feat: Batch records before writing to sinks#20821
Conversation
|
💻 Deploy preview available (feat: Batch records before writing to sinks): |
762962f to
95025fa
Compare
95025fa to
6bd1ddb
Compare
6bd1ddb to
3506e9a
Compare
pkg/validation/limits.go
Outdated
There was a problem hiding this comment.
I think there's a fair bit of complexity being introduced here by batching by number of records. We already have batch_size as a configuration option in the engine, have you tried using that instead of adding a new limit?
There was a problem hiding this comment.
I should have copied over my comment from the original PR: #20763 (comment)
by batching by number of records
We don't batch by the number of records/rows but, but by their size (Bytes).
My hesitation around using the already existing batch_size (row count) is that (e.g.) 6k rows of metric records will be fairly small, whereas 6k rows of log records can be a lot and cause OOMs.
In my opinion bytes size-based batching fits all use cases plus ideally the writing to sinks logic should be agnostic of what's actually written.
Happy to change my mind tho.
There was a problem hiding this comment.
My hesitation around using the already existing batch_size (row count) is that (e.g.) 6k rows of metric records will be fairly small, whereas 6k rows of log records can be a lot and cause OOMs.
If 6k rows of log records can cause OOMs, our batch size shouldn't be 6k :) batch_size determines the largest record that a pipeline can produce, so it needs to be able to fit in memory. (Batching records by row count also helps performance as you'll have more contiguous regions of memory)
Based on that intent, do you think it makes sense to consistently use batch_size throughout?
There was a problem hiding this comment.
our batch size shouldn't be 6k
Fair, my point is that 6k may be fine (and beneficial) for some record types and too much for other. So reducing it reduces the benefit in the smaller record types.
Still I'm all up for simplicity here, and this is something we can revisit in the future if we need to. I changed it so we use the already existing BatchSize.
pkg/engine/internal/worker/thread.go
Outdated
There was a problem hiding this comment.
A single arrowagg instance can be used in drainPipeline instead of batch []arrow.RecordBatch, I think. arroagg has a Reset method, so it can be reset after flushing.
There was a problem hiding this comment.
Good call. Changed.
What this PR does / why we need it:
We noticed that in many queries for which the physical planning phase was taking long, most of the time was spent on waiting to write to sinks.
Schedulers create one task per pointerscan. Before this PR, each task would write N times to sinks.
We are adding
record_batch_size: to control how many responses are merged together into a single response before writing to a sink, thus reducing the number of round trips. Note that this also applies to other tasks beside the metastore scan ones.