From cb5ed1d7968c629c15af7428f43a5dede50a9cbe Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 20 Nov 2025 10:32:04 -0800 Subject: [PATCH 1/2] Only use coordinator time_to_next_poll if group_id is set --- kafka/consumer/group.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index c7ad0f59b..36df78326 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -747,7 +747,9 @@ def _poll_once(self, timer, max_records, update_offsets=True): # We do not want to be stuck blocking in poll if we are missing some positions # since the offset lookup may be backing off after a failure - poll_timeout_ms = min(timer.timeout_ms, self._coordinator.time_to_next_poll() * 1000) + poll_timeout_ms = timer.timeout_ms + if self.config['group_id'] is not None: + poll_timeout_ms = min(poll_timeout_ms, self._coordinator.time_to_next_poll() * 1000) if not has_all_fetch_positions: log.debug('poll: do not have all fetch positions...') poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms']) From 061659b01121ad61ed4b134411ad01f2a808de0c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 20 Nov 2025 11:29:57 -0800 Subject: [PATCH 2/2] Connect coordinator from heartbeat thread if needed --- kafka/coordinator/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index d13ce4abb..e1d8d8336 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -1198,6 +1198,7 @@ def _run_once(self): self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) elif not self.coordinator.connected(): + self.coordinator._client.maybe_connect(self.coordinator.coordinator_id) self.coordinator._client._lock.release() self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)