Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,27 +127,27 @@ private <P> Completion pushDownStream(List<DecatonProcessor<P>> downstreams, P t
log.error("Exception from tracing", e);
}
nextProcessor.process(nextContext, taskData);
} finally {
try {
traceHandle.processingReturn();
} catch (Exception e) {
log.error("Exception from tracing", e);
}
} finally {
completion = nextContext.deferredCompletion.get();
if (completion == null) {
// If process didn't requested for deferred completion, we understand it as process
// completed synchronously.
completion = CompletionImpl.completedCompletion();
}
}

completion.asFuture().whenComplete((unused, throwable) -> {
try {
traceHandle.processingCompletion();
} catch (Exception e) {
log.error("Exception from tracing", e);
}
});
completion.asFuture().whenComplete((unused, throwable) -> {
try {
traceHandle.processingCompletion();
} catch (Exception e) {
log.error("Exception from tracing", e);
}
});
}
return completion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -475,7 +476,7 @@ public void testTrace_Async() throws InterruptedException {
assertFalse(comp.isComplete());
// The trace for this processor should no longer be "current"
// (because sync execution has finished)
// but it is should still be "open"
// but it should still be "open"
assertNull(TestTracingProvider.getCurrentTraceId());
assertThat(TestTracingProvider.getOpenTraces(), hasItem("testTrace_Async-Async"));

Expand All @@ -488,6 +489,127 @@ public void testTrace_Async() throws InterruptedException {
// Trace ID is not propagated unless the implementation does so manually
assertNull(traceDuringAsyncProcessing.get());
} finally {
executor.shutdownNow();
handle.processingCompletion();
}
TestTracingProvider.assertAllTracesWereClosed();
}

@Test
@Timeout(5)
public void testTrace_SyncFailure() {
RecordTraceHandle handle = new TestTraceHandle("testTrace");
final AtomicReference<String> traceDuringProcessing = new AtomicReference<>();
try {
ProcessingContextImpl<HelloTask> context = context(handle,
new NamedProcessor("SyncFail", (ctx, task) -> {
traceDuringProcessing
.set(TestTracingProvider.getCurrentTraceId());
throw new RuntimeException("task failure");
}));

RuntimeException e = assertThrows(RuntimeException.class, () -> context.push(TASK));
assertEquals("task failure", e.getMessage());
assertNull(TestTracingProvider.getCurrentTraceId());
assertEquals("testTrace-SyncFail", traceDuringProcessing.get());
// Trace is closed normally
assertThat(TestTracingProvider.getOpenTraces(), not(hasItem("testTrace-SyncFail")));
} finally {
handle.processingCompletion();
}
TestTracingProvider.assertAllTracesWereClosed();
}

@Test
@Timeout(5)
public void testTrace_AsyncFailure() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ExecutorService executor = Executors.newSingleThreadExecutor();
RecordTraceHandle handle = new TestTraceHandle("testTrace");
final AtomicReference<String> traceDuringSyncProcessing = new AtomicReference<>();
final AtomicReference<String> traceDuringAsyncProcessing = new AtomicReference<>();
try {
ProcessingContextImpl<HelloTask> context =
context(handle, new NamedProcessor("AsyncFail", (ctx, task) -> {
DeferredCompletion comp = ctx.deferCompletion();
traceDuringSyncProcessing.set(TestTracingProvider.getCurrentTraceId());
executor.execute(() -> {
traceDuringAsyncProcessing.set(TestTracingProvider.getCurrentTraceId());
safeAwait(latch);
comp.complete();
});
// exception outside async call
throw new RuntimeException("task failure");
}));

RuntimeException e = assertThrows(RuntimeException.class, () -> context.push(TASK));
assertEquals("task failure", e.getMessage());
assertNull(TestTracingProvider.getCurrentTraceId());

// Trace is still be "open"
assertThat(TestTracingProvider.getOpenTraces(), hasItem("testTrace-AsyncFail"));
assertEquals("testTrace-AsyncFail", traceDuringSyncProcessing.get());
assertNull(traceDuringAsyncProcessing.get());

latch.countDown();
terminateExecutor(executor);
// Trace is closed normally
assertThat(TestTracingProvider.getOpenTraces(), not(hasItem("testTrace-AsyncFail")));
// Trace ID is not propagated unless the implementation does so manually
assertNull(traceDuringAsyncProcessing.get());
} finally {
executor.shutdownNow();
handle.processingCompletion();
}
TestTracingProvider.assertAllTracesWereClosed();
}

@Test
@Timeout(5)
public void testTrace_AsyncThreadFailure() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ExecutorService executor = Executors.newSingleThreadExecutor();
RecordTraceHandle handle = new TestTraceHandle("testTrace_AsyncThreadFailure");
final AtomicReference<String> traceDuringSyncProcessing = new AtomicReference<>();
final AtomicReference<String> traceDuringAsyncProcessing = new AtomicReference<>();
final AtomicReference<CompletableFuture<Void>> asyncStageRef = new AtomicReference<>();
try {
ProcessingContextImpl<HelloTask> context =
context(handle, new NamedProcessor("AsyncThreadFail", (ctx, task) -> {
DeferredCompletion comp = ctx.deferCompletion();
traceDuringSyncProcessing.set(TestTracingProvider.getCurrentTraceId());

CompletableFuture<Void> asyncStage = CompletableFuture.runAsync(() -> {
traceDuringAsyncProcessing.set(TestTracingProvider.getCurrentTraceId());
safeAwait(latch);
// inside async call
throw new RuntimeException("async failure");
}, executor);
asyncStageRef.set(asyncStage);
comp.completeWith(asyncStage);
}));

Completion comp = context.push(TASK);
assertFalse(comp.isComplete());
assertNull(TestTracingProvider.getCurrentTraceId());

// Trace is still be "open"
assertThat(TestTracingProvider.getOpenTraces(),
hasItem("testTrace_AsyncThreadFailure-AsyncThreadFail"));
assertEquals("testTrace_AsyncThreadFailure-AsyncThreadFail", traceDuringSyncProcessing.get());
assertNull(traceDuringAsyncProcessing.get());

latch.countDown();
assertThrows(CompletionException.class, () -> asyncStageRef.get().join());
comp.asFuture().toCompletableFuture().join();
assertTrue(comp.isComplete());
// Trace is closed normally
assertThat(TestTracingProvider.getOpenTraces(),
not(hasItem("testTrace_AsyncThreadFailure-AsyncThreadFail")));
// Trace ID is not propagated unless the implementation does so manually
assertNull(traceDuringAsyncProcessing.get());
} finally {
executor.shutdownNow();
handle.processingCompletion();
}
TestTracingProvider.assertAllTracesWereClosed();
Expand Down
Loading