diff --git a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java index d0034fb..df506db 100644 --- a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java +++ b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java @@ -12,7 +12,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; -import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,6 +22,7 @@ import java.util.*; public class BackupSinkTask extends SinkTask { + private static final Long DEFAULT_OFFSET = Long.valueOf(-1L); private static final Logger log = LoggerFactory.getLogger(BackupSinkTask.class); private Path targetDir; private Map partitionWriters = new HashMap<>(); @@ -33,6 +33,7 @@ public class BackupSinkTask extends SinkTask { private Map currentOffsets = new HashMap<>(); private EndOffsetReader endOffsetReader; private java.util.function.Consumer exitFunction; + private Set topicPartitionsWithDefaultOffset = new HashSet(); @Override public String version() { @@ -90,9 +91,22 @@ private void terminateIfCompleted() { boolean terminate = true; for (Map.Entry partitionOffset : endOffsets.entrySet()) { Long endOffset = partitionOffset.getValue(); - Long currentOffset = currentOffsets.getOrDefault(partitionOffset.getKey(), -1L); + Long currentOffset = currentOffsets.getOrDefault(partitionOffset.getKey(), DEFAULT_OFFSET); - if (currentOffset < endOffset - 1) { + final TopicPartition topicPartition = partitionOffset.getKey(); + log.debug("check to terminate: partition {} current offset {} end offset {}", topicPartition, currentOffset, endOffset); + boolean topicPartitionIsEmpty = false; + + if (DEFAULT_OFFSET.equals(currentOffset)) { + if (topicPartitionsWithDefaultOffset.contains(topicPartition)) { + topicPartitionIsEmpty = true; + log.info("topic partition is empty {}", topicPartition); + } else { + topicPartitionsWithDefaultOffset.add(topicPartition); + } + } + + if (!topicPartitionIsEmpty && currentOffset < endOffset - 1) { return; } }