Skip to content

Commit b7a2eaf

Browse files
committed
Revert loop logic changes
Signed-off-by: Samuel Monson <[email protected]>
1 parent 721e33a commit b7a2eaf

File tree

1 file changed

+16
-8
lines changed

1 file changed

+16
-8
lines changed

src/guidellm/scheduler/scheduler.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,17 @@ async def run(
130130
futures, queues, stop_event = await self._start_processes(
131131
manager, executor, scheduling_strategy
132132
)
133-
run_info, requests_iter = self._run_setup(
133+
run_info, requests_iter, times_iter = self._run_setup(
134134
futures, scheduling_strategy, max_number, max_duration
135135
)
136136

137137
# Add some initial requests to the queue
138138
requests_iter = self._add_requests(
139139
requests_iter,
140140
queues.requests,
141+
times_iter,
141142
run_info,
143+
loop_limit=run_info.strategy.queued_requests_limit,
142144
)
143145
# Wait for the test to start
144146
await asyncio.sleep(time.time() - scheduling_strategy.start_time)
@@ -169,6 +171,7 @@ async def run(
169171
requests_iter = self._add_requests(
170172
requests_iter,
171173
queues.requests,
174+
times_iter,
172175
run_info,
173176
)
174177
await asyncio.sleep(0) # enable requests to start
@@ -257,8 +260,9 @@ def _run_setup(
257260
scheduling_strategy: SchedulingStrategy,
258261
max_number: Optional[int],
259262
max_duration: Optional[float],
260-
) -> tuple[SchedulerRunInfo, Iterator[Any]]:
263+
) -> tuple[SchedulerRunInfo, Iterator[Any], Iterator[float]]:
261264
requests_iter = iter(self.request_loader)
265+
times_iter = iter(scheduling_strategy.request_times())
262266
end_time = scheduling_strategy.start_time + (max_duration or math.inf)
263267
end_number = max_number or math.inf
264268

@@ -284,28 +288,32 @@ def _run_setup(
284288
strategy=scheduling_strategy,
285289
)
286290

287-
return info, requests_iter
291+
return info, requests_iter, times_iter
288292

289293
def _add_requests(
290294
self,
291295
requests_iter: Optional[Iterator[Any]],
292296
requests_queue: Queue[WorkerProcessRequest[RequestT, ResponseT]],
297+
times_iter: Iterator[float],
293298
run_info: SchedulerRunInfo,
299+
loop_limit: Optional[int] = None,
294300
) -> Optional[Iterator[Any]]:
295301
if requests_iter is not None:
296302
try:
297303
added_count = 0
298304

299-
if time.time() >= run_info.end_time:
300-
raise StopIteration
301-
302305
while not requests_queue.full() and added_count < (
303-
run_info.strategy.queued_requests_limit
304-
or settings.max_add_requests_per_loop
306+
loop_limit or settings.max_add_requests_per_loop
305307
):
306308
if run_info.created_requests >= run_info.end_number:
307309
raise StopIteration
308310

311+
if (
312+
next(times_iter) >= run_info.end_time
313+
or time.time() >= run_info.end_time
314+
):
315+
raise StopIteration
316+
309317
work_req = WorkerProcessRequest[RequestT, ResponseT](
310318
request=next(requests_iter),
311319
timeout_time=run_info.end_time,

0 commit comments

Comments
 (0)