Skip to content

Commit c4b8add

Browse files
authored
Fixing timer firing bug (#461)
* Trying to fix the timer firing bug that causes timer to not firing according to their schedule when used in Async threads. * Add extra protection * More fixes to the timer logic
1 parent e503a2c commit c4b8add

File tree

3 files changed

+51
-22
lines changed

3 files changed

+51
-22
lines changed

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ public void runUntilAllBlocked() throws Throwable {
240240
toExecuteInWorkflowThread.clear();
241241
progress = false;
242242
Iterator<WorkflowThread> ci = threads.iterator();
243-
nextWakeUpTime = 0;
243+
nextWakeUpTime = Long.MAX_VALUE;
244244
while (ci.hasNext()) {
245245
WorkflowThread c = ci.next();
246246
progress = c.runUntilBlocked() || progress;
@@ -256,7 +256,7 @@ public void runUntilAllBlocked() throws Throwable {
256256
}
257257
} else {
258258
long t = c.getBlockedUntil();
259-
if (t > nextWakeUpTime) {
259+
if (t > currentTimeMillis() && t < nextWakeUpTime) {
260260
nextWakeUpTime = t;
261261
}
262262
}
@@ -269,7 +269,8 @@ public void runUntilAllBlocked() throws Throwable {
269269
threads.addLast(c);
270270
}
271271
} while (progress && !threads.isEmpty());
272-
if (nextWakeUpTime < currentTimeMillis()) {
272+
273+
if (nextWakeUpTime < currentTimeMillis() || nextWakeUpTime == Long.MAX_VALUE) {
273274
nextWakeUpTime = 0;
274275
}
275276
} finally {

src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.List;
6464
import java.util.Map;
6565
import java.util.Optional;
66+
import java.util.UUID;
6667
import java.util.concurrent.CancellationException;
6768
import java.util.concurrent.CompletableFuture;
6869
import java.util.concurrent.ExecutionException;
@@ -543,16 +544,17 @@ public static class TestTimerCancellationWorkflow implements TestWorkflow {
543544

544545
@Override
545546
public String workflow1(String input) {
546-
Promise<Void> s = Async.procedure(() -> Workflow.sleep(Duration.ofDays(1)));
547+
long startTime = Workflow.currentTimeMillis();
548+
Promise<Void> s = Async.procedure(() -> Workflow.sleep(Duration.ofHours(3)));
547549
TestActivity activity = Workflow.newActivityStub(TestActivity.class);
548-
try {
549-
activity.activity1("input");
550-
Workflow.sleep(Duration.ofDays(3));
551-
} catch (CancellationException e) {
552-
return "cancelled";
553-
} finally {
554-
s.get();
550+
activity.activity1("input");
551+
Workflow.sleep(Duration.ofHours(1));
552+
s.get();
553+
long endTime = Workflow.currentTimeMillis();
554+
if (Duration.ofMillis(endTime - startTime).compareTo(Duration.ofHours(3)) < 0) {
555+
fail("workflow sleep interrupted unexpectedly");
555556
}
557+
556558
return "result";
557559
}
558560
}
@@ -564,22 +566,20 @@ public void testTimerCancellation() throws TException {
564566
worker.registerActivitiesImplementations(new ActivityImpl());
565567
testEnvironment.start();
566568
WorkflowClient client = testEnvironment.newWorkflowClient();
567-
TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class);
568-
WorkflowExecution execution = WorkflowClient.start(workflow::workflow1, "input1");
569-
WorkflowStub untyped = client.newUntypedWorkflowStub(execution, Optional.empty());
570-
testEnvironment.sleep(Duration.ofHours(1));
571-
untyped.cancel();
572-
try {
573-
untyped.getResult(String.class);
574-
fail("unreacheable");
575-
} catch (CancellationException e) {
576-
}
569+
570+
String workflowID = UUID.randomUUID().toString();
571+
TestWorkflow workflow =
572+
client.newWorkflowStub(
573+
TestWorkflow.class, new WorkflowOptions.Builder().setWorkflowId(workflowID).build());
574+
String result = workflow.workflow1("input1");
575+
assertEquals("result", result);
576+
577577
History history =
578578
testEnvironment
579579
.getWorkflowService()
580580
.GetWorkflowExecutionHistory(
581581
new GetWorkflowExecutionHistoryRequest()
582-
.setExecution(execution)
582+
.setExecution(new WorkflowExecution().setWorkflowId(workflowID))
583583
.setDomain(client.getDomain()))
584584
.getHistory();
585585
List<HistoryEvent> historyEvents = history.getEvents();

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,34 @@ public void testSync() {
432432
"executeActivity TestActivities::activity2");
433433
}
434434

435+
public interface TestMultipleTimers {
436+
@WorkflowMethod
437+
long execute();
438+
}
439+
440+
public static class TestMultipleTimersImpl implements TestMultipleTimers {
441+
442+
@Override
443+
public long execute() {
444+
Promise<Void> t1 = Async.procedure(() -> Workflow.sleep(Duration.ofSeconds(1)));
445+
Promise<Void> t2 = Async.procedure(() -> Workflow.sleep(Duration.ofSeconds(2)));
446+
long start = Workflow.currentTimeMillis();
447+
Promise.anyOf(t1, t2).get();
448+
long elapsed = Workflow.currentTimeMillis() - start;
449+
return elapsed;
450+
}
451+
}
452+
453+
@Test
454+
public void testMultipleTimers() {
455+
startWorkerFor(TestMultipleTimersImpl.class);
456+
TestMultipleTimers workflowStub =
457+
workflowClient.newWorkflowStub(
458+
TestMultipleTimers.class, newWorkflowOptionsBuilder(taskList).build());
459+
long result = workflowStub.execute();
460+
assertTrue("should be around 1 second: " + result, result < 2000);
461+
}
462+
435463
public static class TestActivityRetryWithMaxAttempts implements TestWorkflow1 {
436464

437465
@Override

0 commit comments

Comments
 (0)