From dee77e735540c4eaf80d7fdf0cd06d1baada8bb4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 24 Feb 2025 10:03:40 -0800 Subject: [PATCH 01/12] make servers/*/api_versions.txt and make servers/*/messages --- Makefile | 8 ++++++++ test/fixtures.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/Makefile b/Makefile index 0e5838735..b8fdbbfc5 100644 --- a/Makefile +++ b/Makefile @@ -93,6 +93,14 @@ servers/%/kafka-bin: servers/dist/$$(call kafka_artifact_name,$$*) | servers/dis tar xzvf $< -C $@ --strip-components 1 if [[ "$*" < "1" ]]; then make servers/patch-libs/$*; fi +servers/%/api_versions.txt: servers/$$*/kafka-bin + KAFKA_VERSION=$* python -m test.fixtures get_api_versions >servers/$*/api_versions.txt + +servers/%/messages: servers/$$*/kafka-bin + cd servers/$*/ && jar xvf kafka-bin/libs/kafka-clients-$*.jar common/message/ + mv servers/$*/common/message/ servers/$*/messages/ + rmdir servers/$*/common + servers/patch-libs/%: servers/dist/jakarta.xml.bind-api-2.3.3.jar | servers/$$*/kafka-bin cp $< servers/$*/kafka-bin/libs/ diff --git a/test/fixtures.py b/test/fixtures.py index 673c0ac66..f99caeba0 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -686,3 +686,37 @@ def get_producers(self, cnt, **params): params = self._enrich_client_params(params, client_id='producer') for client in self._create_many_clients(cnt, KafkaProducer, **params): yield client + + +def get_api_versions(): + import logging + logging.basicConfig(level=logging.ERROR) + + from test.fixtures import ZookeeperFixture, KafkaFixture + zk = ZookeeperFixture.instance() + k = KafkaFixture.instance(0, zk) + + from kafka import KafkaClient + client = KafkaClient(bootstrap_servers='localhost:{}'.format(k.port)) + client.check_version() + + from pprint import pprint + + pprint(client.get_api_versions()) + + client.close() + k.close() + zk.close() + + +if __name__ == '__main__': + import sys + if len(sys.argv) < 2: + print("Commands: get_api_versions") + exit(0) + cmd = sys.argv[1] + if cmd == 'get_api_versions': + get_api_versions() + else: + print("Unknown cmd: %s", cmd) + exit(1) From 289358537c45f903c9e0815696925de874d36818 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 23 Feb 2025 10:49:51 -0800 Subject: [PATCH 02/12] 200ms timeout for client.poll in ensure_active_group and admin client --- kafka/admin/client.py | 2 +- kafka/coordinator/base.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index db1d522a0..fa453e179 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -390,7 +390,7 @@ def _send_request_to_node(self, node_id, request, wakeup=True): while not self._client.ready(node_id): # poll until the connection to broker is ready, otherwise send() # will fail with NodeNotReadyError - self._client.poll() + self._client.poll(timeout_ms=200) return self._client.send(node_id, request, wakeup) def _send_request_to_controller(self, request): diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index f3832c531..77c35f154 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -371,7 +371,7 @@ def ensure_active_group(self): while not self.coordinator_unknown(): if not self._client.in_flight_request_count(self.coordinator_id): break - self._client.poll() + self._client.poll(timeout_ms=200) else: continue From afd15fa334dda82f3d23ec423660013e1c27a5ea Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 23 Feb 2025 16:47:08 -0800 Subject: [PATCH 03/12] only acquire coordinator lock in heartbeat thread close if not self thread --- kafka/coordinator/base.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 77c35f154..247d31b13 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -923,8 +923,6 @@ def close(self): if self.closed: return self.closed = True - with self.coordinator._lock: - self.coordinator._lock.notify() # Generally this should not happen - close() is triggered # by the coordinator. But in some cases GC may close the coordinator @@ -932,6 +930,9 @@ def close(self): if threading.current_thread() == self: return + with self.coordinator._lock: + self.coordinator._lock.notify() + if self.is_alive(): self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000) if self.is_alive(): From 399f5f08fc1fffcea389665a750791c8f91b86d5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 23 Feb 2025 16:47:36 -0800 Subject: [PATCH 04/12] Check for -1 controller_id in admin client --- kafka/admin/client.py | 42 ++++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index fa453e179..7f2aff6f3 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1,9 +1,10 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division from collections import defaultdict import copy import logging import socket +import time from . import ConfigResourceType from kafka.vendor import six @@ -273,24 +274,33 @@ def _validate_timeout(self, timeout_ms): """ return timeout_ms or self.config['request_timeout_ms'] - def _refresh_controller_id(self): + def _refresh_controller_id(self, timeout_ms=30000): """Determine the Kafka cluster controller.""" version = self._matching_api_version(MetadataRequest) if 1 <= version <= 6: - request = MetadataRequest[version]() - future = self._send_request_to_node(self._client.least_loaded_node(), request) - - self._wait_for_futures([future]) - - response = future.value - controller_id = response.controller_id - # verify the controller is new enough to support our requests - controller_version = self._client.check_version(controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) - if controller_version < (0, 10, 0): - raise IncompatibleBrokerVersion( - "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." - .format(controller_version)) - self._controller_id = controller_id + timeout_at = time.time() + timeout_ms / 1000 + while time.time() < timeout_at: + request = MetadataRequest[version]() + future = self._send_request_to_node(self._client.least_loaded_node(), request) + + self._wait_for_futures([future]) + + response = future.value + controller_id = response.controller_id + if controller_id == -1: + log.warning("Controller ID not available, got -1") + time.sleep(1) + continue + # verify the controller is new enough to support our requests + controller_version = self._client.check_version(controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) + if controller_version < (0, 10, 0): + raise IncompatibleBrokerVersion( + "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." + .format(controller_version)) + self._controller_id = controller_id + return + else: + raise Errors.NodeNotAvailableError('controller') else: raise UnrecognizedBrokerVersion( "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}." From b0216b70da6587ab8434807731abc79c9c9214bc Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 23 Feb 2025 16:48:51 -0800 Subject: [PATCH 05/12] Fixup variable interpolation in test fixture error --- test/fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures.py b/test/fixtures.py index f99caeba0..7c9e52130 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -546,7 +546,7 @@ def _failure(error): break self._client.poll(timeout_ms=100) else: - raise RuntimeError('Could not connect to broker with node id %d' % (node_id,)) + raise RuntimeError('Could not connect to broker with node id %s' % (node_id,)) try: future = self._client.send(node_id, request) From 03882812b937aa2ea6c8e0569a68d598309142b8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 23 Feb 2025 16:49:15 -0800 Subject: [PATCH 06/12] Retry on error in fixture _create_topic_via_metadata --- test/fixtures.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 7c9e52130..654f15533 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import atexit import logging @@ -583,7 +583,15 @@ def _create_topic(self, topic_name, num_partitions=None, replication_factor=None self._create_topic_via_cli(topic_name, num_partitions, replication_factor) def _create_topic_via_metadata(self, topic_name, timeout_ms=10000): - self._send_request(MetadataRequest[0]([topic_name]), timeout_ms) + timeout_at = time.time() + timeout_ms / 1000 + while time.time() < timeout_at: + response = self._send_request(MetadataRequest[0]([topic_name]), timeout_ms) + if response.topics[0][0] == 0: + return + log.warning("Unable to create topic via MetadataRequest: err %d", response.topics[0][0]) + time.sleep(1) + else: + raise RuntimeError('Unable to create topic via MetadataRequest') def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): request = CreateTopicsRequest[0]([(topic_name, num_partitions, From 24ecb730116f9325daf3ddd6f1c65eab3a9186d6 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 23 Feb 2025 16:49:43 -0800 Subject: [PATCH 07/12] Add error str to assert_message_count checks --- test/testutil.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/testutil.py b/test/testutil.py index ec4d70bf6..dd4e267a8 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -28,12 +28,12 @@ def env_kafka_version(): def assert_message_count(messages, num_messages): """Check that we received the expected number of messages with no duplicates.""" # Make sure we got them all - assert len(messages) == num_messages + assert len(messages) == num_messages, 'Expected %d messages, got %d' % (num_messages, len(messages)) # Make sure there are no duplicates # Note: Currently duplicates are identified only using key/value. Other attributes like topic, partition, headers, # timestamp, etc are ignored... this could be changed if necessary, but will be more tolerant of dupes. unique_messages = {(m.key, m.value) for m in messages} - assert len(unique_messages) == num_messages + assert len(unique_messages) == num_messages, 'Expected %d unique messages, got %d' % (num_messages, len(unique_messages)) class Timer(object): From 9d491016e50053fe2293b2eb7b6cea97d280eec6 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 23 Feb 2025 16:50:44 -0800 Subject: [PATCH 08/12] Add timeout to test_kafka_consumer --- test/test_consumer_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 10ea0495c..6789329b4 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -27,7 +27,7 @@ def test_kafka_version_infer(kafka_consumer_factory): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer(kafka_consumer_factory, send_messages): """Test KafkaConsumer""" - consumer = kafka_consumer_factory(auto_offset_reset='earliest') + consumer = kafka_consumer_factory(auto_offset_reset='earliest', consumer_timeout_ms=2000) send_messages(range(0, 100), partition=0) send_messages(range(0, 100), partition=1) cnt = 0 From ecbb2c7071c33d9c8a875f6595ae3587a0493b80 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 23 Feb 2025 16:51:31 -0800 Subject: [PATCH 09/12] Refactor sasl_integration test_client - wait for node ready; use send future --- test/test_sasl_integration.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py index e3a4813ae..0f404da20 100644 --- a/test/test_sasl_integration.py +++ b/test/test_sasl_integration.py @@ -1,5 +1,6 @@ import logging import uuid +import time import pytest @@ -69,12 +70,17 @@ def test_client(request, sasl_kafka): client, = sasl_kafka.get_clients(1) request = MetadataRequest_v1(None) - client.send(0, request) - for _ in range(10): - result = client.poll(timeout_ms=10000) - if len(result) > 0: - break - else: + timeout_at = time.time() + 1 + while not client.is_ready(0): + client.maybe_connect(0) + client.poll(timeout_ms=100) + if time.time() > timeout_at: + raise RuntimeError("Couldn't connect to node 0") + future = client.send(0, request) + client.poll(future=future, timeout_ms=10000) + if not future.is_done: raise RuntimeError("Couldn't fetch topic response from Broker.") - result = result[0] + elif future.failed(): + raise future.exception + result = future.value assert topic_name in [t[1] for t in result.topics] From b17f4f246a605b6862d50b4829a53e175a0be4cf Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 23 Feb 2025 16:52:36 -0800 Subject: [PATCH 10/12] Also sleep when waiting for consumers in test_describe_consumer_group_exists --- test/test_admin_integration.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 06c40a223..bd2fd216e 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -168,7 +168,7 @@ def consumer_thread(i, group_id): stop[i] = Event() consumers[i] = kafka_consumer_factory(group_id=group_id) while not stop[i].is_set(): - consumers[i].poll(20) + consumers[i].poll(timeout_ms=200) consumers[i].close() consumers[i] = None stop[i] = None @@ -183,6 +183,7 @@ def consumer_thread(i, group_id): try: timeout = time() + 35 while True: + info('Checking consumers...') for c in range(num_consumers): # Verify all consumers have been created @@ -212,9 +213,9 @@ def consumer_thread(i, group_id): if not rejoining and is_same_generation: break - else: - sleep(1) assert time() < timeout, "timeout waiting for assignments" + info('sleeping...') + sleep(1) info('Group stabilized; verifying assignment') output = kafka_admin_client.describe_consumer_groups(group_id_list) @@ -236,6 +237,8 @@ def consumer_thread(i, group_id): for c in range(num_consumers): info('Stopping consumer %s', c) stop[c].set() + for c in range(num_consumers): + info('Waiting for consumer thread %s', c) threads[c].join() threads[c] = None From effe90e3b55b44b175d4ef6da81b81ffbc23feaa Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 23 Feb 2025 17:44:47 -0800 Subject: [PATCH 11/12] include client_id in BrokerConnection __str__ output --- kafka/conn.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index c9ad9cc27..ab3fc6944 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1315,8 +1315,8 @@ def reset_override_configs(): return version def __str__(self): - return "" % ( - self.node_id, self.host, self.port, self.state, + return "" % ( + self.config['client_id'], self.node_id, self.host, self.port, self.state, AFI_NAMES[self._sock_afi], self._sock_addr) From d2335dc3e9bd8e16915bb381c4ee9a4ef77515c9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 24 Feb 2025 12:14:22 -0800 Subject: [PATCH 12/12] admin client: check_version only if needed, node_id kwarg for controller --- kafka/admin/client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 7f2aff6f3..c9e51e5c9 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -213,11 +213,13 @@ def __init__(self, **configs): metric_group_prefix='admin', **self.config ) - self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) # Get auto-discovered version from client if necessary if self.config['api_version'] is None: self.config['api_version'] = self._client.config['api_version'] + else: + # need to run check_version for get_api_versions() + self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) self._closed = False self._refresh_controller_id() @@ -292,7 +294,7 @@ def _refresh_controller_id(self, timeout_ms=30000): time.sleep(1) continue # verify the controller is new enough to support our requests - controller_version = self._client.check_version(controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) + controller_version = self._client.check_version(node_id=controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) if controller_version < (0, 10, 0): raise IncompatibleBrokerVersion( "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."