Skip to content

Commit 02d2ca4

Browse files
authored
Merge pull request #93 from stealthrocket/clamp-min-results
Clamp min_results if greater than outstanding calls
2 parents 2a4c856 + 341b8ea commit 02d2ca4

File tree

2 files changed

+36
-3
lines changed

2 files changed

+36
-3
lines changed

src/dispatch/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ def _run(self, input: Input) -> Output:
440440
return Output.poll(
441441
state=serialized_state,
442442
calls=pending_calls,
443-
min_results=max(1, self.poll_min_results),
443+
min_results=max(1, min(state.outstanding_calls, self.poll_min_results)),
444444
max_results=max(1, min(state.outstanding_calls, self.poll_max_results)),
445445
max_wait_seconds=self.poll_max_wait_seconds,
446446
)

tests/dispatch/test_scheduler.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,42 @@ async def main():
291291
output = self.start(main)
292292
self.assert_exit_result_error(output, ValueError, "oops")
293293

294-
def start(self, main: Callable, *args: Any, **kwargs: Any) -> Output:
294+
def test_min_max_results_clamping(self):
295+
@durable
296+
async def main():
297+
return await call_concurrently("a", "b", "c")
298+
299+
output = self.start(main, poll_min_results=1, poll_max_results=10)
300+
self.assert_poll_call_functions(
301+
output, ["a", "b", "c"], min_results=1, max_results=3
302+
)
303+
304+
output = self.start(main, poll_min_results=1, poll_max_results=2)
305+
self.assert_poll_call_functions(
306+
output, ["a", "b", "c"], min_results=1, max_results=2
307+
)
308+
309+
output = self.start(main, poll_min_results=10, poll_max_results=10)
310+
self.assert_poll_call_functions(
311+
output, ["a", "b", "c"], min_results=3, max_results=3
312+
)
313+
314+
def start(
315+
self,
316+
main: Callable,
317+
*args: Any,
318+
poll_min_results=1,
319+
poll_max_results=10,
320+
poll_max_wait_seconds=None,
321+
**kwargs: Any,
322+
) -> Output:
295323
input = Input.from_input_arguments(main.__qualname__, *args, **kwargs)
296-
return OneShotScheduler(main).run(input)
324+
return OneShotScheduler(
325+
main,
326+
poll_min_results=poll_min_results,
327+
poll_max_results=poll_max_results,
328+
poll_max_wait_seconds=poll_max_wait_seconds,
329+
).run(input)
297330

298331
def resume(
299332
self,

0 commit comments

Comments
 (0)