Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ servers/%/kafka-bin: servers/dist/$$(call kafka_artifact_name,$$*) | servers/dis
tar xzvf $< -C $@ --strip-components 1
if [[ "$*" < "1" ]]; then make servers/patch-libs/$*; fi

servers/%/api_versions.txt: servers/$$*/kafka-bin
KAFKA_VERSION=$* python -m test.fixtures get_api_versions >servers/$*/api_versions.txt

servers/%/messages: servers/$$*/kafka-bin
cd servers/$*/ && jar xvf kafka-bin/libs/kafka-clients-$*.jar common/message/
mv servers/$*/common/message/ servers/$*/messages/
rmdir servers/$*/common

servers/patch-libs/%: servers/dist/jakarta.xml.bind-api-2.3.3.jar | servers/$$*/kafka-bin
cp $< servers/$*/kafka-bin/libs/

Expand Down
48 changes: 30 additions & 18 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import absolute_import
from __future__ import absolute_import, division

from collections import defaultdict
import copy
import logging
import socket
import time

from . import ConfigResourceType
from kafka.vendor import six
Expand Down Expand Up @@ -212,11 +213,13 @@ def __init__(self, **configs):
metric_group_prefix='admin',
**self.config
)
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))

# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:
self.config['api_version'] = self._client.config['api_version']
else:
# need to run check_version for get_api_versions()
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))

