Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.davinci.DaVinciBackend;
import com.linkedin.davinci.VersionBackend;
import com.linkedin.davinci.client.AvroGenericDaVinciClient;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.davinci.client.StorageClass;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTask;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.PushJobCheckpoints;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
Expand All @@ -54,6 +58,7 @@
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.DiskUsage;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.ReferenceCounted;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
Expand Down Expand Up @@ -324,4 +329,110 @@ public void testDaVinciDiskFullFailure(boolean useDaVinciSpecificExecutionStatus
}
}
}

/**
* Validates that Da Vinci client reads survive a StoreIngestionTask failure on the live version.
*
* Flow:
* 1. Push data via VPJ, DaVinci client subscribes and ingests successfully
* 2. Verify all records are readable from DaVinci
* 3. Inject a task-level exception into the live version's StoreIngestionTask via
* {@code setLastStoreIngestionException()}
* 4. The SIT's run loop picks up the exception in {@code checkIngestionProgress()}, which triggers
* {@code handleIngestionException()} and reports error for all partitions
* 5. Since partition futures in {@link VersionBackend} were already completed successfully,
* {@code completePartitionExceptionally()} is a no-op ({@link java.util.concurrent.CompletableFuture}
* can only be completed once)
* 6. Verify all records are STILL readable — the SIT failure must not break existing reads
*/
@Test(timeOut = TEST_TIMEOUT)
public void testReadsFromLiveVersionSurviveSITFailure() throws Exception {
int recordCount = 10;
String storeName = Utils.getUniqueString("davinci_sit_failure_read_test");
File inputDir = getTempDataDirectory();
String inputDirPath = "file://" + inputDir.getAbsolutePath();
Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir, recordCount);
Properties vpjProperties = defaultVPJProps(venice, inputDirPath, storeName);
vpjProperties.putAll(
PubSubBrokerWrapper.getBrokerDetailsForClients(Collections.singletonList(venice.getPubSubBrokerWrapper())));

try (ControllerClient controllerClient = createStoreForJob(venice.getClusterName(), recordSchema, vpjProperties)) {
venice.createMetaSystemStore(storeName);
venice.createPushStatusSystemStore(storeName);

StoreResponse storeResponse = controllerClient.getStore(storeName);
assertFalse(storeResponse.isError(), "Store response error: " + storeResponse.getError());
assertTrue(storeResponse.getStore().isDaVinciPushStatusStoreEnabled());

// Push v1
runVPJ(vpjProperties, 1, controllerClient);

VeniceProperties backendConfig = getDaVinciBackendConfig(true);
MetricsRepository metricsRepository = new MetricsRepository();
try (CachingDaVinciClientFactory factory = getCachingDaVinciClientFactory(
d2Client,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
metricsRepository,
backendConfig,
venice)) {

DaVinciClient<String, Object> daVinciClient = factory.getGenericAvroClient(
storeName,
new DaVinciConfig().setIsolated(true).setStorageClass(StorageClass.MEMORY_BACKED_BY_DISK));
daVinciClient.start();
daVinciClient.subscribeAll().get(30, TimeUnit.SECONDS);

// Verify all records are readable after successful ingestion
for (int i = 1; i <= recordCount; i++) {
String key = Integer.toString(i);
Object value = daVinciClient.get(key).get();
assertNotNull(value, "Key " + key + " should be readable after v1 ingestion");
assertEquals(value.toString(), TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i);
}
LOGGER.info("All records verified readable from Da Vinci client before SIT failure injection");

// Get the live version's StoreIngestionTask and inject a task-level exception
DaVinciBackend backend = AvroGenericDaVinciClient.getBackend();
assertNotNull(backend, "Da Vinci backend should not be null");
StoreIngestionTask sit;
try (ReferenceCounted<VersionBackend> versionRef =
backend.getStoreOrThrow(storeName).getDaVinciCurrentVersion()) {
VersionBackend versionBackend = versionRef.get();
assertNotNull(versionBackend, "Current version backend should not be null");
String versionTopic = versionBackend.getVersion().kafkaTopicName();
LOGGER.info("Injecting SIT failure for version topic: {}", versionTopic);

sit = backend.getIngestionBackend().getStoreIngestionService().getStoreIngestionTask(versionTopic);
assertNotNull(sit, "StoreIngestionTask should exist for " + versionTopic);
assertTrue(sit.isRunning(), "StoreIngestionTask should be running before failure injection");

// Inject task-level exception — this causes the SIT's run loop to throw from
// checkIngestionProgress() and report error for all partitions
sit.setLastStoreIngestionException(new VeniceException("Injected SIT failure for testing"));
}

// Wait for the SIT to actually process the injected exception and stop.
// This ensures the error propagation path has been fully exercised before we check reads.
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> {
assertFalse(sit.isRunning(), "StoreIngestionTask should stop after processing injected failure");
});

// Core assertion: reads from the live version must still work after SIT failure.
// VersionBackend.completePartitionExceptionally() is a no-op on already-completed futures,
// so the partition remains ready-to-serve and reads should succeed.
for (int i = 1; i <= recordCount; i++) {
String key = Integer.toString(i);
Object value = daVinciClient.get(key).get();
assertNotNull(value, "Key " + key + " should still be readable after SIT failure");
assertEquals(
value.toString(),
TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i,
"Key " + key + " value mismatch after SIT failure");
}
LOGGER.info("All records still readable after SIT failure — test passed");
} finally {
controllerClient.disableAndDeleteStore(storeName);
}
}
}
}