Skip to content

Commit 0a29784

Browse files
committed
Allow selecting which celery worker to ping
1 parent c6ca36b commit 0a29784

File tree

3 files changed

+80
-5
lines changed

3 files changed

+80
-5
lines changed

docs/settings.rst

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,26 @@ Using `django.settings` you may exert more fine-grained control over the behavio
9595
- Number
9696
- `3`
9797
- Specifies the maximum total time for a task to complete and return a result, including queue time.
98+
99+
100+
Celery-Ping Health Check
101+
-------------------
102+
103+
Using `django.settings` you may exert more fine-grained control over the behavior of the celery-ping health check
104+
105+
.. list-table:: Additional Settings
106+
:widths: 25 10 10 55
107+
:header-rows: 1
108+
109+
* - Name
110+
- Type
111+
- Default
112+
- Description
113+
* - `HEALTHCHECK_CELERY_PING_TIMEOUT`
114+
- Number
115+
- `1`
116+
- Specifies the maximum total time (in seconds) for which "pong" responses are awaited.
117+
* - `HEALTHCHECK_CELERY_PING_DESTINATION`
118+
- List of Strings
119+
- `None`
120+
- Specifies the list of workers which will receive the "ping" request.

health_check/contrib/celery_ping/backends.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ class CeleryPingHealthCheck(BaseHealthCheckBackend):
1010

1111
def check_status(self):
1212
timeout = getattr(settings, "HEALTHCHECK_CELERY_PING_TIMEOUT", 1)
13+
destination = getattr(settings, "HEALTHCHECK_CELERY_PING_DESTINATION", None)
1314

1415
try:
15-
ping_result = app.control.ping(timeout=timeout)
16+
ping_result = app.control.ping(destination=destination, timeout=timeout)
1617
except IOError as e:
1718
self.add_error(ServiceUnavailable("IOError"), e)
1819
except NotImplementedError as exc:
@@ -30,9 +31,9 @@ def check_status(self):
3031
ServiceUnavailable("Celery workers unavailable"),
3132
)
3233
else:
33-
self._check_ping_result(ping_result)
34+
self._check_ping_result(ping_result, destination)
3435

35-
def _check_ping_result(self, ping_result):
36+
def _check_ping_result(self, ping_result, destination):
3637
active_workers = []
3738

3839
for result in ping_result:
@@ -46,6 +47,15 @@ def _check_ping_result(self, ping_result):
4647
continue
4748
active_workers.append(worker)
4849

50+
if destination:
51+
inactive_workers = set(destination) - set(active_workers)
52+
if inactive_workers:
53+
self.add_error(
54+
ServiceUnavailable(
55+
f"Celery workers {inactive_workers} did not respond"
56+
)
57+
)
58+
4959
if not self.errors:
5060
self._check_active_queues(active_workers)
5161

tests/test_celery_ping.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import pytest
44
from django.apps import apps
5-
from django.conf import settings
65

76
from health_check.contrib.celery_ping.apps import HealthCheckConfig
87
from health_check.contrib.celery_ping.backends import CeleryPingHealthCheck
@@ -20,7 +19,9 @@ class TestCeleryPingHealthCheck:
2019
def health_check(self):
2120
return CeleryPingHealthCheck()
2221

23-
def test_check_status_doesnt_add_errors_when_ping_successful(self, health_check):
22+
def test_check_status_doesnt_add_errors_when_ping_successful(
23+
self, health_check, settings
24+
):
2425
celery_worker = "celery@4cc150a7b49b"
2526

2627
with patch(
@@ -59,6 +60,7 @@ def test_check_status_reports_errors_if_ping_responses_are_incorrect(
5960
def test_check_status_adds_errors_when_ping_successfull_but_not_all_defined_queues_have_consumers(
6061
self,
6162
health_check,
63+
settings,
6264
):
6365
celery_worker = "celery@4cc150a7b49b"
6466
queues = list(settings.CELERY_QUEUES)
@@ -123,6 +125,46 @@ def test_check_status_add_error_when_ping_result_failed(
123125
assert len(health_check.errors) == 1
124126
assert "workers unavailable" in health_check.errors[0].message.lower()
125127

128+
def test_check_status_reports_errors_if_ping_responses_are_missing(
129+
self,
130+
health_check,
131+
settings,
132+
):
133+
settings.HEALTHCHECK_CELERY_PING_DESTINATION = [
134+
"celery1@4cc150a7b49b",
135+
"celery2@4cc150a7b49b",
136+
]
137+
with patch(
138+
self.CELERY_APP_CONTROL_PING,
139+
return_value=[
140+
{"celery1@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
141+
],
142+
):
143+
health_check.check_status()
144+
145+
assert len(health_check.errors) == 1
146+
147+
def test_check_status_reports_destinations(
148+
self,
149+
health_check,
150+
settings,
151+
):
152+
settings.HEALTHCHECK_CELERY_PING_DESTINATION = [
153+
"celery1@4cc150a7b49b",
154+
"celery2@4cc150a7b49b",
155+
]
156+
with patch(
157+
self.CELERY_APP_CONTROL_PING,
158+
return_value=[
159+
{"celery1@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
160+
{"celery2@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
161+
{"celery3@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
162+
],
163+
):
164+
health_check.check_status()
165+
166+
assert len(health_check.errors) == 1
167+
126168

127169
class TestCeleryPingHealthCheckApps:
128170
def test_apps(self):

0 commit comments

Comments
 (0)