Skip to content

Commit d1501ee

Browse files
committed
Remove CursorProcessorCompiler
PageFunctionCompiler is already capable of handling cursor based source through the RecordPageSource. Switching to it completely is more efficient and significantly reduces code complexity BenchmarkPageProcessor2 (columnCount) (dictionaryBlocks) (type) Mode Cnt columnOriented rowOriented Units 2 false varchar thrpt 10 24094.594 ± 298.176 23104.763 ± 1703.015 ops/s 2 false bigint thrpt 10 227586.727 ± 2931.301 98167.939 ± 1298.626 ops/s 2 true varchar thrpt 10 522379.404 ± 17311.851 22438.570 ± 293.261 ops/s 2 true bigint thrpt 10 531809.004 ± 3907.024 80766.463 ± 2149.091 ops/s 8 false varchar thrpt 10 10905.569 ± 74.438 7881.082 ± 309.976 ops/s 8 false bigint thrpt 10 77246.491 ± 715.483 35992.972 ± 868.800 ops/s 8 true varchar thrpt 10 202403.928 ± 5096.054 7619.441 ± 254.538 ops/s 8 true bigint thrpt 10 210286.557 ± 9252.181 30140.612 ± 579.285 ops/s 32 false varchar thrpt 10 3265.821 ± 39.931 2532.048 ± 95.691 ops/s 32 false bigint thrpt 10 21298.000 ± 141.566 8812.520 ± 193.922 ops/s 32 true varchar thrpt 10 62201.438 ± 883.989 2225.182 ± 37.580 ops/s 32 true bigint thrpt 10 62372.831 ± 452.341 7131.056 ± 98.403 ops/s 200 false varchar thrpt 10 546.173 ± 13.762 300.797 ± 11.186 ops/s 200 false bigint thrpt 10 3428.100 ± 48.497 1189.737 ± 6.415 ops/s 200 true varchar thrpt 10 10243.644 ± 36.385 231.297 ± 4.015 ops/s 200 true bigint thrpt 10 10014.793 ± 366.954 843.738 ± 9.865 ops/s
1 parent 4d9873f commit d1501ee

17 files changed

+10
-997
lines changed

core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java

Lines changed: 1 addition & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,14 @@
2626
import io.trino.metadata.TableHandle;
2727
import io.trino.operator.WorkProcessor.ProcessState;
2828
import io.trino.operator.WorkProcessor.TransformationState;
29-
import io.trino.operator.project.CursorProcessor;
30-
import io.trino.operator.project.CursorProcessorOutput;
3129
import io.trino.operator.project.PageProcessor;
3230
import io.trino.operator.project.PageProcessorMetrics;
3331
import io.trino.spi.Page;
34-
import io.trino.spi.PageBuilder;
3532
import io.trino.spi.connector.ColumnHandle;
3633
import io.trino.spi.connector.ConnectorPageSource;
3734
import io.trino.spi.connector.ConnectorSession;
3835
import io.trino.spi.connector.DynamicFilter;
3936
import io.trino.spi.connector.EmptyPageSource;
40-
import io.trino.spi.connector.RecordCursor;
41-
import io.trino.spi.connector.RecordPageSource;
4237
import io.trino.spi.connector.SourcePage;
4338
import io.trino.spi.metrics.Metrics;
4439
import io.trino.spi.type.Type;
@@ -56,7 +51,6 @@
5651
import java.util.function.Consumer;
5752
import java.util.function.Function;
5853
import java.util.function.LongConsumer;
59-
import java.util.function.Supplier;
6054

6155
import static com.google.common.base.Preconditions.checkState;
6256
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
@@ -74,8 +68,6 @@ public class ScanFilterAndProjectOperator
7468
private final WorkProcessor<Page> pages;
7569
private final PageProcessorMetrics pageProcessorMetrics = new PageProcessorMetrics();
7670

77-
@Nullable
78-
private RecordCursor cursor;
7971
@Nullable
8072
private ConnectorPageSource pageSource;
8173

@@ -93,7 +85,6 @@ private ScanFilterAndProjectOperator(
9385
DriverYieldSignal yieldSignal,
9486
WorkProcessor<Split> split,
9587
PageSourceProvider pageSourceProvider,
96-
CursorProcessor cursorProcessor,
9788
PageProcessor pageProcessor,
9889
TableHandle table,
9990
Iterable<ColumnHandle> columns,
@@ -107,7 +98,6 @@ private ScanFilterAndProjectOperator(
10798
session,
10899
yieldSignal,
109100
pageSourceProvider,
110-
cursorProcessor,
111101
pageProcessor,
112102
table,
113103
columns,
@@ -163,9 +153,6 @@ public Metrics getConnectorMetrics()
163153
@Override
164154
public Metrics getMetrics()
165155
{
166-
if (cursor != null) {
167-
return Metrics.EMPTY;
168-
}
169156
return pageProcessorMetrics.getMetrics();
170157
}
171158

@@ -187,9 +174,6 @@ public void close()
187174
throw new UncheckedIOException(e);
188175
}
189176
}
190-
else if (cursor != null) {
191-
cursor.close();
192-
}
193177
}
194178

