Skip to content

Commit 1174077

Browse files
committed
Added health check to watch.stream for silent connection drops
1 parent 3762c1d commit 1174077

File tree

2 files changed

+247
-5
lines changed

2 files changed

+247
-5
lines changed

kubernetes/base/watch/watch.py

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
import json
1616
import pydoc
1717
import sys
18+
import time
19+
20+
from urllib3.exceptions import ProtocolError, ReadTimeoutError
1821

1922
from kubernetes import client
2023

@@ -154,6 +157,14 @@ def stream(self, func, *args, **kwargs):
154157
:param func: The API function pointer. Any parameter to the function
155158
can be passed after this parameter.
156159
160+
:param int _health_check_interval: Optional. Number of seconds to wait
161+
for data before assuming the connection has been silently dropped
162+
(e.g., during a control plane upgrade). When set to a value > 0,
163+
the watch will automatically detect silent connection drops and
164+
reconnect using the last known resource_version. Default is 0
165+
(disabled). This is useful for long-running watches that need to
166+
survive cluster upgrades.
167+
157168
:return: Event object with these keys:
158169
'type': The type of event such as "ADDED", "DELETED", etc.
159170
'raw_object': a dict representing the watched object.
@@ -172,6 +183,11 @@ def stream(self, func, *args, **kwargs):
172183
...
173184
if should_stop:
174185
watch.stop()
186+
187+
Example with health check (survives control plane upgrades):
188+
for e in watch.stream(v1.list_namespace, _health_check_interval=30):
189+
# If no data received for 30s, watch reconnects automatically
190+
print(e['type'], e['object'].metadata.name)
175191
"""
176192

177193
self._stop = False
@@ -187,6 +203,20 @@ def stream(self, func, *args, **kwargs):
187203
disable_retries = ('timeout_seconds' in kwargs)
188204
retry_after_410 = False
189205
deserialize = kwargs.pop('deserialize', True)
206+
207+
# Health check interval: when > 0, sets a read timeout on the
208+
# HTTP connection so that silent connection drops (e.g., during
209+
# control plane upgrades) are detected and the watch reconnects.
210+
# A value of 0 (default) disables this feature.
211+
health_check_interval = kwargs.pop('_health_check_interval', 0)
212+
213+
# If health check is enabled and user hasn't set an explicit
214+
# request timeout, use the health check interval as the read
215+
# timeout. This causes urllib3 to raise ReadTimeoutError if no
216+
# data arrives within the interval, which we catch below.
217+
if health_check_interval > 0 and '_request_timeout' not in kwargs:
218+
kwargs['_request_timeout'] = (None, health_check_interval)
219+
190220
while True:
191221
resp = func(*args, **kwargs)
192222
try:
@@ -217,12 +247,26 @@ def stream(self, func, *args, **kwargs):
217247
retry_after_410 = False
218248
yield event
219249
else:
220-
if line:
250+
if line:
221251
yield line # Normal non-empty line
222-
else:
223-
yield '' # Only yield one empty line
252+
else:
253+
yield '' # Only yield one empty line
224254
if self._stop:
225255
break
256+
except (ReadTimeoutError, ProtocolError) as e:
257+
# Only treat a read timeout / protocol error as a silent connection drop
258+
should_retry = (
259+
health_check_interval > 0
260+
and not disable_retries
261+
and watch_arg == "watch"
262+
and self.resource_version is not None
263+
)
264+
if should_retry:
265+
# Add a small sleep to avoid a tight reconnect loop
266+
# in case the endpoint is hard-down or errors immediately.
267+
time.sleep(min(1.0,health_check_interval))
268+
else:
269+
raise
226270
finally:
227271
resp.close()
228272
resp.release_conn()
@@ -232,4 +276,4 @@ def stream(self, func, *args, **kwargs):
232276
self._stop = True
233277

234278
if self._stop or disable_retries:
235-
break
279+
break

kubernetes/base/watch/watch_test.py

Lines changed: 199 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import unittest
1818
from unittest.mock import Mock, call
1919

20+
from urllib3.exceptions import ReadTimeoutError
21+
2022
from kubernetes import client, config
2123
from kubernetes.client import ApiException
2224

@@ -626,5 +628,201 @@ def test_pod_log_empty_lines(self):
626628
# ])
627629

628630

631+
def test_health_check_detects_silent_drop_and_reconnects(self):
632+
"""Test that _health_check_interval detects a silent connection drop
633+
(simulated by ReadTimeoutError) and reconnects automatically,
634+
continuing to yield events from the new connection."""
635+
636+
# First response: yields one event, then raises ReadTimeoutError
637+
# (simulating a silent connection drop during control plane upgrade)
638+
fake_resp_1 = Mock()
639+
fake_resp_1.close = Mock()
640+
fake_resp_1.release_conn = Mock()
641+
642+
def stream_then_timeout(*args, **kwargs):
643+
yield '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n'
644+
raise ReadTimeoutError(pool=None, url=None, message="Read timed out")
645+
646+
fake_resp_1.stream = Mock(side_effect=stream_then_timeout)
647+
648+
# Second response: yields another event (after reconnect)
649+
fake_resp_2 = Mock()
650+
fake_resp_2.close = Mock()
651+
fake_resp_2.release_conn = Mock()
652+
fake_resp_2.stream = Mock(
653+
return_value=[
654+
'{"type": "ADDED", "object": {"metadata": {"name": "test2", "resourceVersion": "2"}}}\n'
655+
])
656+
657+
fake_api = Mock()
658+
# First call returns the stream that will timeout,
659+
# second call returns the stream with new events
660+
fake_api.get_namespaces = Mock(side_effect=[fake_resp_1, fake_resp_2])
661+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
662+
663+
w = Watch()
664+
events = []
665+
# Note: we do NOT pass timeout_seconds here because that disables
666+
# retries. The watch should reconnect after the ReadTimeoutError
667+
# and then stop naturally when the second stream ends (resource_version
668+
# is set from the first event, so the finally block won't set _stop).
669+
for e in w.stream(fake_api.get_namespaces,
670+
_health_check_interval=5):
671+
events.append(e)
672+
# Stop after collecting events from both connections
673+
if len(events) == 2:
674+
w.stop()
675+
676+
# Should have received events from both connections
677+
self.assertEqual(2, len(events))
678+
self.assertEqual("test1", events[0]['object'].metadata.name)
679+
self.assertEqual("test2", events[1]['object'].metadata.name)
680+
681+
# Verify the API was called twice (initial + reconnect)
682+
self.assertEqual(2, fake_api.get_namespaces.call_count)
683+
684+
# Verify both responses were properly closed
685+
fake_resp_1.close.assert_called_once()
686+
fake_resp_1.release_conn.assert_called_once()
687+
fake_resp_2.close.assert_called_once()
688+
fake_resp_2.release_conn.assert_called_once()
689+
690+
def test_health_check_disabled_by_default(self):
691+
"""Test that without _health_check_interval, a ReadTimeoutError
692+
propagates to the caller (backward compatibility)."""
693+
694+
fake_resp = Mock()
695+
fake_resp.close = Mock()
696+
fake_resp.release_conn = Mock()
697+
698+
def stream_then_timeout(*args, **kwargs):
699+
yield '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n'
700+
raise ReadTimeoutError(pool=None, url=None, message="Read timed out")
701+
702+
fake_resp.stream = Mock(side_effect=stream_then_timeout)
703+
704+
fake_api = Mock()
705+
fake_api.get_namespaces = Mock(return_value=fake_resp)
706+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
707+
708+
w = Watch()
709+
events = []
710+
with self.assertRaises(ReadTimeoutError):
711+
for e in w.stream(fake_api.get_namespaces):
712+
events.append(e)
713+
714+
# Should have received the one event before the timeout
715+
self.assertEqual(1, len(events))
716+
self.assertEqual("test1", events[0]['object'].metadata.name)
717+
718+
# Verify the response was properly closed even after exception
719+
fake_resp.close.assert_called_once()
720+
fake_resp.release_conn.assert_called_once()
721+
722+
def test_health_check_sets_request_timeout(self):
723+
"""Test that _health_check_interval sets _request_timeout on the
724+
API call when no explicit _request_timeout is provided."""
725+
726+
fake_resp = Mock()
727+
fake_resp.close = Mock()
728+
fake_resp.release_conn = Mock()
729+
fake_resp.stream = Mock(
730+
return_value=[
731+
'{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n'
732+
])
733+
734+
fake_api = Mock()
735+
fake_api.get_namespaces = Mock(return_value=fake_resp)
736+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
737+
738+
w = Watch()
739+
for e in w.stream(fake_api.get_namespaces,
740+
_health_check_interval=30,
741+
timeout_seconds=10):
742+
pass
743+
744+
# Verify _request_timeout was set to the health check interval
745+
fake_api.get_namespaces.assert_called_once_with(
746+
_preload_content=False, watch=True,
747+
timeout_seconds=10, _request_timeout=(None,30))
748+
749+
def test_health_check_preserves_explicit_request_timeout(self):
750+
"""Test that _health_check_interval does NOT override an explicit
751+
_request_timeout provided by the user."""
752+
753+
fake_resp = Mock()
754+
fake_resp.close = Mock()
755+
fake_resp.release_conn = Mock()
756+
fake_resp.stream = Mock(
757+
return_value=[
758+
'{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n'
759+
])
760+
761+
fake_api = Mock()
762+
fake_api.get_namespaces = Mock(return_value=fake_resp)
763+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
764+
765+
w = Watch()
766+
for e in w.stream(fake_api.get_namespaces,
767+
_health_check_interval=30,
768+
_request_timeout=60,
769+
timeout_seconds=10):
770+
pass
771+
772+
# Verify the user's _request_timeout (60) was preserved, not overridden
773+
fake_api.get_namespaces.assert_called_once_with(
774+
_preload_content=False, watch=True,
775+
timeout_seconds=10, _request_timeout=60)
776+
777+
def test_health_check_reconnects_with_resource_version(self):
778+
"""Test that after a silent drop, the reconnect uses the last known
779+
resource_version so no events are missed."""
780+
781+
# First response: yields events with resource versions, then times out
782+
fake_resp_1 = Mock()
783+
fake_resp_1.close = Mock()
784+
fake_resp_1.release_conn = Mock()
785+
786+
def stream_then_timeout(*args, **kwargs):
787+
yield '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "100"}}}\n'
788+
yield '{"type": "MODIFIED", "object": {"metadata": {"name": "test1", "resourceVersion": "101"}}}\n'
789+
raise ReadTimeoutError(pool=None, url=None, message="Read timed out")
790+
791+
fake_resp_1.stream = Mock(side_effect=stream_then_timeout)
792+
793+
# Second response: yields more events
794+
fake_resp_2 = Mock()
795+
fake_resp_2.close = Mock()
796+
fake_resp_2.release_conn = Mock()
797+
fake_resp_2.stream = Mock(
798+
return_value=[
799+
'{"type": "ADDED", "object": {"metadata": {"name": "test2", "resourceVersion": "102"}}}\n'
800+
])
801+
802+
fake_api = Mock()
803+
fake_api.get_namespaces = Mock(side_effect=[fake_resp_1, fake_resp_2])
804+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
805+
806+
w = Watch()
807+
events = []
808+
# Note: no timeout_seconds so retries are enabled
809+
for e in w.stream(fake_api.get_namespaces,
810+
_health_check_interval=5):
811+
events.append(e)
812+
if len(events) == 3:
813+
w.stop()
814+
815+
self.assertEqual(3, len(events))
816+
817+
# Verify the second call used the last resource_version from the
818+
# first connection (101) so no events are missed
819+
calls = fake_api.get_namespaces.call_args_list
820+
self.assertEqual(2, len(calls))
821+
# First call: no resource_version
822+
self.assertNotIn('resource_version', calls[0].kwargs)
823+
# Second call: should have resource_version=101
824+
self.assertEqual('101', calls[1].kwargs['resource_version'])
825+
826+
629827
if __name__ == '__main__':
630-
unittest.main()
828+
unittest.main()

0 commit comments

Comments
 (0)