Skip to content

Commit a8bd097

Browse files
committed
Add stop event
Signed-off-by: Samuel Monson <[email protected]>
1 parent f148426 commit a8bd097

File tree

2 files changed

+24
-12
lines changed

2 files changed

+24
-12
lines changed

src/guidellm/scheduler/scheduler.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from multiprocessing import Manager
77
from queue import Empty as QueueEmpty
88
from queue import Queue
9+
from threading import Event
910
from typing import (
1011
Any,
1112
Generic,
@@ -126,7 +127,7 @@ async def run(
126127
) as executor,
127128
):
128129
requests_iter: Optional[Iterator[Any]] = None
129-
futures, queues = await self._start_processes(
130+
futures, queues, stop_event = await self._start_processes(
130131
manager, executor, scheduling_strategy
131132
)
132133
run_info, requests_iter, times_iter = self._run_setup(
@@ -178,7 +179,7 @@ async def run(
178179
run_info=run_info,
179180
)
180181

181-
await self._stop_processes(futures, queues.requests)
182+
await self._stop_processes(futures, stop_event)
182183

183184
async def _start_processes(
184185
self,
@@ -188,6 +189,7 @@ async def _start_processes(
188189
) -> tuple[
189190
list[asyncio.Future],
190191
MPQueues[RequestT, ResponseT],
192+
Event,
191193
]:
192194
await self.worker.prepare_multiprocessing()
193195
queues: MPQueues[RequestT, ResponseT] = MPQueues(
@@ -197,6 +199,7 @@ async def _start_processes(
197199
times=manager.Queue(maxsize=scheduling_strategy.processing_requests_limit),
198200
responses=manager.Queue(),
199201
)
202+
stop_event = manager.Event()
200203

201204
num_processes = min(
202205
scheduling_strategy.processes_limit,
@@ -226,6 +229,7 @@ async def _start_processes(
226229
executor,
227230
self.worker.process_loop_asynchronous,
228231
queues,
232+
stop_event,
229233
False, # TODO: Make configurable
230234
requests_limit,
231235
id_,
@@ -234,7 +238,7 @@ async def _start_processes(
234238

235239
await asyncio.sleep(0.1) # give time for processes to start
236240

237-
return futures, queues
241+
return futures, queues, stop_event
238242

239243
def _run_setup(
240244
self,
@@ -369,10 +373,9 @@ def _check_result_ready(
369373
async def _stop_processes(
370374
self,
371375
futures: list[asyncio.Future],
372-
requests_queue: Queue[RequestSession[RequestT, ResponseT]],
376+
stop_event: Event,
373377
):
374-
# FIXME: Need new method for stopping workers
375-
for _ in futures:
376-
requests_queue.put(None)
378+
# stop all processes
379+
stop_event.set()
377380

378381
await asyncio.gather(*futures)

src/guidellm/scheduler/worker.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from dataclasses import dataclass
77
from queue import Empty as QueueEmpty
88
from queue import Queue
9+
from threading import Event
910
from typing import (
1011
Any,
1112
Generic,
@@ -109,9 +110,11 @@ async def resolve(
109110
...
110111

111112
async def get_request_time(
112-
self, times_queue: Queue[WorkerProcessRequestTime]
113+
self,
114+
times_queue: Queue[WorkerProcessRequestTime],
115+
timeout: Optional[int] = None,
113116
) -> WorkerProcessRequestTime:
114-
return await asyncio.to_thread(times_queue.get) # type: ignore[attr-defined]
117+
return await asyncio.to_thread(times_queue.get, timeout=timeout) # type: ignore[attr-defined]
115118

116119
async def send_result(
117120
self,
@@ -181,6 +184,7 @@ async def resolve_scheduler_request(
181184
def process_loop_asynchronous(
182185
self,
183186
queues: MPQueues[RequestT, ResponseT],
187+
stop_event: Event,
184188
prioritize_sessions: bool,
185189
max_concurrency: int,
186190
process_id: int,
@@ -189,7 +193,7 @@ async def _process_runner():
189193
lock = asyncio.Semaphore(max_concurrency)
190194
pending_sessions: list[RequestSession[RequestT, ResponseT]] = []
191195

192-
while True: # TODO: Exit condition
196+
while True:
193197
await asyncio.sleep(0) # Yield control to the event loop
194198
await lock.acquire()
195199

@@ -201,13 +205,16 @@ async def _process_runner():
201205
else queues.requests.get_nowait()
202206
)
203207
dequeued_time = time.time()
204-
request_times = await self.get_request_time(queues.times)
208+
request_times = await self.get_request_time(queues.times, 5)
205209
except (QueueEmpty, IndexError):
206210
# Requeue the session if we don't have a next time yet
207211
if request_session is not None:
208212
pending_sessions.append(request_session)
209213
lock.release()
210-
continue
214+
if stop_event.is_set():
215+
return # Exit if stop event is set
216+
else:
217+
continue
211218

212219
async def wait_then_requeue(
213220
session: RequestSession[RequestT, ResponseT],
@@ -309,13 +316,15 @@ async def prepare_multiprocessing(self):
309316
def process_loop_asynchronous(
310317
self,
311318
queues: MPQueues[GenerationRequest, ResponseSummary],
319+
stop_event: Event,
312320
prioritize_sessions: bool,
313321
max_concurrency: int,
314322
process_id: int,
315323
):
316324
asyncio.run(self.backend.validate())
317325
super().process_loop_asynchronous(
318326
queues=queues,
327+
stop_event=stop_event,
319328
prioritize_sessions=prioritize_sessions,
320329
max_concurrency=max_concurrency,
321330
process_id=process_id,

0 commit comments

Comments
 (0)