Skip to content

Commit f50c87f

Browse files
feat(engine): Prefetch inputs of Merge pipeline concurrently (#19039)
This PR adds prefetching to the inputs of the Merge node. The Merge is a pipeline that takes N inputs and sequentially consumes each one of them. It completely exhausts an input before moving to the next one. However the inputs do not prefetch any data, so calling Read() may wait on i/o (fetching data from object storage, decoding, etc). Prefetching data allows to fetch the next batch which the caller node is processing the current batch. This PR changes the behaviour of the Merge so inputs are prefetched. Since inputs of the Merge pipeline are read sequentially, the current input is always prefetched. The setting `-querier.engine.merge-prefetch-count` controls how many of the next inputs are prefetched concurrently. Signed-off-by: Christian Haudum <[email protected]> Co-authored-by: Ashwanth Goli <[email protected]>
1 parent 04f8929 commit f50c87f

File tree

6 files changed

+210
-111
lines changed

6 files changed

+210
-111
lines changed

docs/sources/shared/configuration.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4913,6 +4913,13 @@ engine:
49134913
# CLI flag: -querier.engine.batch-size
49144914
[batch_size: <int> | default = 100]
49154915
4916+
# Experimental: The number of inputs that are prefetched simultaneously by any
4917+
# Merge node. A value of 0 means that only the currently processed input is
4918+
# prefetched, 1 means that only the next input is prefetched, and so on. A
4919+
# negative value means that all inputs are be prefetched in parallel.
4920+
# CLI flag: -querier.engine.merge-prefetch-count
4921+
[merge_prefetch_count: <int> | default = 0]
4922+
49164923
# Experimental: Maximum total size of future pages for DataObjScan to download
49174924
# before they are needed, for roundtrip reduction to object storage. Setting
49184925
# to zero disables downloading future pages. Only used in the next generation

pkg/engine/engine.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
194194

195195
cfg := executor.Config{
196196
BatchSize: int64(e.opts.BatchSize),
197+
MergePrefetchCount: e.opts.MergePrefetchCount,
197198
Bucket: e.bucket,
198199
DataobjScanPageCacheSize: int64(e.opts.DataobjScanPageCacheSize),
199200
}

pkg/engine/executor/executor.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@ type Config struct {
2727
Bucket objstore.Bucket
2828

2929
DataobjScanPageCacheSize int64
30+
MergePrefetchCount int
3031
}
3132

3233
func Run(ctx context.Context, cfg Config, plan *physical.Plan, logger log.Logger) Pipeline {
3334
c := &Context{
34-
plan: plan,
35-
batchSize: cfg.BatchSize,
36-
bucket: cfg.Bucket,
37-
logger: logger,
35+
plan: plan,
36+
batchSize: cfg.BatchSize,
37+
mergePrefetchCount: cfg.MergePrefetchCount,
38+
bucket: cfg.Bucket,
39+
logger: logger,
3840

3941
dataobjScanPageCacheSize: cfg.DataobjScanPageCacheSize,
4042
}
@@ -51,12 +53,14 @@ func Run(ctx context.Context, cfg Config, plan *physical.Plan, logger log.Logger
5153
// Context is the execution context
5254
type Context struct {
5355
batchSize int64
56+
5457
logger log.Logger
5558
plan *physical.Plan
5659
evaluator expressionEvaluator
5760
bucket objstore.Bucket
5861

5962
dataobjScanPageCacheSize int64
63+
mergePrefetchCount int
6064
}
6165

6266
func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
@@ -314,7 +318,7 @@ func (c *Context) executeMerge(ctx context.Context, _ *physical.Merge, inputs []
314318
return emptyPipeline()
315319
}
316320

317-
pipeline, err := NewMergePipeline(inputs)
321+
pipeline, err := newMergePipeline(inputs, c.mergePrefetchCount)
318322
if err != nil {
319323
return errorPipeline(ctx, err)
320324
}

pkg/engine/executor/merge.go

Lines changed: 73 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,103 +4,146 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"slices"
87

98
"github.com/apache/arrow-go/v18/arrow"
109
)
1110

1211
// Merge is a pipeline that takes N inputs and sequentially consumes each one of them.
1312
// It completely exhausts an input before moving to the next one.
1413
type Merge struct {
15-
inputs []Pipeline
16-
exhausted []bool
17-
state state
14+
inputs []Pipeline
15+
maxPrefetch int
16+
initialized bool
17+
currInput int // index of the currently processed input
18+
state state
1819
}
1920

2021
var _ Pipeline = (*Merge)(nil)
2122

22-
func NewMergePipeline(inputs []Pipeline) (*Merge, error) {
23+
// newMergePipeline creates a new merge pipeline that merges N inputs into a single output.
24+
//
25+
// The argument maxPrefetch controls how many inputs are prefetched simultaneously while the current one is consumed.
26+
// Set maxPrefetch to 0 to disable prefetching of the next input.
27+
// Set maxPrefetch to 1 to prefetch only the next input, and so on.
28+
// Set maxPrefetch to -1 to pretetch all inputs at once.
29+
func newMergePipeline(inputs []Pipeline, maxPrefetch int) (*Merge, error) {
2330
if len(inputs) == 0 {
24-
return nil, fmt.Errorf("no inputs provided for merge pipeline")
31+
return nil, fmt.Errorf("merge pipeline: no inputs provided")
2532
}
2633

34+
// Default to number of inputs if maxConcurrency is negative or exceeds the number of inputs.
35+
if maxPrefetch < 0 || maxPrefetch >= len(inputs) {
36+
maxPrefetch = len(inputs) - 1
37+
}
38+
39+
// Wrap inputs into prefetching pipeline.
2740
for i := range inputs {
41+
// Only wrap input, but do not call init() on it, as it would start prefetching.
42+
// Prefetching is started in the [Merge.init] function
2843
inputs[i] = newPrefetchingPipeline(inputs[i])
2944
}
3045

3146
return &Merge{
32-
inputs: inputs,
33-
exhausted: make([]bool, len(inputs)),
47+
inputs: inputs,
48+
maxPrefetch: maxPrefetch,
3449
}, nil
3550
}
3651

52+
func (m *Merge) init(ctx context.Context) {
53+
if m.initialized {
54+
return
55+
}
56+
57+
// Initialize pre-fetching of inputs defined by maxPrefetch.
58+
// The first/current input is always initialized.
59+
for i := range m.inputs {
60+
if i <= m.maxPrefetch {
61+
m.startPrefetchingInputAtIndex(ctx, i)
62+
}
63+
}
64+
65+
m.initialized = true
66+
}
67+
68+
// startPrefetchingInputAtIndex initializes the input at given index i,
69+
// if the index is not out of bounds and if the input is of type [prefetchWrapper].
70+
// Initializing the input will start its prefetching.
71+
func (m *Merge) startPrefetchingInputAtIndex(ctx context.Context, i int) {
72+
if i >= len(m.inputs) {
73+
return
74+
}
75+
inp, ok := m.inputs[i].(*prefetchWrapper)
76+
if ok {
77+
inp.init(ctx)
78+
}
79+
}
80+
3781
// Read reads the next value into its state.
3882
// It returns an error if reading fails or when the pipeline is exhausted.
3983
func (m *Merge) Read(ctx context.Context) error {
4084
if m.state.err != nil {
4185
return m.state.err
4286
}
4387

88+
m.init(ctx)
4489
record, err := m.read(ctx)
4590
m.state = newState(record, err)
4691

4792
if err != nil {
48-
return fmt.Errorf("run merge: %w", err)
93+
return err
4994
}
5095

5196
return nil
5297
}
5398

5499
func (m *Merge) read(ctx context.Context) (arrow.Record, error) {
55-
if !slices.Contains(m.exhausted, false) {
100+
// All inputs have been consumed and are exhausted
101+
if m.currInput >= len(m.inputs) {
56102
return nil, EOF
57103
}
58104

59-
for i, input := range m.inputs {
60-
if m.exhausted[i] {
61-
continue
62-
}
105+
for m.currInput < len(m.inputs) {
106+
input := m.inputs[m.currInput]
63107

64108
if err := input.Read(ctx); err != nil {
65109
if errors.Is(err, EOF) {
66110
input.Close()
67-
m.exhausted[i] = true
111+
// Proceed to the next input
112+
m.currInput++
113+
// Initialize the next input so it starts prefetching
114+
m.startPrefetchingInputAtIndex(ctx, m.currInput+m.maxPrefetch)
68115
continue
69116
}
70117

71118
return nil, err
72119
}
73120

74-
// not updating reference counts as this pipeline is not consuming
75-
// the record.
76121
return input.Value()
77122
}
78123

79-
// return EOF if none of the inputs returned a record.
124+
// Return EOF if none of the inputs returned a record.
80125
return nil, EOF
81126
}
82127

128+
// Value returns the current value in state.
129+
func (m *Merge) Value() (arrow.Record, error) {
130+
return m.state.Value()
131+
}
132+
83133
// Close implements Pipeline.
84134
func (m *Merge) Close() {
85-
for i, input := range m.inputs {
86-
// exhausted inputs are already closed
87-
if !m.exhausted[i] {
88-
input.Close()
89-
}
135+
// exhausted inputs are already closed
136+
for _, input := range m.inputs[m.currInput:] {
137+
input.Close()
90138
}
91139
}
92140

93-
// Inputs implements Pipeline.
141+
// Inputs returns the inputs of the pipeline.
94142
func (m *Merge) Inputs() []Pipeline {
95143
return m.inputs
96144
}
97145

98-
// Transport implements Pipeline.
146+
// Transport returns the type of transport of the implementation.
99147
func (m *Merge) Transport() Transport {
100148
return Local
101149
}
102-
103-
// Value implements Pipeline.
104-
func (m *Merge) Value() (arrow.Record, error) {
105-
return m.state.Value()
106-
}

0 commit comments

Comments
 (0)