Skip to content

Commit 6a24a6a

Browse files
authored
Manual backport of #131658 (#131955)
1 parent c5d3722 commit 6a24a6a

File tree

15 files changed

+687
-298
lines changed

15 files changed

+687
-298
lines changed

docs/changelog/131658.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 131658
2+
summary: Fix `aggregate_metric_double` sorting and `mv_expand` issues
3+
area: ES|QL
4+
type: bug
5+
issues: []
Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.data;
9+
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.unit.ByteSizeValue;
13+
import org.elasticsearch.core.ReleasableIterator;
14+
import org.elasticsearch.core.Releasables;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.stream.Stream;
19+
20+
public final class AggregateMetricDoubleArrayBlock extends AbstractNonThreadSafeRefCounted implements AggregateMetricDoubleBlock {
21+
private final DoubleBlock minBlock;
22+
private final DoubleBlock maxBlock;
23+
private final DoubleBlock sumBlock;
24+
private final IntBlock countBlock;
25+
private final int positionCount;
26+
27+
public AggregateMetricDoubleArrayBlock(DoubleBlock minBlock, DoubleBlock maxBlock, DoubleBlock sumBlock, IntBlock countBlock) {
28+
this.minBlock = minBlock;
29+
this.maxBlock = maxBlock;
30+
this.sumBlock = sumBlock;
31+
this.countBlock = countBlock;
32+
this.positionCount = minBlock.getPositionCount();
33+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
34+
if (b.getPositionCount() != positionCount) {
35+
assert false : "expected positionCount=" + positionCount + " but was " + b;
36+
throw new IllegalArgumentException("expected positionCount=" + positionCount + " but was " + b);
37+
}
38+
if (b.isReleased()) {
39+
assert false : "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released";
40+
throw new IllegalArgumentException(
41+
"can't build aggregate_metric_double block out of released blocks but [" + b + "] was released"
42+
);
43+
}
44+
}
45+
}
46+
47+
public static AggregateMetricDoubleArrayBlock fromCompositeBlock(CompositeBlock block) {
48+
assert block.getBlockCount() == 4
49+
: "Can't make AggregateMetricDoubleBlock out of CompositeBlock with " + block.getBlockCount() + " blocks";
50+
DoubleBlock min = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex());
51+
DoubleBlock max = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex());
52+
DoubleBlock sum = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex());
53+
IntBlock count = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
54+
return new AggregateMetricDoubleArrayBlock(min, max, sum, count);
55+
}
56+
57+
public CompositeBlock asCompositeBlock() {
58+
final Block[] blocks = new Block[4];
59+
blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = minBlock;
60+
blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = maxBlock;
61+
blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = sumBlock;
62+
blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = countBlock;
63+
return new CompositeBlock(blocks);
64+
}
65+
66+
@Override
67+
protected void closeInternal() {
68+
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
69+
}
70+
71+
@Override
72+
public Vector asVector() {
73+
return null;
74+
}
75+
76+
@Override
77+
public int getTotalValueCount() {
78+
int totalValueCount = 0;
79+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
80+
totalValueCount += b.getTotalValueCount();
81+
}
82+
return totalValueCount;
83+
}
84+
85+
@Override
86+
public int getPositionCount() {
87+
return positionCount;
88+
}
89+
90+
@Override
91+
public int getFirstValueIndex(int position) {
92+
return minBlock.getFirstValueIndex(position);
93+
}
94+
95+
@Override
96+
public int getValueCount(int position) {
97+
int max = 0;
98+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
99+
max = Math.max(max, b.getValueCount(position));
100+
}
101+
return max;
102+
}
103+
104+
@Override
105+
public ElementType elementType() {
106+
return ElementType.AGGREGATE_METRIC_DOUBLE;
107+
}
108+
109+
@Override
110+
public BlockFactory blockFactory() {
111+
return minBlock.blockFactory();
112+
}
113+
114+
@Override
115+
public void allowPassingToDifferentDriver() {
116+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
117+
block.allowPassingToDifferentDriver();
118+
}
119+
}
120+
121+
@Override
122+
public boolean isNull(int position) {
123+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
124+
if (block.isNull(position) == false) {
125+
return false;
126+
}
127+
}
128+
return true;
129+
}
130+
131+
@Override
132+
public boolean mayHaveNulls() {
133+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveNulls);
134+
}
135+
136+
@Override
137+
public boolean areAllValuesNull() {
138+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).allMatch(Block::areAllValuesNull);
139+
}
140+
141+
@Override
142+
public boolean mayHaveMultivaluedFields() {
143+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields);
144+
}
145+
146+
@Override
147+
public boolean doesHaveMultivaluedFields() {
148+
if (Stream.of(minBlock, maxBlock, sumBlock, countBlock).noneMatch(Block::mayHaveMultivaluedFields)) {
149+
return false;
150+
}
151+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::doesHaveMultivaluedFields);
152+
}
153+
154+
@Override
155+
public AggregateMetricDoubleBlock filter(int... positions) {
156+
AggregateMetricDoubleArrayBlock result = null;
157+
DoubleBlock newMinBlock = null;
158+
DoubleBlock newMaxBlock = null;
159+
DoubleBlock newSumBlock = null;
160+
IntBlock newCountBlock = null;
161+
try {
162+
newMinBlock = minBlock.filter(positions);
163+
newMaxBlock = maxBlock.filter(positions);
164+
newSumBlock = sumBlock.filter(positions);
165+
newCountBlock = countBlock.filter(positions);
166+
result = new AggregateMetricDoubleArrayBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
167+
return result;
168+
} finally {
169+
if (result == null) {
170+
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
171+
}
172+
}
173+
}
174+
175+
@Override
176+
public AggregateMetricDoubleBlock keepMask(BooleanVector mask) {
177+
AggregateMetricDoubleArrayBlock result = null;
178+
DoubleBlock newMinBlock = null;
179+
DoubleBlock newMaxBlock = null;
180+
DoubleBlock newSumBlock = null;
181+
IntBlock newCountBlock = null;
182+
try {
183+
newMinBlock = minBlock.keepMask(mask);
184+
newMaxBlock = maxBlock.keepMask(mask);
185+
newSumBlock = sumBlock.keepMask(mask);
186+
newCountBlock = countBlock.keepMask(mask);
187+
result = new AggregateMetricDoubleArrayBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
188+
return result;
189+
} finally {
190+
if (result == null) {
191+
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
192+
}
193+
}
194+
}
195+
196+
@Override
197+
public ReleasableIterator<? extends AggregateMetricDoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
198+
// TODO: support
199+
throw new UnsupportedOperationException("can't lookup values from AggregateMetricDoubleBlock");
200+
}
201+
202+
@Override
203+
public MvOrdering mvOrdering() {
204+
// TODO: determine based on sub-blocks
205+
return MvOrdering.UNORDERED;
206+
}
207+
208+
@Override
209+
public AggregateMetricDoubleBlock expand() {
210+
this.incRef();
211+
return this;
212+
}
213+
214+
@Override
215+
public void writeTo(StreamOutput out) throws IOException {
216+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
217+
block.writeTo(out);
218+
}
219+
}
220+
221+
public static Block readFrom(StreamInput in) throws IOException {
222+
boolean success = false;
223+
DoubleBlock minBlock = null;
224+
DoubleBlock maxBlock = null;
225+
DoubleBlock sumBlock = null;
226+
IntBlock countBlock = null;
227+
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
228+
try {
229+
minBlock = DoubleBlock.readFrom(blockStreamInput);
230+
maxBlock = DoubleBlock.readFrom(blockStreamInput);
231+
sumBlock = DoubleBlock.readFrom(blockStreamInput);
232+
countBlock = IntBlock.readFrom(blockStreamInput);
233+
AggregateMetricDoubleArrayBlock result = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, sumBlock, countBlock);
234+
success = true;
235+
return result;
236+
} finally {
237+
if (success == false) {
238+
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
239+
}
240+
}
241+
}
242+
243+
@Override
244+
public long ramBytesUsed() {
245+
return minBlock.ramBytesUsed() + maxBlock.ramBytesUsed() + sumBlock.ramBytesUsed() + countBlock.ramBytesUsed();
246+
}
247+
248+
@Override
249+
public boolean equals(Object obj) {
250+
if (obj instanceof AggregateMetricDoubleBlock that) {
251+
return AggregateMetricDoubleBlock.equals(this, that);
252+
}
253+
return false;
254+
}
255+
256+
@Override
257+
public int hashCode() {
258+
return AggregateMetricDoubleBlock.hash(this);
259+
}
260+
261+
public DoubleBlock minBlock() {
262+
return minBlock;
263+
}
264+
265+
public DoubleBlock maxBlock() {
266+
return maxBlock;
267+
}
268+
269+
public DoubleBlock sumBlock() {
270+
return sumBlock;
271+
}
272+
273+
public IntBlock countBlock() {
274+
return countBlock;
275+
}
276+
277+
public Block getMetricBlock(int index) {
278+
if (index == AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()) {
279+
return minBlock;
280+
}
281+
if (index == AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()) {
282+
return maxBlock;
283+
}
284+
if (index == AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()) {
285+
return sumBlock;
286+
}
287+
if (index == AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()) {
288+
return countBlock;
289+
}
290+
throw new UnsupportedOperationException("Received an index (" + index + ") outside of range for AggregateMetricDoubleBlock.");
291+
}
292+
}

0 commit comments

Comments
 (0)