Skip to content

Commit 2bef04f

Browse files
authored
Add a test for completion of async activity with error (#307)
1 parent 351a8b9 commit 2bef04f

File tree

2 files changed

+115
-0
lines changed

2 files changed

+115
-0
lines changed

temporal-sdk/src/main/java/io/temporal/internal/external/ManualActivityCompletionClientImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.temporal.common.converter.DataConverter;
3535
import io.temporal.failure.CanceledFailure;
3636
import io.temporal.failure.FailureConverter;
37+
import io.temporal.failure.TemporalFailure;
3738
import io.temporal.internal.common.GrpcRetryer;
3839
import io.temporal.internal.common.OptionsUtils;
3940
import io.temporal.serviceclient.WorkflowServiceStubs;
@@ -149,6 +150,9 @@ public void fail(Throwable exception) {
149150
if (exception == null) {
150151
throw new IllegalArgumentException("null exception");
151152
}
153+
if (exception instanceof TemporalFailure) {
154+
((TemporalFailure) exception).setDataConverter(dataConverter);
155+
}
152156
// When converting failures reason is class name, details are serialized exception.
153157
if (taskToken != null) {
154158
RespondActivityTaskFailedRequest request =
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.workflow;
21+
22+
import io.temporal.activity.*;
23+
import io.temporal.client.ActivityCompletionClient;
24+
import io.temporal.client.WorkflowOptions;
25+
import io.temporal.common.RetryOptions;
26+
import io.temporal.failure.ApplicationFailure;
27+
import io.temporal.testing.TestWorkflowRule;
28+
import java.time.Duration;
29+
import java.util.concurrent.ForkJoinPool;
30+
import org.junit.Assert;
31+
import org.junit.Rule;
32+
import org.junit.Test;
33+
34+
public class AsyncActivityCompleteWithErrorTest {
35+
36+
@Rule
37+
public TestWorkflowRule testWorkflowRule =
38+
TestWorkflowRule.newBuilder()
39+
.setWorkflowTypes(TestWorkflowImpl.class)
40+
.setActivityImplementations(new AsyncActivityWithManualCompletion())
41+
.setUseExternalService(Boolean.parseBoolean(System.getenv("USE_DOCKER_SERVICE")))
42+
.setTarget(System.getenv("TEMPORAL_SERVICE_ADDRESS"))
43+
.build();
44+
45+
@WorkflowInterface
46+
public interface TestWorkflow {
47+
48+
@WorkflowMethod
49+
String execute(String taskQueue);
50+
}
51+
52+
public static class TestWorkflowImpl implements TestWorkflow {
53+
54+
@Override
55+
public String execute(String taskQueue) {
56+
TestActivity activity =
57+
Workflow.newActivityStub(
58+
TestActivity.class,
59+
ActivityOptions.newBuilder()
60+
.setScheduleToStartTimeout(Duration.ofSeconds(1))
61+
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
62+
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
63+
.build());
64+
Promise<Integer> promise = Async.function(activity::execute);
65+
RuntimeException failure = promise.getFailure();
66+
Assert.assertNotNull(failure);
67+
Assert.assertTrue(failure.getCause() instanceof ApplicationFailure);
68+
ApplicationFailure cause = (ApplicationFailure) failure.getCause();
69+
Assert.assertEquals("simulated failure", cause.getOriginalMessage());
70+
Assert.assertEquals("some details", cause.getDetails().get(String.class));
71+
Assert.assertEquals("test", cause.getType());
72+
return "success";
73+
}
74+
}
75+
76+
@ActivityInterface
77+
public interface TestActivity {
78+
79+
@ActivityMethod
80+
int execute();
81+
}
82+
83+
public static class AsyncActivityWithManualCompletion implements TestActivity {
84+
@Override
85+
public int execute() {
86+
ActivityExecutionContext context = Activity.getExecutionContext();
87+
ActivityCompletionClient completionClient = context.useLocalManualCompletion();
88+
ForkJoinPool.commonPool().execute(() -> asyncActivityFn(context, completionClient));
89+
return 0;
90+
}
91+
92+
private void asyncActivityFn(
93+
ActivityExecutionContext context, ActivityCompletionClient completionClient) {
94+
completionClient.completeExceptionally(
95+
context.getTaskToken(),
96+
ApplicationFailure.newFailure("simulated failure", "test", "some details"));
97+
}
98+
}
99+
100+
@Test
101+
public void verifyActivityCompletionClientCompleteExceptionally() {
102+
String taskQueue = testWorkflowRule.getTaskQueue();
103+
TestWorkflow workflow =
104+
testWorkflowRule
105+
.getWorkflowClient()
106+
.newWorkflowStub(
107+
TestWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(taskQueue).build());
108+
String result = workflow.execute(taskQueue);
109+
Assert.assertEquals("success", result);
110+
}
111+
}

0 commit comments

Comments
 (0)