[common][vpj] Tag EOP with per-partition record counts for batch push verification#2663
[common][vpj] Tag EOP with per-partition record counts for batch push verification#2663sushantmane wants to merge 8 commits intolinkedin:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds per-partition record-count tracking in VPJ and propagates those counts on the End-of-Push (EOP) control message via a new PubSub header (prc), enabling downstream batch-push verification without changing the EOP Avro schema.
Changes:
- Introduces
VENICE_PARTITION_RECORD_COUNT_HEADER(prc) and wiresVeniceWriter.broadcastEndOfPush(...)to optionally attach per-partition counts as PubSub headers. - Implements per-partition record counting in both MR (counters) and Spark (custom
MapLongAccumulator) viaDataWriterTaskTrackeradditions. - Updates
VenicePushJobto collect per-partition counts from the task tracker and pass them tobroadcastEndOfPush, with unit tests for writer, MR counters, and Spark accumulator.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java | Adds the prc header constant for per-partition record counts. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Adds EOP/control-message overloads to emit per-partition record counts in PubSub headers. |
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Tests broadcastEndOfPush with/without per-partition record-count headers. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/DataWriterTaskTracker.java | Extends the tracker interface with per-partition tracking + retrieval methods. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java | Calls the new per-partition tracking hook when sending a record. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelper.java | Adds MR counter group + helpers for per-partition record counts. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/task/ReporterBackedMapReduceDataWriterTaskTracker.java | Tracks per-partition counts via Reporter-backed MR counters. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/task/CounterBackedMapReduceDataWriterTaskTracker.java | Exposes per-partition counts from MR Counters after job completion. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/MapLongAccumulator.java | Adds a Spark accumulator for per-key (partition) long aggregation. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/DataWriterAccumulators.java | Registers the new per-partition accumulator with Spark. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/SparkDataWriterTaskTracker.java | Tracks and returns per-partition record counts via the Spark accumulator. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java | Collects per-partition record counts from the tracker and passes them into EOP broadcast. |
| clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelperTest.java | Adds tests for per-partition counter parsing (and related behaviors). |
| clients/venice-push-job/src/test/java/com/linkedin/venice/spark/datawriter/task/MapLongAccumulatorTest.java | Adds unit tests for accumulator add/merge/reset/copy/immutability. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java
Show resolved
Hide resolved
...-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelper.java
Show resolved
Hide resolved
...h-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelperTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelper.java
Show resolved
Hide resolved
...sh-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java
Outdated
Show resolved
Hide resolved
...h-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelperTest.java
Show resolved
Hide resolved
c83652b to
9b2b905
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...-job/src/main/java/com/linkedin/venice/spark/datawriter/task/SparkDataWriterTaskTracker.java
Show resolved
Hide resolved
...ice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/MapLongAccumulator.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java
Show resolved
Hide resolved
|
The PR's accumulator-based counting is unreliable under Spark speculative execution and task retries due to failures. We shouldn't rely on an inaccurate signal to validate the data on server side. Replacing the accumulator with more deterministic Another thought while you work on the server side PR. We should account for at least once semantics for Kafka as in |
IMO verification is a best-effort mechanism. I do not think that is a good justification for using dataFrame.count(). As for Kafka duplicate delivery issue, we already detect those cases and drop records that are duplicates as part of DIV, which will happen before we increment counters |
a4f4fbb to
eeca5d2
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
I mean, if we are going to fail a push job based on the counts, the counts have to be accurate. This process automation of validation cannot be a best effort, it has to be reliable. Currently, the only spark action in VPJ is the Sounds good on the kafka duplicates. |
verification Track per-partition record counts during batch push and embed them as PubSub message headers on the End-of-Push control message. This enables server-side verification (in a follow-up PR) without requiring changes to the EOP Avro schema. Changes: - PubSubMessageHeaders: new "prc" header constant - VeniceWriter: new broadcastEndOfPush/sendControlMessage overloads with headers - DataWriterTaskTracker: per-partition tracking interface - MR path: counter group in MRJobCounterHelper + tracker implementations - Spark path: new MapLongAccumulator + tracker implementations - VenicePushJob: collect per-partition counts, pass to broadcastEndOfPush
…Map, rename test Code review findings: - VeniceWriter: make 5-param sendControlMessage delegate to 6-param overload, eliminating duplicated getDebugInfo/isEndOfSegment/synchronized logic - MapLongAccumulator: ConcurrentHashMap → HashMap — Spark tasks are single-threaded (each gets a fresh copy()), no concurrent access occurs - MRJobCounterHelperTest: rename misleading test to reflect what it actually tests (counter retrieval round-trip, not Reporter-based increment)
Move accumulator/counter update from the per-record hot path (sendMessageToKafka) to the close() method. The local messageSent field already tracks the exact count — flush it to the tracker once when the partition writer closes. This eliminates per-record Tuple2 allocations (Spark) and counter increments (MR). Changed trackRecordSentToPubSubForPartition(int) to trackRecordSentToPubSubForPartition(int, long) to accept bulk count.
59c3fa1 to
65ec100
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
TestBatchPushEopRecordCount runs a batch push with SEND_CONTROL_MESSAGES_DIRECTLY=true and verifies: - Per-partition record counts are populated (non-empty map) - Sum of per-partition counts equals total records pushed (100) - Each partition has a non-negative count - Push completes successfully (data is readable) Uses a VenicePushJob subclass to capture the per-partition counts via reflection on the private getPerPartitionRecordCounts() method.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public void run() { | ||
| super.run(); | ||
| // Capture the counts after the job completes but before close | ||
| try { | ||
| java.lang.reflect.Method method = VenicePushJob.class.getDeclaredMethod("getPerPartitionRecordCounts"); | ||
| method.setAccessible(true); | ||
| capturedCounts = (Map<Integer, Long>) method.invoke(this); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to capture per-partition record counts", e); | ||
| } |
There was a problem hiding this comment.
This test uses reflection to call the private VenicePushJob#getPerPartitionRecordCounts() method. That’s brittle (method rename/visibility changes will break at runtime) and bypasses compile-time checks. Prefer exposing a test-friendly hook (e.g., @VisibleForTesting protected Map<Integer,Long> getPerPartitionRecordCounts() or a protected accessor on the task tracker) so the subclass can call it directly without reflection.
| public void run() { | |
| super.run(); | |
| // Capture the counts after the job completes but before close | |
| try { | |
| java.lang.reflect.Method method = VenicePushJob.class.getDeclaredMethod("getPerPartitionRecordCounts"); | |
| method.setAccessible(true); | |
| capturedCounts = (Map<Integer, Long>) method.invoke(this); | |
| } catch (Exception e) { | |
| throw new RuntimeException("Failed to capture per-partition record counts", e); | |
| } | |
| protected void broadcastEndOfPush(Map<Integer, Long> perPartitionRecordCounts) { | |
| capturedCounts = | |
| perPartitionRecordCounts == null ? null : new java.util.HashMap<>(perPartitionRecordCounts); | |
| super.broadcastEndOfPush(perPartitionRecordCounts); |
| // Capture all sendMessage calls (SOS + EOP + EOS for each partition) | ||
| ArgumentCaptor<Integer> partitionCaptor = ArgumentCaptor.forClass(Integer.class); | ||
| ArgumentCaptor<KafkaMessageEnvelope> kmeCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); | ||
| ArgumentCaptor<PubSubMessageHeaders> headersCaptor = ArgumentCaptor.forClass(PubSubMessageHeaders.class); | ||
| verify(mockedProducer, atLeast(6)).sendMessage( | ||
| anyString(), | ||
| partitionCaptor.capture(), | ||
| any(), | ||
| kmeCaptor.capture(), | ||
| headersCaptor.capture(), | ||
| any()); |
There was a problem hiding this comment.
The verification threshold atLeast(6) is too low for what this test claims to capture (“SOS + EOP + EOS for each partition”). broadcastEndOfPush() will typically produce 3 control messages per partition (SOS triggered by first message, then EOP, then EOS), so for 3 partitions you can assert times(9) (or otherwise assert the expected minimum per-partition) to make the test catch regressions where EOS/SOS stops being emitted.
| // Capture all sendMessage calls | ||
| ArgumentCaptor<PubSubMessageHeaders> headersCaptor = ArgumentCaptor.forClass(PubSubMessageHeaders.class); | ||
| ArgumentCaptor<KafkaMessageEnvelope> kmeCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); | ||
| verify(mockedProducer, atLeast(4)) |
There was a problem hiding this comment.
Similarly here, verify(..., atLeast(4)) is lower than the expected number of control-message sends for 2 partitions when calling broadcastEndOfPush() (typically SOS + EOP + EOS per partition). Tightening this to the expected count (or at least >= 6) will make the test meaningfully assert the full control-message sequence, not just that “something” was sent.
| verify(mockedProducer, atLeast(4)) | |
| verify(mockedProducer, times(partitionCount * 3)) |
| Assert.assertEquals( | ||
| totalFromPartitions, | ||
| DEFAULT_USER_DATA_RECORD_COUNT, | ||
| "Sum of per-partition record counts (" + perPartitionCounts + ") should equal total records pushed (" | ||
| + DEFAULT_USER_DATA_RECORD_COUNT + ")"); | ||
|
|
||
| // Verify each partition has a non-negative count | ||
| for (Map.Entry<Integer, Long> entry: perPartitionCounts.entrySet()) { | ||
| Assert.assertTrue( | ||
| entry.getValue() >= 0, | ||
| "Partition " + entry.getKey() + " should have non-negative count, got " + entry.getValue()); | ||
| } | ||
|
|
||
| // Verify push actually succeeded — data is readable | ||
| veniceCluster.useControllerClient(controllerClient -> { | ||
| TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> { | ||
| int currentVersion = controllerClient.getStore(storeName).getStore().getCurrentVersion(); | ||
| Assert.assertTrue(currentVersion > 0, "Store should have a current version"); | ||
| }); | ||
| }); | ||
|
|
||
| pushJob.close(); |
There was a problem hiding this comment.
pushJob.close() is only called at the end of the test. If pushJob.run() or any assertion throws, the job/underlying resources won’t be closed, which can leak threads and make subsequent integration tests flaky. Wrap the push job in a try/finally (or try-with-resources if applicable) so close() is guaranteed to run.
| Assert.assertEquals( | |
| totalFromPartitions, | |
| DEFAULT_USER_DATA_RECORD_COUNT, | |
| "Sum of per-partition record counts (" + perPartitionCounts + ") should equal total records pushed (" | |
| + DEFAULT_USER_DATA_RECORD_COUNT + ")"); | |
| // Verify each partition has a non-negative count | |
| for (Map.Entry<Integer, Long> entry: perPartitionCounts.entrySet()) { | |
| Assert.assertTrue( | |
| entry.getValue() >= 0, | |
| "Partition " + entry.getKey() + " should have non-negative count, got " + entry.getValue()); | |
| } | |
| // Verify push actually succeeded — data is readable | |
| veniceCluster.useControllerClient(controllerClient -> { | |
| TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> { | |
| int currentVersion = controllerClient.getStore(storeName).getStore().getCurrentVersion(); | |
| Assert.assertTrue(currentVersion > 0, "Store should have a current version"); | |
| }); | |
| }); | |
| pushJob.close(); | |
| try { | |
| Assert.assertEquals( | |
| totalFromPartitions, | |
| DEFAULT_USER_DATA_RECORD_COUNT, | |
| "Sum of per-partition record counts (" + perPartitionCounts + ") should equal total records pushed (" | |
| + DEFAULT_USER_DATA_RECORD_COUNT + ")"); | |
| // Verify each partition has a non-negative count | |
| for (Map.Entry<Integer, Long> entry: perPartitionCounts.entrySet()) { | |
| Assert.assertTrue( | |
| entry.getValue() >= 0, | |
| "Partition " + entry.getKey() + " should have non-negative count, got " + entry.getValue()); | |
| } | |
| // Verify push actually succeeded — data is readable | |
| veniceCluster.useControllerClient(controllerClient -> { | |
| TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> { | |
| int currentVersion = controllerClient.getStore(storeName).getStore().getCurrentVersion(); | |
| Assert.assertTrue(currentVersion > 0, "Store should have a current version"); | |
| }); | |
| }); | |
| } finally { | |
| pushJob.close(); | |
| } |
Summary
Track per-partition record counts during batch push and embed them as PubSub message headers on the End-of-Push control message. This enables server-side verification (in a follow-up PR) without requiring changes to the EOP Avro schema.
Part 1 of 4 in the batch push record count verification series.
Changes
PubSubMessageHeaders: newprcheader constantVeniceWriter: newbroadcastEndOfPush/sendControlMessageoverloads with per-partition headers (8-byte long per partition)DataWriterTaskTracker: per-partition tracking interface (trackRecordSentToPubSubForPartition,getPerPartitionRecordCounts)MRJobCounterHelper+ tracker implementationsMapLongAccumulator+ tracker implementationsVenicePushJob: collect per-partition counts, pass tobroadcastEndOfPushBackward compatibility
Old servers ignore unknown PubSub headers — no impact.
Test plan
VeniceWriterUnitTest— broadcastEndOfPush with/without countsMRJobCounterHelperTest— per-partition counter round-tripMapLongAccumulatorTest— add, merge, reset, copy