Skip to content

Commit c7a6039

Browse files
authored
fix(tasks) Improve scheduler metrics and fix an error (#95960)
In debugging why the scheduler in s4s occasionally stops scheduling tasks, I found some gaps in our metrics and logging that would have helped solve the problem, and also a sentry error that occurs when the 'next' runtime for task is `now`. Redis doesn't allow `ex=0`. Fixes SENTRY-FOR-SENTRY-B8D
1 parent 7d52415 commit c7a6039

File tree

2 files changed

+43
-6
lines changed

2 files changed

+43
-6
lines changed

src/sentry/taskworker/scheduler/runner.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ def set(self, taskname: str, next_runtime: datetime) -> bool:
4646
Returns False when the key is set and a task should not be spawned.
4747
"""
4848
now = timezone.now()
49-
duration = next_runtime - now
49+
# next_runtime & now could be the same second, and redis gets sad if ex=0
50+
duration = max(int((next_runtime - now).total_seconds()), 1)
5051

5152
result = self._redis.set(self._make_key(taskname), now.isoformat(), ex=duration, nx=True)
5253
return bool(result)
@@ -59,6 +60,8 @@ def read(self, taskname: str) -> datetime | None:
5960
result = self._redis.get(self._make_key(taskname))
6061
if result:
6162
return datetime.fromisoformat(result)
63+
64+
metrics.incr("taskworker.scheduler.run_storage.read.miss", tags={"taskname": taskname})
6265
return None
6366

6467
def read_many(self, tasknames: list[str]) -> Mapping[str, datetime | None]:
@@ -203,6 +206,7 @@ def tick(self) -> float:
203206
self._update_heap()
204207

205208
if not self._heap:
209+
logger.warning("taskworker.scheduler.no_heap")
206210
return 60
207211

208212
while True:
@@ -239,13 +243,22 @@ def _try_spawn(self, entry: ScheduleEntry) -> None:
239243
},
240244
)
241245
else:
242-
# sync with last_run state in storage
243-
entry.set_last_run(self._run_storage.read(entry.fullname))
246+
# We were not able to set a key, load last run from storage.
247+
run_state = self._run_storage.read(entry.fullname)
248+
entry.set_last_run(run_state)
244249

245-
logger.debug(
246-
"taskworker.scheduler.sync_with_storage", extra={"fullname": entry.fullname}
250+
logger.info(
251+
"taskworker.scheduler.sync_with_storage",
252+
extra={
253+
"taskname": entry.taskname,
254+
"namespace": entry.namespace,
255+
"last_runtime": run_state.isoformat() if run_state else None,
256+
},
257+
)
258+
metrics.incr(
259+
"taskworker.scheduler.sync_with_storage",
260+
tags={"taskname": entry.taskname, "namespace": entry.namespace},
247261
)
248-
metrics.incr("taskworker.scheduler.sync_with_storage")
249262

250263
def _update_heap(self) -> None:
251264
"""update the heap to reflect current remaining time"""
@@ -267,3 +280,10 @@ def _load_last_run(self) -> None:
267280
for item in self._entries:
268281
last_run = last_run_times.get(item.fullname, None)
269282
item.set_last_run(last_run)
283+
logger.info(
284+
"taskworker.scheduler.load_last_run",
285+
extra={
286+
"entry_count": len(self._entries),
287+
"loaded_count": len(last_run_times),
288+
},
289+
)

tests/sentry/taskworker/scheduler/test_runner.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,23 @@ def run_storage() -> RunStorage:
3434
return RunStorage(redis)
3535

3636

37+
def test_runstorage_zero_duration(run_storage: RunStorage) -> None:
38+
with freeze_time("2025-07-19 14:25:00"):
39+
now = timezone.now()
40+
result = run_storage.set("test:do_stuff", now)
41+
assert result is True
42+
43+
44+
def test_runstorage_double_set(run_storage: RunStorage) -> None:
45+
with freeze_time("2025-07-19 14:25:00"):
46+
now = timezone.now()
47+
first = run_storage.set("test:do_stuff", now)
48+
second = run_storage.set("test:do_stuff", now)
49+
50+
assert first is True, "initial set should return true"
51+
assert second is False, "writing a key that exists should fail"
52+
53+
3754
@pytest.mark.django_db
3855
def test_schedulerunner_add_invalid(taskregistry) -> None:
3956
run_storage = Mock(spec=RunStorage)

0 commit comments

Comments
 (0)