self._closed = False
self._refresh_controller_id()
Expand Down Expand Up @@ -273,24 +276,33 @@ def _validate_timeout(self, timeout_ms):
"""
return timeout_ms or self.config['request_timeout_ms']

def _refresh_controller_id(self):
def _refresh_controller_id(self, timeout_ms=30000):
"""Determine the Kafka cluster controller."""
version = self._matching_api_version(MetadataRequest)
if 1 <= version <= 6:
request = MetadataRequest[version]()
future = self._send_request_to_node(self._client.least_loaded_node(), request)

self._wait_for_futures([future])

response = future.value
controller_id = response.controller_id
# verify the controller is new enough to support our requests
controller_version = self._client.check_version(controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
if controller_version < (0, 10, 0):
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
.format(controller_version))
self._controller_id = controller_id
timeout_at = time.time() + timeout_ms / 1000
while time.time() < timeout_at:
request = MetadataRequest[version]()
future = self._send_request_to_node(self._client.least_loaded_node(), request)

self._wait_for_futures([future])

response = future.value
controller_id = response.controller_id
if controller_id == -1:
log.warning("Controller ID not available, got -1")
time.sleep(1)
continue
# verify the controller is new enough to support our requests
controller_version = self._client.check_version(node_id=controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
if controller_version < (0, 10, 0):
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
.format(controller_version))
self._controller_id = controller_id
return
else:
raise Errors.NodeNotAvailableError('controller')
else:
raise UnrecognizedBrokerVersion(
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
Expand Down Expand Up @@ -390,7 +402,7 @@ def _send_request_to_node(self, node_id, request, wakeup=True):
while not self._client.ready(node_id):
# poll until the connection to broker is ready, otherwise send()
# will fail with NodeNotReadyError
self._client.poll()
self._client.poll(timeout_ms=200)
return self._client.send(node_id, request, wakeup)

def _send_request_to_controller(self, request):
Expand Down
4 changes: 2 additions & 2 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -1315,8 +1315,8 @@ def reset_override_configs():
return version

def __str__(self):
return "<BrokerConnection node_id=%s host=%s:%d %s [%s %s]>" % (
self.node_id, self.host, self.port, self.state,
return "<BrokerConnection client_id=%s, node_id=%s host=%s:%d %s [%s %s]>" % (
self.config['client_id'], self.node_id, self.host, self.port, self.state,
AFI_NAMES[self._sock_afi], self._sock_addr)


Expand Down
7 changes: 4 additions & 3 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def ensure_active_group(self):
while not self.coordinator_unknown():
if not self._client.in_flight_request_count(self.coordinator_id):
break
self._client.poll()
self._client.poll(timeout_ms=200)
else:
continue

Expand Down Expand Up @@ -923,15 +923,16 @@ def close(self):
if self.closed:
return
self.closed = True
with self.coordinator._lock:
self.coordinator._lock.notify()

# Generally this should not happen - close() is triggered
# by the coordinator. But in some cases GC may close the coordinator
# from within the heartbeat thread.
if threading.current_thread() == self:
return

with self.coordinator._lock:
self.coordinator._lock.notify()

if self.is_alive():
self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000)
if self.is_alive():
Expand Down
48 changes: 45 additions & 3 deletions test/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import absolute_import
from __future__ import absolute_import, division

import atexit
import logging
Expand Down Expand Up @@ -546,7 +546,7 @@ def _failure(error):
break
self._client.poll(timeout_ms=100)
else:
raise RuntimeError('Could not connect to broker with node id %d' % (node_id,))
raise RuntimeError('Could not connect to broker with node id %s' % (node_id,))

try:
future = self._client.send(node_id, request)
Expand Down Expand Up @@ -583,7 +583,15 @@ def _create_topic(self, topic_name, num_partitions=None, replication_factor=None
self._create_topic_via_cli(topic_name, num_partitions, replication_factor)

def _create_topic_via_metadata(self, topic_name, timeout_ms=10000):
self._send_request(MetadataRequest[0]([topic_name]), timeout_ms)
timeout_at = time.time() + timeout_ms / 1000
while time.time() < timeout_at:
response = self._send_request(MetadataRequest[0]([topic_name]), timeout_ms)
if response.topics[0][0] == 0:
return
log.warning("Unable to create topic via MetadataRequest: err %d", response.topics[0][0])
time.sleep(1)
else:
raise RuntimeError('Unable to create topic via MetadataRequest')

def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000):
request = CreateTopicsRequest[0]([(topic_name, num_partitions,
Expand Down Expand Up @@ -686,3 +694,37 @@ def get_producers(self, cnt, **params):
params = self._enrich_client_params(params, client_id='producer')
for client in self._create_many_clients(cnt, KafkaProducer, **params):
yield client


def get_api_versions():
import logging
logging.basicConfig(level=logging.ERROR)

from test.fixtures import ZookeeperFixture, KafkaFixture
zk = ZookeeperFixture.instance()
k = KafkaFixture.instance(0, zk)

from kafka import KafkaClient
client = KafkaClient(bootstrap_servers='localhost:{}'.format(k.port))
client.check_version()

from pprint import pprint

pprint(client.get_api_versions())

client.close()
k.close()
zk.close()


if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("Commands: get_api_versions")
exit(0)
cmd = sys.argv[1]
if cmd == 'get_api_versions':
get_api_versions()
else:
print("Unknown cmd: %s", cmd)
exit(1)
9 changes: 6 additions & 3 deletions test/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def consumer_thread(i, group_id):
stop[i] = Event()
consumers[i] = kafka_consumer_factory(group_id=group_id)
while not stop[i].is_set():
consumers[i].poll(20)
consumers[i].poll(timeout_ms=200)
consumers[i].close()
consumers[i] = None
stop[i] = None
Expand All @@ -183,6 +183,7 @@ def consumer_thread(i, group_id):
try:
timeout = time() + 35
while True:
info('Checking consumers...')
for c in range(num_consumers):

# Verify all consumers have been created
Expand Down Expand Up @@ -212,9 +213,9 @@ def consumer_thread(i, group_id):

if not rejoining and is_same_generation:
break
else:
sleep(1)
assert time() < timeout, "timeout waiting for assignments"
info('sleeping...')
sleep(1)

info('Group stabilized; verifying assignment')
output = kafka_admin_client.describe_consumer_groups(group_id_list)
Expand All @@ -236,6 +237,8 @@ def consumer_thread(i, group_id):
for c in range(num_consumers):
info('Stopping consumer %s', c)
stop[c].set()
for c in range(num_consumers):
info('Waiting for consumer thread %s', c)
threads[c].join()
threads[c] = None

Expand Down
2 changes: 1 addition & 1 deletion test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_kafka_version_infer(kafka_consumer_factory):
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer(kafka_consumer_factory, send_messages):
"""Test KafkaConsumer"""
consumer = kafka_consumer_factory(auto_offset_reset='earliest')
consumer = kafka_consumer_factory(auto_offset_reset='earliest', consumer_timeout_ms=2000)
send_messages(range(0, 100), partition=0)
send_messages(range(0, 100), partition=1)
cnt = 0
Expand Down
20 changes: 13 additions & 7 deletions test/test_sasl_integration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import uuid
import time

import pytest

Expand Down Expand Up @@ -69,12 +70,17 @@ def test_client(request, sasl_kafka):

client, = sasl_kafka.get_clients(1)
request = MetadataRequest_v1(None)
client.send(0, request)
for _ in range(10):
result = client.poll(timeout_ms=10000)
if len(result) > 0:
break
else:
timeout_at = time.time() + 1
while not client.is_ready(0):
client.maybe_connect(0)
client.poll(timeout_ms=100)
if time.time() > timeout_at:
raise RuntimeError("Couldn't connect to node 0")
future = client.send(0, request)
client.poll(future=future, timeout_ms=10000)
if not future.is_done:
raise RuntimeError("Couldn't fetch topic response from Broker.")
result = result[0]
elif future.failed():
raise future.exception
result = future.value
assert topic_name in [t[1] for t in result.topics]
4 changes: 2 additions & 2 deletions test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ def env_kafka_version():
def assert_message_count(messages, num_messages):
"""Check that we received the expected number of messages with no duplicates."""
# Make sure we got them all
assert len(messages) == num_messages
assert len(messages) == num_messages, 'Expected %d messages, got %d' % (num_messages, len(messages))
# Make sure there are no duplicates
# Note: Currently duplicates are identified only using key/value. Other attributes like topic, partition, headers,
# timestamp, etc are ignored... this could be changed if necessary, but will be more tolerant of dupes.
unique_messages = {(m.key, m.value) for m in messages}
assert len(unique_messages) == num_messages
assert len(unique_messages) == num_messages, 'Expected %d unique messages, got %d' % (num_messages, len(unique_messages))


class Timer(object):
Expand Down