@@ -303,7 +303,7 @@ def _can_connect(self, node_id):
303303
304304 def _conn_state_change (self , node_id , sock , conn ):
305305 with self ._lock :
306- if conn .connecting () :
306+ if conn .state is ConnectionStates . CONNECTING :
307307 # SSL connections can enter this state 2x (second during Handshake)
308308 if node_id not in self ._connecting :
309309 self ._connecting .add (node_id )
@@ -315,7 +315,19 @@ def _conn_state_change(self, node_id, sock, conn):
315315 if self .cluster .is_bootstrap (node_id ):
316316 self ._last_bootstrap = time .time ()
317317
318- elif conn .connected ():
318+ elif conn .state is ConnectionStates .API_VERSIONS_SEND :
319+ try :
320+ self ._selector .register (sock , selectors .EVENT_WRITE , conn )
321+ except KeyError :
322+ self ._selector .modify (sock , selectors .EVENT_WRITE , conn )
323+
324+ elif conn .state in (ConnectionStates .API_VERSIONS_RECV , ConnectionStates .AUTHENTICATING ):
325+ try :
326+ self ._selector .register (sock , selectors .EVENT_READ , conn )
327+ except KeyError :
328+ self ._selector .modify (sock , selectors .EVENT_READ , conn )
329+
330+ elif conn .state is ConnectionStates .CONNECTED :
319331 log .debug ("Node %s connected" , node_id )
320332 if node_id in self ._connecting :
321333 self ._connecting .remove (node_id )
@@ -332,6 +344,8 @@ def _conn_state_change(self, node_id, sock, conn):
332344
333345 if self .cluster .is_bootstrap (node_id ):
334346 self ._bootstrap_fails = 0
347+ if self ._api_versions is None :
348+ self ._api_versions = conn ._api_versions
335349
336350 else :
337351 for node_id in list (self ._conns .keys ()):
@@ -970,15 +984,14 @@ def refresh_done(val_or_error):
970984 def get_api_versions (self ):
971985 """Return the ApiVersions map, if available.
972986
973- Note: A call to check_version must previously have succeeded and returned
974- version 0.10.0 or later
987+ Note: Only available after bootstrap; requires broker version 0.10.0 or later.
975988
976989 Returns: a map of dict mapping {api_key : (min_version, max_version)},
977990 or None if ApiVersion is not supported by the kafka cluster.
978991 """
979992 return self ._api_versions
980993
981- def check_version (self , node_id = None , timeout = None , strict = False ):
994+ def check_version (self , node_id = None , timeout = None , ** kwargs ):
982995 """Attempt to guess the version of a Kafka broker.
983996
984997 Keyword Arguments:
@@ -994,50 +1007,45 @@ def check_version(self, node_id=None, timeout=None, strict=False):
9941007 Raises:
9951008 NodeNotReadyError (if node_id is provided)
9961009 NoBrokersAvailable (if node_id is None)
997- UnrecognizedBrokerVersion: please file bug if seen!
998- AssertionError (if strict=True): please file bug if seen!
9991010 """
10001011 timeout = timeout or (self .config ['api_version_auto_timeout_ms' ] / 1000 )
1001- self ._lock .acquire ()
1002- end = time .time () + timeout
1003- while time .time () < end :
1004-
1005- # It is possible that least_loaded_node falls back to bootstrap,
1006- # which can block for an increasing backoff period
1007- try_node = node_id or self .least_loaded_node ()
1008- if try_node is None :
1009- self ._lock .release ()
1010- raise Errors .NoBrokersAvailable ()
1011- if not self ._init_connect (try_node ):
1012- if try_node == node_id :
1013- raise Errors .NodeNotReadyError ("Connection failed to %s" % node_id )
1014- else :
1012+ with self ._lock :
1013+ end = time .time () + timeout
1014+ while time .time () < end :
1015+ time_remaining = max (end - time .time (), 0 )
1016+ if node_id is not None and self .connection_delay (node_id ) > 0 :
1017+ sleep_time = min (time_remaining , self .connection_delay (node_id ) / 1000.0 )
1018+ if sleep_time > 0 :
1019+ time .sleep (sleep_time )
10151020 continue
1016-
1017- conn = self ._conns [try_node ]
1018-
1019- # We will intentionally cause socket failures
1020- # These should not trigger metadata refresh
1021- self ._refresh_on_disconnects = False
1022- try :
1023- remaining = end - time .time ()
1024- version = conn .check_version (timeout = remaining , strict = strict , topics = list (self .config ['bootstrap_topics_filter' ]))
1025- if not self ._api_versions :
1026- self ._api_versions = conn .get_api_versions ()
1027- self ._lock .release ()
1028- return version
1029- except Errors .NodeNotReadyError :
1030- # Only raise to user if this is a node-specific request
1021+ try_node = node_id or self .least_loaded_node ()
1022+ if try_node is None :
1023+ sleep_time = min (time_remaining , self .least_loaded_node_refresh_ms () / 1000.0 )
1024+ if sleep_time > 0 :
1025+ log .warning ('No node available during check_version; sleeping %.2f secs' , sleep_time )
1026+ time .sleep (sleep_time )
1027+ continue
1028+ log .debug ('Attempting to check version with node %s' , try_node )
1029+ if not self ._init_connect (try_node ):
1030+ if try_node == node_id :
1031+ raise Errors .NodeNotReadyError ("Connection failed to %s" % node_id )
1032+ else :
1033+ continue
1034+ conn = self ._conns [try_node ]
1035+
1036+ while conn .connecting () and time .time () < end :
1037+ timeout_ms = min ((end - time .time ()) * 1000 , 200 )
1038+ self .poll (timeout_ms = timeout_ms )
1039+
1040+ if conn ._api_version is not None :
1041+ return conn ._api_version
1042+
1043+ # Timeout
1044+ else :
10311045 if node_id is not None :
1032- self ._lock .release ()
1033- raise
1034- finally :
1035- self ._refresh_on_disconnects = True
1036-
1037- # Timeout
1038- else :
1039- self ._lock .release ()
1040- raise Errors .NoBrokersAvailable ()
1046+ raise Errors .NodeNotReadyError (node_id )
1047+ else :
1048+ raise Errors .NoBrokersAvailable ()
10411049
10421050 def api_version (self , operation , max_version = None ):
10431051 """Find the latest version of the protocol operation supported by both
@@ -1063,15 +1071,15 @@ def api_version(self, operation, max_version=None):
10631071 broker_api_versions = self ._api_versions
10641072 api_key = operation [0 ].API_KEY
10651073 if broker_api_versions is None or api_key not in broker_api_versions :
1066- raise IncompatibleBrokerVersion (
1074+ raise Errors . IncompatibleBrokerVersion (
10671075 "Kafka broker does not support the '{}' Kafka protocol."
10681076 .format (operation [0 ].__name__ ))
10691077 broker_min_version , broker_max_version = broker_api_versions [api_key ]
10701078 version = min (max_version , broker_max_version )
10711079 if version < broker_min_version :
10721080 # max library version is less than min broker version. Currently,
10731081 # no Kafka versions specify a min msg version. Maybe in the future?
1074- raise IncompatibleBrokerVersion (
1082+ raise Errors . IncompatibleBrokerVersion (
10751083 "No version of the '{}' Kafka protocol is supported by both the client and broker."
10761084 .format (operation [0 ].__name__ ))
10771085 return version
0 commit comments