@@ -165,8 +165,8 @@ class BrokerConnection(object):
165165 or other configuration forbids use of all the specified ciphers),
166166 an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
167167 api_version (tuple): Specify which Kafka API version to use.
168- Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9),
169- (0, 10). Default: (0, 8, 2)
168+ Must be None or >= (0, 10, 0) to enable SASL authentication.
169+ Default: None
170170 api_version_auto_timeout_ms (int): number of milliseconds to throw a
171171 timeout exception from the constructor when checking the broker
172172 api version. Only applies if api_version is None
@@ -214,7 +214,7 @@ class BrokerConnection(object):
214214 'ssl_crlfile' : None ,
215215 'ssl_password' : None ,
216216 'ssl_ciphers' : None ,
217- 'api_version' : ( 0 , 8 , 2 ), # default to most restrictive
217+ 'api_version' : None ,
218218 'selector' : selectors .DefaultSelector ,
219219 'state_change_callback' : lambda node_id , sock , conn : True ,
220220 'metrics' : None ,
@@ -522,7 +522,7 @@ def _try_handshake(self):
522522 return False
523523
524524 def _try_authenticate (self ):
525- assert self .config ['api_version' ] is None or self .config ['api_version' ] >= (0 , 10 )
525+ assert self .config ['api_version' ] is None or self .config ['api_version' ] >= (0 , 10 , 0 )
526526
527527 if self ._sasl_auth_future is None :
528528 # Build a SaslHandShakeRequest message
@@ -1178,16 +1178,16 @@ def _infer_broker_version_from_api_versions(self, api_versions):
11781178 test_cases = [
11791179 # format (<broker version>, <needed struct>)
11801180 # Make sure to update consumer_integration test check when adding newer versions.
1181- ((2 , 6 , 0 ), DescribeClientQuotasRequest [0 ]),
1182- ((2 , 5 , 0 ), DescribeAclsRequest_v2 ),
1183- ((2 , 4 , 0 ), ProduceRequest [8 ]),
1184- ((2 , 3 , 0 ), FetchRequest [11 ]),
1185- ((2 , 2 , 0 ), OffsetRequest [5 ]),
1186- ((2 , 1 , 0 ), FetchRequest [10 ]),
1187- ((2 , 0 , 0 ), FetchRequest [8 ]),
1188- ((1 , 1 , 0 ), FetchRequest [7 ]),
1189- ((1 , 0 , 0 ), MetadataRequest [5 ]),
1190- ((0 , 11 , 0 ), MetadataRequest [4 ]),
1181+ ((2 , 6 ), DescribeClientQuotasRequest [0 ]),
1182+ ((2 , 5 ), DescribeAclsRequest_v2 ),
1183+ ((2 , 4 ), ProduceRequest [8 ]),
1184+ ((2 , 3 ), FetchRequest [11 ]),
1185+ ((2 , 2 ), OffsetRequest [5 ]),
1186+ ((2 , 1 ), FetchRequest [10 ]),
1187+ ((2 , 0 ), FetchRequest [8 ]),
1188+ ((1 , 1 ), FetchRequest [7 ]),
1189+ ((1 , 0 ), MetadataRequest [5 ]),
1190+ ((0 , 11 ), MetadataRequest [4 ]),
11911191 ((0 , 10 , 2 ), OffsetFetchRequest [2 ]),
11921192 ((0 , 10 , 1 ), MetadataRequest [2 ]),
11931193 ]
@@ -1209,7 +1209,7 @@ def check_version(self, timeout=2, strict=False, topics=[]):
12091209
12101210 Note: This is a blocking call.
12111211
1212- Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
1212+ Returns: version tuple, i.e. (3, 9), (2, 4), etc ...
12131213 """
12141214 timeout_at = time .time () + timeout
12151215 log .info ('Probing node %s broker version' , self .node_id )
@@ -1240,7 +1240,7 @@ def reset_override_configs():
12401240 test_cases = [
12411241 # All cases starting from 0.10 will be based on ApiVersionsResponse
12421242 ((0 , 11 ), ApiVersionsRequest [1 ]()),
1243- ((0 , 10 ), ApiVersionsRequest [0 ]()),
1243+ ((0 , 10 , 0 ), ApiVersionsRequest [0 ]()),
12441244 ((0 , 9 ), ListGroupsRequest [0 ]()),
12451245 ((0 , 8 , 2 ), GroupCoordinatorRequest [0 ]('kafka-python-default-group' )),
12461246 ((0 , 8 , 1 ), OffsetFetchRequest [0 ]('kafka-python-default-group' , [])),
@@ -1273,7 +1273,7 @@ def reset_override_configs():
12731273 selector .close ()
12741274
12751275 if f .succeeded ():
1276- if version >= (0 , 10 ):
1276+ if version >= (0 , 10 , 0 ):
12771277 # Starting from 0.10 kafka broker we determine version
12781278 # by looking at ApiVersionsResponse
12791279 api_versions = self ._handle_api_versions_response (f .value )
@@ -1303,7 +1303,7 @@ def reset_override_configs():
13031303 # requests (bug...). In this case we expect to see a correlation
13041304 # id mismatch
13051305 elif (isinstance (f .exception , Errors .CorrelationIdError ) and
1306- version == (0 , 10 )):
1306+ version > (0 , 9 )):
13071307 pass
13081308 elif six .PY2 :
13091309 assert isinstance (f .exception .args [0 ], socket .error )
0 commit comments