Skip to content

Commit 6004436

Browse files
authored
fix: Release all stopped monitors (#397)
1 parent 00080a8 commit 6004436

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

aws_advanced_python_wrapper/host_monitoring_plugin.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ def run(self):
432432
self._monitor_container.release_monitor(self)
433433
break
434434

435-
sleep(Monitor._INACTIVE_SLEEP_MS / 1000)
435+
self.sleep(Monitor._INACTIVE_SLEEP_MS / 1000)
436436
continue
437437

438438
status_check_start_time_ns = perf_counter_ns()
@@ -482,7 +482,7 @@ def run(self):
482482
# Use this delay for all active contexts
483483
self._host_check_timeout_ms = delay_ms
484484

485-
sleep(delay_ms / 1000)
485+
self.sleep(delay_ms / 1000)
486486
except InterruptedError as e:
487487
raise e
488488
except Exception as e:
@@ -494,6 +494,7 @@ def run(self):
494494
logger.debug("Monitor.StoppingMonitorUnhandledException", self._host_info.host)
495495
logger.debug(e, exc_info=True)
496496
finally:
497+
self._monitor_container.release_monitor(self)
497498
self.stop()
498499
if self._monitoring_conn is not None:
499500
try:
@@ -543,6 +544,10 @@ def _execute_conn_check(self, conn: Connection, timeout_sec: float):
543544
driver_dialect.execute("Cursor.execute", lambda: cursor.execute(query), query, exec_timeout=timeout_sec)
544545
cursor.fetchone()
545546

547+
# Used to help with testing
548+
def sleep(self, duration: int):
549+
sleep(duration)
550+
546551

547552
class MonitoringThreadContainer:
548553
"""

tests/unit/test_monitor.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,35 @@ def test_run_host_available(
153153
assert container._tasks_map.get(monitor) is None
154154

155155

156+
def test_ensure_stopped_monitor_removed_from_map(
157+
mocker,
158+
monitor,
159+
host_info,
160+
monitoring_conn_props,
161+
mock_plugin_service,
162+
mock_conn,
163+
mock_driver_dialect):
164+
remove_delays()
165+
host_alias = "host-1"
166+
container = MonitoringThreadContainer()
167+
container._monitor_map.put_if_absent(host_alias, monitor)
168+
container._tasks_map.put_if_absent(monitor, mocker.MagicMock())
169+
170+
executor = ThreadPoolExecutor()
171+
context = MonitoringContext(monitor, mock_conn, mock_driver_dialect, 10, 1, 3)
172+
173+
mocker.patch(
174+
"aws_advanced_python_wrapper.host_monitoring_plugin.Monitor.sleep", side_effect=InterruptedError())
175+
monitor.start_monitoring(context)
176+
177+
future = executor.submit(monitor.run)
178+
sleep(0.1) # Allow some time for the monitor to loop
179+
wait([future], 3)
180+
181+
assert container._monitor_map.get(host_alias) is None
182+
assert container._tasks_map.get(monitor) is None
183+
184+
156185
def test_run_host_unavailable(
157186
mocker,
158187
monitor,

0 commit comments

Comments
 (0)