diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java index c5b7129ea727..5eb5b59867db 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java @@ -154,7 +154,7 @@ public void beforeClass() { UserAgentContainer userAgentContainer = new UserAgentContainer(); userAgentContainer.setSuffix(USER_AGENT_SUFFIX_GATEWAY_CLIENT); - this.gatewayClientUserAgent = userAgentContainer.getUserAgent(); + this.gatewayClientUserAgent = generateHttp2OptedInUserAgentIfRequired(userAgentContainer.getUserAgent()); directClient = new CosmosClientBuilder() .endpoint(TestConfigurations.HOST) @@ -164,7 +164,7 @@ public void beforeClass() { .directMode() .buildClient(); userAgentContainer.setSuffix(USER_AGENT_SUFFIX_DIRECT_CLIENT); - this.directClientUserAgent = userAgentContainer.getUserAgent(); + this.directClientUserAgent = generateHttp2OptedInUserAgentIfRequired(userAgentContainer.getUserAgent()); cosmosAsyncContainer = getSharedMultiPartitionCosmosContainer(this.gatewayClient.asyncClient()); cosmosAsyncDatabase = directClient.asyncClient().getDatabase(cosmosAsyncContainer.getDatabase().getId()); @@ -271,7 +271,6 @@ public void queryChangeFeedAllVersionsAndDeletes() { FeedResponse response = results.next(); String diagnostics = response.getCosmosDiagnostics().toString(); assertThat(diagnostics).contains("\"connectionMode\":\"GATEWAY\""); - assertThat(diagnostics).contains("\"userAgent\":\"" + this.gatewayClientUserAgent + "\""); assertThat(diagnostics).contains("gatewayStatisticsList"); assertThat(diagnostics).contains("\"operationType\":\"ReadFeed\""); assertThat(diagnostics).contains("\"userAgent\":\"" + this.gatewayClientUserAgent + "\""); @@ -298,7 +297,7 @@ public void queryChangeFeedIncrementalDirectMode() throws Exception { FeedResponse response = results.next(); String diagnostics = response.getCosmosDiagnostics().toString(); assertThat(diagnostics).contains("\"connectionMode\":\"DIRECT\""); - assertThat(diagnostics).contains("\"userAgent\":\"" + this.directClientUserAgent + "\""); + assertThat(diagnostics).contains("\"userAgent\":\"" + generateHttp2OptedInUserAgentIfRequired(this.directClientUserAgent) + "\""); assertThat(diagnostics).contains("\"requestOperationType\":\"ReadFeed\""); } } @@ -324,7 +323,6 @@ public void queryChangeFeedIncrementalGatewayMode() throws Exception { assertThat(diagnostics).contains("\"userAgent\":\"" + this.gatewayClientUserAgent + "\""); assertThat(diagnostics).contains("gatewayStatisticsList"); assertThat(diagnostics).contains("\"operationType\":\"ReadFeed\""); - assertThat(diagnostics).contains("\"userAgent\":\"" + this.gatewayClientUserAgent + "\""); } } @@ -349,7 +347,6 @@ public void gatewayDiagnostics() throws Exception { String diagnostics = createResponse.getDiagnostics().toString(); logger.info("DIAGNOSTICS: {}", diagnostics); assertThat(diagnostics).contains("\"connectionMode\":\"GATEWAY\""); - assertThat(diagnostics).contains("\"userAgent\":\"" + this.gatewayClientUserAgent + "\""); assertThat(diagnostics).contains("gatewayStatisticsList"); assertThat(diagnostics).contains("\"operationType\":\"Create\""); assertThat(diagnostics).contains("\"metaDataName\":\"CONTAINER_LOOK_UP\""); @@ -392,7 +389,6 @@ public void gatewayDiagnosticsOnException() throws Exception { assertThat(diagnostics).contains("gatewayStatisticsList"); assertThat(diagnostics).contains("\"statusCode\":404"); assertThat(diagnostics).contains("\"operationType\":\"Read\""); - assertThat(diagnostics).contains("\"userAgent\":\"" + this.gatewayClientUserAgent + "\""); assertThat(diagnostics).contains("\"exceptionMessage\":\"Entity with the specified id does not exist in the system."); assertThat(diagnostics).contains("\"exceptionResponseHeaders\""); assertThat(diagnostics).doesNotContain("\"exceptionResponseHeaders\": \"{}\""); @@ -1971,6 +1967,14 @@ private void validateChannelAcquisitionContext(CosmosDiagnostics diagnostics, bo } } + private String generateHttp2OptedInUserAgentIfRequired(String userAgent) { + if (Configs.isHttp2Enabled()) { + userAgent = userAgent + "|F10"; + } + + return userAgent; + } + private CosmosDiagnostics performDocumentOperation( CosmosAsyncContainer cosmosAsyncContainer, OperationType operationType, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTestsBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTestsBase.java index d25ba51cecaf..682fc7b46773 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTestsBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTestsBase.java @@ -2359,6 +2359,15 @@ public Object[][] testConfigs_queryAfterCreation() { } }; + BiConsumer validateCtxUtmostRegions = + (ctx, expectedNumberOfRegionsContacted) -> { + assertThat(ctx).isNotNull(); + if (ctx != null) { + assertThat(ctx.getContactedRegionNames().size()).isGreaterThanOrEqualTo(1); + assertThat(ctx.getContactedRegionNames().size()).isLessThanOrEqualTo(expectedNumberOfRegionsContacted); + } + }; + Consumer validateCtxQueryPlan = (ctx) -> { assertThat(ctx).isNotNull(); @@ -2429,6 +2438,9 @@ public Object[][] testConfigs_queryAfterCreation() { Consumer validateCtxTwoRegions = (ctx) -> validateCtxRegions.accept(ctx, TWO_REGIONS); + Consumer validateCtxUtmostTwoRegions = + (ctx) -> validateCtxUtmostRegions.accept(ctx, TWO_REGIONS); + Consumer validateCtxFirstRegionFailureSecondRegionSuccessfulSingleFeedResponse = (ctx) -> { CosmosDiagnostics[] diagnostics = ctx.getDiagnostics().toArray(new CosmosDiagnostics[0]); assertThat(diagnostics.length).isEqualTo(3); @@ -3270,7 +3282,7 @@ public Object[][] testConfigs_queryAfterCreation() { CosmosDiagnostics[] diagnostics = ctx.getDiagnostics().toArray(new CosmosDiagnostics[0]); // Diagnostics of query attempt in first region not even available yet - assertThat(diagnostics.length).isEqualTo(2); + assertThat(diagnostics.length).isGreaterThanOrEqualTo(2); // query plan on first region assertThat(diagnostics[0].getContactedRegionNames().size()).isEqualTo(1); @@ -3279,13 +3291,13 @@ public Object[][] testConfigs_queryAfterCreation() { (ctx) -> { assertThat(ctx.getDiagnostics()).isNotNull(); CosmosDiagnostics[] diagnostics = ctx.getDiagnostics().toArray(new CosmosDiagnostics[0]); - assertThat(diagnostics[1].getContactedRegionNames().size()).isEqualTo(1); - assertThat(diagnostics[1].getContactedRegionNames().iterator().next()).isEqualTo(SECOND_REGION_NAME); - assertThat(diagnostics[1].getFeedResponseDiagnostics()).isNotNull(); - assertThat(diagnostics[1].getFeedResponseDiagnostics().getQueryMetricsMap()).isNotNull(); - assertThat(diagnostics[1].getFeedResponseDiagnostics().getClientSideRequestStatistics()).isNotNull(); + assertThat(diagnostics[diagnostics.length - 1].getContactedRegionNames().size()).isEqualTo(1); + assertThat(diagnostics[diagnostics.length - 1].getContactedRegionNames().iterator().next()).isEqualTo(SECOND_REGION_NAME); + assertThat(diagnostics[diagnostics.length - 1].getFeedResponseDiagnostics()).isNotNull(); + assertThat(diagnostics[diagnostics.length - 1].getFeedResponseDiagnostics().getQueryMetricsMap()).isNotNull(); + assertThat(diagnostics[diagnostics.length - 1].getFeedResponseDiagnostics().getClientSideRequestStatistics()).isNotNull(); ClientSideRequestStatistics[] clientStats = - diagnostics[1] + diagnostics[diagnostics.length - 1] .getFeedResponseDiagnostics() .getClientSideRequestStatistics() .toArray(new ClientSideRequestStatistics[0]); @@ -3301,17 +3313,17 @@ public Object[][] testConfigs_queryAfterCreation() { } ), ArrayUtils.toArray( - validateCtxSingleRegion, + validateCtxUtmostTwoRegions, (ctx) -> { assertThat(ctx.getDiagnostics()).isNotNull(); CosmosDiagnostics[] diagnostics = ctx.getDiagnostics().toArray(new CosmosDiagnostics[0]); - assertThat(diagnostics[0].getContactedRegionNames().size()).isEqualTo(1); - assertThat(diagnostics[0].getContactedRegionNames().iterator().next()).isEqualTo(SECOND_REGION_NAME); - assertThat(diagnostics[0].getFeedResponseDiagnostics()).isNotNull(); - assertThat(diagnostics[0].getFeedResponseDiagnostics().getQueryMetricsMap()).isNotNull(); - assertThat(diagnostics[0].getFeedResponseDiagnostics().getClientSideRequestStatistics()).isNotNull(); + assertThat(diagnostics[diagnostics.length - 1].getContactedRegionNames().size()).isEqualTo(1); + assertThat(diagnostics[diagnostics.length - 1].getContactedRegionNames().iterator().next()).isEqualTo(SECOND_REGION_NAME); + assertThat(diagnostics[diagnostics.length - 1].getFeedResponseDiagnostics()).isNotNull(); + assertThat(diagnostics[diagnostics.length - 1].getFeedResponseDiagnostics().getQueryMetricsMap()).isNotNull(); + assertThat(diagnostics[diagnostics.length - 1].getFeedResponseDiagnostics().getClientSideRequestStatistics()).isNotNull(); ClientSideRequestStatistics[] clientStats = - diagnostics[0] + diagnostics[diagnostics.length - 1] .getFeedResponseDiagnostics() .getClientSideRequestStatistics() .toArray(new ClientSideRequestStatistics[0]); @@ -4067,10 +4079,14 @@ public Object[][] testConfigs_readAllAfterCreation() { for (int i = start; i < diagnostics.length; i++) { CosmosDiagnostics currentDiagnostics = diagnostics[i]; - assertThat(currentDiagnostics.getFeedResponseDiagnostics()).isNotNull(); - assertThat(currentDiagnostics.getFeedResponseDiagnostics().getQueryMetricsMap()).isNotNull(); - assertThat(currentDiagnostics.getFeedResponseDiagnostics().getClientSideRequestStatistics()).isNotNull(); - assertThat(currentDiagnostics.getFeedResponseDiagnostics().getClientSideRequestStatistics().size()).isGreaterThanOrEqualTo(1); + + if (i != diagnostics.length - 1) { + assertThat(currentDiagnostics.getFeedResponseDiagnostics()).isNull(); + } else { + assertThat(currentDiagnostics.getFeedResponseDiagnostics().getQueryMetricsMap()).isNotNull(); + assertThat(currentDiagnostics.getFeedResponseDiagnostics().getClientSideRequestStatistics()).isNotNull(); + assertThat(currentDiagnostics.getFeedResponseDiagnostics().getClientSideRequestStatistics().size()).isGreaterThanOrEqualTo(1); + } } } }; @@ -4579,14 +4595,14 @@ public Object[][] testConfigs_readAllAfterCreation() { CosmosDiagnostics[] diagnostics = ctx.getDiagnostics().toArray(new CosmosDiagnostics[0]); // Diagnostics of query attempt in first region not even available yet - assertThat(diagnostics.length).isEqualTo(2); - assertThat(diagnostics[1].getContactedRegionNames().size()).isEqualTo(1); - assertThat(diagnostics[1].getContactedRegionNames().iterator().next()).isEqualTo(SECOND_REGION_NAME); - assertThat(diagnostics[1].getFeedResponseDiagnostics()).isNotNull(); - assertThat(diagnostics[1].getFeedResponseDiagnostics().getQueryMetricsMap()).isNotNull(); - assertThat(diagnostics[1].getFeedResponseDiagnostics().getClientSideRequestStatistics()).isNotNull(); + assertThat(diagnostics.length).isGreaterThanOrEqualTo(2); + assertThat(diagnostics[diagnostics.length - 1].getContactedRegionNames().size()).isEqualTo(1); + assertThat(diagnostics[diagnostics.length - 1].getContactedRegionNames().iterator().next()).isEqualTo(SECOND_REGION_NAME); + assertThat(diagnostics[diagnostics.length - 1].getFeedResponseDiagnostics()).isNotNull(); + assertThat(diagnostics[diagnostics.length - 1].getFeedResponseDiagnostics().getQueryMetricsMap()).isNotNull(); + assertThat(diagnostics[diagnostics.length - 1].getFeedResponseDiagnostics().getClientSideRequestStatistics()).isNotNull(); ClientSideRequestStatistics[] clientStats = - diagnostics[1] + diagnostics[diagnostics.length - 1] .getFeedResponseDiagnostics() .getClientSideRequestStatistics() .toArray(new ClientSideRequestStatistics[0]); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java index e53658ae0dde..c7f09c818050 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java @@ -7,6 +7,7 @@ import com.azure.cosmos.implementation.ConnectionPolicy; import com.azure.cosmos.implementation.DatabaseAccount; import com.azure.cosmos.implementation.DatabaseAccountLocation; +import com.azure.cosmos.implementation.DatabaseAccountManagerInternal; import com.azure.cosmos.implementation.ForbiddenException; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.GoneException; @@ -40,6 +41,7 @@ import com.azure.cosmos.models.CosmosBatch; import com.azure.cosmos.models.CosmosBatchResponse; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; +import com.azure.cosmos.models.CosmosItemIdentity; import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosPatchItemRequestOptions; @@ -49,6 +51,17 @@ import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.rx.TestSuiteBase; +import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; +import com.azure.cosmos.test.faultinjection.FaultInjectionCondition; +import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; +import com.azure.cosmos.test.faultinjection.FaultInjectionEndpointBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorResult; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.ByteBuf; @@ -64,6 +77,7 @@ import org.testng.annotations.Factory; import org.testng.annotations.Test; import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; import java.net.SocketTimeoutException; import java.net.URI; @@ -79,10 +93,123 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import static org.assertj.core.api.Assertions.assertThat; +/** + * End-to-end test suite validating Per-Partition Automatic Failover (PPAF) behavior in the Azure Cosmos DB Java SDK. + * + *

This suite exercises and verifies: + *

    + *
  • Automatic failover and hedged routing at the granularity of a single physical partition (PK range).
  • + *
  • Dynamic enablement and disablement of PPAF at runtime by reflecting and overriding + * {@code GlobalEndpointManager}'s {@code DatabaseAccountManagerInternal} owner to toggle + * {@code DatabaseAccount#isPerPartitionFailoverBehaviorEnabled}.
  • + *
  • Write operation failover (Create, Replace, Upsert, Delete, Patch, Batch) under multiple failover‑eligible + * status/sub-status combinations (410/21005, 503/21008, 403/3, 408/*, gateway read timeouts).
  • + *
  • Non-write hedging behavior (point Read and Query variants) under region-scoped transient faults: + *
      + *
    • DIRECT mode: simulated server-generated 410 (sub-status 21005) scoped to a specific partition key range.
    • + *
    • GATEWAY mode: injected RESPONSE_DELAY faults via fault injection rules (query plan + query + read item).
    • + *
    + *
  • + *
  • Interaction with end-to-end latency policies (E2E timeout) as a gating mechanism for enabling failover logic.
  • + *
+ * + *

Connection Modes Covered: + *

    + *
  • DIRECT: Uses a mocked {@code TransportClient} to selectively throw {@code CosmosException} for a targeted + * (region + PK range) while returning success for others.
  • + *
  • GATEWAY: Uses a mocked {@code HttpClient} (or fault injection framework) to simulate service errors, network + * timeouts (socket/read), regional delays, or success responses.
  • + *
+ * + *

Phased Validation Patterns: + *

    + *
  • Pre-failover / Hedging Window: Verifies retries or region hedging (multi-region contacts) before + * PPCB-enforced failover, optionally repeated to satisfy E2E timeout activation thresholds.
  • + *
  • Post-failover / Stabilized: Ensures subsequent operations route directly to a healthy region + * (single-region contact, zero retries) unless query semantics (e.g., query plan retrieval) require multi-region access.
  • + *
  • Dynamic Enablement Toggle: For selected 503 scenarios, validates behavior transitions + * Disabled → Enabled → Disabled, confirming routing and diagnostics adapt immediately after + * {@code refreshLocationAsync(true)}.
  • + *
+ * + *

Diagnostics Assertions: + * Each test inspects {@code CosmosDiagnostics} to assert: + *

    + *
  • Contacted region count (hedged vs stabilized).
  • + *
  • Retry count bounds (min/max) aligned with scenario expectations.
  • + *
  • Final HTTP status classification (success vs expected failure when failover gated).
  • + *
  • Consistency across response types (item, batch, feed, or exception paths).
  • + *
+ * + *

Key Internal Mechanisms: + *

    + *
  • Reflection-based access to internal SDK components (e.g., {@code RxDocumentClientImpl}, + * {@code StoreReader}, {@code ConsistencyWriter}) to inject mocked transport layers.
  • + *
  • Custom delegating {@code DatabaseAccountManagerInternal} wrapper that conditionally sets + * the per-partition failover flag on retrieved {@code DatabaseAccount} snapshots.
  • + *
  • Fault injection rules in GATEWAY mode to apply controlled latency (RESPONSE_DELAY) per region and operation type.
  • + *
  • Reusable operation dispatch via a functional resolver mapping {@code OperationType} to execution lambdas + * returning a uniform {@code ResponseWrapper} abstraction.
  • + *
+ * + *

Query Variants (QueryFlavor): + *

    + *
  • {@code NONE}: Point read (readItem).
  • + *
  • {@code READ_ALL}: {@code readAllItems} over a single partition key.
  • + *
  • {@code READ_MANY}: {@code readMany} with one or more item identities.
  • + *
  • {@code QUERY_ITEMS}: Standard SQL query; may still contact original region for query plan acquisition even + * after stabilization (thus dual-region diagnostics may persist).
  • + *
+ * + *

End-to-End Latency Policy Integration: + * Tests optionally apply a short-circuit latency policy to: + *

    + *
  • Simulate threshold-based activation (e.g., property {@code COSMOS.E2E_TIMEOUT_ERROR_HIT_THRESHOLD_FOR_PPAF}).
  • + *
  • Differentiate pre-threshold (no failover) vs post-threshold (failover enabled) diagnostics for the same fault.
  • + *
+ * + *

Batch Operation Coverage: + * Batch scenarios ensure that failover and diagnostics behaviors remain consistent with single-item operations, + * including mock batch response materialization and hedging logic validation.

+ * + *

Safety & Cleanup: + * Each scenario ensures: + *

    + *
  • System properties used to gate PPAF or E2E behaviors are cleared in {@code finally} blocks.
  • + *
  • Clients are safely disposed to avoid cross-test interference.
  • + *
+ * + *

Usage Notes: + * This suite relies on internal APIs and reflection hooks not intended for production use. It is crafted specifically + * for validation of resilience, routing, and diagnostics fidelity across complex multi-region and transient-fault + * conditions. Adjustments to internal SDK contracts may require corresponding test maintenance.

+ * + *

Failure Interpretation: + * A test failure typically indicates one of: + *

    + *
  • Unexpected retry amplification or suppression.
  • + *
  • Incorrect region routing (e.g., failover not triggered or not stabilized).
  • + *
  • Diagnostics context regression (missing region names, status codes, or retry metrics).
  • + *
  • Latency policy mis-integration (threshold not honored).
  • + *
+ * + *

Extensibility: + * Additional scenarios (e.g., new fault types, new operation categories, multi-partition batch coverage, or read feed streaming) + * can be added by: + *

    + *
  1. Extending the appropriate {@code @DataProvider} with new parameter rows.
  2. + *
  3. Enhancing {@code resolveDataPlaneOperation} for new operation abstractions.
  4. + *
  5. Adding new fault injection builders or transport client predicates.
  6. + *
+ * + *

All validations aim to ensure that PPAF delivers predictable, minimal-latency routing under regional fault pressure + * while preserving observability through {@code CosmosDiagnostics}.

+ */ public class PerPartitionAutomaticFailoverE2ETests extends TestSuiteBase { private CosmosAsyncDatabase sharedDatabase; @@ -155,6 +282,184 @@ public PerPartitionAutomaticFailoverE2ETests(CosmosClientBuilder clientBuilder) super(clientBuilder); } + // Non-write dynamic enablement scenarios: READ and QUERY (with flavors) under SERVER_GENERATED_GONE and RESPONSE_DELAY + @DataProvider(name = "ppafNonWriteDynamicEnablementScenarios") + public Object[][] ppafNonWriteDynamicEnablementScenarios() { + + Set onlyDirect = new HashSet<>(); + onlyDirect.add(ConnectionMode.DIRECT); + + Set onlyGateway = new HashSet<>(); + onlyGateway.add(ConnectionMode.GATEWAY); + + return new Object[][]{ + // GONE (DIRECT only) + { + "Dynamic non-write: READ with SERVER_GENERATED_GONE (DIRECT)", + OperationType.Read, + QueryFlavor.NONE, + FaultKind.SERVER_GENERATED_GONE, + HttpConstants.StatusCodes.OK, + onlyDirect + }, + { + "Dynamic non-write: QUERY (readAll) with SERVER_GENERATED_GONE (DIRECT)", + OperationType.Query, + QueryFlavor.READ_ALL, + FaultKind.SERVER_GENERATED_GONE, + HttpConstants.StatusCodes.OK, + onlyDirect + }, + { + "Dynamic non-write: QUERY (readMany) with SERVER_GENERATED_GONE (DIRECT)", + OperationType.Query, + QueryFlavor.READ_MANY, + FaultKind.SERVER_GENERATED_GONE, + HttpConstants.StatusCodes.OK, + onlyDirect + }, + { + "Dynamic non-write: QUERY (queryItems) with SERVER_GENERATED_GONE (DIRECT)", + OperationType.Query, + QueryFlavor.QUERY_ITEMS, + FaultKind.SERVER_GENERATED_GONE, + HttpConstants.StatusCodes.OK, + onlyDirect + }, + + // RESPONSE_DELAY (GATEWAY only) + { + "Dynamic non-write: READ with RESPONSE_DELAY (GATEWAY)", + OperationType.Read, + QueryFlavor.NONE, + FaultKind.RESPONSE_DELAY, + HttpConstants.StatusCodes.OK, + onlyGateway + }, + { + "Dynamic non-write: QUERY (readAll) with RESPONSE_DELAY (GATEWAY)", + OperationType.Query, + QueryFlavor.READ_ALL, + FaultKind.RESPONSE_DELAY, + HttpConstants.StatusCodes.OK, + onlyGateway + }, + { + "Dynamic non-write: QUERY (readMany) with RESPONSE_DELAY (GATEWAY)", + OperationType.Query, + QueryFlavor.READ_MANY, + FaultKind.RESPONSE_DELAY, + HttpConstants.StatusCodes.OK, + onlyGateway + }, + { + "Dynamic non-write: QUERY (queryItems) with RESPONSE_DELAY (GATEWAY)", + OperationType.Query, + QueryFlavor.QUERY_ITEMS, + FaultKind.RESPONSE_DELAY, + HttpConstants.StatusCodes.OK, + onlyGateway + } + }; + } + + @DataProvider(name = "ppafDynamicEnablement503Only") + public Object[][] ppafDynamicEnablement503Only() { + + // When PPAF is disabled -> expect no success, single region contacted (no failover) + ExpectedResponseCharacteristics expectedWhenDisabled = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(0) + .setShouldFinalResponseHaveSuccess(false) + .setExpectedRegionsContactedCount(1); + + // When PPAF is enabled -> expect success, single region contacted (directly routed to healthy) + ExpectedResponseCharacteristics expectedWhenEnabled = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(1) + .setShouldFinalResponseHaveSuccess(true) + .setExpectedRegionsContactedCount(2); + + return new Object[][]{ + { + "Dynamic enablement: CREATE with SERVICE_UNAVAILABLE/503", + OperationType.Create, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.CREATED, + expectedWhenDisabled, + expectedWhenEnabled, + false, + false, + false, + ALL_CONNECTION_MODES + }, + { + "Dynamic enablement: REPLACE with SERVICE_UNAVAILABLE/503", + OperationType.Replace, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.OK, + expectedWhenDisabled, + expectedWhenEnabled, + false, + false, + false, + ALL_CONNECTION_MODES + }, + { + "Dynamic enablement: UPSERT with SERVICE_UNAVAILABLE/503", + OperationType.Upsert, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.OK, + expectedWhenDisabled, + expectedWhenEnabled, + false, + false, + false, + ALL_CONNECTION_MODES + }, + { + "Dynamic enablement: DELETE with SERVICE_UNAVAILABLE/503", + OperationType.Delete, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.NOT_MODIFIED, + expectedWhenDisabled, + expectedWhenEnabled, + false, + false, + false, + ALL_CONNECTION_MODES + }, + { + "Dynamic enablement: PATCH with SERVICE_UNAVAILABLE/503", + OperationType.Patch, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.OK, + expectedWhenDisabled, + expectedWhenEnabled, + false, + false, + false, + ALL_CONNECTION_MODES + }, + { + "Dynamic enablement: BATCH with SERVICE_UNAVAILABLE/503", + OperationType.Batch, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.OK, + expectedWhenDisabled, + expectedWhenEnabled, + false, + false, + false, + ALL_CONNECTION_MODES + } + }; + } + @BeforeClass(groups = {"multi-region"}) public void beforeClass() { CosmosAsyncClient cosmosAsyncClient = getClientBuilder().buildAsyncClient(); @@ -915,17 +1220,33 @@ public Object[][] ppafTestConfigsWithWriteOps() { }; } - // testPpafWithWriteFailoverWithEligibleErrorStatusCodes does the following: - // for DIRECT connection mode, - // an availability failure (410, 503, 408) or write forbidden failure (403/3) is injected - // for a given partitionKeyRange and region through mocking - // the first operation execution for a given operation type is expected to see failures and then failover (403/3s & 503s & 408s (not e2e timeout hit) are retried and e2e time out hit (408:20008) just see the operation fail) - // the second operation execution should see the request go straight away to the failed over region - caveat is when e2e timeout is hit, only after x failures does a failover happen - // for GATEWAY connection mode, - // an availability failure (503, 408), write forbidden failure (403/3) and I/O failures are injected - // for a given region through mocking - // the first operation execution for a given operation type is expected to see failures and then failover (403/3s & 503s & 408s (not e2e timeout hit) are retried and e2e time out hit (408:20008) just see the operation fail) - // the second operation execution should see the request go straight away to the failed over region - caveat is when e2e timeout is hit, only after x failures does a failover happen + /** + * End-to-end validation of Per-Partition Automatic Failover (PPAF) for write operations + * (Create, Replace, Upsert, Delete, Patch, Batch) when a failover-eligible fault is injected + * for one partition key range in the first preferred region. + * + *

Phases:

+ *
    + *
  • Pre-failover: injected error surfaces; request retries and/or hedges (unless gated by E2E timeout threshold).
  • + *
  • Post-failover: subsequent request routes directly to healthy region (single region, zero retries) unless + * E2E timeout gating still accumulating threshold.
  • + *
+ * + *

Mechanics:

+ *
    + *
  • DIRECT: TransportClient mocked; targeted (region + PK range) throws configured CosmosException.
  • + *
  • GATEWAY: HttpClient mocked; targeted region URI throws CosmosException or network exception (read/socket timeout) or delayed fault.
  • + *
  • GlobalEndpointManager owner replaced with delegating manager to surface dynamic PPAF enablement flag.
  • + *
  • E2E latency policy optionally applied; threshold (COSMOS.E2E_TIMEOUT_ERROR_HIT_THRESHOLD_FOR_PPAF) controls activation.
  • + *
+ * + *

Assertions:

+ *
    + *
  • Regions contacted count (before vs after failover).
  • + *
  • Retry count bounds.
  • + *
  • Success vs failure based on phase and configuration.
  • + *
+ */ @Test(groups = {"multi-region"}, dataProvider = "ppafTestConfigsWithWriteOps") public void testPpafWithWriteFailoverWithEligibleErrorStatusCodes( String testType, @@ -960,8 +1281,6 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodes( System.setProperty("COSMOS.E2E_TIMEOUT_ERROR_HIT_THRESHOLD_FOR_PPAF", "2"); } - System.setProperty("COSMOS.IS_PER_PARTITION_AUTOMATIC_FAILOVER_ENABLED", "true"); - CosmosClientBuilder cosmosClientBuilder = getClientBuilder(); // todo: evaluate whether Batch operation needs op-level e2e timeout and availability strategy @@ -1014,6 +1333,17 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodes( regionalRoutingContextWithIssues, cosmosException); + // Swap GlobalEndpointManager.owner to a delegating wrapper that toggles PPAF flag on DatabaseAccount + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + DatabaseAccountManagerInternal originalOwner = ReflectionUtils.getGlobalEndpointManagerOwner(globalEndpointManager); + + AtomicReference ppafEnabledRef = new AtomicReference<>(Boolean.TRUE); + DatabaseAccountManagerInternal overridingOwner = new DelegatingDatabaseAccountManagerInternal(originalOwner, ppafEnabledRef); + ReflectionUtils.setGlobalEndpointManagerOwner(globalEndpointManager, overridingOwner); + + DatabaseAccount latestDatabaseAccountSnapshot = globalEndpointManager.getLatestDatabaseAccount(); + globalEndpointManager.refreshLocationAsync(latestDatabaseAccountSnapshot, true).block(); + TestObject testItem = TestObject.create(); Function> dataPlaneOperation = resolveDataPlaneOperation(operationType); @@ -1060,8 +1390,6 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodes( System.setProperty("COSMOS.E2E_TIMEOUT_ERROR_HIT_THRESHOLD_FOR_PPAF", "2"); } - System.setProperty("COSMOS.IS_PER_PARTITION_AUTOMATIC_FAILOVER_ENABLED", "true"); - CosmosClientBuilder cosmosClientBuilder = getClientBuilder(); // todo: evaluate whether Batch operation needs op-level e2e timeout and availability strategy @@ -1086,6 +1414,16 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodes( GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount(); + // Swap GlobalEndpointManager.owner to a delegating wrapper that toggles PPAF flag on DatabaseAccount + DatabaseAccountManagerInternal originalOwner = ReflectionUtils.getGlobalEndpointManagerOwner(globalEndpointManager); + + AtomicReference ppafEnabledRef = new AtomicReference<>(Boolean.TRUE); + DatabaseAccountManagerInternal overridingOwner = new DelegatingDatabaseAccountManagerInternal(originalOwner, ppafEnabledRef); + ReflectionUtils.setGlobalEndpointManagerOwner(globalEndpointManager, overridingOwner); + + DatabaseAccount latestDatabaseAccountSnapshot = globalEndpointManager.getLatestDatabaseAccount(); + globalEndpointManager.refreshLocationAsync(latestDatabaseAccountSnapshot, true).block(); + assertThat(preferredRegions).isNotNull(); assertThat(preferredRegions.size()).isGreaterThanOrEqualTo(1); @@ -1145,6 +1483,631 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodes( } } + /** + * Verifies per-partition automatic failover (PPAF) dynamic enablement by toggling + * DatabaseAccount#isPerPartitionFailoverBehaviorEnabled at runtime via a reflected override + * of GlobalEndpointManager.owner (DatabaseAccountManagerInternal). + * + *

Test strategy

+ *
    + *
  • Build a CosmosAsyncClient from the provided builder.
  • + *
  • Use ReflectionUtils to obtain GlobalEndpointManager from the underlying RxDocumentClient.
  • + *
  • Replace its private owner with a delegating DatabaseAccountManagerInternal that injects + * DatabaseAccount#setIsPerPartitionFailoverBehaviorEnabled(enabledRef.get()).
  • + *
  • Mock transport (DIRECT) or HttpClient (GATEWAY) to simulate a 503 on the primary region + * and success elsewhere, mirroring the base PPAF test.
  • + *
  • Run in phases:
  • + *
+ *
    + *
  1. PPAF disabled — expect failure characteristics (no success).
  2. + *
  3. PPAF enabled — expect success characteristics (routes to healthy).
  4. + *
  5. PPAF disabled again — expect failure again (toggle verified).
  6. + *
+ * + *

After each toggle, call refreshLocationAsync(forceRefresh=true) so GlobalEndpointManager + * observes the updated DatabaseAccount flags immediately.

+ * + *

Expectations are provided by the data provider: when disabled, the request should not succeed; + * when enabled, it should succeed. Works for both DIRECT and GATEWAY connection modes.

+ */ + @Test(groups = {"multi-region"}, dataProvider = "ppafDynamicEnablement503Only") + public void testPpafWithWriteFailoverWithEligibleErrorStatusCodesWithPpafDynamicEnablement( + String testType, + OperationType operationType, + int errorStatusCodeToMockFromPartitionInUnhealthyRegion, + int errorSubStatusCodeToMockFromPartitionInUnhealthyRegion, + int successStatusCode, + ExpectedResponseCharacteristics expectedResponseCharacteristicsWhenPpafIsDisabled, + ExpectedResponseCharacteristics expectedResponseCharacteristicsWhenPpafIsEnabled, + boolean shouldThrowNetworkError, + boolean shouldThrowReadTimeoutExceptionWhenNetworkError, + boolean shouldUseE2ETimeout, + Set allowedConnectionModes) { + + ConnectionPolicy connectionPolicy = COSMOS_CLIENT_BUILDER_ACCESSOR.getConnectionPolicy(getClientBuilder()); + ConnectionMode connectionMode = connectionPolicy.getConnectionMode(); + + if (!allowedConnectionModes.contains(connectionMode)) { + throw new SkipException(String.format("Test with type : %s not eligible for specified connection mode %s.", testType, connectionMode)); + } + + // DIRECT flow: swap transport client, inject error for primary region/PK range, and verify phase-by-phase + if (connectionMode == ConnectionMode.DIRECT) { + TransportClient transportClientMock = Mockito.mock(TransportClient.class); + List preferredRegions = this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions; + Map readableRegionNameToEndpoint = this.accountLevelLocationReadableLocationContext.regionNameToEndpoint; + Utils.ValueHolder cosmosAsyncClientValueHolder = new Utils.ValueHolder<>(); + + try { + CosmosClientBuilder cosmosClientBuilder = getClientBuilder(); + + if (operationType.equals(OperationType.Batch) && shouldUseE2ETimeout) { + cosmosClientBuilder.endToEndOperationLatencyPolicyConfig(THREE_SEC_E2E_TIMEOUT_POLICY); + } + + CosmosAsyncClient asyncClient = cosmosClientBuilder.buildAsyncClient(); + cosmosAsyncClientValueHolder.v = asyncClient; + + CosmosAsyncContainer asyncContainer = asyncClient + .getDatabase(this.sharedDatabase.getId()) + .getContainer(this.sharedSinglePartitionContainer.getId()); + + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(asyncClient); + + // Swap GlobalEndpointManager.owner to a delegating wrapper that toggles PPAF flag on DatabaseAccount + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + DatabaseAccountManagerInternal originalOwner = ReflectionUtils.getGlobalEndpointManagerOwner(globalEndpointManager); + + AtomicReference ppafEnabledRef = new AtomicReference<>(Boolean.FALSE); + DatabaseAccountManagerInternal overridingOwner = new DelegatingDatabaseAccountManagerInternal(originalOwner, ppafEnabledRef); + ReflectionUtils.setGlobalEndpointManagerOwner(globalEndpointManager, overridingOwner); + + StoreClient storeClient = ReflectionUtils.getStoreClient(rxDocumentClient); + ReplicatedResourceClient replicatedResourceClient = ReflectionUtils.getReplicatedResourceClient(storeClient); + ConsistencyReader consistencyReader = ReflectionUtils.getConsistencyReader(replicatedResourceClient); + StoreReader storeReader = ReflectionUtils.getStoreReader(consistencyReader); + + ConsistencyWriter consistencyWriter = ReflectionUtils.getConsistencyWriter(replicatedResourceClient); + Utils.ValueHolder> partitionKeyRangesForContainer + = getPartitionKeyRangesForContainer(asyncContainer, rxDocumentClient).block(); + + assertThat(partitionKeyRangesForContainer).isNotNull(); + assertThat(partitionKeyRangesForContainer.v).isNotNull(); + assertThat(partitionKeyRangesForContainer.v.size()).isGreaterThanOrEqualTo(1); + + PartitionKeyRange partitionKeyRangeWithIssues = partitionKeyRangesForContainer.v.get(0); + + assertThat(preferredRegions).isNotNull(); + assertThat(preferredRegions.size()).isGreaterThanOrEqualTo(1); + + String regionWithIssues = preferredRegions.get(0); + RegionalRoutingContext regionalRoutingContextWithIssues = new RegionalRoutingContext(new URI(readableRegionNameToEndpoint.get(regionWithIssues))); + + // Redirect all store calls through our mocked transport client + ReflectionUtils.setTransportClient(storeReader, transportClientMock); + ReflectionUtils.setTransportClient(consistencyWriter, transportClientMock); + + setupTransportClientToReturnSuccessResponse(transportClientMock, constructStoreResponse(operationType, successStatusCode)); + + CosmosException cosmosException = createCosmosException( + errorStatusCodeToMockFromPartitionInUnhealthyRegion, + errorSubStatusCodeToMockFromPartitionInUnhealthyRegion); + + setupTransportClientToThrowCosmosException( + transportClientMock, + partitionKeyRangeWithIssues, + regionalRoutingContextWithIssues, + cosmosException); + + TestObject testItem = TestObject.create(); + + Function> dataPlaneOperation = resolveDataPlaneOperation(operationType); + + OperationInvocationParamsWrapper operationInvocationParamsWrapper = new OperationInvocationParamsWrapper(); + operationInvocationParamsWrapper.asyncContainer = asyncContainer; + operationInvocationParamsWrapper.createdTestItem = testItem; + operationInvocationParamsWrapper.itemRequestOptions = shouldUseE2ETimeout ? new CosmosItemRequestOptions().setCosmosEndToEndOperationLatencyPolicyConfig(THREE_SEC_E2E_TIMEOUT_POLICY) : new CosmosItemRequestOptions(); + operationInvocationParamsWrapper.patchItemRequestOptions = shouldUseE2ETimeout ? new CosmosPatchItemRequestOptions().setCosmosEndToEndOperationLatencyPolicyConfig(THREE_SEC_E2E_TIMEOUT_POLICY) : new CosmosPatchItemRequestOptions(); + + // Phase 1: PPAF disabled -> expect characteristics provided for DISABLED + ppafEnabledRef.set(Boolean.FALSE); + globalEndpointManager.refreshLocationAsync(null, true).block(); + ResponseWrapper responseWithPpafDisabled = dataPlaneOperation.apply(operationInvocationParamsWrapper); + this.validateExpectedResponseCharacteristics.accept(responseWithPpafDisabled, expectedResponseCharacteristicsWhenPpafIsDisabled); + + // Phase 2: PPAF enabled -> expect characteristics provided for ENABLED + ppafEnabledRef.set(Boolean.TRUE); + globalEndpointManager.refreshLocationAsync(null, true).block(); + ResponseWrapper responseWithPpafEnabled = dataPlaneOperation.apply(operationInvocationParamsWrapper); + this.validateExpectedResponseCharacteristics.accept(responseWithPpafEnabled, expectedResponseCharacteristicsWhenPpafIsEnabled); + + // Phase 3: PPAF disabled again -> confirm behavior reverts + ppafEnabledRef.set(Boolean.FALSE); + globalEndpointManager.refreshLocationAsync(null, true).block(); + responseWithPpafDisabled = dataPlaneOperation.apply(operationInvocationParamsWrapper); + this.validateExpectedResponseCharacteristics.accept(responseWithPpafDisabled, expectedResponseCharacteristicsWhenPpafIsDisabled); + } catch (Exception e) { + Assertions.fail("The test ran into an exception {}", e); + } finally { + safeClose(cosmosAsyncClientValueHolder.v); + } + } + + // GATEWAY flow: swap RxGatewayStoreModel HttpClient, inject 503 on primary region and verify phases + if (connectionMode == ConnectionMode.GATEWAY) { + HttpClient mockedHttpClient = Mockito.mock(HttpClient.class); + List preferredRegions = this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions; + Map readableRegionNameToEndpoint = this.accountLevelLocationReadableLocationContext.regionNameToEndpoint; + Utils.ValueHolder cosmosAsyncClientValueHolder = new Utils.ValueHolder<>(); + + try { + CosmosClientBuilder cosmosClientBuilder = getClientBuilder(); + + if (operationType.equals(OperationType.Batch) && shouldUseE2ETimeout) { + cosmosClientBuilder.endToEndOperationLatencyPolicyConfig(THREE_SEC_E2E_TIMEOUT_POLICY); + } + + CosmosAsyncClient asyncClient = cosmosClientBuilder.buildAsyncClient(); + cosmosAsyncClientValueHolder.v = asyncClient; + + CosmosAsyncContainer asyncContainer = asyncClient + .getDatabase(this.sharedDatabase.getId()) + .getContainer(this.sharedSinglePartitionContainer.getId()); + + // Populate collection and PK range caches to ensure routing is initialized + asyncContainer.getFeedRanges().block(); + + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(asyncClient); + RxStoreModel rxStoreModel = ReflectionUtils.getGatewayProxy(rxDocumentClient); + + // Swap GlobalEndpointManager.owner to a delegating wrapper that toggles PPAF flag on DatabaseAccount + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + DatabaseAccountManagerInternal originalOwner = ReflectionUtils.getGlobalEndpointManagerOwner(globalEndpointManager); + + AtomicReference ppafEnabledRef = new AtomicReference<>(Boolean.FALSE); + DatabaseAccountManagerInternal overridingOwner = new DelegatingDatabaseAccountManagerInternal(originalOwner, ppafEnabledRef); + ReflectionUtils.setGlobalEndpointManagerOwner(globalEndpointManager, overridingOwner); + + DatabaseAccount databaseAccountForResponses = globalEndpointManager.getLatestDatabaseAccount(); + if (databaseAccountForResponses == null) { + // Ensure we have an initial snapshot + globalEndpointManager.refreshLocationAsync(null, true).block(); + databaseAccountForResponses = globalEndpointManager.getLatestDatabaseAccount(); + } + + assertThat(preferredRegions).isNotNull(); + assertThat(preferredRegions.size()).isGreaterThanOrEqualTo(1); + + String regionWithIssues = preferredRegions.get(0); + URI locationEndpointWithIssues = new URI(readableRegionNameToEndpoint.get(regionWithIssues) + "dbs/" + this.sharedDatabase.getId() + "/colls/" + this.sharedSinglePartitionContainer.getId() + "/docs"); + + // Redirect gateway calls through our mocked HttpClient + ReflectionUtils.setGatewayHttpClient(rxStoreModel, mockedHttpClient); + + setupHttpClientToReturnSuccessResponse(mockedHttpClient, operationType, databaseAccountForResponses, successStatusCode); + + CosmosException cosmosException = createCosmosException( + errorStatusCodeToMockFromPartitionInUnhealthyRegion, + errorSubStatusCodeToMockFromPartitionInUnhealthyRegion); + + setupHttpClientToThrowCosmosException( + mockedHttpClient, + locationEndpointWithIssues, + cosmosException, + shouldThrowNetworkError, + shouldThrowReadTimeoutExceptionWhenNetworkError, + shouldUseE2ETimeout); + + TestObject testItem = TestObject.create(); + + Function> dataPlaneOperation = resolveDataPlaneOperation(operationType); + + OperationInvocationParamsWrapper operationInvocationParamsWrapper = new OperationInvocationParamsWrapper(); + operationInvocationParamsWrapper.asyncContainer = asyncContainer; + operationInvocationParamsWrapper.createdTestItem = testItem; + operationInvocationParamsWrapper.itemRequestOptions = shouldUseE2ETimeout ? new CosmosItemRequestOptions().setCosmosEndToEndOperationLatencyPolicyConfig(THREE_SEC_E2E_TIMEOUT_POLICY) : new CosmosItemRequestOptions(); + operationInvocationParamsWrapper.patchItemRequestOptions = shouldUseE2ETimeout ? new CosmosPatchItemRequestOptions().setCosmosEndToEndOperationLatencyPolicyConfig(THREE_SEC_E2E_TIMEOUT_POLICY) : new CosmosPatchItemRequestOptions(); + + // Phase 1: PPAF disabled -> expect characteristics provided for DISABLED + ppafEnabledRef.set(Boolean.FALSE); + globalEndpointManager.refreshLocationAsync(null, true).block(); + ResponseWrapper responseWithPpafDisabled = dataPlaneOperation.apply(operationInvocationParamsWrapper); + this.validateExpectedResponseCharacteristics.accept(responseWithPpafDisabled, expectedResponseCharacteristicsWhenPpafIsDisabled); + + // Phase 2: PPAF enabled -> expect characteristics provided for ENABLED + ppafEnabledRef.set(Boolean.TRUE); + globalEndpointManager.refreshLocationAsync(null, true).block(); + ResponseWrapper responseWithPpafEnabled = dataPlaneOperation.apply(operationInvocationParamsWrapper); + this.validateExpectedResponseCharacteristics.accept(responseWithPpafEnabled, expectedResponseCharacteristicsWhenPpafIsEnabled); + + // Phase 3: PPAF disabled again -> confirm behavior reverts + ppafEnabledRef.set(Boolean.FALSE); + globalEndpointManager.refreshLocationAsync(null, true).block(); + responseWithPpafDisabled = dataPlaneOperation.apply(operationInvocationParamsWrapper); + this.validateExpectedResponseCharacteristics.accept(responseWithPpafDisabled, expectedResponseCharacteristicsWhenPpafIsDisabled); + } catch (Exception e) { + Assertions.fail("The test ran into an exception {}", e); + } finally { + safeClose(cosmosAsyncClientValueHolder.v); + } + } + } + + /** + * Validates dynamic Per-Partition Automatic Failover (PPAF) hedging behavior for non-write operations + * (point Read and Query variants). + * + *

Fault models:

+ *
    + *
  • DIRECT: SERVER_GENERATED_GONE (HTTP 410 / substatus 21005) for a targeted partition key range + * in the first preferred region.
  • + *
  • GATEWAY: RESPONSE_DELAY injected (via fault injection rules) for the first preferred region + * (applied to read item, query plan, and query operations).
  • + *
+ * + *

QueryFlavor mapping:

+ *
    + *
  • NONE: point read (readItem).
  • + *
  • READ_ALL: readAllItems.
  • + *
  • READ_MANY: readMany with supplied identities.
  • + *
  • QUERY_ITEMS: queryItems (requires query plan; may still contact original region post-stabilization).
  • + *
+ * + *

Phases asserted:

+ *
    + *
  1. Hedging window (multiple consecutive injected faults): + *
      + *
    • DIRECT (410): expect >=1 retry and 2 contacted regions.
    • + *
    • GATEWAY (delay): expect 0 retries and 2 contacted regions (hedged).
    • + *
    + *
  2. + *
  3. Post-window stabilization: + *
      + *
    • Routes directly to healthy region (1 contacted region) except QUERY_ITEMS + * which may still require original region for query plan (thus 2).
    • + *
    • Expect 0 retries.
    • + *
    + *
  4. + *
+ * + *

Behavior is parameterized by the ppafNonWriteDynamicEnablementScenarios data provider: + * test type description, operationType (Read/Query), queryFlavor, faultKind, expected success + * status code, and allowed connection modes.

+ * + *

Dynamic enablement is achieved by overriding GlobalEndpointManager's owner to + * inject the PPAF flag into DatabaseAccount snapshots.

+ */ + @Test(groups = {"multi-region"}, dataProvider = "ppafNonWriteDynamicEnablementScenarios") + public void testFailoverBehaviorForNonWriteOperationsWithPpafDynamicEnablement( + String testType, + OperationType operationType, + QueryFlavor queryFlavor, + FaultKind faultKind, + int successStatusCode, + Set allowedConnectionModes) { + + ConnectionPolicy connectionPolicy = COSMOS_CLIENT_BUILDER_ACCESSOR.getConnectionPolicy(getClientBuilder()); + ConnectionMode connectionMode = connectionPolicy.getConnectionMode(); + + if (!allowedConnectionModes.contains(connectionMode)) { + throw new SkipException(String.format("Test with type : %s not eligible for specified connection mode %s.", testType, connectionMode)); + } + + final int consecutiveFaults = 10; + + // ===================== DIRECT MODE PATH ===================== + if (connectionMode == ConnectionMode.DIRECT) { + + // Build expectations (hedging window vs stabilized post-window) + ExpectedResponseCharacteristics expectedDuringWindow = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(1) // At least one retry due to first region failure + .setShouldFinalResponseHaveSuccess(true) + .setExpectedRegionsContactedCount(2); // Hedging to healthy region + + ExpectedResponseCharacteristics expectedAfterWindow = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(0) // Stable routing + .setExpectedMaxRetryCount(0) + .setShouldFinalResponseHaveSuccess(true) + // QUERY_ITEMS still requires query plan from original region -> 2 regions contacted + .setExpectedRegionsContactedCount(queryFlavor.equals(QueryFlavor.QUERY_ITEMS) ? 2 : 1); + + TransportClient transportClientMock = Mockito.mock(TransportClient.class); + List preferredRegions = this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions; + Map readableRegionNameToEndpoint = this.accountLevelLocationReadableLocationContext.regionNameToEndpoint; + Utils.ValueHolder cosmosAsyncClientValueHolder = new Utils.ValueHolder<>(); + + try { + // Build client and container + CosmosClientBuilder cosmosClientBuilder = getClientBuilder(); + CosmosAsyncClient asyncClient = cosmosClientBuilder.buildAsyncClient(); + cosmosAsyncClientValueHolder.v = asyncClient; + + CosmosAsyncContainer asyncContainer = asyncClient + .getDatabase(this.sharedDatabase.getId()) + .getContainer(this.sharedSinglePartitionContainer.getId()); + + // Reflection plumbing for internal components + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(asyncClient); + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + Mockito.when(transportClientMock.getGlobalEndpointManager()).thenReturn(globalEndpointManager); + + // Enable dynamic PPAF via delegating owner + DatabaseAccountManagerInternal originalOwner = ReflectionUtils.getGlobalEndpointManagerOwner(globalEndpointManager); + AtomicReference ppafEnabledRef = new AtomicReference<>(Boolean.TRUE); + DatabaseAccountManagerInternal overridingOwner = + new DelegatingDatabaseAccountManagerInternal(originalOwner, ppafEnabledRef); + ReflectionUtils.setGlobalEndpointManagerOwner(globalEndpointManager, overridingOwner); + + // Internal store clients + StoreClient storeClient = ReflectionUtils.getStoreClient(rxDocumentClient); + ReplicatedResourceClient replicatedResourceClient = ReflectionUtils.getReplicatedResourceClient(storeClient); + ConsistencyReader consistencyReader = ReflectionUtils.getConsistencyReader(replicatedResourceClient); + StoreReader storeReader = ReflectionUtils.getStoreReader(consistencyReader); + ConsistencyWriter consistencyWriter = ReflectionUtils.getConsistencyWriter(replicatedResourceClient); + + // Identify a PK range + first preferred region to fault + Utils.ValueHolder> partitionKeyRangesForContainer = + getPartitionKeyRangesForContainer(asyncContainer, rxDocumentClient).block(); + assertThat(partitionKeyRangesForContainer).isNotNull(); + assertThat(partitionKeyRangesForContainer.v).isNotNull(); + assertThat(partitionKeyRangesForContainer.v.size()).isGreaterThanOrEqualTo(1); + PartitionKeyRange partitionKeyRangeWithIssues = partitionKeyRangesForContainer.v.get(0); + + assertThat(preferredRegions).isNotNull(); + assertThat(preferredRegions.size()).isGreaterThanOrEqualTo(1); + String regionWithIssues = preferredRegions.get(0); + RegionalRoutingContext regionalRoutingContextWithIssues = + new RegionalRoutingContext(new URI(readableRegionNameToEndpoint.get(regionWithIssues))); + + // Wire mock transport client into reader + writer paths + ReflectionUtils.setTransportClient(storeReader, transportClientMock); + ReflectionUtils.setTransportClient(consistencyWriter, transportClientMock); + + // Success response when routed to healthy region + setupTransportClientToReturnSuccessResponse( + transportClientMock, + constructStoreResponse(operationType, successStatusCode)); + + if (faultKind != FaultKind.SERVER_GENERATED_GONE) { + throw new SkipException("DIRECT path only supports SERVER_GENERATED_GONE for this test."); + } + + // Inject 410/21005 for unhealthy region + CosmosException cosmosException = createCosmosException( + HttpConstants.StatusCodes.GONE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410); + + setupTransportClientToThrowCosmosException( + transportClientMock, + partitionKeyRangeWithIssues, + regionalRoutingContextWithIssues, + cosmosException); + + // Prepare operation invocation + TestObject testItem = TestObject.create(); + Function> dataPlaneOperation = + resolveDataPlaneOperation(operationType); + + OperationInvocationParamsWrapper params = new OperationInvocationParamsWrapper(); + params.asyncContainer = asyncContainer; + params.createdTestItem = testItem; + applyQueryFlavor(params, queryFlavor, testItem); + + // Force initial refresh so DatabaseAccount is loaded with PPAF flag + DatabaseAccount dbAccountSnapshot = globalEndpointManager.getLatestDatabaseAccount(); + if (dbAccountSnapshot == null) { + globalEndpointManager.refreshLocationAsync(null, true).block(); + } else { + globalEndpointManager.refreshLocationAsync(dbAccountSnapshot, true).block(); + } + + // Execute hedging + stabilization phases + runHedgingPhasesForNonWrite( + consecutiveFaults, + dataPlaneOperation, + params, + expectedDuringWindow, + expectedAfterWindow); + + } catch (Exception e) { + Assertions.fail("The test ran into an exception {}", e); + } finally { + safeClose(cosmosAsyncClientValueHolder.v); + } + } + + // ===================== GATEWAY MODE PATH ===================== + if (connectionMode == ConnectionMode.GATEWAY) { + + ExpectedResponseCharacteristics expectedDuringWindow = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(0) // Delay fault causes hedging without retries + .setExpectedMaxRetryCount(0) + .setShouldFinalResponseHaveSuccess(true) + .setExpectedRegionsContactedCount(2); + + ExpectedResponseCharacteristics expectedAfterWindow = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(0) + .setExpectedMaxRetryCount(0) + .setShouldFinalResponseHaveSuccess(true) + .setExpectedRegionsContactedCount(queryFlavor.equals(QueryFlavor.QUERY_ITEMS) ? 2 : 1); + + List preferredRegions = this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions; + Utils.ValueHolder cosmosAsyncClientValueHolder = new Utils.ValueHolder<>(); + + try { + // Build client + container + CosmosClientBuilder cosmosClientBuilder = getClientBuilder(); + CosmosAsyncClient asyncClient = cosmosClientBuilder.buildAsyncClient(); + cosmosAsyncClientValueHolder.v = asyncClient; + + CosmosAsyncContainer asyncContainer = asyncClient + .getDatabase(this.sharedDatabase.getId()) + .getContainer(this.sharedSinglePartitionContainer.getId()); + // Warm caches + asyncContainer.getFeedRanges().block(); + + RxDocumentClientImpl rxDocumentClient = + (RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(asyncClient); + + GlobalEndpointManager globalEndpointManager = + ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + + // Enable PPAF dynamically + DatabaseAccountManagerInternal originalOwner = + ReflectionUtils.getGlobalEndpointManagerOwner(globalEndpointManager); + AtomicReference ppafEnabledRef = new AtomicReference<>(Boolean.TRUE); + DatabaseAccountManagerInternal overridingOwner = + new DelegatingDatabaseAccountManagerInternal(originalOwner, ppafEnabledRef); + ReflectionUtils.setGlobalEndpointManagerOwner(globalEndpointManager, overridingOwner); + + assertThat(preferredRegions).isNotNull(); + assertThat(preferredRegions.size()).isGreaterThanOrEqualTo(1); + String regionWithIssues = preferredRegions.get(0); + + // Refresh DB account snapshot + DatabaseAccount dbAccountSnapshot = globalEndpointManager.getLatestDatabaseAccount(); + if (dbAccountSnapshot == null) { + globalEndpointManager.refreshLocationAsync(null, true).block(); + } else { + globalEndpointManager.refreshLocationAsync(dbAccountSnapshot, true).block(); + } + + if (faultKind != FaultKind.RESPONSE_DELAY) { + throw new SkipException("GATEWAY path only supports RESPONSE_DELAY for this test."); + } + + // Inject RESPONSE_DELAY faults using FIR (read item + query + query plan) + FeedRange fullRange = FeedRange.forFullRange(); + + FaultInjectionServerErrorResult responseDelayError = FaultInjectionResultBuilders + .getResultBuilder(FaultInjectionServerErrorType.RESPONSE_DELAY) + .delay(Duration.ofSeconds(10)) // long enough to trigger hedging + .suppressServiceRequests(false) + .build(); + + FaultInjectionCondition conditionForReadItem = new FaultInjectionConditionBuilder() + .connectionType(FaultInjectionConnectionType.GATEWAY) + .endpoints(new FaultInjectionEndpointBuilder(fullRange).build()) + .operationType(FaultInjectionOperationType.READ_ITEM) + .region(regionWithIssues) + .build(); + + FaultInjectionCondition conditionForQueryPlan = new FaultInjectionConditionBuilder() + .connectionType(FaultInjectionConnectionType.GATEWAY) + .endpoints(new FaultInjectionEndpointBuilder(fullRange).build()) + .operationType(FaultInjectionOperationType.METADATA_REQUEST_QUERY_PLAN) + .region(regionWithIssues) + .build(); + + FaultInjectionCondition conditionForQuery = new FaultInjectionConditionBuilder() + .connectionType(FaultInjectionConnectionType.GATEWAY) + .endpoints(new FaultInjectionEndpointBuilder(fullRange).build()) + .operationType(FaultInjectionOperationType.QUERY_ITEM) + .region(regionWithIssues) + .build(); + + String ruleId = String.format("response-delay-%s", UUID.randomUUID()); + + FaultInjectionRule queryPlanResponseDelayFIRule = new FaultInjectionRuleBuilder(ruleId + "-qp") + .condition(conditionForQueryPlan) + .result(responseDelayError) + .build(); + + FaultInjectionRule queryResponseDelayFIRule = new FaultInjectionRuleBuilder(ruleId + "-q") + .condition(conditionForQuery) + .result(responseDelayError) + .build(); + + FaultInjectionRule readItemResponseDelayFIRule = new FaultInjectionRuleBuilder(ruleId + "-r") + .condition(conditionForReadItem) + .result(responseDelayError) + .build(); + + CosmosFaultInjectionHelper + .configureFaultInjectionRules( + asyncContainer, + Arrays.asList(queryPlanResponseDelayFIRule, queryResponseDelayFIRule, readItemResponseDelayFIRule)) + .block(); + + // Seed item for read/readMany scenarios + TestObject testItem = TestObject.create(); + asyncContainer.createItem(testItem).block(); + + // Prepare params + operation + Function> dataPlaneOperation = + resolveDataPlaneOperation(operationType); + OperationInvocationParamsWrapper params = new OperationInvocationParamsWrapper(); + params.asyncContainer = asyncContainer; + params.createdTestItem = testItem; + applyQueryFlavor(params, queryFlavor, testItem); + + // Execute hedging + stabilization phases + runHedgingPhasesForNonWrite( + consecutiveFaults, + dataPlaneOperation, + params, + expectedDuringWindow, + expectedAfterWindow); + + } catch (Exception e) { + Assertions.fail("The test ran into an exception {}", e); + } finally { + safeClose(cosmosAsyncClientValueHolder.v); + } + } + } + + /** + * Helper: Executes the hedging window (multiple consecutive fault attempts) followed by a single post-window verification. + */ + private void runHedgingPhasesForNonWrite( + int consecutiveFaults, + Function> dataPlaneOperation, + OperationInvocationParamsWrapper params, + ExpectedResponseCharacteristics expectedDuringWindow, + ExpectedResponseCharacteristics expectedAfterWindow) { + + // Hedging window iterations + for (int i = 0; i < consecutiveFaults; i++) { + ResponseWrapper response = dataPlaneOperation.apply(params); + this.validateExpectedResponseCharacteristics.accept(response, expectedDuringWindow); + } + + // Stabilized post-window request + ResponseWrapper postWindow = dataPlaneOperation.apply(params); + this.validateExpectedResponseCharacteristics.accept(postWindow, expectedAfterWindow); + } + + private static class DelegatingDatabaseAccountManagerInternal implements DatabaseAccountManagerInternal { + private final DatabaseAccountManagerInternal delegate; + private final AtomicReference ppafEnabledRef; + + DelegatingDatabaseAccountManagerInternal(DatabaseAccountManagerInternal delegate, AtomicReference ppafEnabledRef) { + this.delegate = delegate; + this.ppafEnabledRef = ppafEnabledRef; + } + + @Override + public Flux getDatabaseAccountFromEndpoint(URI endpoint) { + return delegate.getDatabaseAccountFromEndpoint(endpoint) + .map(dbAccount -> { + Boolean enabled = ppafEnabledRef.get(); + dbAccount.setIsPerPartitionFailoverBehaviorEnabled(enabled); + return dbAccount; + }); + } + + @Override + public ConnectionPolicy getConnectionPolicy() { + return delegate.getConnectionPolicy(); + } + + @Override + public URI getServiceEndpoint() { + return delegate.getServiceEndpoint(); + } + } + private void setupTransportClientToThrowCosmosException( TransportClient transportClientMock, PartitionKeyRange partitionKeyRange, @@ -1329,9 +2292,10 @@ private StoreResponse constructStoreResponse(OperationType operationType, int st private static class AccountLevelLocationContext { - private final List serviceOrderedReadableRegions; - private final List serviceOrderedWriteableRegions; - private final Map regionNameToEndpoint; + private final List serviceOrderedReadableRegions; + @SuppressWarnings("unused") + private final List serviceOrderedWriteableRegions; + private final Map regionNameToEndpoint; public AccountLevelLocationContext( List serviceOrderedReadableRegions, @@ -1384,7 +2348,7 @@ private Function> resolveDa CosmosItemResponse readItemResponse = asyncContainer.readItem( createdTestObject.getId(), - new PartitionKey(createdTestObject.getId()), + new PartitionKey(createdTestObject.getMypk()), itemRequestOptions, TestObject.class) .block(); @@ -1411,7 +2375,7 @@ private Function> resolveDa CosmosItemResponse upsertItemResponse = asyncContainer.upsertItem( createdTestObject, - new PartitionKey(createdTestObject.getId()), + new PartitionKey(createdTestObject.getMypk()), itemRequestOptions) .block(); @@ -1437,7 +2401,7 @@ private Function> resolveDa CosmosItemResponse createItemResponse = asyncContainer.createItem( createdTestObject, - new PartitionKey(createdTestObject.getId()), + new PartitionKey(createdTestObject.getMypk()), itemRequestOptions) .block(); @@ -1491,7 +2455,7 @@ private Function> resolveDa CosmosItemResponse patchItemResponse = asyncContainer.patchItem( createdTestObject.getId(), - new PartitionKey(createdTestObject.getId()), + new PartitionKey(createdTestObject.getMypk()), patchOperations, patchItemRequestOptions, TestObject.class) @@ -1514,13 +2478,28 @@ private Function> resolveDa CosmosAsyncContainer asyncContainer = paramsWrapper.asyncContainer; CosmosQueryRequestOptions queryRequestOptions = paramsWrapper.queryRequestOptions == null ? new CosmosQueryRequestOptions() : paramsWrapper.queryRequestOptions; queryRequestOptions = paramsWrapper.feedRangeForQuery == null ? queryRequestOptions.setFeedRange(FeedRange.forFullRange()) : queryRequestOptions.setFeedRange(paramsWrapper.feedRangeForQuery); + String sql = paramsWrapper.querySql != null ? paramsWrapper.querySql : "SELECT * FROM c"; try { + // If applyQueryFlavor requested readAllItems or readMany, use those operations instead of query + if (paramsWrapper.readAllPartitionKey != null) { + FeedResponse readAllResponse = asyncContainer + .readAllItems(paramsWrapper.readAllPartitionKey, TestObject.class) + .byPage() + .blockLast(); + return new ResponseWrapper<>(readAllResponse); + } - FeedResponse queryItemResponse = asyncContainer.queryItems( - "SELECT * FROM C", - queryRequestOptions, - TestObject.class) + if (paramsWrapper.readManyIdentities != null && !paramsWrapper.readManyIdentities.isEmpty()) { + FeedResponse readManyResponse = asyncContainer + .readMany(paramsWrapper.readManyIdentities, TestObject.class) + .block(); + return new ResponseWrapper<>(readManyResponse); + } + + // Fallback: regular queryItems + FeedResponse queryItemResponse = asyncContainer + .queryItems(sql, queryRequestOptions, TestObject.class) .byPage() .blockLast(); @@ -1656,6 +2635,11 @@ private static class OperationInvocationParamsWrapper { public CosmosItemRequestOptions patchItemRequestOptions; public FeedRange feedRangeToDrainForChangeFeed; public FeedRange feedRangeForQuery; + public String querySql; + // For QueryFlavor.READ_ALL + public PartitionKey readAllPartitionKey; + // For QueryFlavor.READ_MANY + public List readManyIdentities; } private static class ExpectedResponseCharacteristics { @@ -1703,7 +2687,8 @@ private static class FakeBatchResponse { private String retryAfterMilliseconds; - public int getStatusCode() { + @SuppressWarnings("unused") + public int getStatusCode() { return statusCode; } @@ -1712,7 +2697,8 @@ public FakeBatchResponse setStatusCode(int statusCode) { return this; } - public int getSubStatusCode() { + @SuppressWarnings("unused") + public int getSubStatusCode() { return subStatusCode; } @@ -1721,7 +2707,8 @@ public FakeBatchResponse setSubStatusCode(int subStatusCode) { return this; } - public double getRequestCharge() { + @SuppressWarnings("unused") + public double getRequestCharge() { return requestCharge; } @@ -1730,7 +2717,8 @@ public FakeBatchResponse setRequestCharge(double requestCharge) { return this; } - public String geteTag() { + @SuppressWarnings("unused") + public String geteTag() { return eTag; } @@ -1739,7 +2727,8 @@ public FakeBatchResponse seteTag(String eTag) { return this; } - public Object getResourceBody() { + @SuppressWarnings("unused") + public Object getResourceBody() { return resourceBody; } @@ -1748,7 +2737,8 @@ public FakeBatchResponse setResourceBody(Object resourceBody) { return this; } - public String getRetryAfterMilliseconds() { + @SuppressWarnings("unused") + public String getRetryAfterMilliseconds() { return retryAfterMilliseconds; } @@ -1758,6 +2748,60 @@ public FakeBatchResponse setRetryAfterMilliseconds(String retryAfterMilliseconds } } + private enum FaultKind { + SERVER_GENERATED_GONE, + RESPONSE_DELAY + } + + private enum QueryFlavor { + NONE, // Not a query + READ_ALL, // SELECT * FROM c + READ_MANY, // Simulate with IN clause + QUERY_ITEMS // Arbitrary filter + } + + private void applyQueryFlavor(OperationInvocationParamsWrapper params, QueryFlavor flavor, TestObject seed) { + if (flavor == QueryFlavor.NONE) { + // Do not set CosmosQueryRequestOptions explicitly + params.querySql = null; + params.readAllPartitionKey = null; + params.readManyIdentities = null; + return; + } + + // Do not set CosmosQueryRequestOptions explicitly; default behavior will be used + + switch (flavor) { + case READ_ALL: + // Map to readAllItems on the container using the seed's partition key + String pkReadAll = seed != null ? seed.getMypk() : UUID.randomUUID().toString(); + params.readAllPartitionKey = new PartitionKey(pkReadAll); + params.querySql = null; + params.readManyIdentities = null; + break; + case READ_MANY: + // Map to readMany with one or more identities using the seed + String id = seed != null ? seed.getId() : UUID.randomUUID().toString(); + String pkReadMany = seed != null ? seed.getMypk() : UUID.randomUUID().toString(); + PartitionKey pkValue = new PartitionKey(pkReadMany); + List identities = new ArrayList<>(); + identities.add(new CosmosItemIdentity(pkValue, id)); + params.readManyIdentities = identities; + params.readAllPartitionKey = null; + params.querySql = null; + break; + case QUERY_ITEMS: + params.querySql = "SELECT * FROM c WHERE IS_DEFINED(c.mypk)"; + params.readAllPartitionKey = null; + params.readManyIdentities = null; + break; + default: + params.querySql = "SELECT * FROM c"; + params.readAllPartitionKey = null; + params.readManyIdentities = null; + } + } + private HttpResponse createResponse(int statusCode, OperationType operationType, ResourceType resourceType, DatabaseAccount databaseAccount, TestPojo testPojo) { HttpResponse httpResponse = new HttpResponse() { @Override diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionCircuitBreakerE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionCircuitBreakerE2ETests.java index 7cbae5749237..2fa9a5e2693e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionCircuitBreakerE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionCircuitBreakerE2ETests.java @@ -4731,7 +4731,7 @@ private static List buildGwResponseDelayInjectionRulesNotSco FaultInjectionServerErrorResult faultInjectionServerErrorResult = FaultInjectionResultBuilders .getResultBuilder(FaultInjectionServerErrorType.RESPONSE_DELAY) .delay(paramsWrapper.getResponseDelay()) - .suppressServiceRequests(false) + .suppressServiceRequests(true) .build(); List faultInjectionRules = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/UserAgentSuffixTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/UserAgentSuffixTest.java index 9b74cba3ca05..d91b97327f80 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/UserAgentSuffixTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/UserAgentSuffixTest.java @@ -6,6 +6,7 @@ package com.azure.cosmos; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.models.CosmosContainerResponse; import com.azure.cosmos.rx.TestSuiteBase; import org.testng.annotations.AfterClass; @@ -53,7 +54,7 @@ public void userAgentSuffixWithoutSpecialCharacter() { assertThat(response.getProperties()).isNotNull(); assertThat(response.getProperties().getId()).isEqualTo(this.containerName); assertThat(response.getDiagnostics()).isNotNull(); - assertThat(response.getDiagnostics().getUserAgent()).endsWith("TestUserAgent"); + validateUserAgentSuffix(response.getDiagnostics().getUserAgent(), "TestUserAgent"); } @Test(groups = { "fast", "emulator" }, timeOut = TIMEOUT) @@ -70,7 +71,7 @@ public void userAgentSuffixWithSpecialCharacter() { assertThat(response.getProperties()).isNotNull(); assertThat(response.getProperties().getId()).isEqualTo(this.containerName); assertThat(response.getDiagnostics()).isNotNull(); - assertThat(response.getDiagnostics().getUserAgent()).endsWith("TestUserAgent's"); + validateUserAgentSuffix(response.getDiagnostics().getUserAgent(), "TestUserAgent's"); } @Test(groups = { "fast", "emulator" }, timeOut = TIMEOUT) @@ -87,7 +88,7 @@ public void userAgentSuffixWithUnicodeCharacter() { assertThat(response.getProperties()).isNotNull(); assertThat(response.getProperties().getId()).isEqualTo(this.containerName); assertThat(response.getDiagnostics()).isNotNull(); - assertThat(response.getDiagnostics().getUserAgent()).endsWith("UnicodeChar_InUserAgent"); + validateUserAgentSuffix(response.getDiagnostics().getUserAgent(), "UnicodeChar_InUserAgent"); } @Test(groups = { "fast", "emulator" }, timeOut = TIMEOUT) @@ -104,6 +105,15 @@ public void userAgentSuffixWithWhitespaceAndAsciiSpecialChars() { assertThat(response.getProperties()).isNotNull(); assertThat(response.getProperties().getId()).isEqualTo(this.containerName); assertThat(response.getDiagnostics()).isNotNull(); - assertThat(response.getDiagnostics().getUserAgent()).endsWith("UserAgent with space$%_^()*&"); + validateUserAgentSuffix(response.getDiagnostics().getUserAgent(), "UserAgent with space$%_^()*&"); + } + + private void validateUserAgentSuffix(String actualUserAgent, String expectedUserAgentSuffix) { + + if (Configs.isHttp2Enabled()) { + expectedUserAgentSuffix = expectedUserAgentSuffix + "|F10"; + } + + assertThat(actualUserAgent).endsWith(expectedUserAgentSuffix); } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index 41b0445a14cf..cf126f74b3fe 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -10,6 +10,7 @@ import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.implementation.ApiType; import com.azure.cosmos.implementation.AsyncDocumentClient; +import com.azure.cosmos.implementation.DatabaseAccountManagerInternal; import com.azure.cosmos.implementation.ClientSideRequestStatistics; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ConnectionPolicy; @@ -248,6 +249,14 @@ public static GlobalEndpointManager getGlobalEndpointManager(RxDocumentClientImp return get(GlobalEndpointManager.class, rxDocumentClient, "globalEndpointManager"); } + public static DatabaseAccountManagerInternal getGlobalEndpointManagerOwner(GlobalEndpointManager globalEndpointManager) { + return get(DatabaseAccountManagerInternal.class, globalEndpointManager, "owner"); + } + + public static void setGlobalEndpointManagerOwner(GlobalEndpointManager globalEndpointManager, DatabaseAccountManagerInternal newOwner) { + set(globalEndpointManager, newOwner, "owner"); + } + public static void setThinProxy(RxDocumentClientImpl client, RxStoreModel storeModel) { set(client, storeModel, "thinProxy"); } diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 2af84251114a..5019932cfd5f 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.75.0-beta.1 (Unreleased) #### Features Added +* Enabled `Cosmos(Async)Client` to support per-partition automatic failover dynamically without the need to restart the application. - See [PR 46477](https://github.com/Azure/azure-sdk-for-java/pull/46477) * AAD Auth: Added a fallback mechanism for AAD audience scope. - [PR 46637](https://github.com/Azure/azure-sdk-for-java/pull/46637) #### Breaking Changes @@ -19,6 +20,7 @@ #### Features Added * Added `ThroughputBucket` support for throughput control. - [PR 46042](https://github.com/Azure/azure-sdk-for-java/pull/46042) +* AAD Auth: Adds a fallback mechanism for AAD audience scope. - [PR 46637](https://github.com/Azure/azure-sdk-for-java/pull/46637) #### Bugs Fixed * Fixed 404/1002 for query when container recreated with same name. - [PR 45930](https://github.com/Azure/azure-sdk-for-java/pull/45930) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java index 4a0409a594b0..d540bf33a082 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java @@ -64,7 +64,7 @@ public void serialize(DiagnosticsClientConfig clientConfig, JsonGenerator genera generator.writeStringField("machineId", ClientTelemetry.getMachineId(clientConfig)); generator.writeStringField("connectionMode", clientConfig.getConnectionMode().toString()); generator.writeNumberField("numberOfClients", clientConfig.getActiveClientsCount()); - generator.writeStringField("isPpafEnabled", Configs.isPerPartitionAutomaticFailoverEnabled()); + generator.writeStringField("isPpafEnabled", clientConfig.isPerPartitionAutomaticFailoverEnabledAsString); generator.writeStringField("isFalseProgSessionTokenMergeEnabled", Configs.isSessionTokenFalseProgressMergeEnabled() ? "true" : "false"); generator.writeStringField("excrgns", clientConfig.excludedRegionsRelatedConfig()); generator.writeObjectFieldStart("clientEndpoints"); @@ -132,6 +132,7 @@ class DiagnosticsClientConfig { private String sessionRetryOptionsAsString; private String regionScopedSessionContainerOptionsAsString; private String partitionLevelCircuitBreakerConfigAsString; + private String isPerPartitionAutomaticFailoverEnabledAsString = "false"; public DiagnosticsClientConfig withMachineId(String machineId) { this.machineId = machineId; @@ -254,6 +255,11 @@ public DiagnosticsClientConfig withPartitionLevelCircuitBreakerConfig(PartitionL return this; } + public DiagnosticsClientConfig withIsPerPartitionAutomaticFailoverEnabled(Boolean isPpafEnabled) { + this.isPerPartitionAutomaticFailoverEnabledAsString = (isPpafEnabled != null && isPpafEnabled) ? "true" : "false"; + return this; + } + public DiagnosticsClientConfig withRegionScopedSessionContainerOptions(RegionScopedSessionContainer regionScopedSessionContainer) { if (regionScopedSessionContainer == null) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index c5fae9c43669..dd464f48a15e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -21,8 +21,10 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -46,6 +48,7 @@ public class GlobalEndpointManager implements AutoCloseable { private volatile boolean isClosed; private volatile DatabaseAccount latestDatabaseAccount; private final AtomicBoolean hasThinClientReadLocations = new AtomicBoolean(false); + private final AtomicBoolean lastRecordedPerPartitionAutomaticFailoverEnabledOnClient = new AtomicBoolean(Configs.isPerPartitionAutomaticFailoverEnabled().equalsIgnoreCase("true")); private final ReentrantReadWriteLock.WriteLock databaseAccountWriteLock; @@ -53,6 +56,8 @@ public class GlobalEndpointManager implements AutoCloseable { private volatile Throwable latestDatabaseRefreshError; + private volatile Consumer perPartitionAutomaticFailoverConfigModifier; + public void setLatestDatabaseRefreshError(Throwable latestDatabaseRefreshError) { this.latestDatabaseRefreshError = latestDatabaseRefreshError; } @@ -186,6 +191,7 @@ public boolean canUseMultipleWriteLocations(RxDocumentServiceRequest request) { public void close() { this.isClosed = true; + this.perPartitionAutomaticFailoverConfigModifier = null; this.scheduler.dispose(); logger.debug("GlobalEndpointManager closed."); } @@ -365,6 +371,22 @@ private Mono getDatabaseAccountAsync(URI serviceEndpoint) { databaseAccount.getThinClientReadableLocations(); this.hasThinClientReadLocations.set(thinClientReadLocations != null && !thinClientReadLocations.isEmpty()); + Boolean currentPerPartitionAutomaticFailoverEnabledFromService = + databaseAccount.isPerPartitionFailoverBehaviorEnabled(); + + if (currentPerPartitionAutomaticFailoverEnabledFromService != null) { + boolean newVal = currentPerPartitionAutomaticFailoverEnabledFromService; + // Attempt to flip only if the value actually changes. + if (this.lastRecordedPerPartitionAutomaticFailoverEnabledOnClient + .compareAndSet(!newVal, newVal)) { + if (this.perPartitionAutomaticFailoverConfigModifier != null) { + logger.info("ATTN: Per partition automatic failover enabled: {}, applying modifier", + currentPerPartitionAutomaticFailoverEnabledFromService); + this.perPartitionAutomaticFailoverConfigModifier.accept(databaseAccount); + } + } + } + this.setLatestDatabaseRefreshError(null); } finally { this.databaseAccountWriteLock.unlock(); @@ -411,4 +433,8 @@ private List getEffectivePreferredRegions() { this.databaseAccountReadLock.unlock(); } } + + public void setPerPartitionAutomaticFailoverConfigModifier(Consumer perPartitionAutomaticFailoverConfigModifier) { + this.perPartitionAutomaticFailoverConfigModifier = perPartitionAutomaticFailoverConfigModifier; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 61ed1ee76460..cbd29c82e3d6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -19,6 +19,7 @@ import com.azure.cosmos.CosmosItemSerializer; import com.azure.cosmos.CosmosOperationPolicy; import com.azure.cosmos.DirectConnectionConfig; +import com.azure.cosmos.Http2ConnectionConfig; import com.azure.cosmos.ReadConsistencyStrategy; import com.azure.cosmos.SessionRetryOptions; import com.azure.cosmos.ThresholdBasedAvailabilityStrategy; @@ -278,6 +279,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization private List operationPolicies; private final AtomicReference cachedCosmosAsyncClientSnapshot; private CosmosEndToEndOperationLatencyPolicyConfig ppafEnforcedE2ELatencyPolicyConfigForReads; + private Consumer perPartitionFailoverConfigModifier; public RxDocumentClientImpl(URI serviceEndpoint, String masterKeyOrResourceToken, @@ -792,7 +794,15 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func this.globalEndpointManager, this.reactorHttpClient); + this.perPartitionFailoverConfigModifier + = (databaseAccount -> { + this.initializePerPartitionFailover(databaseAccount); + this.addUserAgentSuffix(this.userAgentContainer, EnumSet.allOf(UserAgentFeatureFlags.class)); + }); + + this.globalEndpointManager.setPerPartitionAutomaticFailoverConfigModifier(this.perPartitionFailoverConfigModifier); this.globalEndpointManager.init(); + this.initializePerPartitionCircuitBreaker(); DatabaseAccount databaseAccountSnapshot = this.initializeGatewayConfigurationReader(); this.resetSessionContainerIfNeeded(databaseAccountSnapshot); @@ -857,7 +867,6 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func && readConsistencyStrategy != ReadConsistencyStrategy.SESSION && !sessionCapturingOverrideEnabled); this.sessionContainer.setDisableSessionCapturing(updatedDisableSessionCapturing); - this.initializePerPartitionFailover(databaseAccountSnapshot); this.addUserAgentSuffix(this.userAgentContainer, EnumSet.allOf(UserAgentFeatureFlags.class)); } catch (Exception e) { logger.error("unexpected failure in initializing client.", e); @@ -1291,6 +1300,22 @@ private Flux> createQueryInternal( UUID activityId, final AtomicBoolean isQueryCancelledOnTimeout) { + // reevaluate e2e policy config on cosmosQueryRequestOptions + if (options != null) { + CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfigFromRequestOptions = + getEndToEndOperationLatencyPolicyConfig( + ImplementationBridgeHelpers + .CosmosQueryRequestOptionsHelper + .getCosmosQueryRequestOptionsAccessor() + .toRequestOptions(options), + resourceTypeEnum, + OperationType.Query); + + if (endToEndPolicyConfigFromRequestOptions != null) { + options.setCosmosEndToEndOperationLatencyPolicyConfig(endToEndPolicyConfigFromRequestOptions); + } + } + Flux> executionContext = DocumentQueryExecutionContextFactory .createDocumentQueryExecutionContextAsync(diagnosticsClientContext, queryClient, resourceTypeEnum, klass, sqlQuery, @@ -1452,6 +1477,30 @@ private void addUserAgentSuffix(UserAgentContainer userAgentContainer, Set toDocumentServiceResponse(Mono toDocumentServiceResponse(Mono userAgentFeatureFlags) { + if (userAgentFeatureFlags == null || userAgentFeatureFlags.isEmpty()) { return; } - int value = 0; + writeLock.lock(); + try { - for (UserAgentFeatureFlags userAgentFeatureFlag : userAgentFeatureFlags) { - value += userAgentFeatureFlag.getValue(); - } + int value = 0; + + for (UserAgentFeatureFlags userAgentFeatureFlag : userAgentFeatureFlags) { + value += userAgentFeatureFlag.getValue(); + } - this.userAgent = this.userAgent + "|F" + value; + this.userAgent = !Strings.isNullOrEmpty(this.baseUserAgentWithSuffix) ? this.baseUserAgentWithSuffix : this.baseUserAgent; + this.userAgent = this.userAgent + "|F" + Integer.toHexString(value).toUpperCase(Locale.ROOT); + } finally { + writeLock.unlock(); + } } public void setSuffix(String suffix) { - if (suffix.length() > maxSuffixLength) { - suffix = suffix.substring(0, maxSuffixLength); + writeLock.lock(); + try { + if (suffix == null) { + suffix = ""; + } + + if (suffix.length() > maxSuffixLength) { + suffix = suffix.substring(0, maxSuffixLength); + } + + this.suffix = suffix; + this.userAgent = stripNonAsciiCharacters(baseUserAgent.concat(" ").concat(this.suffix)); + this.baseUserAgentWithSuffix = this.userAgent; + } finally { + writeLock.unlock(); } - - this.suffix = suffix; - this.userAgent = stripNonAsciiCharacters(baseUserAgent.concat(" ").concat(this.suffix)); } public String getUserAgent() { - return this.userAgent; + readLock.lock(); + try { + return this.userAgent; + } finally { + readLock.unlock(); + } } private static String stripNonAsciiCharacters(String input) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/UserAgentFeatureFlags.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/UserAgentFeatureFlags.java index da65133bbec9..aa8da74a02d3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/UserAgentFeatureFlags.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/UserAgentFeatureFlags.java @@ -16,7 +16,10 @@ */ public enum UserAgentFeatureFlags { PerPartitionAutomaticFailover(1), - PerPartitionCircuitBreaker(1 << 1); + PerPartitionCircuitBreaker(1 << 1), + ThinClient(1 << 2), + // BinaryEncoding(1 << 3), + Http2(1 << 4); private final int value; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java index c1c0c431f8fa..d50643f0e5a9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java @@ -325,8 +325,15 @@ public boolean isPerPartitionAutomaticFailoverApplicable(RxDocumentServiceReques return false; } - public void resetPerPartitionAutomaticFailoverEnabled(boolean isPerPartitionAutomaticFailoverEnabled) { + public synchronized void resetPerPartitionAutomaticFailoverEnabled(boolean isPerPartitionAutomaticFailoverEnabled) { this.isPerPartitionAutomaticFailoverEnabled.set(isPerPartitionAutomaticFailoverEnabled); + this.clear(); + } + + private void clear() { + this.partitionKeyRangeToFailoverInfo.clear(); + this.partitionKeyRangeToEndToEndTimeoutErrorTracker.clear(); + this.warnLevelLoggedCounts.set(0); } private static void logAsWarnOrDebug(String message, AtomicInteger warnLogThreshold) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java index 41f711fb4857..89bb15d10ec4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java @@ -48,6 +48,7 @@ public class GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker impleme private final AtomicReference globalAddressResolverSnapshot; private final ConcurrentHashMap regionalRoutingContextToRegion; private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final AtomicBoolean isPartitionRecoveryTaskRunning = new AtomicBoolean(false); private final Scheduler partitionRecoveryScheduler = Schedulers.newSingle( "partition-availability-staleness-check", true); @@ -65,8 +66,8 @@ public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker(GlobalEndpoin } public void init() { - if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled()) { - this.updateStaleLocationInfo().subscribeOn(this.partitionRecoveryScheduler).subscribe(); + if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled() && !this.isPartitionRecoveryTaskRunning.get()) { + this.updateStaleLocationInfo().subscribeOn(this.partitionRecoveryScheduler).doOnSubscribe(ignore -> this.isPartitionRecoveryTaskRunning.set(true)).subscribe(); } } @@ -554,14 +555,18 @@ public PartitionLevelCircuitBreakerConfig getCircuitBreakerConfig() { return this.consecutiveExceptionBasedCircuitBreaker.getPartitionLevelCircuitBreakerConfig(); } - public synchronized void resetCircuitBreakerConfig() { - PartitionLevelCircuitBreakerConfig partitionLevelCircuitBreakerConfig - = Configs.getPartitionLevelCircuitBreakerConfig(); - + public synchronized void resetCircuitBreakerConfig(PartitionLevelCircuitBreakerConfig partitionLevelCircuitBreakerConfig) { this.consecutiveExceptionBasedCircuitBreaker = new ConsecutiveExceptionBasedCircuitBreaker(partitionLevelCircuitBreakerConfig); this.locationSpecificHealthContextTransitionHandler = new LocationSpecificHealthContextTransitionHandler(this.consecutiveExceptionBasedCircuitBreaker); + + this.clear(); + } + + private void clear() { + this.partitionKeyRangeToLocationSpecificUnavailabilityInfo.clear(); + this.regionalRoutingContextToRegion.clear(); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/PartitionLevelCircuitBreakerConfig.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/PartitionLevelCircuitBreakerConfig.java index 672fa5ef830d..011337201301 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/PartitionLevelCircuitBreakerConfig.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/PartitionLevelCircuitBreakerConfig.java @@ -76,4 +76,20 @@ public static PartitionLevelCircuitBreakerConfig fromJsonString(String jsonStrin throw new RuntimeException("Unable to convert from Json String", e); } } + + public static PartitionLevelCircuitBreakerConfig fromExplicitArgs( + Boolean isPartitionLevelCircuitBreakerEnabled) { + + PartitionLevelCircuitBreakerConfig config = new PartitionLevelCircuitBreakerConfig(); + + if (isPartitionLevelCircuitBreakerEnabled != null) { + config.isPartitionLevelCircuitBreakerEnabled = isPartitionLevelCircuitBreakerEnabled; + } + + config.circuitBreakerType = DEFAULT.getCircuitBreakerType(); + config.consecutiveExceptionCountToleratedForReads = DEFAULT.getConsecutiveExceptionCountToleratedForReads(); + config.consecutiveExceptionCountToleratedForWrites = DEFAULT.getConsecutiveExceptionCountToleratedForWrites(); + + return config; + } }