Skip to content

Commit ddc7ac2

Browse files
committed
Fix NPE due to negative offset
1 parent b9c1cfe commit ddc7ac2

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

src/main/java/de/azapps/kafkabackup/common/offset/OffsetSink.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package de.azapps.kafkabackup.common.offset;
22

33
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import de.azapps.kafkabackup.sink.BackupSinkTask;
45
import org.apache.kafka.clients.admin.AdminClient;
56
import org.apache.kafka.clients.admin.ConsumerGroupListing;
67
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
78
import org.apache.kafka.common.TopicPartition;
89
import org.apache.kafka.connect.errors.RetriableException;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
912

1013
import java.io.IOException;
1114
import java.nio.file.Files;
@@ -19,6 +22,7 @@
1922
import java.util.stream.Collectors;
2023

2124
public class OffsetSink {
25+
private static final Logger log = LoggerFactory.getLogger(OffsetSink.class);
2226
private final Path targetDir;
2327
private final Map<TopicPartition, OffsetStoreFile> topicOffsets = new HashMap<>();
2428
private List<String> consumerGroups = new ArrayList<>();
@@ -62,6 +66,10 @@ private void syncOffsetsForGroup(String consumerGroup) throws IOException {
6266
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicOffsetsAndMetadata.entrySet()) {
6367
TopicPartition tp = entry.getKey();
6468
OffsetAndMetadata offsetAndMetadata = entry.getValue();
69+
if (offsetAndMetadata == null) {
70+
log.warn("OffsetAndMetadata not available, negative offset? for tp {}", tp);
71+
return;
72+
}
6573

6674
if (validTopic(tp.topic())) {
6775
if (!this.topicOffsets.containsKey(tp)) {

0 commit comments

Comments
 (0)