83
83
84
84
class BufferingSocket (object ):
85
85
86
- def __init__ (self , socket ):
87
- self .address = socket .getpeername ()
88
- self .socket = socket
86
+ def __init__ (self , connection ):
87
+ self .connection = connection
88
+ self .socket = connection .socket
89
+ self .address = self .socket .getpeername ()
89
90
self .buffer = bytearray ()
90
91
91
92
def fill (self ):
@@ -96,6 +97,10 @@ def fill(self):
96
97
self .buffer [len (self .buffer ):] = received
97
98
else :
98
99
if ready_to_read is not None :
100
+ # If this connection fails, remove this address from the
101
+ # connection pool to which this connection belongs.
102
+ if self .connection .pool :
103
+ self .connection .pool .remove (self .address )
99
104
raise ServiceUnavailable ("Failed to read from connection %r" % (self .address ,))
100
105
101
106
def read_message (self ):
@@ -211,9 +216,12 @@ class Connection(object):
211
216
.. note:: logs at INFO level
212
217
"""
213
218
219
+ #: The pool of which this connection is a member
220
+ pool = None
221
+
214
222
def __init__ (self , sock , ** config ):
215
223
self .socket = sock
216
- self .buffering_socket = BufferingSocket (sock )
224
+ self .buffering_socket = BufferingSocket (self )
217
225
self .address = sock .getpeername ()
218
226
self .channel = ChunkChannel (sock )
219
227
self .packer = Packer (self .channel )
@@ -411,6 +419,7 @@ def acquire(self, address):
411
419
connection .in_use = True
412
420
return connection
413
421
connection = self .connector (address )
422
+ connection .pool = self
414
423
connection .in_use = True
415
424
connections .append (connection )
416
425
return connection
0 commit comments