|
37 | 37 | import org.apache.kafka.common.config.ConfigResource;
|
38 | 38 |
|
39 | 39 | import java.util.Collections;
|
| 40 | +import java.util.HashMap; |
40 | 41 | import java.util.List;
|
41 | 42 | import java.util.Map;
|
42 | 43 | import java.util.Optional;
|
|
47 | 48 | import static com.google.common.collect.ImmutableList.toImmutableList;
|
48 | 49 | import static com.google.common.collect.ImmutableMap.toImmutableMap;
|
49 | 50 | import static com.google.common.collect.ImmutableSet.toImmutableSet;
|
50 |
| -import static com.google.common.collect.Iterables.getOnlyElement; |
51 | 51 | import static io.trino.plugin.kafka.KafkaErrorCode.KAFKA_SPLIT_ERROR;
|
52 | 52 | import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.OFFSET_TIMESTAMP_FIELD;
|
53 | 53 | import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.PARTITION_ID_FIELD;
|
@@ -123,13 +123,23 @@ public KafkaFilteringResult getKafkaFilterResult(
|
123 | 123 | try (KafkaConsumer<byte[], byte[]> kafkaConsumer = consumerFactory.create(session)) {
|
124 | 124 | // filter negative value to avoid java.lang.IllegalArgumentException when using KafkaConsumer offsetsForTimes
|
125 | 125 | if (offsetTimestampRanged.get().begin() > INVALID_KAFKA_RANGE_INDEX) {
|
| 126 | + Map<TopicPartition, Long> partitionBeginTimestamps = new HashMap<>(); |
| 127 | + partitionBeginOffsets.forEach((partition, partitionIndex) -> { |
| 128 | + partitionBeginTimestamps.put(partition, offsetTimestampRanged.get().begin()); |
| 129 | + }); |
| 130 | + Map<TopicPartition, Long> beginOffsets = findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partitionBeginTimestamps); |
126 | 131 | partitionBeginOffsets = overridePartitionBeginOffsets(partitionBeginOffsets,
|
127 |
| - partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().begin())); |
| 132 | + partition -> Optional.ofNullable(beginOffsets.get(partition))); |
128 | 133 | }
|
129 | 134 | if (isTimestampUpperBoundPushdownEnabled(session, kafkaTableHandle.topicName())) {
|
130 | 135 | if (offsetTimestampRanged.get().end() > INVALID_KAFKA_RANGE_INDEX) {
|
| 136 | + Map<TopicPartition, Long> partitionEndTimestamps = new HashMap<>(); |
| 137 | + partitionEndOffsets.forEach((partition, partitionIndex) -> { |
| 138 | + partitionEndTimestamps.put(partition, offsetTimestampRanged.get().end()); |
| 139 | + }); |
| 140 | + Map<TopicPartition, Long> endOffsets = findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partitionEndTimestamps); |
131 | 141 | partitionEndOffsets = overridePartitionEndOffsets(partitionEndOffsets,
|
132 |
| - partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().end())); |
| 142 | + partition -> Optional.ofNullable(endOffsets.get(partition))); |
133 | 143 | }
|
134 | 144 | }
|
135 | 145 | }
|
@@ -172,11 +182,17 @@ private boolean isTimestampUpperBoundPushdownEnabled(ConnectorSession session, S
|
172 | 182 | return KafkaSessionProperties.isTimestampUpperBoundPushdownEnabled(session);
|
173 | 183 | }
|
174 | 184 |
|
175 |
| - private static Optional<Long> findOffsetsForTimestampGreaterOrEqual(KafkaConsumer<byte[], byte[]> kafkaConsumer, TopicPartition topicPartition, long timestamp) |
| 185 | + private static Map<TopicPartition, Long> findOffsetsForTimestampGreaterOrEqual(KafkaConsumer<byte[], byte[]> kafkaConsumer, Map<TopicPartition, Long> timestamps) |
176 | 186 | {
|
177 |
| - final long transferTimestamp = floorDiv(timestamp, MICROSECONDS_PER_MILLISECOND); |
178 |
| - Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsets = kafkaConsumer.offsetsForTimes(ImmutableMap.of(topicPartition, transferTimestamp)); |
179 |
| - return Optional.ofNullable(getOnlyElement(topicPartitionOffsets.values(), null)).map(OffsetAndTimestamp::offset); |
| 187 | + timestamps.replaceAll((k, v) -> floorDiv(v, MICROSECONDS_PER_MILLISECOND)); |
| 188 | + Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestamps = kafkaConsumer.offsetsForTimes(timestamps); |
| 189 | + Map<TopicPartition, Long> topicPartitionOffsets = new HashMap<>(); |
| 190 | + topicPartitionOffsetAndTimestamps.forEach((topicPartition, offsetAndTimestamp) -> { |
| 191 | + if (offsetAndTimestamp != null) { |
| 192 | + topicPartitionOffsets.put(topicPartition, offsetAndTimestamp.offset()); |
| 193 | + } |
| 194 | + }); |
| 195 | + return topicPartitionOffsets; |
180 | 196 | }
|
181 | 197 |
|
182 | 198 | private static Map<TopicPartition, Long> overridePartitionBeginOffsets(Map<TopicPartition, Long> partitionBeginOffsets,
|
|
0 commit comments