Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.sql.tree.FrameBound;
import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.List;

import static com.facebook.presto.spi.StandardErrorCode.INVALID_WINDOW_FRAME;
Expand All @@ -28,6 +29,7 @@
import static com.facebook.presto.sql.tree.FrameBound.Type.UNBOUNDED_FOLLOWING;
import static com.facebook.presto.sql.tree.FrameBound.Type.UNBOUNDED_PRECEDING;
import static com.facebook.presto.sql.tree.WindowFrame.Type.RANGE;
import static com.facebook.presto.sql.tree.WindowFrame.Type.ROWS;
import static com.facebook.presto.util.Failures.checkCondition;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.toIntExact;
Expand All @@ -46,6 +48,8 @@ public final class WindowPartition
private int peerGroupEnd;

private int currentPosition;
private List<Integer> peerGroupStartIndices = new ArrayList<>();
private List<Integer> peerGroupEndIndices = new ArrayList<>();

public WindowPartition(PagesIndex pagesIndex,
int partitionStart,
Expand Down Expand Up @@ -142,6 +146,8 @@ private void updatePeerGroup()
while ((peerGroupEnd < partitionEnd) && pagesIndex.positionEqualsPosition(peerGroupHashStrategy, peerGroupStart, peerGroupEnd)) {
peerGroupEnd++;
}
peerGroupStartIndices.add(peerGroupStart);
peerGroupEndIndices.add(peerGroupEnd);
}

private Range getFrameRange(FrameInfo frameInfo)
Expand All @@ -150,7 +156,11 @@ private Range getFrameRange(FrameInfo frameInfo)
int endPosition = partitionEnd - partitionStart - 1;

