Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private static GroupByColumnSelectorStrategy forStringArrays()
);
}

private static class UniValueDimensionIdCodec implements DimensionIdCodec<Object>
public static class UniValueDimensionIdCodec implements DimensionIdCodec<Object>
{
/**
* Dictionary for mapping the dimension value to an index. i-th position in the dictionary holds the value represented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public class MemoryFootprint<T>
private final T value;
private final int footprintIncrease;

// Reduced visibility
MemoryFootprint(T value, int footprintIncrease)
public MemoryFootprint(T value, int footprintIncrease)
{
this.value = value;
this.footprintIncrease = footprintIncrease;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.groupby.epinephelinae.vector;

import org.apache.druid.query.groupby.epinephelinae.column.DictionaryBuildingGroupByColumnSelectorStrategy;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.vector.VectorObjectSelector;

/**
* Selector that groups complex columns using a dictionary.
*
* @see DictionaryBuildingSingleValueStringGroupByVectorColumnSelector similar selector for non-dict-encoded strings
* @see DictionaryBuildingGroupByColumnSelectorStrategy#forType(ColumnType) which creates the nonvectorized version
*/
public class DictionaryBuildingComplexGroupByVectorColumnSelector
extends DictionaryBuildingGroupByVectorColumnSelector<Object>
{
public DictionaryBuildingComplexGroupByVectorColumnSelector(
final VectorObjectSelector selector,
final ColumnType columnType
)
{
super(
selector,
new DictionaryBuildingGroupByColumnSelectorStrategy.UniValueDimensionIdCodec(columnType.getNullableStrategy())
);
}

@Override
protected Object convertValue(final Object rawValue)
{
return rawValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.groupby.epinephelinae.vector;

import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.query.groupby.epinephelinae.column.DictionaryBuildingGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.DimensionIdCodec;
import org.apache.druid.query.groupby.epinephelinae.column.MemoryFootprint;
import org.apache.druid.segment.vector.VectorObjectSelector;

/**
* Base class for {@link GroupByVectorColumnSelector} that build dictionaries for values that are not
* natively dictionary-encoded.
*
* @see DictionaryBuildingGroupByColumnSelectorStrategy the nonvectorized version
*/
public abstract class DictionaryBuildingGroupByVectorColumnSelector<T> implements GroupByVectorColumnSelector
{
protected final VectorObjectSelector selector;
protected final DimensionIdCodec<T> dimensionIdCodec;

protected DictionaryBuildingGroupByVectorColumnSelector(
final VectorObjectSelector selector,
final DimensionIdCodec<T> dimensionIdCodec
)
{
this.selector = selector;
this.dimensionIdCodec = dimensionIdCodec;
}

@Override
public final int getGroupingKeySize()
{
return Integer.BYTES;
}

@Override
public final int writeKeys(
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
final int startRow,
final int endRow
)
{
final Object[] vector = selector.getObjectVector();
int stateFootprintIncrease = 0;

for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
final T value = convertValue(vector[i]);
final MemoryFootprint<Integer> idAndMemoryIncrease = dimensionIdCodec.lookupId(value);
keySpace.putInt(j, idAndMemoryIncrease.value());
stateFootprintIncrease += idAndMemoryIncrease.memoryIncrease();
}

return stateFootprintIncrease;
}

@Override
public final void writeKeyToResultRow(
final MemoryPointer keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
final int id = keyMemory.memory().getInt(keyMemory.position() + keyOffset);
final T value = dimensionIdCodec.idToKey(id);
resultRow.set(resultRowPosition, value);
}

@Override
public final void reset()
{
dimensionIdCodec.reset();
}

/**
* Convert raw value from the vector to the appropriate type for this selector.
*/
protected abstract T convertValue(Object rawValue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

package org.apache.druid.query.groupby.epinephelinae.vector;

import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.DictionaryBuildingUtils;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.query.groupby.epinephelinae.column.DictionaryBuildingGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.DimensionIdCodec;
import org.apache.druid.query.groupby.epinephelinae.column.MemoryFootprint;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.vector.VectorObjectSelector;

import java.util.ArrayList;
Expand All @@ -34,85 +36,65 @@
* A {@link GroupByVectorColumnSelector} that builds an internal String<->Integer dictionary, used for grouping
* single-valued STRING columns which are not natively dictionary encoded, e.g. expression virtual columns.
*
* This is effectively the {@link VectorGroupByEngine} analog of
* {@link org.apache.druid.query.groupby.epinephelinae.column.DictionaryBuildingGroupByColumnSelectorStrategy} for
* String columns
* @see DictionaryBuildingComplexGroupByVectorColumnSelector similar selector for complex columns
* @see DictionaryBuildingGroupByColumnSelectorStrategy#forType(ColumnType) which creates the nonvectorized version
*/
public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector implements GroupByVectorColumnSelector
public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector
extends DictionaryBuildingGroupByVectorColumnSelector<String>
{
private static final int GROUP_BY_MISSING_VALUE = -1;

private final VectorObjectSelector selector;

private final List<String> dictionary = new ArrayList<>();
private final Object2IntOpenHashMap<String> reverseDictionary = new Object2IntOpenHashMap<>();

public DictionaryBuildingSingleValueStringGroupByVectorColumnSelector(VectorObjectSelector selector)
public DictionaryBuildingSingleValueStringGroupByVectorColumnSelector(final VectorObjectSelector selector)
{
this.selector = selector;
this.reverseDictionary.defaultReturnValue(-1);
super(selector, new StringDimensionIdCodec());
}

@Override
public int getGroupingKeySize()
protected String convertValue(final Object rawValue)
{
return Integer.BYTES;
return DimensionHandlerUtils.convertObjectToString(rawValue);
}

@Override
public int writeKeys(
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
final int startRow,
final int endRow
)
private static class StringDimensionIdCodec implements DimensionIdCodec<String>
{
final Object[] vector = selector.getObjectVector();
int stateFootprintIncrease = 0;
private final List<String> dictionary = new ArrayList<>();
private final Object2IntMap<String> reverseDictionary = new Object2IntOpenHashMap<>();

for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
final String value = DimensionHandlerUtils.convertObjectToString(vector[i]);
final int dictId = reverseDictionary.getInt(value);
StringDimensionIdCodec()
{
reverseDictionary.defaultReturnValue(-1);
}

@Override
public MemoryFootprint<Integer> lookupId(final String value)
{
int dictId = reverseDictionary.getInt(value);
int footprintIncrease = 0;
if (dictId < 0) {
final int nextId = dictionary.size();
dictId = dictionary.size();
dictionary.add(value);
reverseDictionary.put(value, nextId);
keySpace.putInt(j, nextId);

// Use same ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY as the nonvectorized version; dictionary structure is the same.
stateFootprintIncrease +=
DictionaryBuildingUtils.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
} else {
keySpace.putInt(j, dictId);
reverseDictionary.put(value, dictId);
footprintIncrease =
DictionaryBuildingUtils.estimateEntryFootprint(value == null ? 0 : value.length() * Character.BYTES);
}
return new MemoryFootprint<>(dictId, footprintIncrease);
}

return stateFootprintIncrease;
}
@Override
public String idToKey(final int id)
{
return dictionary.get(id);
}

@Override
public void writeKeyToResultRow(
final MemoryPointer keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
final int id = keyMemory.memory().getInt(keyMemory.position() + keyOffset);
// GROUP_BY_MISSING_VALUE is used to indicate empty rows, which are omitted from the result map.
if (id != GROUP_BY_MISSING_VALUE) {
final String value = dictionary.get(id);
resultRow.set(resultRowPosition, value);
} else {
resultRow.set(resultRowPosition, null);
@Override
public boolean canCompareIds()
{
return false;
}
}

@Override
public void reset()
{
dictionary.clear();
reverseDictionary.clear();
@Override
public void reset()
{
dictionary.clear();
reverseDictionary.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,11 @@ public GroupByVectorColumnSelector makeObjectProcessor(
);
}
return new DictionaryBuildingSingleValueStringGroupByVectorColumnSelector(selector);
} else if (capabilities.is(ValueType.COMPLEX)) {
return new DictionaryBuildingComplexGroupByVectorColumnSelector(selector, capabilities.toColumnType());
} else {
return NilGroupByVectorColumnSelector.INSTANCE;
}
return NilGroupByVectorColumnSelector.INSTANCE;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ public static boolean canVectorizeDimensions(
return false;
}

if (!dimension.getOutputType().isPrimitive()) {
// group by on arrays and complex types is not currently supported in the vector processing engine
if (dimension.getOutputType().isArray()) {
// group by on arrays is not currently supported in the vector processing engine
return false;
}

Expand Down
Loading