195179
private class SplitToPages
@@ -198,7 +182,6 @@ private class SplitToPages
198182
final Session session;
199183
final DriverYieldSignal yieldSignal;
200184
final PageSourceProvider pageSourceProvider;
201-
final CursorProcessor cursorProcessor;
202185
final PageProcessor pageProcessor;
203186
final TableHandle table;
204187
final List<ColumnHandle> columns;
@@ -215,7 +198,6 @@ private class SplitToPages
215198
Session session,
216199
DriverYieldSignal yieldSignal,
217200
PageSourceProvider pageSourceProvider,
218-
CursorProcessor cursorProcessor,
219201
PageProcessor pageProcessor,
220202
TableHandle table,
221203
Iterable<ColumnHandle> columns,
@@ -228,7 +210,6 @@ private class SplitToPages
228210
this.session = requireNonNull(session, "session is null");
229211
this.yieldSignal = requireNonNull(yieldSignal, "yieldSignal is null");
230212
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
231-
this.cursorProcessor = requireNonNull(cursorProcessor, "cursorProcessor is null");
232213
this.pageProcessor = requireNonNull(pageProcessor, "pageProcessor is null");
233214
this.table = requireNonNull(table, "table is null");
234215
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
@@ -250,7 +231,7 @@ public TransformationState<WorkProcessor<Page>> process(Split split)
250231
return finished();
251232
}
252233

253-
checkState(cursor == null && pageSource == null, "Table scan split already set");
234+
checkState(pageSource == null, "Table scan split already set");
254235

255236
if (!dynamicFilter.getCurrentPredicate().isAll()) {
256237
dynamicFilterSplitsProcessed++;
@@ -264,22 +245,10 @@ public TransformationState<WorkProcessor<Page>> process(Split split)
264245
source = pageSourceProvider.createPageSource(session, split, table, columns, dynamicFilter);
265246
}
266247

267-
if (source instanceof RecordPageSource recordPageSource) {
268-
cursor = recordPageSource.getCursor();
269-
return ofResult(processColumnSource());
270-
}
271248
pageSource = source;
272249
return ofResult(processPageSource());
273250
}
274251

275-
WorkProcessor<Page> processColumnSource()
276-
{
277-
return WorkProcessor
278-
.create(new RecordCursorToPages(session, yieldSignal, cursorProcessor, types, pageSourceMemoryContext, outputMemoryContext))
279-
.yielding(yieldSignal::isSet)
280-
.blocking(() -> memoryContext.setBytes(localAggregatedMemoryContext.getBytes()));
281-
}
282-
283252
WorkProcessor<Page> processPageSource()
284253
{
285254
ConnectorSession connectorSession = session.toConnectorSession();
@@ -301,68 +270,6 @@ WorkProcessor<Page> processPageSource()
301270
}
302271
}
303272

