@@ -43,30 +43,29 @@ def __init__(
4343 self .on_result = on_result
4444
4545
46- class JupyterKernelWebSocket (BaseModel ):
47- model_config = ConfigDict (arbitrary_types_allowed = True )
46+ class JupyterKernelWebSocket :
4847
49- url : str
50-
51- _cells : Dict [str , CellExecution ] = {}
52- _waiting_for_replies : Dict [str , DeferredFuture ] = PrivateAttr (default_factory = dict )
53- _queue_in : Queue = PrivateAttr (default_factory = Queue )
54- _queue_out : Queue = PrivateAttr (default_factory = Queue )
55- _process_cleanup : List [Callable [[], Any ]] = PrivateAttr (default_factory = list )
56- _closed : bool = PrivateAttr (default = False )
48+ def __init__ (self , url : str ):
49+ self .url = url
50+ self ._cells : Dict [str , CellExecution ] = {}
51+ self ._waiting_for_replies : Dict [str , DeferredFuture ] = {}
52+ self ._queue_in = Queue ()
53+ self ._queue_out = Queue ()
54+ self ._stopped = threading .Event ()
5755
5856 def process_messages (self ):
59- while True :
60- data = self ._queue_out .get ()
57+ while not self ._stopped .is_set ():
58+ if self ._queue_out .empty ():
59+ time .sleep (0.01 )
60+ continue
6161
62+ data = self ._queue_out .get ()
6263 logger .debug (f"WebSocket received message: { data } " .strip ())
6364 self ._receive_message (json .loads (data ))
6465 self ._queue_out .task_done ()
6566
6667 def connect (self , timeout : float = TIMEOUT ):
6768 started = threading .Event ()
68- stopped = threading .Event ()
69- self ._process_cleanup .append (stopped .set )
7069
7170 threading .Thread (
7271 target = self .process_messages , daemon = True , name = "e2b-process-messages"
@@ -78,7 +77,7 @@ def connect(self, timeout: float = TIMEOUT):
7877 queue_in = self ._queue_in ,
7978 queue_out = self ._queue_out ,
8079 started = started ,
81- stopped = stopped ,
80+ stopped = self . _stopped
8281 ).run ,
8382 daemon = True ,
8483 name = "e2b-code-interpreter-websocket" ,
@@ -91,7 +90,7 @@ def connect(self, timeout: float = TIMEOUT):
9190 while (
9291 not started .is_set ()
9392 and time .time () - start_time < timeout
94- and not self ._closed
93+ and not self ._stopped . is_set ()
9594 ):
9695 time .sleep (0.1 )
9796
@@ -245,12 +244,7 @@ def _receive_message(self, data: dict):
245244
246245 def close (self ):
247246 logger .debug ("Closing WebSocket" )
248- self ._closed = True
249-
250- for cancel in self ._process_cleanup :
251- cancel ()
252-
253- self ._process_cleanup .clear ()
247+ self ._stopped .set ()
254248
255249 for handler in self ._waiting_for_replies .values ():
256250 logger .debug (f"Cancelling waiting for execution result for { handler } " )
0 commit comments