Skip to content

Commit 80d044a

Browse files
Omar Abdelkaderclaude
andcommitted
Add Cron dependency for cron-style task scheduling
Introduces a new Cron dependency that extends Perpetual to support wall-clock scheduled tasks using cron expressions. Unlike Perpetual which uses relative intervals, Cron schedules tasks at exact times (e.g., "0 9 * * 1" for Mondays at 9 AM). Key changes: - Add Cron class with croniter integration for expression parsing - Support standard 5-field cron syntax and Vixie keywords (@daily, etc.) - Automatic scheduling at worker startup (automatic=True by default) - Fix Perpetual.__aenter__ to preserve the automatic flag Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent aecd46a commit 80d044a

File tree

8 files changed

+257
-11
lines changed

8 files changed

+257
-11
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ classifiers = [
2424
]
2525
dependencies = [
2626
"cloudpickle>=3.1.1",
27+
"croniter>=6",
2728
"exceptiongroup>=1.2.0; python_version < '3.11'",
2829
"taskgroup>=0.2.2; python_version < '3.11'",
2930
"fakeredis[lua]>=2.32.1",
@@ -69,6 +70,7 @@ dev = [
6970
"pytest-timeout>=2.4.0",
7071
"pytest-xdist>=3.6.1",
7172
"ruff>=0.14.14",
73+
"types-croniter>=6",
7274
]
7375

7476
docs = [

src/docket/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from .annotations import Logged
1313
from .dependencies import (
1414
ConcurrencyLimit,
15+
Cron,
1516
CurrentDocket,
1617
CurrentExecution,
1718
CurrentWorker,
@@ -36,6 +37,7 @@
3637
"__version__",
3738
"Agenda",
3839
"ConcurrencyLimit",
40+
"Cron",
3941
"CurrentDocket",
4042
"CurrentExecution",
4143
"CurrentWorker",

src/docket/dependencies/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
format_duration,
1717
)
1818
from ._concurrency import ConcurrencyBlocked, ConcurrencyLimit
19+
from ._cron import Cron
1920
from ._contextual import (
2021
CurrentDocket,
2122
CurrentExecution,
@@ -74,6 +75,7 @@
7475
"AdmissionBlocked",
7576
"ConcurrencyBlocked",
7677
"ConcurrencyLimit",
78+
"Cron",
7779
"Perpetual",
7880
"Progress",
7981
"Timeout",

src/docket/dependencies/_cron.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Cron-style scheduling dependency."""
2+
3+
from __future__ import annotations
4+
5+
from datetime import datetime, timezone
6+
from typing import TYPE_CHECKING
7+
8+
from croniter import croniter
9+
10+
from ._perpetual import Perpetual
11+
12+
if TYPE_CHECKING: # pragma: no cover
13+
from ._base import TaskOutcome
14+
from ..execution import Execution
15+
16+
17+
class Cron(Perpetual):
18+
"""Declare a task that should run on a cron schedule. Cron tasks are automatically
19+
rescheduled for the next matching time after they finish (whether they succeed or
20+
fail). By default, a cron task is scheduled at worker startup with `automatic=True`.
21+
22+
Unlike `Perpetual` which schedules based on intervals from the current time, `Cron`
23+
schedules based on wall-clock time, ensuring tasks run at consistent times regardless
24+
of execution duration or delays.
25+
26+
Supports standard cron expressions and Vixie cron-style keywords (@daily, @hourly, etc.)
27+
via the croniter library.
28+
29+
Example:
30+
31+
```python
32+
@task
33+
async def weekly_report(cron: Cron = Cron("0 9 * * 1")) -> None:
34+
# Runs every Monday at 9:00 AM UTC
35+
...
36+
37+
@task
38+
async def daily_cleanup(cron: Cron = Cron("@daily")) -> None:
39+
# Runs every day at midnight UTC
40+
...
41+
```
42+
"""
43+
44+
expression: str
45+
46+
_croniter: croniter[datetime]
47+
48+
def __init__(
49+
self,
50+
expression: str,
51+
automatic: bool = True,
52+
) -> None:
53+
"""
54+
Args:
55+
expression: A cron expression string. Supports:
56+
- Standard 5-field syntax: "minute hour day month weekday"
57+
(e.g., "0 9 * * 1" for Mondays at 9 AM)
58+
- Vixie cron keywords: @yearly, @annually, @monthly, @weekly,
59+
@daily, @midnight, @hourly
60+
automatic: If set, this task will be automatically scheduled during worker
61+
startup and continually through the worker's lifespan. This ensures
62+
that the task will always be scheduled despite crashes and other
63+
adverse conditions. Automatic tasks must not require any arguments.
64+
"""
65+
super().__init__(automatic=automatic)
66+
self.expression = expression
67+
self._croniter = croniter(expression, datetime.now(timezone.utc), datetime)
68+
69+
async def __aenter__(self) -> Cron:
70+
execution = self.execution.get()
71+
cron = Cron(expression=self.expression, automatic=self.automatic)
72+
cron.args = execution.args
73+
cron.kwargs = execution.kwargs
74+
return cron
75+
76+
def get_next(self) -> datetime:
77+
return self._croniter.get_next()
78+
79+
async def on_complete(self, execution: Execution, outcome: TaskOutcome) -> bool:
80+
"""Handle completion by scheduling the next execution at the exact cron time.
81+
82+
This overrides Perpetual's on_complete to ensure we hit the exact wall-clock
83+
time rather than adjusting for task duration.
84+
"""
85+
self.at(self.get_next())
86+
return await super().on_complete(execution, outcome)

src/docket/dependencies/_perpetual.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def __init__(
6161

6262
async def __aenter__(self) -> Perpetual:
6363
execution = self.execution.get()
64-
perpetual = Perpetual(every=self.every)
64+
perpetual = Perpetual(every=self.every, automatic=self.automatic)
6565
perpetual.args = execution.args
6666
perpetual.kwargs = execution.kwargs
6767
return perpetual

src/docket/worker.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from .dependencies import (
4343
AdmissionBlocked,
4444
CompletionHandler,
45+
Cron,
4546
CurrentExecution,
4647
Dependency,
4748
FailedDependency,
@@ -763,15 +764,14 @@ async def _schedule_all_automatic_perpetual_tasks(self) -> None:
763764
perpetual = get_single_dependency_parameter_of_type(
764765
task_function, Perpetual
765766
)
766-
if perpetual is None:
767-
continue
768-
769-
if not perpetual.automatic:
770-
continue
771-
772-
key = task_function.__name__
773-
774-
await self.docket.add(task_function, key=key)()
767+
if perpetual is not None and perpetual.automatic:
768+
key = task_function.__name__
769+
when = (
770+
perpetual.get_next()
771+
if isinstance(perpetual, Cron)
772+
else None
773+
)
774+
await self.docket.add(task_function, when=when, key=key)()
775775
except LockError: # pragma: no cover
776776
return
777777

tests/fundamentals/test_cron.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
"""Tests for Cron dependency (cron-style scheduled tasks)."""
2+
3+
from datetime import datetime, timedelta, timezone
4+
from unittest.mock import patch
5+
6+
from docket import Docket, Worker
7+
from docket.dependencies import Cron
8+
9+
10+
async def test_cron_task_reschedules_itself(docket: Docket, worker: Worker):
11+
"""Cron tasks automatically reschedule after each execution."""
12+
runs = 0
13+
14+
async def my_cron_task(cron: Cron = Cron("0 9 * * *", automatic=False)):
15+
nonlocal runs
16+
runs += 1
17+
18+
# Patch get_next to return a time 10ms in the future (instead of waiting for 9 AM)
19+
with patch.object(
20+
Cron,
21+
"get_next",
22+
return_value=datetime.now(timezone.utc) + timedelta(milliseconds=10),
23+
):
24+
execution = await docket.add(my_cron_task)()
25+
await worker.run_at_most({execution.key: 3})
26+
27+
assert runs == 3
28+
29+
30+
async def test_cron_tasks_are_automatically_scheduled(docket: Docket, worker: Worker):
31+
"""Cron tasks with automatic=True are scheduled at worker startup."""
32+
calls = 0
33+
34+
async def my_automatic_cron(
35+
cron: Cron = Cron("0 0 * * *"),
36+
): # automatic=True is default
37+
nonlocal calls
38+
calls += 1
39+
40+
docket.register(my_automatic_cron)
41+
42+
with patch.object(
43+
Cron,
44+
"get_next",
45+
return_value=datetime.now(timezone.utc) + timedelta(milliseconds=10),
46+
):
47+
await worker.run_at_most({"my_automatic_cron": 2})
48+
49+
assert calls == 2
50+
51+
52+
async def test_cron_tasks_continue_after_errors(docket: Docket, worker: Worker):
53+
"""Cron tasks keep rescheduling even when they raise exceptions."""
54+
calls = 0
55+
56+
async def flaky_cron_task(cron: Cron = Cron("0 * * * *", automatic=False)):
57+
nonlocal calls
58+
calls += 1
59+
raise ValueError("Task failed!")
60+
61+
with patch.object(
62+
Cron,
63+
"get_next",
64+
return_value=datetime.now(timezone.utc) + timedelta(milliseconds=10),
65+
):
66+
execution = await docket.add(flaky_cron_task)()
67+
await worker.run_at_most({execution.key: 3})
68+
69+
assert calls == 3
70+
71+
72+
async def test_cron_tasks_can_cancel_themselves(docket: Docket, worker: Worker):
73+
"""A cron task can stop rescheduling by calling cron.cancel()."""
74+
calls = 0
75+
76+
async def limited_cron_task(cron: Cron = Cron("0 * * * *", automatic=False)):
77+
nonlocal calls
78+
calls += 1
79+
if calls >= 3:
80+
cron.cancel()
81+
82+
with patch.object(
83+
Cron,
84+
"get_next",
85+
return_value=datetime.now(timezone.utc) + timedelta(milliseconds=10),
86+
):
87+
await docket.add(limited_cron_task)()
88+
await worker.run_until_finished()
89+
90+
assert calls == 3
91+
92+
93+
async def test_cron_supports_vixie_keywords(docket: Docket, worker: Worker):
94+
"""Cron supports Vixie cron keywords like @daily, @weekly, @hourly."""
95+
runs = 0
96+
97+
# @daily is equivalent to "0 0 * * *" (midnight every day)
98+
async def daily_task(cron: Cron = Cron("@daily", automatic=False)):
99+
nonlocal runs
100+
runs += 1
101+
102+
with patch.object(
103+
Cron,
104+
"get_next",
105+
return_value=datetime.now(timezone.utc) + timedelta(milliseconds=10),
106+
):
107+
execution = await docket.add(daily_task)()
108+
await worker.run_at_most({execution.key: 1})
109+
110+
assert runs == 1
111+
112+
113+
def test_cron_get_next_returns_future_time():
114+
"""Cron.get_next() returns a datetime in the future via croniter."""
115+
cron = Cron("* * * * *", automatic=False) # Every minute
116+
next_time = cron.get_next()
117+
118+
assert isinstance(next_time, datetime)
119+
assert next_time > datetime.now(timezone.utc)

uv.lock

Lines changed: 36 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)