Skip to content

Commit 385d397

Browse files
authored
chore(engine/executor): reduce cost of executing DataObjScan plan node (#18679)
1 parent a5c722c commit 385d397

File tree

3 files changed

+71
-3
lines changed

3 files changed

+71
-3
lines changed

pkg/engine/executor/executor.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,16 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
6868

6969
switch n := node.(type) {
7070
case *physical.DataObjScan:
71-
return tracePipeline("physical.DataObjScan", c.executeDataObjScan(ctx, n))
71+
// DataObjScan reads from object storage to determine the full pipeline to
72+
// construct, making it expensive to call during planning time.
73+
//
74+
// TODO(rfratto): find a way to remove the logic from executeDataObjScan
75+
// which wraps the pipeline with a topk/limit without reintroducing
76+
// planning cost for thousands of scan nodes.
77+
return newLazyPipeline(func(ctx context.Context, _ []Pipeline) Pipeline {
78+
return tracePipeline("physical.DataObjScan", c.executeDataObjScan(ctx, n))
79+
}, inputs)
80+
7281
case *physical.SortMerge:
7382
return tracePipeline("physical.SortMerge", c.executeSortMerge(ctx, n, inputs))
7483
case *physical.Limit:

pkg/engine/executor/pipeline.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,57 @@ func (p *tracedPipeline) Close() { p.inner.Close() }
283283
func (p *tracedPipeline) Inputs() []Pipeline { return p.inner.Inputs() }
284284

285285
func (p *tracedPipeline) Transport() Transport { return Local }
286+
287+
type lazyPipeline struct {
288+
ctor func(ctx context.Context, inputs []Pipeline) Pipeline
289+
290+
inputs []Pipeline
291+
built Pipeline
292+
}
293+
294+
// newLazyPipeline allows for defering construction of a [Pipeline] to query
295+
// execution time instead of planning time. This is useful for pipelines which
296+
// are expensive to construct, or have dependencies which are only available
297+
// during execution.
298+
//
299+
// The ctor function will be invoked on the first call to [Pipeline.Read].
300+
func newLazyPipeline(ctor func(ctx context.Context, inputs []Pipeline) Pipeline, inputs []Pipeline) *lazyPipeline {
301+
return &lazyPipeline{
302+
ctor: ctor,
303+
inputs: inputs,
304+
}
305+
}
306+
307+
var _ Pipeline = (*lazyPipeline)(nil)
308+
309+
// Read reads the next value from the inner pipeline. If this is the first call
310+
// to Read, the inner pipeline will be constructed using the provided context.
311+
func (lp *lazyPipeline) Read(ctx context.Context) error {
312+
if lp.built == nil {
313+
lp.built = lp.ctor(ctx, lp.inputs)
314+
}
315+
return lp.built.Read(ctx)
316+
}
317+
318+
// Value returns the current value from the lazily constructed pipeline. If the
319+
// pipeline has not been constructed yet, it returns an error.
320+
func (lp *lazyPipeline) Value() (arrow.Record, error) {
321+
if lp.built == nil {
322+
return nil, fmt.Errorf("lazyPipeline not built yet")
323+
}
324+
return lp.built.Value()
325+
}
326+
327+
// Close closes the lazily constructed pipeline if it has been built.
328+
func (lp *lazyPipeline) Close() {
329+
if lp.built != nil {
330+
lp.built.Close()
331+
}
332+
lp.built = nil
333+
}
334+
335+
// Inputs implements [Pipeline].
336+
func (lp *lazyPipeline) Inputs() []Pipeline { return lp.inputs }
337+
338+
// Transport implements [Pipeline].
339+
func (lp *lazyPipeline) Transport() Transport { return Local }

pkg/engine/executor/streams_view.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type streamsView struct {
2626
idColumn *streams.Column
2727
searchColumns []*streams.Column // stream ID + labels
2828
batchSize int
29+
pageCacheSize int
2930

3031
initialized bool
3132
streams arrow.Table
@@ -45,6 +46,9 @@ type streamsViewOptions struct {
4546

4647
// Maximum number of stream records to read at once. Defaults to 128.
4748
BatchSize int
49+
50+
// The size of the page cache to use for reading sections.
51+
CacheSize int
4852
}
4953

5054
// newStreamsView creates a new view of the given streams section. Only the
@@ -149,8 +153,9 @@ func (v *streamsView) init(ctx context.Context) (err error) {
149153
}
150154

151155
readerOptions := streams.ReaderOptions{
152-
Columns: v.searchColumns,
153-
Allocator: memory.DefaultAllocator,
156+
Columns: v.searchColumns,
157+
Allocator: memory.DefaultAllocator,
158+
PageCacheSize: v.pageCacheSize,
154159
}
155160

156161
var scalarIDs []scalar.Scalar

0 commit comments

Comments
 (0)