@@ -165,8 +165,8 @@ class BrokerConnection(object):
165165 or other configuration forbids use of all the specified ciphers),
166166 an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
167167 api_version (tuple): Specify which Kafka API version to use.
168- Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9),
169- (0, 10). Default: (0, 8, 2)
168+ Must be None or >= (0, 10, 0) to enable SASL authentication.
169+ Default: None
170170 api_version_auto_timeout_ms (int): number of milliseconds to throw a
171171 timeout exception from the constructor when checking the broker
172172 api version. Only applies if api_version is None
@@ -214,7 +214,7 @@ class BrokerConnection(object):
214214 'ssl_crlfile' : None ,
215215 'ssl_password' : None ,
216216 'ssl_ciphers' : None ,
217- 'api_version' : ( 0 , 8 , 2 ), # default to most restrictive
217+ 'api_version' : None ,
218218 'selector' : selectors .DefaultSelector ,
219219 'state_change_callback' : lambda node_id , sock , conn : True ,
220220 'metrics' : None ,
@@ -522,7 +522,7 @@ def _try_handshake(self):
522522 return False
523523
524524 def _try_authenticate (self ):
525- assert self .config ['api_version' ] is None or self .config ['api_version' ] >= (0 , 10 )
525+ assert self .config ['api_version' ] is None or self .config ['api_version' ] >= (0 , 10 , 0 )
526526
527527 if self ._sasl_auth_future is None :
528528 # Build a SaslHandShakeRequest message
@@ -1154,9 +1154,10 @@ def next_ifr_request_timeout_ms(self):
11541154 else :
11551155 return float ('inf' )
11561156
1157- def _handle_api_version_response (self , response ):
1157+ def _handle_api_versions_response (self , response ):
11581158 error_type = Errors .for_code (response .error_code )
1159- assert error_type is Errors .NoError , "API version check failed"
1159+ if error_type is not Errors .NoError :
1160+ return False
11601161 self ._api_versions = dict ([
11611162 (api_key , (min_version , max_version ))
11621163 for api_key , min_version , max_version in response .api_versions
@@ -1168,12 +1169,7 @@ def get_api_versions(self):
11681169 return self ._api_versions
11691170
11701171 version = self .check_version ()
1171- if version < (0 , 10 , 0 ):
1172- raise Errors .UnsupportedVersionError (
1173- "ApiVersion not supported by cluster version {} < 0.10.0"
1174- .format (version ))
1175- # _api_versions is set as a side effect of check_versions() on a cluster
1176- # that supports 0.10.0 or later
1172+ # _api_versions is set as a side effect of check_versions()
11771173 return self ._api_versions
11781174
11791175 def _infer_broker_version_from_api_versions (self , api_versions ):
@@ -1182,16 +1178,16 @@ def _infer_broker_version_from_api_versions(self, api_versions):
11821178 test_cases = [
11831179 # format (<broker version>, <needed struct>)
11841180 # Make sure to update consumer_integration test check when adding newer versions.
1185- ((2 , 6 , 0 ), DescribeClientQuotasRequest [0 ]),
1186- ((2 , 5 , 0 ), DescribeAclsRequest_v2 ),
1187- ((2 , 4 , 0 ), ProduceRequest [8 ]),
1188- ((2 , 3 , 0 ), FetchRequest [11 ]),
1189- ((2 , 2 , 0 ), OffsetRequest [5 ]),
1190- ((2 , 1 , 0 ), FetchRequest [10 ]),
1191- ((2 , 0 , 0 ), FetchRequest [8 ]),
1192- ((1 , 1 , 0 ), FetchRequest [7 ]),
1193- ((1 , 0 , 0 ), MetadataRequest [5 ]),
1194- ((0 , 11 , 0 ), MetadataRequest [4 ]),
1181+ ((2 , 6 ), DescribeClientQuotasRequest [0 ]),
1182+ ((2 , 5 ), DescribeAclsRequest_v2 ),
1183+ ((2 , 4 ), ProduceRequest [8 ]),
1184+ ((2 , 3 ), FetchRequest [11 ]),
1185+ ((2 , 2 ), OffsetRequest [5 ]),
1186+ ((2 , 1 ), FetchRequest [10 ]),
1187+ ((2 , 0 ), FetchRequest [8 ]),
1188+ ((1 , 1 ), FetchRequest [7 ]),
1189+ ((1 , 0 ), MetadataRequest [5 ]),
1190+ ((0 , 11 ), MetadataRequest [4 ]),
11951191 ((0 , 10 , 2 ), OffsetFetchRequest [2 ]),
11961192 ((0 , 10 , 1 ), MetadataRequest [2 ]),
11971193 ]
@@ -1204,7 +1200,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
12041200 if min_version <= struct .API_VERSION <= max_version :
12051201 return broker_version
12061202
1207- # We know that ApiVersionResponse is only supported in 0.10+
1203+ # We know that ApiVersionsResponse is only supported in 0.10+
12081204 # so if all else fails, choose that
12091205 return (0 , 10 , 0 )
12101206
@@ -1213,7 +1209,7 @@ def check_version(self, timeout=2, strict=False, topics=[]):
12131209
12141210 Note: This is a blocking call.
12151211
1216- Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
1212+ Returns: version tuple, i.e. (3, 9), (2, 4), etc ...
12171213 """
12181214 timeout_at = time .time () + timeout
12191215 log .info ('Probing node %s broker version' , self .node_id )
@@ -1236,12 +1232,15 @@ def reset_override_configs():
12361232 # vanilla MetadataRequest. If the server did not recognize the first
12371233 # request, both will be failed with a ConnectionError that wraps
12381234 # socket.error (32, 54, or 104)
1239- from kafka .protocol .admin import ApiVersionRequest , ListGroupsRequest
1235+ from kafka .protocol .admin import ListGroupsRequest
1236+ from kafka .protocol .api_versions import ApiVersionsRequest
1237+ from kafka .protocol .broker_api_versions import BROKER_API_VERSIONS
12401238 from kafka .protocol .commit import OffsetFetchRequest , GroupCoordinatorRequest
12411239
12421240 test_cases = [
1243- # All cases starting from 0.10 will be based on ApiVersionResponse
1244- ((0 , 10 ), ApiVersionRequest [0 ]()),
1241+ # All cases starting from 0.10 will be based on ApiVersionsResponse
1242+ ((0 , 11 ), ApiVersionsRequest [1 ]()),
1243+ ((0 , 10 , 0 ), ApiVersionsRequest [0 ]()),
12451244 ((0 , 9 ), ListGroupsRequest [0 ]()),
12461245 ((0 , 8 , 2 ), GroupCoordinatorRequest [0 ]('kafka-python-default-group' )),
12471246 ((0 , 8 , 1 ), OffsetFetchRequest [0 ]('kafka-python-default-group' , [])),
@@ -1274,11 +1273,17 @@ def reset_override_configs():
12741273 selector .close ()
12751274
12761275 if f .succeeded ():
1277- if isinstance ( request , ApiVersionRequest [ 0 ] ):
1276+ if version >= ( 0 , 10 , 0 ):
12781277 # Starting from 0.10 kafka broker we determine version
1279- # by looking at ApiVersionResponse
1280- api_versions = self ._handle_api_version_response (f .value )
1278+ # by looking at ApiVersionsResponse
1279+ api_versions = self ._handle_api_versions_response (f .value )
1280+ if not api_versions :
1281+ continue
12811282 version = self ._infer_broker_version_from_api_versions (api_versions )
1283+ else :
1284+ if version not in BROKER_API_VERSIONS :
1285+ raise Errors .UnrecognizedBrokerVersion (version )
1286+ self ._api_versions = BROKER_API_VERSIONS [version ]
12821287 log .info ('Broker version identified as %s' , '.' .join (map (str , version )))
12831288 log .info ('Set configuration api_version=%s to skip auto'
12841289 ' check_version requests on startup' , version )
@@ -1298,7 +1303,7 @@ def reset_override_configs():
12981303 # requests (bug...). In this case we expect to see a correlation
12991304 # id mismatch
13001305 elif (isinstance (f .exception , Errors .CorrelationIdError ) and
1301- version == (0 , 10 )):
1306+ version > (0 , 9 )):
13021307 pass
13031308 elif six .PY2 :
13041309 assert isinstance (f .exception .args [0 ], socket .error )
0 commit comments