[da-vinci][test] Global RT DIV: fix remote VT DIV tracking + repro tests for batch-only NR stores#2733
Conversation
batch-only stores Three new tests in TestGlobalRtDiv: 1. testBatchOnlyStoreWithGlobalRtDiv - positive control confirming vtSegments are correctly populated in a single-cluster batch-only store with Global RT DIV. 2. testBatchOnlyStoreWithGlobalRtDivAndMaxAge - demonstrates maxAge eviction mechanism where cloneVtProducerStates destructively removes VT producer states. 3. testBatchOnlyNRStoreWithGlobalRtDivHasEmptyVtSegments - deterministic reproduction of the root cause: when NR is enabled, the dc-1 leader has consumeRemotely()=true, causing shouldProduceToVersionTopic() to return true. This routes VT message validation to REALTIME_TOPIC_TYPE instead of VERSION_TOPIC, putting producer states into rtSegments instead of vtSegments. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds new integration tests in TestGlobalRtDiv to reproduce and document the production “Global RT DIV size:0” symptom for batch-only stores, including a deterministic multi-region/native-replication scenario.
Changes:
- Adds a batch-only + Global RT DIV “positive control” integration test asserting VT DIV state is present and Global RT DIV state is absent.
- Adds an integration test intended to exercise maxAge-based producer-state pruning behavior.
- Adds a 2-region native-replication integration test intended to reproduce VT→RT topicType misrouting (empty
vtSegments) on the remote leader.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // With a 1ms maxAge, the cloneVtProducerStates staleness filter evicts VT producer states | ||
| // during SOP-triggered sync (promotion delay > 1ms makes entries stale). Data records | ||
| // may repopulate vtSegments, and the final EOP sync may or may not evict again (timing-dependent). | ||
| // The key evidence is in server logs: "event=globalRtDiv Removed N stale VT producer state(s)" | ||
| // and "event=globalRtDiv Syncing LCVP ... size: 0" — verifiable in test report HTML. | ||
| maxAgeCluster.getVeniceServers().forEach(server -> { | ||
| if (!server.isRunning()) { | ||
| return; | ||
| } | ||
| StoreIngestionTask sit = | ||
| server.getVeniceServer().getKafkaStoreIngestionService().getStoreIngestionTask(topicName); | ||
| if (sit == null) { | ||
| return; | ||
| } | ||
| DataIntegrityValidator div = sit.getDataIntegrityValidator(); | ||
| if (div == null) { | ||
| return; | ||
| } | ||
| boolean isLeader = server.getPort() == leaderNode.getPort(); | ||
| boolean hasVtState = div.hasVtDivState(PARTITION); | ||
| boolean hasRtState = div.hasGlobalRtDivState(PARTITION); | ||
| // Log the state for debugging. The actual bug evidence is in the server logs showing | ||
| // "Removed N stale VT producer state(s)" during the SOP-triggered sync. | ||
| LOGGER.info( | ||
| "maxAge test: {} ({}) hasVtDivState={} hasGlobalRtDivState={}", | ||
| server.getAddress(), | ||
| isLeader ? "leader" : "follower", | ||
| hasVtState, | ||
| hasRtState); | ||
|
|
||
| // For a batch-only store, there should be NO Global RT DIV state (no RT writers) | ||
| assertFalse( | ||
| hasRtState, | ||
| "Global RT DIV state should NOT exist for batch-only store on " + server.getAddress()); | ||
| }); |
There was a problem hiding this comment.
testBatchOnlyStoreWithGlobalRtDivAndMaxAge doesn’t currently assert any maxAge-eviction behavior; it only logs and asserts hasGlobalRtDivState is false (which is already covered by the previous batch-only test). To make this test actionable, add a deterministic assertion that maxAge pruning actually occurred (e.g., assert VT state is absent/empty after the expected sync point, or create multiple producer GUIDs with sufficiently different message timestamps so pruning is guaranteed). Otherwise the test will remain a no-op regression-wise and will be hard to maintain.
| List<VeniceMultiClusterWrapper> childDatacenters = multiRegion.getChildRegions(); | ||
| VeniceControllerWrapper parentController = multiRegion.getParentControllers().get(0); | ||
|
|
||
| File inputDir = getTempDataDirectory(); | ||
| String inputDirPath = "file:" + inputDir.getAbsolutePath(); | ||
| String storeName = Utils.getUniqueString("batchOnlyNrGlobalRtDiv"); | ||
| String topicName = Version.composeKafkaTopic(storeName, 1); | ||
|
|
||
| Properties vpjProps = IntegrationTestPushUtils.defaultVPJProps(multiRegion, inputDirPath, storeName); | ||
| vpjProps.put(SEND_CONTROL_MESSAGES_DIRECTLY, true); | ||
|
|
There was a problem hiding this comment.
In the NR reproduction test, the test assumes dc-0 is the source fabric and dc-1 is the remote consumer, but IntegrationTestPushUtils.defaultVPJProps(multiRegion, ...) sets SOURCE_GRID_FABRIC by taking the first entry from a HashMap of child regions (see TestWriteUtils.defaultVPJPropsWithD2Routing: childRegionNamesToZkAddress.entrySet().iterator().next()). That ordering isn’t guaranteed, so the source fabric (and therefore which DC is “remote”) can be non-deterministic across JVMs, making this test potentially flaky. Set SOURCE_GRID_FABRIC explicitly (e.g., to childDatacenters.get(0).getRegionName()) and derive the “remote” DC from that choice.
| hasVtState, | ||
| "BUG REPRODUCTION: VT DIV state should be empty on dc-1 leader (consuming remotely) " | ||
| + "due to topicType misrouting caused by consumeRemotely()=true. " + "Server: " | ||
| + server.getAddress()); |
There was a problem hiding this comment.
The NR bug reproduction assertion only checks hasVtDivState is false on the dc-1 leader, but it doesn’t assert that the VT producer state was actually misrouted into RT segments (i.e., hasGlobalRtDivState is true). As written, the test could pass even if DIV isn’t populated at all due to an unrelated failure. Add an assertion on the leader that hasGlobalRtDivState is true (or otherwise validate the expected misrouting signal) to make the reproduction unambiguous.
| + server.getAddress()); | |
| + server.getAddress()); | |
| assertTrue( | |
| hasRtState, | |
| "BUG REPRODUCTION: Global RT DIV state should be populated on dc-1 leader because VT " | |
| + "producer state is expected to be misrouted into RT segments when " | |
| + "consumeRemotely()=true. Server: " + server.getAddress()); |
| * due to the wall-clock-based maxAge staleness check. When DIV_PRODUCER_STATE_MAX_AGE_MS is configured, | ||
| * each sync (triggered by control messages or size threshold) runs: | ||
| * earliestAllowableTimestamp = System.currentTimeMillis() - maxAgeInMs | ||
| * Any segment with lastRecordProducerTimestamp < earliestAllowableTimestamp is evicted from BOTH the clone | ||
| * and the source vtSegments. For batch-only stores, once the last data record is consumed and a sync is | ||
| * triggered (e.g., at EOP), the time gap between the last record's timestamp and the sync may exceed maxAge, | ||
| * causing all entries to be evicted. |
There was a problem hiding this comment.
The Javadoc for the maxAge test describes the staleness threshold as wall-clock based (System.currentTimeMillis() - maxAgeInMs), but the actual DIV pruning uses a data-relative anchor: earliestAllowableTimestamp = latestMessageTimeInMs - maxAgeInMs (see PartitionTracker.computeEarliestAllowableTimestamp / cloneVtProducerStates). This comment is currently misleading and should be updated to match the implementation and the ConfigKeys.DIV_PRODUCER_STATE_MAX_AGE_MS documentation (age is based on producer message timestamps, not wall clock).
| * due to the wall-clock-based maxAge staleness check. When DIV_PRODUCER_STATE_MAX_AGE_MS is configured, | |
| * each sync (triggered by control messages or size threshold) runs: | |
| * earliestAllowableTimestamp = System.currentTimeMillis() - maxAgeInMs | |
| * Any segment with lastRecordProducerTimestamp < earliestAllowableTimestamp is evicted from BOTH the clone | |
| * and the source vtSegments. For batch-only stores, once the last data record is consumed and a sync is | |
| * triggered (e.g., at EOP), the time gap between the last record's timestamp and the sync may exceed maxAge, | |
| * causing all entries to be evicted. | |
| * due to the producer-timestamp-based maxAge staleness check. When DIV_PRODUCER_STATE_MAX_AGE_MS is configured, | |
| * each sync (triggered by control messages or size threshold) computes: | |
| * earliestAllowableTimestamp = latestMessageTimeInMs - maxAgeInMs | |
| * Any segment with lastRecordProducerTimestamp < earliestAllowableTimestamp is evicted from BOTH the clone | |
| * and the source vtSegments. For batch-only stores, if the latest consumed message timestamp advances past | |
| * older VT producer state timestamps by more than maxAge, a sync triggered at EOP can evict all entries. |
…ead of rtSegments When native replication is enabled, the dc-1 leader has consumeRemotely()=true, which caused shouldProduceToVersionTopic() to return true and route remote VT message validation to REALTIME_TOPIC_TYPE. Producer states ended up in rtSegments instead of vtSegments, so cloneVtProducerStates always returned size 0. Fix: add && topicPartition.getPubSubTopic().isRealTime() to the REALTIME_TOPIC_TYPE condition in validateAndFilterOutDuplicateMessagesFromLeaderTopic. Remote VT messages now keep VERSION_TOPIC type, landing in vtSegments and syncing to OffsetRecord via the existing VT snapshot path. Only true RT topic messages use REALTIME_TOPIC_TYPE. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
ca65d64 to
b8dd82d
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * Reproduces a bug where cloneVtProducerStates destructively evicts VT producer states from the consumer DIV | ||
| * due to the wall-clock-based maxAge staleness check. When DIV_PRODUCER_STATE_MAX_AGE_MS is configured, | ||
| * each sync (triggered by control messages or size threshold) runs: | ||
| * earliestAllowableTimestamp = System.currentTimeMillis() - maxAgeInMs | ||
| * Any segment with lastRecordProducerTimestamp < earliestAllowableTimestamp is evicted from BOTH the clone | ||
| * and the source vtSegments. For batch-only stores, once the last data record is consumed and a sync is | ||
| * triggered (e.g., at EOP), the time gap between the last record's timestamp and the sync may exceed maxAge, | ||
| * causing all entries to be evicted. | ||
| * | ||
| * This test uses a tiny maxAge (1ms) to reliably trigger this eviction, demonstrating the mechanism | ||
| * that causes size=0 in the "event=globalRtDiv Syncing LCVP" log for batch-only stores in production. |
There was a problem hiding this comment.
The Javadoc for testBatchOnlyStoreWithGlobalRtDivAndMaxAge says it “reproduces a bug” where VT producer states are evicted due to DIV_PRODUCER_STATE_MAX_AGE_MS, but the test does not assert that eviction occurred (it only asserts no Global RT DIV state). Either add an assertion that verifies the VT state was actually evicted (or reduced) under the tiny maxAge, or soften/adjust the comment so the test description matches what it validates.
| * Reproduces a bug where cloneVtProducerStates destructively evicts VT producer states from the consumer DIV | |
| * due to the wall-clock-based maxAge staleness check. When DIV_PRODUCER_STATE_MAX_AGE_MS is configured, | |
| * each sync (triggered by control messages or size threshold) runs: | |
| * earliestAllowableTimestamp = System.currentTimeMillis() - maxAgeInMs | |
| * Any segment with lastRecordProducerTimestamp < earliestAllowableTimestamp is evicted from BOTH the clone | |
| * and the source vtSegments. For batch-only stores, once the last data record is consumed and a sync is | |
| * triggered (e.g., at EOP), the time gap between the last record's timestamp and the sync may exceed maxAge, | |
| * causing all entries to be evicted. | |
| * | |
| * This test uses a tiny maxAge (1ms) to reliably trigger this eviction, demonstrating the mechanism | |
| * that causes size=0 in the "event=globalRtDiv Syncing LCVP" log for batch-only stores in production. | |
| * Exercises the batch-only-store path with Global RT DIV enabled while configuring | |
| * {@link com.linkedin.venice.ConfigKeys#DIV_PRODUCER_STATE_MAX_AGE_MS} to a tiny value. | |
| * | |
| * A very small maxAge makes producer-state aging more aggressive around sync points such as EOP for | |
| * batch-only stores. This test uses that configuration to cover the scenario that historically exposed | |
| * issues in this area, while validating the externally observable outcome for the store: no Global RT DIV | |
| * state is persisted for the batch-only case. | |
| * | |
| * Note that this test does not directly assert VT producer-state eviction; it only verifies the resulting | |
| * Global RT DIV state behavior under the tiny maxAge configuration. |
| TopicType topicType = PartitionTracker.VERSION_TOPIC; | ||
| // shouldProduceToVersionTopic() ensures this is a LEADER that is consuming from RT or remote VT | ||
| if (isGlobalRtDivEnabled() && shouldProduceToVersionTopic(pcs)) { | ||
| // shouldProduceToVersionTopic() ensures this is a LEADER that is consuming from RT or remote VT. | ||
| // For RT topics, use REALTIME_TOPIC_TYPE so segments are tracked per broker URL in rtSegments. | ||
| // For remote VT, keep VERSION_TOPIC so segments are tracked in vtSegments and synced to OffsetRecord. | ||
| if (isGlobalRtDivEnabled() && shouldProduceToVersionTopic(pcs) | ||
| && topicPartition.getPubSubTopic().isRealTime()) { | ||
| topicType = TopicType.of(REALTIME_TOPIC_TYPE, kafkaUrl); | ||
| } |
There was a problem hiding this comment.
PR description mentions moving the bytes-based sync cadence counter from a SIT-level consumedBytesSinceLastSync map to a per-partition field on PartitionConsumptionState. In the current codebase after this change, StoreIngestionTask still has consumedBytesSinceLastSync (and LeaderFollowerStoreIngestionTask still resets it), so that described change doesn’t appear to be present. If the refactor is intended in this PR, it looks incomplete/missing; otherwise the PR description should be updated to match the actual diff.
| // BUG REPRODUCTION: On the leader in dc-1, consumeRemotely() = true causes | ||
| // shouldProduceToVersionTopic() = true, which routes VT message validation to | ||
| // REALTIME_TOPIC_TYPE instead of VERSION_TOPIC. VT producer states end up in | ||
| // rtSegments instead of vtSegments. cloneVtProducerStates returns size: 0. | ||
| if (isLeader) { | ||
| assertFalse( | ||
| hasVtState, | ||
| "BUG REPRODUCTION: VT DIV state should be empty on dc-1 leader (consuming remotely) " | ||
| + "due to topicType misrouting caused by consumeRemotely()=true. " + "Server: " | ||
| + server.getAddress()); | ||
| } |
There was a problem hiding this comment.
testBatchOnlyNRStoreWithGlobalRtDivHasEmptyVtSegments currently asserts the buggy behavior (dc-1 leader has hasVtDivState == false). With the fix in LeaderFollowerStoreIngestionTask (remote VT should remain VERSION_TOPIC), this test should be updated to assert the corrected behavior (VT DIV state present) and renamed/commented as a regression test rather than a bug reproduction.
| .build())) { | ||
|
|
||
| List<VeniceMultiClusterWrapper> childDatacenters = multiRegion.getChildRegions(); | ||
| VeniceControllerWrapper parentController = multiRegion.getParentControllers().get(0); |
There was a problem hiding this comment.
parentController is created but never used. Please remove the unused local variable (or use it if it was intended for additional assertions) to avoid dead code and potential static-analysis failures.
| VeniceControllerWrapper parentController = multiRegion.getParentControllers().get(0); |
Summary
Fix: Remote VT producer states misrouted to rtSegments
When native replication is enabled, the dc-1 leader has `consumeRemotely()=true`, which caused `shouldProduceToVersionTopic()` to return `true` and route remote VT message validation to `REALTIME_TOPIC_TYPE` instead of `VERSION_TOPIC`. Producer states ended up in `rtSegments` instead of `vtSegments`, so `cloneVtProducerStates` always returned size 0.
Fix: Add `&& topicPartition.getPubSubTopic().isRealTime()` to the `REALTIME_TOPIC_TYPE` condition in `validateAndFilterOutDuplicateMessagesFromLeaderTopic`. Remote VT messages now keep `VERSION_TOPIC` type, landing in `vtSegments` and syncing to `OffsetRecord` via the existing VT snapshot path. Only true RT topic messages use `REALTIME_TOPIC_TYPE`.
New integration tests
3 tests in `TestGlobalRtDiv` reproducing the size:0 bug for batch-only NR stores:
Testing Done
🤖 Generated with Claude Code