Skip to content

Commit 4197302

Browse files
authored
Add internal poll to consumer.position() (#2696)
1 parent 8d38aa7 commit 4197302

File tree

2 files changed

+27
-6
lines changed

2 files changed

+27
-6
lines changed

kafka/consumer/group.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -779,13 +779,12 @@ def position(self, partition, timeout_ms=None):
779779

780780
timer = Timer(timeout_ms)
781781
position = self._subscription.assignment[partition].position
782-
while position is None:
782+
while position is None and not timer.expired:
783783
# batch update fetch positions for any partitions without a valid position
784-
if self._update_fetch_positions(timeout_ms=timer.timeout_ms):
785-
position = self._subscription.assignment[partition].position
786-
if timer.expired:
787-
return None
788-
else:
784+
self._update_fetch_positions(timeout_ms=timer.timeout_ms)
785+
self._client.poll(timeout_ms=timer.timeout_ms)
786+
position = self._subscription.assignment[partition].position
787+
if position is not None:
789788
return position.offset
790789

791790
def highwater(self, partition):

test/integration/test_consumer_integration.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,3 +302,25 @@ def test_kafka_consumer_offsets_for_times_errors(kafka_consumer_factory, topic):
302302

303303
with pytest.raises(KafkaTimeoutError):
304304
consumer.offsets_for_times({bad_tp: 0})
305+
306+
307+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
308+
def test_kafka_consumer_position_after_seek_to_end(kafka_consumer_factory, topic, send_messages):
309+
send_messages(range(0, 10), partition=0)
310+
311+
# Start a consumer with manual partition assignment.
312+
consumer = kafka_consumer_factory(
313+
topics=(),
314+
group_id=None,
315+
enable_auto_commit=False,
316+
)
317+
tp = TopicPartition(topic, 0)
318+
consumer.assign([tp])
319+
320+
# Seek to the end of the partition, and call position() to synchronize the
321+
# partition's offset without calling poll().
322+
consumer.seek_to_end(tp)
323+
position = consumer.position(tp, timeout_ms=1000)
324+
325+
# Verify we got the expected position
326+
assert position == 10, f"Expected position 10, got {position}"

0 commit comments

Comments
 (0)