diff --git a/kafka/client_async.py b/kafka/client_async.py index 67014488f..be19cf80b 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -25,6 +25,7 @@ from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Rate from kafka.metrics.stats.rate import TimeUnit +from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.metadata import MetadataRequest from kafka.util import Dict, WeakMethod # Although this looks unused, it actually monkey-patches socket.socketpair() @@ -239,6 +240,25 @@ def __init__(self, **configs): if self.config['api_version'] is None: check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 self.config['api_version'] = self.check_version(timeout=check_timeout) + elif self.config['api_version'] in BROKER_API_VERSIONS: + self._api_versions = BROKER_API_VERSIONS[self.config['api_version']] + elif (self.config['api_version'] + (0,)) in BROKER_API_VERSIONS: + log.warning('Configured api_version %s is ambiguous; using %s', + self.config['api_version'], self.config['api_version'] + (0,)) + self.config['api_version'] = self.config['api_version'] + (0,) + self._api_versions = BROKER_API_VERSIONS[self.config['api_version']] + else: + compatible_version = None + for v in sorted(BROKER_API_VERSIONS.keys(), reverse=True): + if v <= self.config['api_version']: + compatible_version = v + break + if compatible_version: + log.warning('Configured api_version %s not supported; using %s', + self.config['api_version'], compatible_version) + self._api_versions = BROKER_API_VERSIONS[compatible_version] + else: + raise Errors.UnrecognizedBrokerVersion(self.config['api_version']) def _init_wakeup_socketpair(self): self._wake_r, self._wake_w = socket.socketpair() @@ -849,8 +869,8 @@ def _maybe_refresh_metadata(self, wakeup=False): topics = list(self.config['bootstrap_topics_filter']) if self.cluster.need_all_topic_metadata or not topics: - topics = [] if self.config['api_version'] < (0, 10) else None - api_version = 0 if self.config['api_version'] < (0, 10) else 1 + topics = [] if self.config['api_version'] < (0, 10, 0) else None + api_version = 0 if self.config['api_version'] < (0, 10, 0) else 1 request = MetadataRequest[api_version](topics) log.debug("Sending metadata request %s to node %s", request, node_id) future = self.send(node_id, request, wakeup=wakeup) @@ -898,7 +918,7 @@ def check_version(self, node_id=None, timeout=2, strict=False): is down and the client enters a bootstrap backoff sleep. This is only possible if node_id is None. - Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ... + Returns: version tuple, i.e. (3, 9), (2, 0), (0, 10, 2) etc Raises: NodeNotReadyError (if node_id is provided) @@ -925,9 +945,7 @@ def check_version(self, node_id=None, timeout=2, strict=False): try: remaining = end - time.time() version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter'])) - if version >= (0, 10, 0): - # cache the api versions map if it's available (starting - # in 0.10 cluster version) + if not self._api_versions: self._api_versions = conn.get_api_versions() self._lock.release() return version diff --git a/kafka/conn.py b/kafka/conn.py index ab3fc6944..4d1c36b95 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -165,8 +165,8 @@ class BrokerConnection(object): or other configuration forbids use of all the specified ciphers), an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers api_version (tuple): Specify which Kafka API version to use. - Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), - (0, 10). Default: (0, 8, 2) + Must be None or >= (0, 10, 0) to enable SASL authentication. + Default: None api_version_auto_timeout_ms (int): number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version is None @@ -214,7 +214,7 @@ class BrokerConnection(object): 'ssl_crlfile': None, 'ssl_password': None, 'ssl_ciphers': None, - 'api_version': (0, 8, 2), # default to most restrictive + 'api_version': None, 'selector': selectors.DefaultSelector, 'state_change_callback': lambda node_id, sock, conn: True, 'metrics': None, @@ -522,7 +522,7 @@ def _try_handshake(self): return False def _try_authenticate(self): - assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10) + assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10, 0) if self._sasl_auth_future is None: # Build a SaslHandShakeRequest message @@ -1154,9 +1154,10 @@ def next_ifr_request_timeout_ms(self): else: return float('inf') - def _handle_api_version_response(self, response): + def _handle_api_versions_response(self, response): error_type = Errors.for_code(response.error_code) - assert error_type is Errors.NoError, "API version check failed" + if error_type is not Errors.NoError: + return False self._api_versions = dict([ (api_key, (min_version, max_version)) for api_key, min_version, max_version in response.api_versions @@ -1168,12 +1169,7 @@ def get_api_versions(self): return self._api_versions version = self.check_version() - if version < (0, 10, 0): - raise Errors.UnsupportedVersionError( - "ApiVersion not supported by cluster version {} < 0.10.0" - .format(version)) - # _api_versions is set as a side effect of check_versions() on a cluster - # that supports 0.10.0 or later + # _api_versions is set as a side effect of check_versions() return self._api_versions def _infer_broker_version_from_api_versions(self, api_versions): @@ -1182,16 +1178,16 @@ def _infer_broker_version_from_api_versions(self, api_versions): test_cases = [ # format (, ) # Make sure to update consumer_integration test check when adding newer versions. - ((2, 6, 0), DescribeClientQuotasRequest[0]), - ((2, 5, 0), DescribeAclsRequest_v2), - ((2, 4, 0), ProduceRequest[8]), - ((2, 3, 0), FetchRequest[11]), - ((2, 2, 0), OffsetRequest[5]), - ((2, 1, 0), FetchRequest[10]), - ((2, 0, 0), FetchRequest[8]), - ((1, 1, 0), FetchRequest[7]), - ((1, 0, 0), MetadataRequest[5]), - ((0, 11, 0), MetadataRequest[4]), + ((2, 6), DescribeClientQuotasRequest[0]), + ((2, 5), DescribeAclsRequest_v2), + ((2, 4), ProduceRequest[8]), + ((2, 3), FetchRequest[11]), + ((2, 2), OffsetRequest[5]), + ((2, 1), FetchRequest[10]), + ((2, 0), FetchRequest[8]), + ((1, 1), FetchRequest[7]), + ((1, 0), MetadataRequest[5]), + ((0, 11), MetadataRequest[4]), ((0, 10, 2), OffsetFetchRequest[2]), ((0, 10, 1), MetadataRequest[2]), ] @@ -1204,7 +1200,7 @@ def _infer_broker_version_from_api_versions(self, api_versions): if min_version <= struct.API_VERSION <= max_version: return broker_version - # We know that ApiVersionResponse is only supported in 0.10+ + # We know that ApiVersionsResponse is only supported in 0.10+ # so if all else fails, choose that return (0, 10, 0) @@ -1213,7 +1209,7 @@ def check_version(self, timeout=2, strict=False, topics=[]): Note: This is a blocking call. - Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ... + Returns: version tuple, i.e. (3, 9), (2, 4), etc ... """ timeout_at = time.time() + timeout log.info('Probing node %s broker version', self.node_id) @@ -1236,12 +1232,15 @@ def reset_override_configs(): # vanilla MetadataRequest. If the server did not recognize the first # request, both will be failed with a ConnectionError that wraps # socket.error (32, 54, or 104) - from kafka.protocol.admin import ApiVersionRequest, ListGroupsRequest + from kafka.protocol.admin import ListGroupsRequest + from kafka.protocol.api_versions import ApiVersionsRequest + from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest test_cases = [ - # All cases starting from 0.10 will be based on ApiVersionResponse - ((0, 10), ApiVersionRequest[0]()), + # All cases starting from 0.10 will be based on ApiVersionsResponse + ((0, 11), ApiVersionsRequest[1]()), + ((0, 10, 0), ApiVersionsRequest[0]()), ((0, 9), ListGroupsRequest[0]()), ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')), ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])), @@ -1274,11 +1273,17 @@ def reset_override_configs(): selector.close() if f.succeeded(): - if isinstance(request, ApiVersionRequest[0]): + if version >= (0, 10, 0): # Starting from 0.10 kafka broker we determine version - # by looking at ApiVersionResponse - api_versions = self._handle_api_version_response(f.value) + # by looking at ApiVersionsResponse + api_versions = self._handle_api_versions_response(f.value) + if not api_versions: + continue version = self._infer_broker_version_from_api_versions(api_versions) + else: + if version not in BROKER_API_VERSIONS: + raise Errors.UnrecognizedBrokerVersion(version) + self._api_versions = BROKER_API_VERSIONS[version] log.info('Broker version identified as %s', '.'.join(map(str, version))) log.info('Set configuration api_version=%s to skip auto' ' check_version requests on startup', version) @@ -1298,7 +1303,7 @@ def reset_override_configs(): # requests (bug...). In this case we expect to see a correlation # id mismatch elif (isinstance(f.exception, Errors.CorrelationIdError) and - version == (0, 10)): + version > (0, 9)): pass elif six.PY2: assert isinstance(f.exception.args[0], socket.error) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 0b5df4e9a..333c97758 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -702,11 +702,11 @@ def _create_fetch_requests(self): log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s", partition, node_id) - if self.config['api_version'] >= (0, 11, 0): + if self.config['api_version'] >= (0, 11): version = 4 elif self.config['api_version'] >= (0, 10, 1): version = 3 - elif self.config['api_version'] >= (0, 10): + elif self.config['api_version'] >= (0, 10, 0): version = 2 elif self.config['api_version'] == (0, 9): version = 1 diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 247d31b13..75d9c903d 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -461,7 +461,7 @@ def _send_join_group_request(self): self._generation.member_id, self.protocol_type(), member_metadata) - elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0): + elif (0, 10, 1) <= self.config['api_version'] < (0, 11): request = JoinGroupRequest[1]( self.group_id, self.config['session_timeout_ms'], @@ -562,7 +562,7 @@ def _handle_join_group_response(self, future, send_time, response): def _on_join_follower(self): # send follower's sync group with an empty assignment - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + version = 0 if self.config['api_version'] < (0, 11) else 1 request = SyncGroupRequest[version]( self.group_id, self._generation.generation_id, @@ -590,7 +590,7 @@ def _on_join_leader(self, response): except Exception as e: return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + version = 0 if self.config['api_version'] < (0, 11) else 1 request = SyncGroupRequest[version]( self.group_id, self._generation.generation_id, @@ -771,7 +771,7 @@ def maybe_leave_group(self): # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. log.info('Leaving consumer group (%s).', self.group_id) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + version = 0 if self.config['api_version'] < (0, 11) else 1 request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) @@ -799,7 +799,7 @@ def _send_heartbeat_request(self): e = Errors.NodeNotReadyError(self.coordinator_id) return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + version = 0 if self.config['api_version'] < (0, 11) else 1 request = HeartbeatRequest[version](self.group_id, self._generation.generation_id, self._generation.member_id) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 155e9eee3..5c44a8a81 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -393,7 +393,7 @@ def __init__(self, **configs): assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' if self.config['compression_type'] == 'zstd': - assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers' + assert self.config['api_version'] >= (2, 1), 'Zstd Requires >= Kafka 2.1 Brokers' # Check compression_type for library support ct = self.config['compression_type'] @@ -524,7 +524,7 @@ def partitions_for(self, topic): def _max_usable_produce_magic(self): if self.config['api_version'] >= (0, 11): return 2 - elif self.config['api_version'] >= (0, 10): + elif self.config['api_version'] >= (0, 10, 0): return 1 else: return 0 diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index c6cd76c69..ac9c5a96f 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -313,7 +313,7 @@ def _produce_request(self, node_id, acks, timeout, batches): elif self.config['api_version'] >= (0, 11): version = 3 kwargs = dict(transactional_id=None) - elif self.config['api_version'] >= (0, 10): + elif self.config['api_version'] >= (0, 10, 0): version = 2 elif self.config['api_version'] == (0, 9): version = 1 diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 87768f839..3da5c5419 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -4,66 +4,6 @@ from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields -class ApiVersionResponse_v0(Response): - API_KEY = 18 - API_VERSION = 0 - SCHEMA = Schema( - ('error_code', Int16), - ('api_versions', Array( - ('api_key', Int16), - ('min_version', Int16), - ('max_version', Int16))) - ) - - -class ApiVersionResponse_v1(Response): - API_KEY = 18 - API_VERSION = 1 - SCHEMA = Schema( - ('error_code', Int16), - ('api_versions', Array( - ('api_key', Int16), - ('min_version', Int16), - ('max_version', Int16))), - ('throttle_time_ms', Int32) - ) - - -class ApiVersionResponse_v2(Response): - API_KEY = 18 - API_VERSION = 2 - SCHEMA = ApiVersionResponse_v1.SCHEMA - - -class ApiVersionRequest_v0(Request): - API_KEY = 18 - API_VERSION = 0 - RESPONSE_TYPE = ApiVersionResponse_v0 - SCHEMA = Schema() - - -class ApiVersionRequest_v1(Request): - API_KEY = 18 - API_VERSION = 1 - RESPONSE_TYPE = ApiVersionResponse_v1 - SCHEMA = ApiVersionRequest_v0.SCHEMA - - -class ApiVersionRequest_v2(Request): - API_KEY = 18 - API_VERSION = 2 - RESPONSE_TYPE = ApiVersionResponse_v1 - SCHEMA = ApiVersionRequest_v0.SCHEMA - - -ApiVersionRequest = [ - ApiVersionRequest_v0, ApiVersionRequest_v1, ApiVersionRequest_v2, -] -ApiVersionResponse = [ - ApiVersionResponse_v0, ApiVersionResponse_v1, ApiVersionResponse_v2, -] - - class CreateTopicsResponse_v0(Response): API_KEY = 19 API_VERSION = 0 diff --git a/kafka/protocol/api_versions.py b/kafka/protocol/api_versions.py new file mode 100644 index 000000000..9a782928b --- /dev/null +++ b/kafka/protocol/api_versions.py @@ -0,0 +1,88 @@ +from __future__ import absolute_import + +from kafka.protocol.api import Request, Response +from kafka.protocol.types import Array, Int16, Int32, Schema + + +class BaseApiVersionsResponse(Response): + API_KEY = 18 + API_VERSION = 0 + SCHEMA = Schema( + ('error_code', Int16), + ('api_versions', Array( + ('api_key', Int16), + ('min_version', Int16), + ('max_version', Int16))) + ) + + @classmethod + def decode(cls, data): + if isinstance(data, bytes): + data = BytesIO(data) + # Check error_code, decode as v0 if any error + curr = data.tell() + err = Int16.decode(data) + data.seek(curr) + if err != 0: + return ApiVersionsResponse_v0.decode(data) + return super(BaseApiVersionsResponse, cls).decode(data) + + +class ApiVersionsResponse_v0(Response): + API_KEY = 18 + API_VERSION = 0 + SCHEMA = Schema( + ('error_code', Int16), + ('api_versions', Array( + ('api_key', Int16), + ('min_version', Int16), + ('max_version', Int16))) + ) + + +class ApiVersionsResponse_v1(BaseApiVersionsResponse): + API_KEY = 18 + API_VERSION = 1 + SCHEMA = Schema( + ('error_code', Int16), + ('api_versions', Array( + ('api_key', Int16), + ('min_version', Int16), + ('max_version', Int16))), + ('throttle_time_ms', Int32) + ) + + +class ApiVersionsResponse_v2(BaseApiVersionsResponse): + API_KEY = 18 + API_VERSION = 2 + SCHEMA = ApiVersionsResponse_v1.SCHEMA + + +class ApiVersionsRequest_v0(Request): + API_KEY = 18 + API_VERSION = 0 + RESPONSE_TYPE = ApiVersionsResponse_v0 + SCHEMA = Schema() + + +class ApiVersionsRequest_v1(Request): + API_KEY = 18 + API_VERSION = 1 + RESPONSE_TYPE = ApiVersionsResponse_v1 + SCHEMA = ApiVersionsRequest_v0.SCHEMA + + +class ApiVersionsRequest_v2(Request): + API_KEY = 18 + API_VERSION = 2 + RESPONSE_TYPE = ApiVersionsResponse_v1 + SCHEMA = ApiVersionsRequest_v0.SCHEMA + + +ApiVersionsRequest = [ + ApiVersionsRequest_v0, ApiVersionsRequest_v1, ApiVersionsRequest_v2, +] +ApiVersionsResponse = [ + ApiVersionsResponse_v0, ApiVersionsResponse_v1, ApiVersionsResponse_v2, +] diff --git a/kafka/protocol/broker_api_versions.py b/kafka/protocol/broker_api_versions.py new file mode 100644 index 000000000..db7567180 --- /dev/null +++ b/kafka/protocol/broker_api_versions.py @@ -0,0 +1,64 @@ +BROKER_API_VERSIONS = { + # api_versions responses prior to (0, 10) are synthesized for compatibility + (0, 8, 0): {0: (0, 0), 1: (0, 0), 2: (0, 0), 3: (0, 0)}, + # adds offset commit + fetch + (0, 8, 1): {0: (0, 0), 1: (0, 0), 2: (0, 0), 3: (0, 0), 8: (0, 0), 9: (0, 0)}, + # adds find coordinator + (0, 8, 2): {0: (0, 0), 1: (0, 0), 2: (0, 0), 3: (0, 0), 8: (0, 1), 9: (0, 1), 10: (0, 0)}, + # adds group management (join/sync/leave/heartbeat) + (0, 9): {0: (0, 1), 1: (0, 1), 2: (0, 0), 3: (0, 0), 8: (0, 2), 9: (0, 1), 10: (0, 0), 11: (0, 0), 12: (0, 0), 13: (0, 0), 14: (0, 0), 15: (0, 0), 16: (0, 0)}, + # adds message format v1, sasl, and api versions api + (0, 10, 0): {0: (0, 2), 1: (0, 2), 2: (0, 0), 3: (0, 1), 4: (0, 0), 5: (0, 0), 6: (0, 2), 7: (1, 1), 8: (0, 2), 9: (0, 1), 10: (0, 0), 11: (0, 0), 12: (0, 0), 13: (0, 0), 14: (0, 0), 15: (0, 0), 16: (0, 0), 17: (0, 0), 18: (0, 0)}, + + # All data below is copied from brokers via api_versions_response (see make servers/*/api_versions) + # adds admin apis create/delete topics, and bumps fetch/listoffsets/metadata/joingroup + (0, 10, 1): {0: (0, 2), 1: (0, 3), 2: (0, 1), 3: (0, 2), 4: (0, 0), 5: (0, 0), 6: (0, 2), 7: (1, 1), 8: (0, 2), 9: (0, 1), 10: (0, 0), 11: (0, 1), 12: (0, 0), 13: (0, 0), 14: (0, 0), 15: (0, 0), 16: (0, 0), 17: (0, 0), 18: (0, 0), 19: (0, 0), 20: (0, 0)}, + + # bumps offsetfetch/create-topics + (0, 10, 2): {0: (0, 2), 1: (0, 3), 2: (0, 1), 3: (0, 2), 4: (0, 0), 5: (0, 0), 6: (0, 3), 7: (1, 1), 8: (0, 2), 9: (0, 2), 10: (0, 0), 11: (0, 1), 12: (0, 0), 13: (0, 0), 14: (0, 0), 15: (0, 0), 16: (0, 0), 17: (0, 0), 18: (0, 0), 19: (0, 1), 20: (0, 0)}, + + # Adds message format v2, and more admin apis (describe/create/delete acls, describe/alter configs, etc) + (0, 11): {0: (0, 3), 1: (0, 5), 2: (0, 2), 3: (0, 4), 4: (0, 0), 5: (0, 0), 6: (0, 3), 7: (1, 1), 8: (0, 3), 9: (0, 3), 10: (0, 1), 11: (0, 2), 12: (0, 1), 13: (0, 1), 14: (0, 1), 15: (0, 1), 16: (0, 1), 17: (0, 0), 18: (0, 1), 19: (0, 2), 20: (0, 1), 21: (0, 0), 22: (0, 0), 23: (0, 0), 24: (0, 0), 25: (0, 0), 26: (0, 0), 27: (0, 0), 28: (0, 0), 29: (0, 0), 30: (0, 0), 31: (0, 0), 32: (0, 0), 33: (0, 0)}, + + # Adds Sasl Authenticate, and additional admin apis (describe/alter log dirs, etc) + (1, 0): {0: (0, 5), 1: (0, 6), 2: (0, 2), 3: (0, 5), 4: (0, 1), 5: (0, 0), 6: (0, 4), 7: (0, 1), 8: (0, 3), 9: (0, 3), 10: (0, 1), 11: (0, 2), 12: (0, 1), 13: (0, 1), 14: (0, 1), 15: (0, 1), 16: (0, 1), 17: (0, 1), 18: (0, 1), 19: (0, 2), 20: (0, 1), 21: (0, 0), 22: (0, 0), 23: (0, 0), 24: (0, 0), 25: (0, 0), 26: (0, 0), 27: (0, 0), 28: (0, 0), 29: (0, 0), 30: (0, 0), 31: (0, 0), 32: (0, 0), 33: (0, 0), 34: (0, 0), 35: (0, 0), 36: (0, 0), 37: (0, 0)}, + + (2, 0): {0: (0, 6), 1: (0, 8), 2: (0, 3), 3: (0, 6), 4: (0, 1), 5: (0, 0), 6: (0, 4), 7: (0, 1), 8: (0, 4), 9: (0, 4), 10: (0, 2), 11: (0, 3), 12: (0, 2), 13: (0, 2), 14: (0, 2), 15: (0, 2), 16: (0, 2), 17: (0, 1), 18: (0, 2), 19: (0, 3), 20: (0, 2), 21: (0, 1), 22: (0, 1), 23: (0, 1), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 1), 29: (0, 1), 30: (0, 1), 31: (0, 1), 32: (0, 2), 33: (0, 1), 34: (0, 1), 35: (0, 1), 36: (0, 0), 37: (0, 1), 38: (0, 1), 39: (0, 1), 40: (0, 1), 41: (0, 1), 42: (0, 1)}, + + (2, 1): {0: (0, 7), 1: (0, 10), 2: (0, 4), 3: (0, 7), 4: (0, 1), 5: (0, 0), 6: (0, 4), 7: (0, 1), 8: (0, 6), 9: (0, 5), 10: (0, 2), 11: (0, 3), 12: (0, 2), 13: (0, 2), 14: (0, 2), 15: (0, 2), 16: (0, 2), 17: (0, 1), 18: (0, 2), 19: (0, 3), 20: (0, 3), 21: (0, 1), 22: (0, 1), 23: (0, 2), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 2), 29: (0, 1), 30: (0, 1), 31: (0, 1), 32: (0, 2), 33: (0, 1), 34: (0, 1), 35: (0, 1), 36: (0, 0), 37: (0, 1), 38: (0, 1), 39: (0, 1), 40: (0, 1), 41: (0, 1), 42: (0, 1)}, + + (2, 2): {0: (0, 7), 1: (0, 10), 2: (0, 5), 3: (0, 7), 4: (0, 2), 5: (0, 1), 6: (0, 5), 7: (0, 2), 8: (0, 6), 9: (0, 5), 10: (0, 2), 11: (0, 4), 12: (0, 2), 13: (0, 2), 14: (0, 2), 15: (0, 2), 16: (0, 2), 17: (0, 1), 18: (0, 2), 19: (0, 3), 20: (0, 3), 21: (0, 1), 22: (0, 1), 23: (0, 2), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 2), 29: (0, 1), 30: (0, 1), 31: (0, 1), 32: (0, 2), 33: (0, 1), 34: (0, 1), 35: (0, 1), 36: (0, 1), 37: (0, 1), 38: (0, 1), 39: (0, 1), 40: (0, 1), 41: (0, 1), 42: (0, 1), 43: (0, 0)}, + + (2, 3): {0: (0, 7), 1: (0, 11), 2: (0, 5), 3: (0, 8), 4: (0, 2), 5: (0, 1), 6: (0, 5), 7: (0, 2), 8: (0, 7), 9: (0, 5), 10: (0, 2), 11: (0, 5), 12: (0, 3), 13: (0, 2), 14: (0, 3), 15: (0, 3), 16: (0, 2), 17: (0, 1), 18: (0, 2), 19: (0, 3), 20: (0, 3), 21: (0, 1), 22: (0, 1), 23: (0, 3), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 2), 29: (0, 1), 30: (0, 1), 31: (0, 1), 32: (0, 2), 33: (0, 1), 34: (0, 1), 35: (0, 1), 36: (0, 1), 37: (0, 1), 38: (0, 1), 39: (0, 1), 40: (0, 1), 41: (0, 1), 42: (0, 1), 43: (0, 0), 44: (0, 0)}, + + (2, 4): {0: (0, 8), 1: (0, 11), 2: (0, 5), 3: (0, 9), 4: (0, 4), 5: (0, 2), 6: (0, 6), 7: (0, 3), 8: (0, 8), 9: (0, 6), 10: (0, 3), 11: (0, 6), 12: (0, 4), 13: (0, 4), 14: (0, 4), 15: (0, 5), 16: (0, 3), 17: (0, 1), 18: (0, 3), 19: (0, 5), 20: (0, 4), 21: (0, 1), 22: (0, 2), 23: (0, 3), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 2), 29: (0, 1), 30: (0, 1), 31: (0, 1), 32: (0, 2), 33: (0, 1), 34: (0, 1), 35: (0, 1), 36: (0, 1), 37: (0, 1), 38: (0, 2), 39: (0, 1), 40: (0, 1), 41: (0, 1), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0)}, + + (2, 5): {0: (0, 8), 1: (0, 11), 2: (0, 5), 3: (0, 9), 4: (0, 4), 5: (0, 2), 6: (0, 6), 7: (0, 3), 8: (0, 8), 9: (0, 7), 10: (0, 3), 11: (0, 7), 12: (0, 4), 13: (0, 4), 14: (0, 5), 15: (0, 5), 16: (0, 3), 17: (0, 1), 18: (0, 3), 19: (0, 5), 20: (0, 4), 21: (0, 1), 22: (0, 3), 23: (0, 3), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 3), 29: (0, 2), 30: (0, 2), 31: (0, 2), 32: (0, 2), 33: (0, 1), 34: (0, 1), 35: (0, 1), 36: (0, 2), 37: (0, 2), 38: (0, 2), 39: (0, 2), 40: (0, 2), 41: (0, 2), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0)}, + + (2, 6): {0: (0, 8), 1: (0, 11), 2: (0, 5), 3: (0, 9), 4: (0, 4), 5: (0, 3), 6: (0, 6), 7: (0, 3), 8: (0, 8), 9: (0, 7), 10: (0, 3), 11: (0, 7), 12: (0, 4), 13: (0, 4), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 5), 20: (0, 4), 21: (0, 2), 22: (0, 3), 23: (0, 3), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 3), 29: (0, 2), 30: (0, 2), 31: (0, 2), 32: (0, 3), 33: (0, 1), 34: (0, 1), 35: (0, 2), 36: (0, 2), 37: (0, 2), 38: (0, 2), 39: (0, 2), 40: (0, 2), 41: (0, 2), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 0), 49: (0, 0)}, + + (2, 7): {0: (0, 8), 1: (0, 12), 2: (0, 5), 3: (0, 9), 4: (0, 4), 5: (0, 3), 6: (0, 6), 7: (0, 3), 8: (0, 8), 9: (0, 7), 10: (0, 3), 11: (0, 7), 12: (0, 4), 13: (0, 4), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 6), 20: (0, 5), 21: (0, 2), 22: (0, 4), 23: (0, 3), 24: (0, 2), 25: (0, 2), 26: (0, 2), 27: (0, 0), 28: (0, 3), 29: (0, 2), 30: (0, 2), 31: (0, 2), 32: (0, 3), 33: (0, 1), 34: (0, 1), 35: (0, 2), 36: (0, 2), 37: (0, 3), 38: (0, 2), 39: (0, 2), 40: (0, 2), 41: (0, 2), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 0), 49: (0, 0), 50: (0, 0), 51: (0, 0), 56: (0, 0), 57: (0, 0)}, + + (2, 8): {0: (0, 9), 1: (0, 12), 2: (0, 6), 3: (0, 11), 4: (0, 5), 5: (0, 3), 6: (0, 7), 7: (0, 3), 8: (0, 8), 9: (0, 7), 10: (0, 3), 11: (0, 7), 12: (0, 4), 13: (0, 4), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 4), 23: (0, 4), 24: (0, 3), 25: (0, 3), 26: (0, 3), 27: (0, 1), 28: (0, 3), 29: (0, 2), 30: (0, 2), 31: (0, 2), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 2), 36: (0, 2), 37: (0, 3), 38: (0, 2), 39: (0, 2), 40: (0, 2), 41: (0, 2), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 0), 57: (0, 0), 60: (0, 0), 61: (0, 0)}, + + (3, 0): {0: (0, 9), 1: (0, 12), 2: (0, 7), 3: (0, 11), 4: (0, 5), 5: (0, 3), 6: (0, 7), 7: (0, 3), 8: (0, 8), 9: (0, 8), 10: (0, 4), 11: (0, 7), 12: (0, 4), 13: (0, 4), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 4), 23: (0, 4), 24: (0, 3), 25: (0, 3), 26: (0, 3), 27: (0, 1), 28: (0, 3), 29: (0, 2), 30: (0, 2), 31: (0, 2), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 2), 36: (0, 2), 37: (0, 3), 38: (0, 2), 39: (0, 2), 40: (0, 2), 41: (0, 2), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 0), 57: (0, 0), 60: (0, 0), 61: (0, 0), 65: (0, 0), 66: (0, 0), 67: (0, 0)}, + + (3, 1): {0: (0, 9), 1: (0, 13), 2: (0, 7), 3: (0, 12), 4: (0, 5), 5: (0, 3), 6: (0, 7), 7: (0, 3), 8: (0, 8), 9: (0, 8), 10: (0, 4), 11: (0, 7), 12: (0, 4), 13: (0, 4), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 4), 23: (0, 4), 24: (0, 3), 25: (0, 3), 26: (0, 3), 27: (0, 1), 28: (0, 3), 29: (0, 2), 30: (0, 2), 31: (0, 2), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 2), 36: (0, 2), 37: (0, 3), 38: (0, 2), 39: (0, 2), 40: (0, 2), 41: (0, 2), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 0), 57: (0, 0), 60: (0, 0), 61: (0, 0), 65: (0, 0), 66: (0, 0), 67: (0, 0)}, + + (3, 2): {0: (0, 9), 1: (0, 13), 2: (0, 7), 3: (0, 12), 4: (0, 6), 5: (0, 3), 6: (0, 7), 7: (0, 3), 8: (0, 8), 9: (0, 8), 10: (0, 4), 11: (0, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 4), 23: (0, 4), 24: (0, 3), 25: (0, 3), 26: (0, 3), 27: (0, 1), 28: (0, 3), 29: (0, 2), 30: (0, 2), 31: (0, 2), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 3), 36: (0, 2), 37: (0, 3), 38: (0, 2), 39: (0, 2), 40: (0, 2), 41: (0, 2), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 1), 57: (0, 0), 60: (0, 0), 61: (0, 0), 65: (0, 0), 66: (0, 0), 67: (0, 0)}, + + (3, 3): {0: (0, 9), 1: (0, 13), 2: (0, 7), 3: (0, 12), 4: (0, 6), 5: (0, 3), 6: (0, 7), 7: (0, 3), 8: (0, 8), 9: (0, 8), 10: (0, 4), 11: (0, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 4), 23: (0, 4), 24: (0, 3), 25: (0, 3), 26: (0, 3), 27: (0, 1), 28: (0, 3), 29: (0, 3), 30: (0, 3), 31: (0, 3), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 4), 36: (0, 2), 37: (0, 3), 38: (0, 3), 39: (0, 2), 40: (0, 2), 41: (0, 3), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 2), 57: (0, 1), 60: (0, 0), 61: (0, 0), 65: (0, 0), 66: (0, 0), 67: (0, 0)}, + + (3, 4): {0: (0, 9), 1: (0, 13), 2: (0, 7), 3: (0, 12), 4: (0, 7), 5: (0, 4), 6: (0, 8), 7: (0, 3), 8: (0, 8), 9: (0, 8), 10: (0, 4), 11: (0, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 4), 23: (0, 4), 24: (0, 3), 25: (0, 3), 26: (0, 3), 27: (0, 1), 28: (0, 3), 29: (0, 3), 30: (0, 3), 31: (0, 3), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 4), 36: (0, 2), 37: (0, 3), 38: (0, 3), 39: (0, 2), 40: (0, 2), 41: (0, 3), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 2), 57: (0, 1), 58: (0, 0), 60: (0, 0), 61: (0, 0), 65: (0, 0), 66: (0, 0), 67: (0, 0)}, + + (3, 5): {0: (0, 9), 1: (0, 15), 2: (0, 8), 3: (0, 12), 4: (0, 7), 5: (0, 4), 6: (0, 8), 7: (0, 3), 8: (0, 8), 9: (0, 8), 10: (0, 4), 11: (0, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 4), 23: (0, 4), 24: (0, 3), 25: (0, 3), 26: (0, 3), 27: (0, 1), 28: (0, 3), 29: (0, 3), 30: (0, 3), 31: (0, 3), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 4), 36: (0, 2), 37: (0, 3), 38: (0, 3), 39: (0, 2), 40: (0, 2), 41: (0, 3), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 3), 57: (0, 1), 58: (0, 0), 60: (0, 0), 61: (0, 0), 65: (0, 0), 66: (0, 0), 67: (0, 0)}, + + (3, 6): {0: (0, 9), 1: (0, 15), 2: (0, 8), 3: (0, 12), 4: (0, 7), 5: (0, 4), 6: (0, 8), 7: (0, 3), 8: (0, 8), 9: (0, 8), 10: (0, 4), 11: (0, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 4), 23: (0, 4), 24: (0, 4), 25: (0, 3), 26: (0, 3), 27: (0, 1), 28: (0, 3), 29: (0, 3), 30: (0, 3), 31: (0, 3), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 4), 36: (0, 2), 37: (0, 3), 38: (0, 3), 39: (0, 2), 40: (0, 2), 41: (0, 3), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 3), 57: (0, 1), 58: (0, 0), 60: (0, 0), 61: (0, 0), 65: (0, 0), 66: (0, 0), 67: (0, 0)}, + + (3, 7): {0: (0, 10), 1: (0, 16), 2: (0, 8), 3: (0, 12), 4: (0, 7), 5: (0, 4), 6: (0, 8), 7: (0, 3), 8: (0, 9), 9: (0, 9), 10: (0, 4), 11: (0, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 4), 23: (0, 4), 24: (0, 4), 25: (0, 3), 26: (0, 3), 27: (0, 1), 28: (0, 3), 29: (0, 3), 30: (0, 3), 31: (0, 3), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 4), 36: (0, 2), 37: (0, 3), 38: (0, 3), 39: (0, 2), 40: (0, 2), 41: (0, 3), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 3), 57: (0, 1), 58: (0, 0), 60: (0, 1), 61: (0, 0), 65: (0, 0), 66: (0, 0), 67: (0, 0), 68: (0, 0)}, + + (3, 8): {0: (0, 11), 1: (0, 16), 2: (0, 8), 3: (0, 12), 4: (0, 7), 5: (0, 4), 6: (0, 8), 7: (0, 3), 8: (0, 9), 9: (0, 9), 10: (0, 5), 11: (0, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 5), 16: (0, 5), 17: (0, 1), 18: (0, 3), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 5), 23: (0, 4), 24: (0, 5), 25: (0, 4), 26: (0, 4), 27: (0, 1), 28: (0, 4), 29: (0, 3), 30: (0, 3), 31: (0, 3), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 4), 36: (0, 2), 37: (0, 3), 38: (0, 3), 39: (0, 2), 40: (0, 2), 41: (0, 3), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 3), 57: (0, 1), 58: (0, 0), 60: (0, 1), 61: (0, 0), 65: (0, 0), 66: (0, 1), 67: (0, 0), 68: (0, 0), 69: (0, 0)}, + + (3, 9): {0: (0, 11), 1: (0, 17), 2: (0, 9), 3: (0, 12), 4: (0, 7), 5: (0, 4), 6: (0, 8), 7: (0, 3), 8: (0, 9), 9: (0, 9), 10: (0, 6), 11: (0, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 5), 16: (0, 5), 17: (0, 1), 18: (0, 4), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 5), 23: (0, 4), 24: (0, 5), 25: (0, 4), 26: (0, 4), 27: (0, 1), 28: (0, 4), 29: (0, 3), 30: (0, 3), 31: (0, 3), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 4), 36: (0, 2), 37: (0, 3), 38: (0, 3), 39: (0, 2), 40: (0, 2), 41: (0, 3), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 3), 57: (0, 1), 58: (0, 0), 60: (0, 1), 61: (0, 0), 65: (0, 0), 66: (0, 1), 67: (0, 0), 68: (0, 0), 69: (0, 0)}, + +} diff --git a/test/test_client_async.py b/test/test_client_async.py index ec5e2c0ae..b9b415012 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -266,7 +266,7 @@ def test_least_loaded_node(): def test_set_topics(mocker): request_update = mocker.patch.object(ClusterMetadata, 'request_update') request_update.side_effect = lambda: Future() - cli = KafkaClient(api_version=(0, 10)) + cli = KafkaClient(api_version=(0, 10, 0)) # replace 'empty' with 'non empty' request_update.reset_mock() diff --git a/test/test_consumer.py b/test/test_consumer.py index 436fe55c0..8186125df 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -18,7 +18,7 @@ def test_request_timeout_larger_than_connections_max_idle_ms_raises(self): KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000) def test_subscription_copy(self): - consumer = KafkaConsumer('foo', api_version=(0, 10)) + consumer = KafkaConsumer('foo', api_version=(0, 10, 0)) sub = consumer.subscription() assert sub is not consumer.subscription() assert sub == set(['foo']) diff --git a/test/test_sender.py b/test/test_sender.py index 2a68defcf..f3bbf4275 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -37,9 +37,9 @@ def sender(client, accumulator, metrics): @pytest.mark.parametrize(("api_version", "produce_version"), [ - ((0, 10), 2), + ((0, 10, 0), 2), ((0, 9), 1), - ((0, 8), 0) + ((0, 8, 0), 0) ]) def test_produce_request(sender, mocker, api_version, produce_version): sender.config['api_version'] = api_version