diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java index 75aa0fb9658..f303daf60d8 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java @@ -32,10 +32,14 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.d2.balancer.D2ClientBuilder; +import com.linkedin.davinci.DaVinciBackend; +import com.linkedin.davinci.VersionBackend; +import com.linkedin.davinci.client.AvroGenericDaVinciClient; import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; import com.linkedin.davinci.client.StorageClass; import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory; +import com.linkedin.davinci.kafka.consumer.StoreIngestionTask; import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.PushJobCheckpoints; import com.linkedin.venice.client.store.AvroGenericStoreClient; @@ -54,6 +58,7 @@ import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.DiskUsage; import com.linkedin.venice.utils.PropertyBuilder; +import com.linkedin.venice.utils.ReferenceCounted; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Utils; @@ -324,4 +329,110 @@ public void testDaVinciDiskFullFailure(boolean useDaVinciSpecificExecutionStatus } } } + + /** + * Validates that Da Vinci client reads survive a StoreIngestionTask failure on the live version. + * + * Flow: + * 1. Push data via VPJ, DaVinci client subscribes and ingests successfully + * 2. Verify all records are readable from DaVinci + * 3. Inject a task-level exception into the live version's StoreIngestionTask via + * {@code setLastStoreIngestionException()} + * 4. The SIT's run loop picks up the exception in {@code checkIngestionProgress()}, which triggers + * {@code handleIngestionException()} and reports error for all partitions + * 5. Since partition futures in {@link VersionBackend} were already completed successfully, + * {@code completePartitionExceptionally()} is a no-op ({@link java.util.concurrent.CompletableFuture} + * can only be completed once) + * 6. Verify all records are STILL readable — the SIT failure must not break existing reads + */ + @Test(timeOut = TEST_TIMEOUT) + public void testReadsFromLiveVersionSurviveSITFailure() throws Exception { + int recordCount = 10; + String storeName = Utils.getUniqueString("davinci_sit_failure_read_test"); + File inputDir = getTempDataDirectory(); + String inputDirPath = "file://" + inputDir.getAbsolutePath(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir, recordCount); + Properties vpjProperties = defaultVPJProps(venice, inputDirPath, storeName); + vpjProperties.putAll( + PubSubBrokerWrapper.getBrokerDetailsForClients(Collections.singletonList(venice.getPubSubBrokerWrapper()))); + + try (ControllerClient controllerClient = createStoreForJob(venice.getClusterName(), recordSchema, vpjProperties)) { + venice.createMetaSystemStore(storeName); + venice.createPushStatusSystemStore(storeName); + + StoreResponse storeResponse = controllerClient.getStore(storeName); + assertFalse(storeResponse.isError(), "Store response error: " + storeResponse.getError()); + assertTrue(storeResponse.getStore().isDaVinciPushStatusStoreEnabled()); + + // Push v1 + runVPJ(vpjProperties, 1, controllerClient); + + VeniceProperties backendConfig = getDaVinciBackendConfig(true); + MetricsRepository metricsRepository = new MetricsRepository(); + try (CachingDaVinciClientFactory factory = getCachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig, + venice)) { + + DaVinciClient daVinciClient = factory.getGenericAvroClient( + storeName, + new DaVinciConfig().setIsolated(true).setStorageClass(StorageClass.MEMORY_BACKED_BY_DISK)); + daVinciClient.start(); + daVinciClient.subscribeAll().get(30, TimeUnit.SECONDS); + + // Verify all records are readable after successful ingestion + for (int i = 1; i <= recordCount; i++) { + String key = Integer.toString(i); + Object value = daVinciClient.get(key).get(); + assertNotNull(value, "Key " + key + " should be readable after v1 ingestion"); + assertEquals(value.toString(), TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i); + } + LOGGER.info("All records verified readable from Da Vinci client before SIT failure injection"); + + // Get the live version's StoreIngestionTask and inject a task-level exception + DaVinciBackend backend = AvroGenericDaVinciClient.getBackend(); + assertNotNull(backend, "Da Vinci backend should not be null"); + StoreIngestionTask sit; + try (ReferenceCounted versionRef = + backend.getStoreOrThrow(storeName).getDaVinciCurrentVersion()) { + VersionBackend versionBackend = versionRef.get(); + assertNotNull(versionBackend, "Current version backend should not be null"); + String versionTopic = versionBackend.getVersion().kafkaTopicName(); + LOGGER.info("Injecting SIT failure for version topic: {}", versionTopic); + + sit = backend.getIngestionBackend().getStoreIngestionService().getStoreIngestionTask(versionTopic); + assertNotNull(sit, "StoreIngestionTask should exist for " + versionTopic); + assertTrue(sit.isRunning(), "StoreIngestionTask should be running before failure injection"); + + // Inject task-level exception — this causes the SIT's run loop to throw from + // checkIngestionProgress() and report error for all partitions + sit.setLastStoreIngestionException(new VeniceException("Injected SIT failure for testing")); + } + + // Wait for the SIT to actually process the injected exception and stop. + // This ensures the error propagation path has been fully exercised before we check reads. + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + assertFalse(sit.isRunning(), "StoreIngestionTask should stop after processing injected failure"); + }); + + // Core assertion: reads from the live version must still work after SIT failure. + // VersionBackend.completePartitionExceptionally() is a no-op on already-completed futures, + // so the partition remains ready-to-serve and reads should succeed. + for (int i = 1; i <= recordCount; i++) { + String key = Integer.toString(i); + Object value = daVinciClient.get(key).get(); + assertNotNull(value, "Key " + key + " should still be readable after SIT failure"); + assertEquals( + value.toString(), + TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i, + "Key " + key + " value mismatch after SIT failure"); + } + LOGGER.info("All records still readable after SIT failure — test passed"); + } finally { + controllerClient.disableAndDeleteStore(storeName); + } + } + } }