304-
private class RecordCursorToPages
305-
implements WorkProcessor.Process<Page>
306-
{
307-
final ConnectorSession session;
308-
final DriverYieldSignal yieldSignal;
309-
final CursorProcessor cursorProcessor;
310-
final PageBuilder pageBuilder;
311-
final LocalMemoryContext pageSourceMemoryContext;
312-
final LocalMemoryContext outputMemoryContext;
313-
314-
boolean finished;
315-
316-
RecordCursorToPages(
317-
Session session,
318-
DriverYieldSignal yieldSignal,
319-
CursorProcessor cursorProcessor,
320-
List<Type> types,
321-
LocalMemoryContext pageSourceMemoryContext,
322-
LocalMemoryContext outputMemoryContext)
323-
{
324-
this.session = session.toConnectorSession();
325-
this.yieldSignal = yieldSignal;
326-
this.cursorProcessor = cursorProcessor;
327-
this.pageBuilder = new PageBuilder(types);
328-
this.pageSourceMemoryContext = pageSourceMemoryContext;
329-
this.outputMemoryContext = outputMemoryContext;
330-
}
331-
332-
@Override
333-
public ProcessState<Page> process()
334-
{
335-
if (!finished) {
336-
CursorProcessorOutput output = cursorProcessor.process(session, yieldSignal, cursor, pageBuilder);
337-
pageSourceMemoryContext.setBytes(cursor.getMemoryUsage());
338-
339-
processedPositions += output.getProcessedRows();
340-
// TODO: derive better values for cursors
341-
processedBytes = cursor.getCompletedBytes();
342-
physicalBytes = cursor.getCompletedBytes();
343-
physicalPositions = processedPositions;
344-
readTimeNanos = cursor.getReadTimeNanos();
345-
if (output.isNoMoreRows()) {
346-
finished = true;
347-
}
348-
}
349-
350-
if (pageBuilder.isFull() || (finished && !pageBuilder.isEmpty())) {
351-
// only return a page if buffer is full or cursor has finished
352-
Page page = pageBuilder.build();
353-
pageBuilder.reset();
354-
outputMemoryContext.setBytes(pageBuilder.getRetainedSizeInBytes());
355-
return ProcessState.ofResult(page);
356-
}
357-
if (finished) {
358-
checkState(pageBuilder.isEmpty());
359-
return ProcessState.finished();
360-
}
361-
outputMemoryContext.setBytes(pageBuilder.getRetainedSizeInBytes());
362-
return ProcessState.yielded();
363-
}
364-
}
365-
366273
static class ProcessedBytesMonitor
367274
implements Consumer<ProcessState<Page>>
368275
{
@@ -445,7 +352,6 @@ public static class ScanFilterAndProjectOperatorFactory
445352
{
446353
private final int operatorId;
447354
private final PlanNodeId planNodeId;
448-
private final Supplier<CursorProcessor> cursorProcessor;
449355
private final Function<DynamicFilter, PageProcessor> pageProcessor;
450356
private final PlanNodeId sourceId;
451357
private final PageSourceProvider pageSourceProvider;
@@ -462,7 +368,6 @@ public ScanFilterAndProjectOperatorFactory(
462368
PlanNodeId planNodeId,
463369
PlanNodeId sourceId,
464370
PageSourceProviderFactory pageSourceProvider,
465-
Supplier<CursorProcessor> cursorProcessor,
466371
Function<DynamicFilter, PageProcessor> pageProcessor,
467372
TableHandle table,
468373
Iterable<ColumnHandle> columns,
@@ -473,7 +378,6 @@ public ScanFilterAndProjectOperatorFactory(
473378
{
474379
this.operatorId = operatorId;
475380
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
476-
this.cursorProcessor = requireNonNull(cursorProcessor, "cursorProcessor is null");
477381
this.pageProcessor = requireNonNull(pageProcessor, "pageProcessor is null");
478382
this.sourceId = requireNonNull(sourceId, "sourceId is null");
479383
this.table = requireNonNull(table, "table is null");
@@ -530,7 +434,6 @@ public WorkProcessorSourceOperator create(
530434
yieldSignal,
531435
split,
532436
pageSourceProvider,
533-
cursorProcessor.get(),
534437
pageProcessor.apply(dynamicFilter),
535438
table,
536439
columns,

core/trino-main/src/main/java/io/trino/operator/project/CursorProcessor.java

Lines changed: 0 additions & 24 deletions
This file was deleted.

core/trino-main/src/main/java/io/trino/operator/project/CursorProcessorOutput.java

Lines changed: 0 additions & 49 deletions
This file was deleted.

core/trino-main/src/main/java/io/trino/server/ServerMainModule.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@
127127
import io.trino.sql.SqlEnvironmentConfig;
128128
import io.trino.sql.analyzer.SessionTimeProvider;
129129
import io.trino.sql.analyzer.StatementAnalyzerFactory;
130-
import io.trino.sql.gen.CursorProcessorCompiler;
131130
import io.trino.sql.gen.ExpressionCompiler;
132131
import io.trino.sql.gen.JoinCompiler;
133132
import io.trino.sql.gen.JoinFilterFunctionCompiler;
@@ -276,8 +275,6 @@ protected void setup(Binder binder)
276275
newExporter(binder).export(PageFunctionCompiler.class).withGeneratedName();
277276
binder.bind(ColumnarFilterCompiler.class).in(Scopes.SINGLETON);
278277
newExporter(binder).export(ColumnarFilterCompiler.class).withGeneratedName();
279-
binder.bind(CursorProcessorCompiler.class).in(Scopes.SINGLETON);
280-
newExporter(binder).export(CursorProcessorCompiler.class).withGeneratedName();
281278
configBinder(binder).bindConfig(TaskManagerConfig.class);
282279

283280
// TODO: use conditional module

0 commit comments

Comments
 (0)