Skip to content

Commit 629dd67

Browse files
committed
feat: Implement Scan/Projection/Filter and HashJoin operators.
1 parent 32939eb commit 629dd67

File tree

11 files changed

+520
-13
lines changed

11 files changed

+520
-13
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package co.clflushopt.glint.query.physical.expr;
2+
3+
import co.clflushopt.glint.types.ArrowTypes;
4+
import co.clflushopt.glint.types.ColumnVector;
5+
import co.clflushopt.glint.types.LiteralValueVector;
6+
import co.clflushopt.glint.types.RecordBatch;
7+
8+
public class LiteralIntExpr implements Expr {
9+
private int value;
10+
11+
public LiteralIntExpr(int value) {
12+
this.value = value;
13+
}
14+
15+
@Override
16+
public String toString() {
17+
return Long.toString(value);
18+
}
19+
20+
@Override
21+
public ColumnVector eval(RecordBatch input) {
22+
return new LiteralValueVector(ArrowTypes.Int32Type, value, input.getRowSize());
23+
}
24+
25+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package co.clflushopt.glint.query.physical.plan;
2+
3+
import java.util.Iterator;
4+
import java.util.List;
5+
import java.util.stream.Collectors;
6+
import java.util.stream.IntStream;
7+
8+
import org.apache.arrow.memory.RootAllocator;
9+
import org.apache.arrow.vector.BitVector;
10+
import org.apache.arrow.vector.FieldVector;
11+
import org.apache.arrow.vector.VarCharVector;
12+
13+
import co.clflushopt.glint.query.physical.expr.Expr;
14+
import co.clflushopt.glint.types.ArrowFieldVector;
15+
import co.clflushopt.glint.types.ArrowVectorBuilder;
16+
import co.clflushopt.glint.types.ColumnVector;
17+
import co.clflushopt.glint.types.RecordBatch;
18+
import co.clflushopt.glint.types.Schema;
19+
20+
public class FilterOperator implements PhysicalPlan {
21+
private final PhysicalPlan input;
22+
private final Expr expr; // Assuming you have an Expression interface
23+
24+
public FilterOperator(PhysicalPlan input, Expr expr) {
25+
this.input = input;
26+
this.expr = expr;
27+
}
28+
29+
@Override
30+
public Iterator<RecordBatch> execute() {
31+
// Convert Sequence to Iterator/Iterable
32+
Iterator<RecordBatch> inputIterator = input.execute();
33+
34+
// Return a new Iterator that applies the filter
35+
return new Iterator<RecordBatch>() {
36+
@Override
37+
public boolean hasNext() {
38+
return inputIterator.hasNext();
39+
}
40+
41+
@Override
42+
public RecordBatch next() {
43+
RecordBatch batch = inputIterator.next();
44+
BitVector result = (BitVector) ((ArrowFieldVector) expr.eval(batch)).getField();
45+
Schema schema = batch.getSchema();
46+
int columnCount = schema.getFields().size();
47+
48+
// Filter each field
49+
List<FieldVector> filteredFields = IntStream.range(0, columnCount)
50+
.mapToObj(i -> filter(batch.getField(i), result))
51+
.collect(Collectors.toList());
52+
53+
// Convert to ArrowFieldVectors
54+
List<ColumnVector> fields = filteredFields.stream().map(ArrowFieldVector::new)
55+
.collect(Collectors.toList());
56+
57+
return new RecordBatch(schema, fields);
58+
}
59+
};
60+
}
61+
62+
private FieldVector filter(ColumnVector v, BitVector selection) {
63+
VarCharVector filteredVector = new VarCharVector("v", new RootAllocator(Long.MAX_VALUE));
64+
filteredVector.allocateNew();
65+
66+
ArrowVectorBuilder builder = new ArrowVectorBuilder(filteredVector);
67+
68+
int count = 0;
69+
for (int i = 0; i < selection.getValueCount(); i++) {
70+
if (selection.get(i) == 1) {
71+
builder.setValue(count, v.getValue(i));
72+
count++;
73+
}
74+
}
75+
filteredVector.setValueCount(count);
76+
return filteredVector;
77+
}
78+
79+
@Override
80+
public Schema getSchema() {
81+
return input.getSchema();
82+
}
83+
84+
@Override
85+
public List<PhysicalPlan> getChildren() {
86+
return List.of(input);
87+
}
88+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package co.clflushopt.glint.query.physical.plan;
2+
3+
import java.util.Collections;
4+
import java.util.HashMap;
5+
import java.util.Iterator;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.stream.Collectors;
9+
10+
import org.apache.arrow.memory.RootAllocator;
11+
import org.apache.arrow.vector.VectorSchemaRoot;
12+
13+
import co.clflushopt.glint.query.functional.Accumulator;
14+
import co.clflushopt.glint.query.physical.expr.AggregateExpr;
15+
import co.clflushopt.glint.query.physical.expr.Expr;
16+
import co.clflushopt.glint.types.ArrowFieldVector;
17+
import co.clflushopt.glint.types.ArrowVectorBuilder;
18+
import co.clflushopt.glint.types.ColumnVector;
19+
import co.clflushopt.glint.types.RecordBatch;
20+
import co.clflushopt.glint.types.Schema;
21+
22+
/**
23+
* HashJoinOperator implements the Hash Aggregate Join algorithm where the input
24+
* is consumed from two sources and the join is performed in two phases: build
25+
* and probe.
26+
*
27+
* The build phase consumes the left input and builds a hash table using the
28+
* join key. The probe phase consumes the right input and probes the hash table
29+
* to find matching rows.
30+
*
31+
*/
32+
public class HashJoinOperator implements PhysicalPlan {
33+
private PhysicalPlan input;
34+
private List<Expr> groupByExpr;
35+
private List<AggregateExpr> aggregateExpr;
36+
private Schema schema;
37+
38+
/**
39+
* Create a new HashJoinOperator.
40+
*
41+
* @param left the left input operator.
42+
* @param right the right input operator.
43+
* @param leftKey the join key for the left input.
44+
* @param rightKey the join key for the right input.
45+
*/
46+
public HashJoinOperator(PhysicalPlan input, List<Expr> groupByExpr,
47+
List<AggregateExpr> aggregateExpr, Schema schema) {
48+
this.input = input;
49+
this.groupByExpr = groupByExpr;
50+
this.aggregateExpr = aggregateExpr;
51+
this.schema = schema;
52+
}
53+
54+
@Override
55+
public Iterator<RecordBatch> execute() {
56+
// Map to store grouping keys and their accumulators
57+
Map<List<Object>, List<Accumulator>> map = new HashMap<>();
58+
59+
// Process each batch from input
60+
Iterator<RecordBatch> inputIter = input.execute();
61+
while (inputIter.hasNext()) {
62+
RecordBatch batch = inputIter.next();
63+
64+
// Evaluate grouping expressions
65+
List<ColumnVector> groupKeys = groupByExpr.stream().map(expr -> expr.eval(batch))
66+
.collect(Collectors.toList());
67+
68+
// Evaluate aggregate input expressions
69+
List<ColumnVector> aggrInputValues = aggregateExpr.stream()
70+
.map(expr -> expr.getInputExpr().eval(batch)).collect(Collectors.toList());
71+
72+
// Process each row in the batch
73+
for (int rowIndex = 0; rowIndex < batch.getRowSize(); rowIndex++) {
74+
// Create final variable for lambda.
75+
final int currentRow = rowIndex;
76+
// Create key for hash map
77+
List<Object> rowKey = groupKeys.stream().map(key -> {
78+
Object value = key.getValue(currentRow);
79+
if (value instanceof byte[]) {
80+
return new String((byte[]) value);
81+
}
82+
return value;
83+
}).collect(Collectors.toList());
84+
85+
// Get or create accumulators for this grouping key
86+
List<Accumulator> accumulators = map.computeIfAbsent(rowKey, k -> aggregateExpr
87+
.stream().map(acc -> acc.getAccumulator()).collect(Collectors.toList()));
88+
89+
// Perform accumulation
90+
for (int i = 0; i < accumulators.size(); i++) {
91+
Object value = aggrInputValues.get(i).getValue(rowIndex);
92+
accumulators.get(i).accumulate(value);
93+
}
94+
}
95+
}
96+
97+
// Create result batch with final aggregate values
98+
VectorSchemaRoot root = VectorSchemaRoot.create(schema.toArrow(),
99+
new RootAllocator(Long.MAX_VALUE));
100+
root.allocateNew();
101+
root.setRowCount(map.size());
102+
103+
List<ArrowVectorBuilder> builders = root.getFieldVectors().stream()
104+
.map(ArrowVectorBuilder::new).collect(Collectors.toList());
105+
106+
int rowIndex = 0;
107+
for (Map.Entry<List<Object>, List<Accumulator>> entry : map.entrySet()) {
108+
List<Object> groupingKey = entry.getKey();
109+
List<Accumulator> accumulators = entry.getValue();
110+
111+
// Set grouping key values
112+
for (int i = 0; i < groupByExpr.size(); i++) {
113+
builders.get(i).setValue(rowIndex, groupingKey.get(i));
114+
}
115+
116+
// Set aggregate values
117+
for (int i = 0; i < aggregateExpr.size(); i++) {
118+
builders.get(groupByExpr.size() + i).setValue(rowIndex,
119+
accumulators.get(i).getResult());
120+
}
121+
rowIndex++;
122+
}
123+
124+
RecordBatch outputBatch = new RecordBatch(schema, root.getFieldVectors().stream()
125+
.map(ArrowFieldVector::new).collect(Collectors.toList()));
126+
127+
return Collections.singletonList(outputBatch).iterator();
128+
}
129+
130+
@Override
131+
public Schema getSchema() {
132+
return schema;
133+
}
134+
135+
@Override
136+
public List<PhysicalPlan> getChildren() {
137+
return List.of(input);
138+
}
139+
140+
}

glint/src/main/java/co/clflushopt/glint/query/physical/plan/PhysicalPlan.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package co.clflushopt.glint.query.physical.plan;
22

3+
import java.util.Iterator;
34
import java.util.List;
45
import java.util.stream.IntStream;
56

@@ -25,7 +26,7 @@ public interface PhysicalPlan {
2526
* is equivalent to `next()` in the Volcano paper.
2627
*
2728
*/
28-
public Iterable<RecordBatch> execute();
29+
public Iterator<RecordBatch> execute();
2930

3031
/**
3132
* Returns the pipeline structure of the plan.

glint/src/main/java/co/clflushopt/glint/query/physical/plan/ProjectionOperator.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package co.clflushopt.glint.query.physical.plan;
22

3-
import java.util.ArrayList;
3+
import java.util.Iterator;
44
import java.util.List;
55
import java.util.stream.Collectors;
66

77
import co.clflushopt.glint.query.physical.expr.Expr;
8+
import co.clflushopt.glint.types.ColumnVector;
89
import co.clflushopt.glint.types.RecordBatch;
910
import co.clflushopt.glint.types.Schema;
1011

@@ -31,17 +32,23 @@ public ProjectionOperator(PhysicalPlan input, Schema schema, List<Expr> projecti
3132
}
3233

3334
@Override
34-
public Iterable<RecordBatch> execute() {
35-
var iter = input.execute().iterator();
36-
List<RecordBatch> result = new ArrayList<>();
35+
public Iterator<RecordBatch> execute() {
36+
Iterator<RecordBatch> inputIterator = input.execute();
3737

38-
while (iter.hasNext()) {
39-
var columns = this.projections.stream().map(expr -> expr.eval(iter.next()))
40-
.collect(Collectors.toList());
41-
result.add(new RecordBatch(schema, columns));
42-
}
38+
return new Iterator<RecordBatch>() {
39+
@Override
40+
public boolean hasNext() {
41+
return inputIterator.hasNext();
42+
}
4343

44-
return result;
44+
@Override
45+
public RecordBatch next() {
46+
RecordBatch batch = inputIterator.next();
47+
List<ColumnVector> columns = projections.stream()
48+
.map(expression -> expression.eval(batch)).collect(Collectors.toList());
49+
return new RecordBatch(schema, columns);
50+
}
51+
};
4552
}
4653

4754
@Override

glint/src/main/java/co/clflushopt/glint/query/physical/plan/ScanOperator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package co.clflushopt.glint.query.physical.plan;
22

3+
import java.util.Iterator;
34
import java.util.List;
45

56
import co.clflushopt.glint.datasource.DataSource;
@@ -30,8 +31,8 @@ public Schema getSchema() {
3031
}
3132

3233
@Override
33-
public Iterable<RecordBatch> execute() {
34-
return dataSource.scan(projection);
34+
public Iterator<RecordBatch> execute() {
35+
return dataSource.scan(projection).iterator();
3536
}
3637

3738
@Override
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package co.clflushopt.glint.query.physical;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
5+
6+
import java.util.Collections;
7+
import java.util.Iterator;
8+
9+
import org.junit.Test;
10+
11+
import co.clflushopt.glint.query.physical.expr.BooleanExpr;
12+
import co.clflushopt.glint.query.physical.expr.ColumnExpr;
13+
import co.clflushopt.glint.query.physical.expr.Expr;
14+
import co.clflushopt.glint.query.physical.expr.LiteralIntExpr;
15+
import co.clflushopt.glint.query.physical.plan.FilterOperator;
16+
import co.clflushopt.glint.query.physical.plan.ScanOperator;
17+
import co.clflushopt.glint.types.RecordBatch;
18+
import co.clflushopt.glint.types.Schema;
19+
20+
public class FilterOperatorTest {
21+
@Test
22+
public void testFilter() {
23+
// Create test data
24+
Schema schema = new Schema(TestUtils.createTestSchema());
25+
RecordBatch testBatch = TestUtils.createTestBatch();
26+
SyntheticDataSource dataSource = new SyntheticDataSource(schema,
27+
Collections.singletonList(testBatch));
28+
ScanOperator scan = new ScanOperator(dataSource, Collections.emptyList());
29+
30+
// Filter age > 30
31+
Expr filterExpr = new BooleanExpr.GtExpression(new ColumnExpr(2), new LiteralIntExpr(30));
32+
33+
FilterOperator filter = new FilterOperator(scan, filterExpr);
34+
Iterator<RecordBatch> result = filter.execute();
35+
36+
// Verify results
37+
assertTrue(result.hasNext());
38+
RecordBatch batch = result.next();
39+
assertEquals(1, batch.getRowSize());
40+
assertEquals("Charlie", batch.getField(1).getValue(0));
41+
assertEquals(35, Integer.parseInt((String) batch.getField(2).getValue(0)));
42+
}
43+
}

0 commit comments

Comments
 (0)