-
Notifications
You must be signed in to change notification settings - Fork 976
Implement Improved arrow-avro Reader Zero-Byte Record Handling #7966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this PR just simplifies code without actually changing any behavior?
(and we could probably simplify even more)?
arrow-avro/src/reader/mod.rs
Outdated
while total_consumed < data.len() && self.decoded_rows < self.batch_size { | ||
let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?; | ||
if consumed == 0 { | ||
while self.decoded_rows < self.batch_size { | ||
let buffer = &data[total_consumed..]; | ||
if buffer.is_empty() { | ||
// No more data to process. | ||
break; | ||
} | ||
let consumed = self.record_decoder.decode(buffer, 1)?; | ||
// A successful call to record_decoder.decode means one row was decoded. | ||
// If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record. | ||
// We increment `decoded_rows` to mark progress and avoid an infinite loop. | ||
// We add `consumed` (which can be 0) to `total_consumed`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT, the old and new versions of this code are logically equivalent? And the original consumed == 0
check was actually unnecessary because the row count increased after every record_decoder.decode
call, which would cause the loop to eventually terminate? Seems like we just want a simplified version of the original code?
let mut total_consumed = 0usize;
while total_consumed < data.len() && self.decoded_rows < self.batch_size {
let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?;
// A successful call to record_decoder.decode means one row was decoded.
// If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record.
// We increment `decoded_rows` to mark progress and avoid an infinite loop.
// We add `consumed` (which can be 0) to `total_consumed`.
total_consumed += consumed;
self.decoded_rows += 1;
}
or maybe even (with mut data: &[u8]
):
while !data.is_empty() && self.decoded_rows < self.batch_size {
// A successful call to record_decoder.decode means one row was decoded.
// If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record.
// We advance the data slice by `consumed` (which can be 0)
// We increment `decoded_rows` to mark progress and avoid an infinite loop.
let consumed = self.record_decoder.decode(data, 1)?;
data = &data[consumed..];
self.decoded_rows += 1;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich I went ahead and changed the code to:
let mut total_consumed = 0usize;
while total_consumed < data.len() && self.decoded_rows < self.batch_size {
let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?;
// A successful call to record_decoder.decode means one row was decoded.
// If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record.
// We increment `decoded_rows` to mark progress and avoid an infinite loop.
// We add `consumed` (which can be 0) to `total_consumed`.
total_consumed += consumed;
self.decoded_rows += 1;
}
I'm working on a bigger update that covers schema resolution and a schema store. I split this code off of that work, which is the reason for the difference in structure.
My plan is to break that update up into two smaller PRs and there's a decent chance I'll have a more fundamental refinement for the decode
loop in one of them.
if consumed == 0 && self.block_cursor < self.block_data.len() { | ||
self.block_cursor = self.block_data.len(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this check was never needed? Because eventually the outer loop's self.decoder.batch_is_full()
check would fail and break the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich Yeah it was an over-aggressive guardrail and since this crate isn't public I probably should have gone with your original recommendation. I was just unsure of what work needed to be done to support zero-byte encodings and wanted time to do the research and an integration test to confirm it.
This PR ended up being much smaller than I anticipated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jecsand838 and @scovich 🚀
Which issue does this PR close?
Part of Add Avro Support #4886
Follow up to Implement arrow-avro Reader and ReaderBuilder #7834
Rationale for this change
The initial Avro reader implementation contained an under-developed and temporary safeguard to prevent infinite loops when processing records that consumed zero bytes from the input buffer.
When the
Decoder
reported that zero bytes were consumed, theReader
would advance it's cursor to the end of the current data block. While this successfully prevented an infinite loop, it had the critical side effect of silently discarding any remaining data in that block, leading to potential data loss.This change enhances the decoding logic to handle these zero-byte values correctly, ensuring that the
Reader
makes proper progress without dropping data and without risking an infinite loop.What changes are included in this PR?
Decoder
has been updated to accurately track and report the number of bytes consumed for all values, including valid zero-length records likenull
or emptybytes
. This ensures the decoder always makes forward progress.Reader
that previously advanced to the end of a block on a zero-byte read has been removed. The reader now relies on the decoder to report accurate consumption and advances its cursor incrementally and safely.zero_byte.avro
file created via this python script: https://gist.github.com/jecsand838/e57647d0d12853f3cf07c350a6a40395Are these changes tested?
Yes, a new
test_read_zero_byte_avro_file
test was added that reads the newzero_byte.avro
file and confirms the update.Are there any user-facing changes?
N/A
Follow-Up PRs
test_read_zero_byte_avro_file
once Added zero_byte.avro file arrow-testing#109 is merged in.