-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52798] [SQL] Add function approx_top_k_combine #51505
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
override def serialize(buffer: CombineInternal[Any]): Array[Byte] = { | ||
val sketchBytes = buffer.getSketch.toByteArray( | ||
ApproxTopK.genSketchSerDe(buffer.getItemDataType)) | ||
val maxItemsTrackedByte = buffer.getMaxItemsTracked.toByte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is casting an Int
as a Byte
. Could this cause any overflow?
InternalRow.apply(sketchBytes, null, maxItemsTracked, typeCode) | ||
} | ||
|
||
override def serialize(buffer: CombineInternal[Any]): Array[Byte] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serialization and deserialization here are a bit concerning. The current implementation has brittle byte-level manipulations with no robust error checking or flexibility. Hardcoded assumptions about byte sizes and type conversions create significant risks for future compatibility and data integrity. I think it may be worth looking into using a ByteBuffer and adding versioning to the sketch buffer, or any other approaches!
@@ -177,6 +177,8 @@ object ApproxTopK { | |||
val DEFAULT_K: Int = 5 | |||
val DEFAULT_MAX_ITEMS_TRACKED: Int = 10000 | |||
private val MAX_ITEMS_TRACKED_LIMIT: Int = 1000000 | |||
val VOID_MAX_ITEMS_TRACKED = -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we document what exactly this VOID_MAX_ITEMS_TRACKED and SKETCH_SIZE_PLACEHOLDER mean?
What changes were proposed in this pull request?
This PR adds a SQL function:
approx_top_k_accumulate
, an aggregation function that merges multiple sketches into a single sketch.Syntax
Arguments
expr
: An expression of sketch structsmaxItemsTracked
: An optional INTEGER literal. If maxItemsTracked is specified, use this value for the newly generated combined sketch. If maxItemsTracked is not specified, all input sketches must have the same maxItemsTracked, and the output sketch would use the same value as well.Returns
The return of this function is a STRUCT with four fields: sketch, itemDataType, maxItemsTracked and typeCode. The return is exactly the same as for approx_top_k_accumulate.
Why are the changes needed?
They are useful sibling functions for approx_top_k queries.
Does this PR introduce any user-facing change?
Yes, this PR introduces a new user-facing SQL function. See user examples as below.
How was this patch tested?
Unit tests for end-to-end SQL queries and invalid input for expressions.
Was this patch authored or co-authored using generative AI tooling?