From 1d162ca9a31098c3cb72cae830a0e9de48f979b9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 20 Nov 2025 11:57:13 -0800 Subject: [PATCH 1/2] add test --- test/integration/test_consumer_integration.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/integration/test_consumer_integration.py b/test/integration/test_consumer_integration.py index 71cf2642d..6060dc830 100644 --- a/test/integration/test_consumer_integration.py +++ b/test/integration/test_consumer_integration.py @@ -302,3 +302,25 @@ def test_kafka_consumer_offsets_for_times_errors(kafka_consumer_factory, topic): with pytest.raises(KafkaTimeoutError): consumer.offsets_for_times({bad_tp: 0}) + + +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +def test_kafka_consumer_position_after_seek_to_end(kafka_consumer_factory, topic, send_messages): + send_messages(range(0, 10), partition=0) + + # Start a consumer with manual partition assignment. + consumer = kafka_consumer_factory( + topics=(), + group_id=None, + enable_auto_commit=False, + ) + tp = TopicPartition(topic, 0) + consumer.assign([tp]) + + # Seek to the end of the partition, and call position() to synchronize the + # partition's offset without calling poll(). + consumer.seek_to_end(tp) + position = consumer.position(tp, timeout_ms=1000) + + # Verify we got the expected position + assert position == 10, f"Expected position 10, got {position}" From 8ab551dc7880e8706c9c411d4ab35fb872a69d14 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 20 Nov 2025 11:59:29 -0800 Subject: [PATCH 2/2] Add internal poll to consumer.position() --- kafka/consumer/group.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 36df78326..49fa3e261 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -779,13 +779,12 @@ def position(self, partition, timeout_ms=None): timer = Timer(timeout_ms) position = self._subscription.assignment[partition].position - while position is None: + while position is None and not timer.expired: # batch update fetch positions for any partitions without a valid position - if self._update_fetch_positions(timeout_ms=timer.timeout_ms): - position = self._subscription.assignment[partition].position - if timer.expired: - return None - else: + self._update_fetch_positions(timeout_ms=timer.timeout_ms) + self._client.poll(timeout_ms=timer.timeout_ms) + position = self._subscription.assignment[partition].position + if position is not None: return position.offset def highwater(self, partition):