diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java index cc0e1d4e7d5..386efb708c6 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java @@ -172,10 +172,11 @@ public DataWriter load(final GenericRecord key) new CloseOnFlushWriterWrapper(new Supplier>() { @Override public DataWriter get() { + Future> future = null; try { log.info(String.format("Adding one more writer to loading cache of existing writer " + "with size = %d", partitionWriters.size())); - Future> future = createWriterPool.submit(() -> createPartitionWriter(key)); + future = createWriterPool.submit(() -> createPartitionWriter(key)); state.setProp(CURRENT_PARTITIONED_WRITERS_COUNTER, partitionWriters.size() + 1); return future.get(writeTimeoutInterval, TimeUnit.SECONDS); } catch (ExecutionException | InterruptedException e) { @@ -183,6 +184,11 @@ public DataWriter get() { } catch (TimeoutException e) { throw new RuntimeException(String.format("Failed to create writer due to timeout. The operation timed out after %s seconds.", writeTimeoutInterval), e); } + finally { + if (future != null && !future.isDone()) { + future.cancel(true); + } + } } }, state), state, key); }