diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index b33c2a331e..009ce064b8 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -4,19 +4,26 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.uber.m3.tally.Scope; +import io.temporal.api.workflowservice.v1.DescribeNamespaceRequest; +import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.common.converter.DataConverter; import io.temporal.internal.client.WorkflowClientInternal; import io.temporal.internal.sync.WorkflowThreadExecutor; import io.temporal.internal.task.VirtualThreadDelegate; -import io.temporal.internal.worker.*; +import io.temporal.internal.worker.ShutdownManager; import io.temporal.internal.worker.WorkflowExecutorCache; +import io.temporal.internal.worker.WorkflowRunLockManager; import io.temporal.serviceclient.MetricsTag; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -196,9 +203,14 @@ public synchronized void start() { // Workers check and require that Temporal Server is available during start to fail-fast in case // of configuration issues. - // TODO(https://github.com/temporalio/sdk-java/issues/2060) consider using describeNamespace as - // a connection check. - workflowClient.getWorkflowServiceStubs().getServerCapabilities(); + DescribeNamespaceResponse response = + workflowClient + .getWorkflowServiceStubs() + .blockingStub() + .describeNamespace( + DescribeNamespaceRequest.newBuilder() + .setNamespace(workflowClient.getOptions().getNamespace()) + .build()); for (Worker worker : workers.values()) { worker.start(); diff --git a/temporal-sdk/src/test/java/io/temporal/workerFactory/WorkerFactoryTests.java b/temporal-sdk/src/test/java/io/temporal/workerFactory/WorkerFactoryTests.java index e228508376..856ba8dcce 100644 --- a/temporal-sdk/src/test/java/io/temporal/workerFactory/WorkerFactoryTests.java +++ b/temporal-sdk/src/test/java/io/temporal/workerFactory/WorkerFactoryTests.java @@ -1,9 +1,14 @@ package io.temporal.workerFactory; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.worker.WorkerFactory; @@ -128,4 +133,24 @@ public void factoryCanBeShutdownMoreThanOnce() { factory.shutdown(); factory.awaitTermination(1, TimeUnit.MILLISECONDS); } + + @Test + public void startFailsOnNonexistentNamespace() { + WorkflowServiceStubs serviceLocal = + WorkflowServiceStubs.newServiceStubs( + WorkflowServiceStubsOptions.newBuilder().setTarget(serviceAddress).build()); + WorkflowClient clientLocal = + WorkflowClient.newInstance( + serviceLocal, WorkflowClientOptions.newBuilder().setNamespace("i_dont_exist").build()); + WorkerFactory factoryLocal = WorkerFactory.newInstance(clientLocal); + factoryLocal.newWorker("task-queue"); + + StatusRuntimeException ex = assertThrows(StatusRuntimeException.class, factoryLocal::start); + assertEquals(Status.Code.NOT_FOUND, ex.getStatus().getCode()); + + factoryLocal.shutdownNow(); + factoryLocal.awaitTermination(5, TimeUnit.SECONDS); + serviceLocal.shutdownNow(); + serviceLocal.awaitTermination(5, TimeUnit.SECONDS); + } } diff --git a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/WorkerVersioningTest.java b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/WorkerVersioningTest.java index a21380b9c1..5c0481b4ab 100644 --- a/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/WorkerVersioningTest.java +++ b/temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/WorkerVersioningTest.java @@ -2,6 +2,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.EventType; import io.temporal.api.enums.v1.VersioningBehavior; @@ -11,7 +13,14 @@ import io.temporal.common.WorkflowExecutionHistory; import io.temporal.spring.boot.autoconfigure.workerversioning.TestWorkflow; import io.temporal.spring.boot.autoconfigure.workerversioning.TestWorkflow2; -import org.junit.jupiter.api.*; +import io.temporal.worker.WorkerFactory; +import java.time.Duration; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.ConfigurableApplicationContext; @@ -20,7 +29,7 @@ import org.springframework.test.context.ActiveProfiles; @SpringBootTest(classes = WorkerVersioningTest.Configuration.class) -@ActiveProfiles(profiles = "worker-versioning") +@ActiveProfiles(profiles = {"worker-versioning", "disable-start-workers"}) @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class WorkerVersioningTest { @Autowired ConfigurableApplicationContext applicationContext; @@ -43,15 +52,13 @@ void setUp() { @Test @Timeout(value = 10) public void testAutoDiscovery() { - workflowClient - .getWorkflowServiceStubs() - .blockingStub() - .setWorkerDeploymentCurrentVersion( - SetWorkerDeploymentCurrentVersionRequest.newBuilder() - .setNamespace(workflowClient.getOptions().getNamespace()) - .setDeploymentName("dname") - .setVersion("dname.bid") - .build()); + // Manually start the worker because we disable automatic worker start, due to + // automatic worker start running prior to the docker check, which causes namespace + // errors when running in-mem unit tests + WorkerFactory workerFactory = applicationContext.getBean(WorkerFactory.class); + workerFactory.start(); + + setCurrentVersionWithRetry(); TestWorkflow testWorkflow = workflowClient.newWorkflowStub( @@ -84,6 +91,36 @@ public void testAutoDiscovery() { == VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE)); } + @SuppressWarnings("deprecation") + private void setCurrentVersionWithRetry() { + long deadline = System.currentTimeMillis() + Duration.ofSeconds(10).toMillis(); + while (true) { + try { + workflowClient + .getWorkflowServiceStubs() + .blockingStub() + .setWorkerDeploymentCurrentVersion( + SetWorkerDeploymentCurrentVersionRequest.newBuilder() + .setNamespace(workflowClient.getOptions().getNamespace()) + .setDeploymentName("dname") + .setVersion("dname.bid") + .build()); + return; + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() != Status.Code.NOT_FOUND + || System.currentTimeMillis() > deadline) { + throw e; + } + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } + } + } + @ComponentScan( excludeFilters = @ComponentScan.Filter(