// handle empty frame
if (emptyFrame(frameInfo, rowPosition, endPosition)) {
if (frameInfo.getType() == ROWS && emptyFrame(frameInfo, rowPosition, endPosition - rowPosition)) {
return new Range(-1, -1);
}

if (frameInfo.getType() == RANGE && emptyFrame(frameInfo, peerGroupStart, endPosition - (peerGroupEnd - 1))) {
return new Range(-1, -1);
}

Expand All @@ -161,6 +171,12 @@ private Range getFrameRange(FrameInfo frameInfo)
if (frameInfo.getStartType() == UNBOUNDED_PRECEDING) {
frameStart = 0;
}
else if (frameInfo.getType() == RANGE && frameInfo.getStartType() == PRECEDING) {
frameStart = precedingStartRange(getStartValue(frameInfo));
}
else if (frameInfo.getType() == RANGE && frameInfo.getStartType() == FOLLOWING) {
frameStart = followingRange(rowPosition, endPosition, getStartValue(frameInfo));
}
else if (frameInfo.getStartType() == PRECEDING) {
frameStart = preceding(rowPosition, getStartValue(frameInfo));
}
Expand All @@ -178,6 +194,12 @@ else if (frameInfo.getType() == RANGE) {
if (frameInfo.getEndType() == UNBOUNDED_FOLLOWING) {
frameEnd = endPosition;
}
else if (frameInfo.getType() == RANGE && frameInfo.getEndType() == PRECEDING) {
frameEnd = precedingEndRange(getEndValue(frameInfo));
}
else if (frameInfo.getType() == RANGE && frameInfo.getEndType() == FOLLOWING) {
frameEnd = followingRange(peerGroupEnd, endPosition + 1, getEndValue(frameInfo)) - 1;
}
else if (frameInfo.getEndType() == PRECEDING) {
frameEnd = preceding(rowPosition, getEndValue(frameInfo));
}
Expand All @@ -194,19 +216,62 @@ else if (frameInfo.getType() == RANGE) {
return new Range(frameStart, frameEnd);
}

private boolean emptyFrame(FrameInfo frameInfo, int rowPosition, int endPosition)
private int precedingEndRange(long endValue)
{
int peerGroupEndIndex = peerGroupEndIndices.indexOf(peerGroupEnd);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use Map<Integer, Integer> (peer group -> position) instead of ArrayList<Integer>, to avoid slowness when there are many tiny groups in a large partition.
Imagine billions of rows with ~1 row / peer group.
Then indexOf will be O(billions). But if you use a Map, it's gonna be O(log(billions)).

if (peerGroupEndIndex < endValue) {
return peerGroupEnd - 1;
}
return peerGroupEndIndices.get(toIntExact(peerGroupEndIndex - endValue)) - 1;
}

private int precedingStartRange(long startValue)
{
int peerGroupStartIndex = peerGroupStartIndices.indexOf(peerGroupStart);
if (peerGroupStartIndex < startValue) {
return 0;
}
return peerGroupStartIndices.get(toIntExact(peerGroupStartIndex - startValue));
}

private int followingRange(int followingPeerGroupStart, int endPosition, long value)
{
if (value == 0) {
return followingPeerGroupStart;
}
// TODO: Optimize this to *not* look for peers often, probably have pageIndex keep the peer groups
int followingPeerGroupEnd = 0;
int currentValue = 0;
while (currentValue < value) {
boolean peerFound = false;
followingPeerGroupEnd = followingPeerGroupStart + 1;
while ((followingPeerGroupEnd < partitionEnd) && pagesIndex.positionEqualsPosition(peerGroupHashStrategy, followingPeerGroupStart, followingPeerGroupEnd)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are no peers, just each row is a distinct value within the partition?
You'd never set peerFound = true, IIUC so this loop would never end?
Can you explain this to me?

followingPeerGroupEnd++;
peerFound = true;
}
if (followingPeerGroupEnd >= partitionEnd) {
return endPosition;
}

if (!peerFound) {
currentValue++;
}
followingPeerGroupStart++;
}
return followingPeerGroupEnd;
}

private boolean emptyFrame(FrameInfo frameInfo, int rowPosition, int position)
{
FrameBound.Type startType = frameInfo.getStartType();
FrameBound.Type endType = frameInfo.getEndType();

int positions = endPosition - rowPosition;

if ((startType == UNBOUNDED_PRECEDING) && (endType == PRECEDING)) {
return getEndValue(frameInfo) > rowPosition;
}

if ((startType == FOLLOWING) && (endType == UNBOUNDED_FOLLOWING)) {
return getStartValue(frameInfo) > positions;
return getStartValue(frameInfo) > position;
}

if (startType != endType) {
Expand All @@ -225,7 +290,7 @@ private boolean emptyFrame(FrameInfo frameInfo, int rowPosition, int endPosition
return (start < end) || ((start > rowPosition) && (end > rowPosition));
}

return (start > end) || ((start > positions) && (end > positions));
return (start > end) || ((start > position) && (end > position));
}

private static int preceding(int rowPosition, long value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@
import static com.facebook.presto.sql.tree.FrameBound.Type.PRECEDING;
import static com.facebook.presto.sql.tree.FrameBound.Type.UNBOUNDED_FOLLOWING;
import static com.facebook.presto.sql.tree.FrameBound.Type.UNBOUNDED_PRECEDING;
import static com.facebook.presto.sql.tree.WindowFrame.Type.RANGE;
import static com.facebook.presto.type.UnknownType.UNKNOWN;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -1260,12 +1259,6 @@ private void analyzeWindowFrame(WindowFrame frame)
if ((startType == FOLLOWING) && (endType == CURRENT_ROW)) {
throw new SemanticException(INVALID_WINDOW_FRAME, frame, "Window frame starting from FOLLOWING cannot end with CURRENT ROW");
}
if ((frame.getType() == RANGE) && ((startType == PRECEDING) || (endType == PRECEDING))) {
throw new SemanticException(INVALID_WINDOW_FRAME, frame, "Window frame RANGE PRECEDING is only supported with UNBOUNDED");
}
if ((frame.getType() == RANGE) && ((startType == FOLLOWING) || (endType == FOLLOWING))) {
throw new SemanticException(INVALID_WINDOW_FRAME, frame, "Window frame RANGE FOLLOWING is only supported with UNBOUNDED");
}
}

private void analyzeHaving(QuerySpecification node, Scope scope)
Expand Down
Loading