Skip to content

Commit ca160e7

Browse files
committed
feat: add kafka healthcheck
1 parent 9cfe2ea commit ca160e7

File tree

4 files changed

+131
-0
lines changed

4 files changed

+131
-0
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
import django
2+
3+
if django.VERSION < (3, 2):
4+
default_app_config = "health_check.contrib.kafka.apps.HealthCheckConfig"

health_check/contrib/kafka/apps.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from django.apps import AppConfig
2+
3+
from health_check.plugins import plugin_dir
4+
5+
6+
class HealthCheckConfig(AppConfig):
7+
name = "health_check.contrib.kafka"
8+
9+
def ready(self):
10+
from .backends import KafkaHealthCheck
11+
12+
plugin_dir.register(KafkaHealthCheck)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import logging
2+
import importlib
3+
4+
from django.conf import settings
5+
6+
from health_check.backends import BaseHealthCheckBackend
7+
from health_check.exceptions import ServiceUnavailable
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
# On essaye de charger kafka-python-ng en priorité, sinon kafka-python
13+
try:
14+
kafka_module = importlib.import_module("kafka")
15+
except ImportError:
16+
kafka_module = None
17+
18+
if not kafka_module:
19+
raise ImportError(
20+
"Aucune bibliothèque kafka-python ou kafka-python-ng trouvée. Installez l'une d'elles."
21+
)
22+
23+
KafkaAdminClient = getattr(kafka_module, "KafkaAdminClient", None)
24+
KafkaError = getattr(importlib.import_module("kafka.errors"), "KafkaError", None)
25+
26+
if not KafkaAdminClient or not KafkaError:
27+
raise ImportError(
28+
"KafkaAdminClient ou KafkaError non disponible. Vérifiez vos installations."
29+
)
30+
31+
32+
class KafkaHealthCheck(BaseHealthCheckBackend):
33+
"""Health check for Kafka."""
34+
35+
namespace = None
36+
37+
def check_status(self):
38+
"""Check Kafka service by opening and closing a broker channel."""
39+
logger.debug("Checking for a KAFKA_URL on django settings...")
40+
41+
bootstrap_servers = getattr(settings, "KAFKA_URL", None)
42+
43+
logger.debug(
44+
"Got %s as the kafka_url. Connecting to kafka...", bootstrap_servers
45+
)
46+
47+
logger.debug("Attempting to connect to kafka...")
48+
try:
49+
admin_client = KafkaAdminClient(
50+
bootstrap_servers=bootstrap_servers,
51+
request_timeout_ms=3000, # 3 secondes max
52+
api_version_auto_timeout_ms=1000,
53+
)
54+
# Ping léger : on liste les topics (lecture metadata)
55+
admin_client.list_topics()
56+
except KafkaError.BrokerResponseError as e:
57+
self.add_error(ServiceUnavailable("Unknown error"), e)
58+
else:
59+
logger.debug("Connection established. Kafka is healthy.")

tests/test_kafka.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from unittest import mock
2+
3+
from kafka.errors import BrokerResponseError
4+
5+
from health_check.contrib.kafka.backends import KafkaHealthCheck
6+
7+
8+
class TestKafkaHealthCheck:
9+
"""Test Kafka health check."""
10+
11+
@mock.patch("health_check.contrib.kafka.backends.getattr")
12+
@mock.patch("health_check.contrib.kafka.backends.Connection")
13+
def test_broker_refused_connection(self, mocked_connection, mocked_getattr):
14+
"""Test when the connection to Kafka is refused."""
15+
mocked_getattr.return_value = "kafka_url"
16+
17+
conn_exception = ConnectionRefusedError("Refused connection")
18+
19+
# mock returns
20+
mocked_conn = mock.MagicMock()
21+
mocked_connection.return_value.__enter__.return_value = mocked_conn
22+
mocked_conn.connect.side_effect = conn_exception
23+
24+
# instantiates the class
25+
kafka_healthchecker = KafkaHealthCheck()
26+
27+
# invokes the method check_status()
28+
kafka_healthchecker.check_status()
29+
assert len(kafka_healthchecker.errors), 1
30+
31+
# mock assertions
32+
mocked_connection.assert_called_once_with("kafka_url")
33+
34+
@mock.patch("health_check.contrib.kafka.backends.getattr")
35+
@mock.patch("health_check.contrib.kafka.backends.Connection")
36+
def test_broker_auth_error(self, mocked_connection, mocked_getattr):
37+
"""Test that the connection to Kafka has an authentication error."""
38+
mocked_getattr.return_value = "kafka_url"
39+
40+
conn_exception = BrokerResponseError("Refused connection")
41+
42+
# mock returns
43+
mocked_conn = mock.MagicMock()
44+
mocked_connection.return_value.__enter__.return_value = mocked_conn
45+
mocked_conn.connect.side_effect = conn_exception
46+
47+
# instantiates the class
48+
rabbitmq_healthchecker = KafkaHealthCheck()
49+
50+
# invokes the method check_status()
51+
rabbitmq_healthchecker.check_status()
52+
assert len(rabbitmq_healthchecker.errors), 1
53+
54+
# mock assertions
55+
mocked_connection.assert_called_once_with("kafka_url")
56+

0 commit comments

Comments
 (0)