135135
136136
137137# pylint: disable=too-many-public-methods, too-many-instance-attributes
138+ # pylint: disable=too-many-lines
138139class Ankaios :
139140 """
140141 This class is used to interact with the Ankaios using an intuitive API.
@@ -240,17 +241,27 @@ def disconnect(self) -> None:
240241 Disconnect from the control interface by stopping to read
241242 from the input fifo.
242243 """
244+ if not self ._connected :
245+ self .logger .debug ("Already disconnected." )
246+ return
243247 self .logger .debug ("Disconnecting.." )
244- self ._connected = False
245248 self ._disconnect_event .set ()
246249 if self ._read_thread is not None :
247250 self ._read_thread .join (timeout = 2 )
248251 if self ._read_thread .is_alive ():
249252 self .logger .error ("Read thread did not stop." )
250253 self ._read_thread = None
254+ self ._cleanup ()
255+
256+ def _cleanup (self ) -> None :
257+ """
258+ Clean up the resources.
259+ """
260+ self ._connected = False
251261 if self ._output_file is not None :
252262 self ._output_file .close ()
253263 self ._output_file = None
264+ self .logger .debug ("Cleanup happened" )
254265
255266 # pylint: disable=too-many-branches
256267 def _read_from_control_interface (self ) -> None :
@@ -287,7 +298,7 @@ def _read_from_control_interface(self) -> None:
287298
288299 # Buffer for reading in the byte size of the proto msg
289300 varint_buffer = bytearray ()
290- while True :
301+ while not self . _disconnect_event . is_set () :
291302 # Consume byte for byte
292303 next_byte = input_fifo .read (1 )
293304 if not next_byte : # pragma: no cover
@@ -297,6 +308,10 @@ def _read_from_control_interface(self) -> None:
297308 if next_byte [0 ] & MOST_SIGNIFICANT_BIT_MASK == 0 :
298309 break
299310 if not varint_buffer : # pragma: no cover
311+ self .logger .error (
312+ "Nothing to read from the input fifo pipe. Is the agent still there?"
313+ )
314+ time .sleep (1 )
300315 continue
301316 # Decode the varint and receive the proto msg length
302317 msg_len , _ = _DecodeVarint (varint_buffer , 0 )
@@ -331,11 +346,11 @@ def _read_from_control_interface(self) -> None:
331346 self ._responses [request_id ].set ()
332347 except ConnectionClosedException as e : # pragma: no cover
333348 self .logger .error ("Connection closed: %s" , e )
334- self .disconnect ()
335349 except Exception as e : # pylint: disable=broad-exception-caught
336350 self .logger .error ("Error while reading fifo file: %s" , e )
337351 finally :
338352 input_fifo .close ()
353+ self ._cleanup ()
339354
340355 def _write_to_pipe (self , to_ankaios : _control_api .ToAnkaios ) -> None :
341356 """
0 commit comments