Skip to content

Commit 95025fa

Browse files
committed
remove scans batching
1 parent 914a2b7 commit 95025fa

File tree

12 files changed

+17
-172
lines changed

12 files changed

+17
-172
lines changed

docs/sources/shared/configuration.md

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4937,11 +4937,6 @@ otlp_config:
49374937
# CLI flag: -limits.debug-engine-streams
49384938
[debug_engine_streams: <boolean> | default = false]
49394939

4940-
# Experimental: Number of PointersScan targets to batch per task at the workflow
4941-
# level. 0 = one task per path.
4942-
# CLI flag: -limits.metastore-pointers-scans-per-task
4943-
[metastore_pointers_scans_per_task: <int> | default = 0]
4944-
49454940
# Experimental: Maximum size of record batches sent to sinks when draining the
49464941
# pipeline (e.g. 1MB). 0 = no batching, send each record as-is.
49474942
# CLI flag: -limits.record-batch-size

pkg/engine/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ func (e *Engine) buildPhysicalPlan(ctx context.Context, tenantID string, logger
367367
}
368368

369369
func (e *Engine) metastoreSectionsResolver(ctx context.Context, tenantID string) physical.MetastoreSectionsResolver {
370-
planner := physical.NewMetastorePlanner(e.metastore, e.limits.MetastorePointersScansPerTask(tenantID))
370+
planner := physical.NewMetastorePlanner(e.metastore)
371371
return func(selector physical.Expression, predicates []physical.Expression, start time.Time, end time.Time) ([]*metastore.DataobjSectionDescriptor, error) {
372372
ctx, region := xcap.StartRegion(ctx, "ObjectMetastore.Sections")
373373
defer region.End()

pkg/engine/internal/planner/physical/metastore_planner.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,11 @@ import (
1313

1414
type MetastorePlanner struct {
1515
metastore metastore.Metastore
16-
17-
// PointersScansPerTask is the batch size for workflow-level sharding: when > 0,
18-
// the ScanSet gets ShardBatchSize set so each task runs up to this many PointersScan targets.
19-
// 0 means one task per path (current behavior).
20-
PointersScansPerTask int
2116
}
2217

23-
func NewMetastorePlanner(metastore metastore.Metastore, pointersScansPerTask int) MetastorePlanner {
18+
func NewMetastorePlanner(metastore metastore.Metastore) MetastorePlanner {
2419
return MetastorePlanner{
25-
metastore: metastore,
26-
PointersScansPerTask: pointersScansPerTask,
20+
metastore: metastore,
2721
}
2822
}
2923

@@ -51,8 +45,7 @@ func (p MetastorePlanner) Plan(ctx context.Context, selector Expression, predica
5145
plan.graph.Add(parallelize)
5246

5347
scanSet := &ScanSet{
54-
NodeID: ulid.Make(),
55-
ShardBatchSize: p.PointersScansPerTask,
48+
NodeID: ulid.Make(),
5649
}
5750
plan.graph.Add(scanSet)
5851

pkg/engine/internal/planner/physical/metastore_planner_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ func TestMetastorePlanner_Plan_UsesMergeRootAndPointersTargets(t *testing.T) {
3838
start := time.Unix(10, 0)
3939
end := start.Add(time.Hour)
4040

41-
const shardSize = 2
42-
p := NewMetastorePlanner(ms, shardSize)
41+
p := NewMetastorePlanner(ms)
4342
plan, err := p.Plan(context.Background(), nil, nil, start, end)
4443
require.NoError(t, err)
4544

@@ -66,6 +65,4 @@ func TestMetastorePlanner_Plan_UsesMergeRootAndPointersTargets(t *testing.T) {
6665
require.Equal(t, start, target.Pointers.Start)
6766
require.Equal(t, end, target.Pointers.End)
6867
}
69-
70-
require.Equal(t, shardSize, set.ShardBatchSize)
7168
}

pkg/engine/internal/planner/physical/scanset.go

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ type ScanSet struct {
7171
// returned. Predicates would almost always contain a time range filter to
7272
// only read the logs for the requested time range.
7373
Predicates []Expression
74-
75-
// ShardBatchSize, when > 0, causes Shards() to yield ScanSet nodes with up to
76-
// ShardBatchSize targets each (workflow-level batching). Applies to all target types.
77-
// 0 means one shard per target.
78-
ShardBatchSize int
7974
}
8075

8176
// ID returns the ULID that uniquely identifies the node in the plan.
@@ -89,9 +84,8 @@ func (s *ScanSet) Clone() Node {
8984
}
9085

9186
return &ScanSet{
92-
NodeID: ulid.Make(),
93-
Targets: newTargets,
94-
ShardBatchSize: s.ShardBatchSize,
87+
NodeID: ulid.Make(),
88+
Targets: newTargets,
9589
}
9690
}
9791

@@ -104,40 +98,9 @@ func (s *ScanSet) Type() NodeType {
10498
// will be a clone. Projections and predicates on the ScanSet are cloned and
10599
// applied to each shard.
106100
//
107-
// When ShardBatchSize > 0, yields ScanSet nodes each with up to ShardBatchSize
108-
// targets (workflow-level batching). Otherwise yields one node per target
109-
// (DataObjScan or PointersScan).
110-
//
111101
// Shards panics if one of the targets is invalid.
112102
func (s *ScanSet) Shards() iter.Seq[Node] {
113103
return func(yield func(Node) bool) {
114-
// Batched sharding: when ShardBatchSize > 0, yield one ScanSet per chunk of targets.
115-
if s.ShardBatchSize > 0 {
116-
for i := 0; i < len(s.Targets); i += s.ShardBatchSize {
117-
end := i + s.ShardBatchSize
118-
if end > len(s.Targets) {
119-
end = len(s.Targets)
120-
}
121-
chunk := s.Targets[i:end]
122-
batchedTargets := make([]*ScanTarget, len(chunk))
123-
for j, t := range chunk {
124-
batchedTargets[j] = t.Clone()
125-
}
126-
shardSet := &ScanSet{
127-
NodeID: ulid.Make(),
128-
Targets: batchedTargets,
129-
Projections: s.Projections,
130-
Predicates: s.Predicates,
131-
ShardBatchSize: 0, // injected ScanSet does not re-shard
132-
}
133-
if !yield(shardSet) {
134-
return
135-
}
136-
}
137-
return
138-
}
139-
140-
// One shard per target (current behavior).
141104
for _, target := range s.Targets {
142105
switch target.Type {
143106
case ScanTypeDataObject:

pkg/engine/internal/planner/physical/scanset_test.go

Lines changed: 0 additions & 41 deletions
This file was deleted.

pkg/engine/internal/workflow/metastore_workflow_planner_test.go

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package workflow
22

33
import (
44
"context"
5-
"fmt"
65
"testing"
76
"time"
87

@@ -29,7 +28,7 @@ func TestPlanWorkflow_MetastorePlan_UsesMergeRootAndPointersPartitions(t *testin
2928
start := time.Unix(10, 0)
3029
end := start.Add(time.Hour)
3130

32-
p := physical.NewMetastorePlanner(ms, 0)
31+
p := physical.NewMetastorePlanner(ms)
3332
plan, err := p.Plan(context.Background(), nil, nil, start, end)
3433
require.NoError(t, err)
3534

@@ -52,7 +51,7 @@ func TestPlanWorkflow_MetastorePlan_UsesMergeRootAndPointersPartitions(t *testin
5251
for _, child := range children {
5352
childRoot, err := child.Fragment.Root()
5453
require.NoError(t, err)
55-
require.IsType(t, &physical.PointersScan{}, childRoot, "partition fragment root should be PointersScan when not batching")
54+
require.IsType(t, &physical.PointersScan{}, childRoot)
5655
gotLocations[childRoot.(*physical.PointersScan).Location] = struct{}{}
5756
}
5857

@@ -61,40 +60,3 @@ func TestPlanWorkflow_MetastorePlan_UsesMergeRootAndPointersPartitions(t *testin
6160
require.True(t, ok, "missing partition for %q", indexPath)
6261
}
6362
}
64-
65-
func TestPlanWorkflow_MetastorePlan_BatchesPointersScans(t *testing.T) {
66-
paths := make([]string, 250)
67-
for i := range paths {
68-
paths[i] = fmt.Sprintf("index/%d", i)
69-
}
70-
ms := fakeMetastoreIndexes{indexPaths: paths}
71-
start := time.Unix(10, 0)
72-
end := start.Add(time.Hour)
73-
74-
p := physical.NewMetastorePlanner(ms, 100)
75-
plan, err := p.Plan(context.Background(), nil, nil, start, end)
76-
require.NoError(t, err)
77-
78-
graph, err := planWorkflow("tenant", plan)
79-
require.NoError(t, err)
80-
81-
rootTask, err := graph.Root()
82-
require.NoError(t, err)
83-
84-
rootNode, err := rootTask.Fragment.Root()
85-
require.NoError(t, err)
86-
require.IsType(t, &physical.Merge{}, rootNode)
87-
88-
children := graph.Children(rootTask)
89-
require.Len(t, children, 3, "250 paths with batch 100 should produce 3 tasks (100, 100, 50)")
90-
91-
expectedSizes := []int{100, 100, 50}
92-
for i, child := range children {
93-
childRoot, err := child.Fragment.Root()
94-
require.NoError(t, err)
95-
scanSet, ok := childRoot.(*physical.ScanSet)
96-
require.True(t, ok, "child %d: partition fragment root should be ScanSet when batching, got %T", i, childRoot)
97-
require.Len(t, scanSet.Targets, expectedSizes[i], "child %d: expected %d PointersScan targets", i, expectedSizes[i])
98-
require.Equal(t, 0, scanSet.ShardBatchSize, "batched shard should have ShardBatchSize 0")
99-
}
100-
}

pkg/logcli/client/file.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,6 @@ func (l *limiter) DebugEngineStreams(_ string) bool {
266266
return false // This setting for the v2 execution engine is unused in LogCLI
267267
}
268268

269-
func (l *limiter) MetastorePointersScansPerTask(_ string) int {
270-
return 0 // This setting for the v2 execution engine is unused in LogCLI
271-
}
272-
273269
func (l *limiter) RecordBatchSize(_ string) int {
274270
return 0 // This setting for the v2 execution engine is unused in LogCLI
275271
}

pkg/logql/limits.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ type Limits interface {
2727
MaxScanTaskParallelism(string) int
2828
DebugEngineTasks(string) bool
2929
DebugEngineStreams(string) bool
30-
MetastorePointersScansPerTask(string) int
3130
RecordBatchSize(string) int
3231
}
3332

@@ -40,11 +39,10 @@ type fakeLimits struct {
4039
multiVariantQueryEnable bool
4140

4241
// v2 engine limits
43-
maxScanTaskParallelism int
44-
debugEngineTasks bool
45-
debugEngineStreams bool
46-
metastorePointersScansPerTask int
47-
recordBatchSize int
42+
maxScanTaskParallelism int
43+
debugEngineTasks bool
44+
debugEngineStreams bool
45+
recordBatchSize int
4846
}
4947

5048
func (f fakeLimits) MaxQuerySeries(_ context.Context, _ string) int {
@@ -83,10 +81,6 @@ func (f fakeLimits) DebugEngineStreams(_ string) bool {
8381
return f.debugEngineStreams
8482
}
8583

86-
func (f fakeLimits) MetastorePointersScansPerTask(_ string) int {
87-
return f.metastorePointersScansPerTask
88-
}
89-
9084
func (f fakeLimits) RecordBatchSize(_ string) int {
9185
return f.recordBatchSize
9286
}

pkg/querier/queryrange/roundtrip_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1549,10 +1549,6 @@ func (f fakeLimits) DebugEngineStreams(_ string) bool {
15491549
return false
15501550
}
15511551

1552-
func (f fakeLimits) MetastorePointersScansPerTask(_ string) int {
1553-
return 0
1554-
}
1555-
15561552
func (f fakeLimits) RecordBatchSize(_ string) int {
15571553
return 0
15581554
}

0 commit comments

Comments
 (0)