Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131658.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131658
summary: Fix `aggregate_metric_double` sorting and `mv_expand` issues
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.data;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.util.List;
import java.util.stream.Stream;

public final class AggregateMetricDoubleArrayBlock extends AbstractNonThreadSafeRefCounted implements AggregateMetricDoubleBlock {
private final DoubleBlock minBlock;
private final DoubleBlock maxBlock;
private final DoubleBlock sumBlock;
private final IntBlock countBlock;
private final int positionCount;

public AggregateMetricDoubleArrayBlock(DoubleBlock minBlock, DoubleBlock maxBlock, DoubleBlock sumBlock, IntBlock countBlock) {
this.minBlock = minBlock;
this.maxBlock = maxBlock;
this.sumBlock = sumBlock;
this.countBlock = countBlock;
this.positionCount = minBlock.getPositionCount();
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
if (b.getPositionCount() != positionCount) {
assert false : "expected positionCount=" + positionCount + " but was " + b;
throw new IllegalArgumentException("expected positionCount=" + positionCount + " but was " + b);
}
if (b.isReleased()) {
assert false : "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released";
throw new IllegalArgumentException(
"can't build aggregate_metric_double block out of released blocks but [" + b + "] was released"
);
}
}
}

public static AggregateMetricDoubleArrayBlock fromCompositeBlock(CompositeBlock block) {
assert block.getBlockCount() == 4
: "Can't make AggregateMetricDoubleBlock out of CompositeBlock with " + block.getBlockCount() + " blocks";
DoubleBlock min = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex());
DoubleBlock max = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex());
DoubleBlock sum = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex());
IntBlock count = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
return new AggregateMetricDoubleArrayBlock(min, max, sum, count);
}

public CompositeBlock asCompositeBlock() {
final Block[] blocks = new Block[4];
blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = minBlock;
blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = maxBlock;
blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = sumBlock;
blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = countBlock;
return new CompositeBlock(blocks);
}

@Override
protected void closeInternal() {
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
}

@Override
public Vector asVector() {
return null;
}

@Override
public int getTotalValueCount() {
int totalValueCount = 0;
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
totalValueCount += b.getTotalValueCount();
}
return totalValueCount;
}

@Override
public int getPositionCount() {
return positionCount;
}

@Override
public int getFirstValueIndex(int position) {
return minBlock.getFirstValueIndex(position);
}

@Override
public int getValueCount(int position) {
int max = 0;
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
max = Math.max(max, b.getValueCount(position));
}
return max;
}

@Override
public ElementType elementType() {
return ElementType.AGGREGATE_METRIC_DOUBLE;
}

@Override
public BlockFactory blockFactory() {
return minBlock.blockFactory();
}

@Override
public void allowPassingToDifferentDriver() {
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
block.allowPassingToDifferentDriver();
}
}

@Override
public boolean isNull(int position) {
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
if (block.isNull(position) == false) {
return false;
}
}
return true;
}

@Override
public boolean mayHaveNulls() {
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveNulls);
}

@Override
public boolean areAllValuesNull() {
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).allMatch(Block::areAllValuesNull);
}

@Override
public boolean mayHaveMultivaluedFields() {
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields);
}

@Override
public boolean doesHaveMultivaluedFields() {
if (Stream.of(minBlock, maxBlock, sumBlock, countBlock).noneMatch(Block::mayHaveMultivaluedFields)) {
return false;
}
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::doesHaveMultivaluedFields);
}

@Override
public AggregateMetricDoubleBlock filter(int... positions) {
AggregateMetricDoubleArrayBlock result = null;
DoubleBlock newMinBlock = null;
DoubleBlock newMaxBlock = null;
DoubleBlock newSumBlock = null;
IntBlock newCountBlock = null;
try {
newMinBlock = minBlock.filter(positions);
newMaxBlock = maxBlock.filter(positions);
newSumBlock = sumBlock.filter(positions);
newCountBlock = countBlock.filter(positions);
result = new AggregateMetricDoubleArrayBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
return result;
} finally {
if (result == null) {
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
}
}
}

@Override
public AggregateMetricDoubleBlock keepMask(BooleanVector mask) {
AggregateMetricDoubleArrayBlock result = null;
DoubleBlock newMinBlock = null;
DoubleBlock newMaxBlock = null;
DoubleBlock newSumBlock = null;
IntBlock newCountBlock = null;
try {
newMinBlock = minBlock.keepMask(mask);
newMaxBlock = maxBlock.keepMask(mask);
newSumBlock = sumBlock.keepMask(mask);
newCountBlock = countBlock.keepMask(mask);
result = new AggregateMetricDoubleArrayBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
return result;
} finally {
if (result == null) {
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
}
}
}

@Override
public ReleasableIterator<? extends AggregateMetricDoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO: support
throw new UnsupportedOperationException("can't lookup values from AggregateMetricDoubleBlock");
}

@Override
public MvOrdering mvOrdering() {
// TODO: determine based on sub-blocks
return MvOrdering.UNORDERED;
}

@Override
public AggregateMetricDoubleBlock expand() {
this.incRef();
return this;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
block.writeTo(out);
}
}

public static Block readFrom(StreamInput in) throws IOException {
boolean success = false;
DoubleBlock minBlock = null;
DoubleBlock maxBlock = null;
DoubleBlock sumBlock = null;
IntBlock countBlock = null;
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
try {
minBlock = DoubleBlock.readFrom(blockStreamInput);
maxBlock = DoubleBlock.readFrom(blockStreamInput);
sumBlock = DoubleBlock.readFrom(blockStreamInput);
countBlock = IntBlock.readFrom(blockStreamInput);
AggregateMetricDoubleArrayBlock result = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, sumBlock, countBlock);
success = true;
return result;
} finally {
if (success == false) {
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
}
}
}

@Override
public long ramBytesUsed() {
return minBlock.ramBytesUsed() + maxBlock.ramBytesUsed() + sumBlock.ramBytesUsed() + countBlock.ramBytesUsed();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof AggregateMetricDoubleBlock that) {
return AggregateMetricDoubleBlock.equals(this, that);
}
return false;
}

@Override
public int hashCode() {
return AggregateMetricDoubleBlock.hash(this);
}

public DoubleBlock minBlock() {
return minBlock;
}

public DoubleBlock maxBlock() {
return maxBlock;
}

public DoubleBlock sumBlock() {
return sumBlock;
}

public IntBlock countBlock() {
return countBlock;
}

public Block getMetricBlock(int index) {
if (index == AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()) {
return minBlock;
}
if (index == AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()) {
return maxBlock;
}
if (index == AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()) {
return sumBlock;
}
if (index == AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()) {
return countBlock;
}
throw new UnsupportedOperationException("Received an index (" + index + ") outside of range for AggregateMetricDoubleBlock.");
}
}
Loading
Loading