Skip to content

Commit c583a02

Browse files
authored
Avoid reuse of stopped monitor threads (#362)
1 parent 0fc33e2 commit c583a02

File tree

8 files changed

+200
-250
lines changed

8 files changed

+200
-250
lines changed

aws_advanced_python_wrapper/host_monitoring_plugin.py

Lines changed: 153 additions & 155 deletions
Large diffs are not rendered by default.

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ IamPlugin.IsNoneOrEmpty=[IamPlugin] Property "{}" is None or empty.
112112
LogUtils.Topology=[LogUtils] Topology {}
113113

114114
Monitor.ContextNone=[Monitor] Parameter 'context' should not evaluate to None.
115+
Monitor.ExceptionInMonitorLoop=[Monitor] Continuing monitoring after an unhandled exception was thrown in the monitoring thread for host '{}'.
116+
Monitor.StoppingMonitorUnhandledException=[Monitor] Stopping thread after an unhandled exception was thrown in the monitoring thread for host '{}'.
117+
Monitor.InterruptedException=[Monitor] Monitoring thread for host '{}' was interrupted.
115118
Monitor.OpenedMonitorConnection=[Monitor] Opened a monitoring connection to '{}'.
116119
Monitor.OpeningMonitorConnection=[Monitor] Opening a monitoring connection to '{}'.
117120

aws_advanced_python_wrapper/utils/log.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,38 +26,38 @@ class Logger:
2626
def __init__(self, name: str):
2727
self.logger = getLogger(name)
2828

29-
def debug(self, msg, *args):
29+
def debug(self, msg, *args, **kwargs):
3030
if not self.logger.isEnabledFor(logging.DEBUG):
3131
return
3232

3333
if args is not None and len(args) > 0:
34-
self.logger.debug(Messages.get_formatted(msg, *args))
34+
self.logger.debug(Messages.get_formatted(msg, *args), **kwargs)
3535
else:
3636
try:
37-
self.logger.debug(Messages.get(msg))
37+
self.logger.debug(Messages.get(msg), **kwargs)
3838
except NotInResourceBundleError:
39-
self.logger.debug(msg)
39+
self.logger.debug(msg, **kwargs)
4040

41-
def error(self, msg, *args):
41+
def error(self, msg, *args, **kwargs):
4242
if not self.logger.isEnabledFor(logging.ERROR):
4343
return
4444

4545
if args is not None and len(args) > 0:
46-
self.logger.error(Messages.get_formatted(msg, *args))
46+
self.logger.error(Messages.get_formatted(msg, *args), **kwargs)
4747
else:
4848
try:
49-
self.logger.error(Messages.get(msg))
49+
self.logger.error(Messages.get(msg), **kwargs)
5050
except NotInResourceBundleError:
51-
self.logger.error(msg)
51+
self.logger.error(msg, **kwargs)
5252

53-
def warning(self, msg, *args):
53+
def warning(self, msg, *args, **kwargs):
5454
if not self.logger.isEnabledFor(logging.WARNING):
5555
return
5656

5757
if args is not None and len(args) > 0:
58-
self.logger.warning(Messages.get_formatted(msg, *args))
58+
self.logger.warning(Messages.get_formatted(msg, *args), **kwargs)
5959
else:
6060
try:
61-
self.logger.warning(Messages.get(msg))
61+
self.logger.warning(Messages.get(msg), **kwargs)
6262
except NotInResourceBundleError:
63-
self.logger.warning(msg)
63+
self.logger.warning(msg, **kwargs)

aws_advanced_python_wrapper/utils/properties.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class WrapperProperties:
156156
MONITOR_DISPOSAL_TIME_MS = WrapperProperty(
157157
"monitor_disposal_time_ms",
158158
"Interval in milliseconds after which a monitor should be considered inactive and marked for disposal.",
159-
60_000)
159+
600_000) # 10 minutes
160160

161161
# Failover
162162
ENABLE_FAILOVER = WrapperProperty(

tests/unit/test_monitor.py

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,24 +71,12 @@ def mock_plugin_service(mocker, mock_conn, mock_driver_dialect):
7171

7272

7373
@pytest.fixture
74-
def mock_monitor_service(mocker):
75-
service = mocker.MagicMock()
76-
container = MonitoringThreadContainer()
77-
78-
def release_monitor(monitor_to_release):
79-
container.release_monitor(monitor_to_release)
80-
81-
service.notify_unused.side_effect = release_monitor
82-
return service
83-
84-
85-
@pytest.fixture
86-
def monitor(mock_plugin_service, mock_monitor_service, host_info, props):
74+
def monitor(mock_plugin_service, host_info, props):
8775
return Monitor(
8876
mock_plugin_service,
8977
host_info,
9078
props,
91-
mock_monitor_service)
79+
MonitoringThreadContainer())
9280

9381

9482
@pytest.fixture(autouse=True)
@@ -138,7 +126,6 @@ def test_run_host_available(
138126
monitor,
139127
host_info,
140128
monitoring_conn_props,
141-
mock_monitor_service,
142129
mock_plugin_service,
143130
mock_conn,
144131
mock_driver_dialect):
@@ -159,7 +146,6 @@ def test_run_host_available(
159146

160147
mock_plugin_service.force_connect.assert_called_once_with(host_info, monitoring_conn_props, None)
161148
mock_driver_dialect.abort_connection.assert_not_called()
162-
mock_monitor_service.notify_unused.assert_called_once()
163149
mock_conn.close.assert_called_once()
164150
assert context._is_host_unavailable is False
165151
assert monitor._is_stopped.is_set()
@@ -172,28 +158,27 @@ def test_run_host_unavailable(
172158
monitor,
173159
host_info,
174160
monitoring_conn_props,
175-
mock_monitor_service,
176161
mock_plugin_service,
177162
mock_conn,
178163
mock_driver_dialect):
179164
remove_delays()
180165
executor = ThreadPoolExecutor()
181166
context = MonitoringContext(monitor, mock_conn, mock_driver_dialect, 30, 10, 3)
182167

183-
mocker.patch("aws_advanced_python_wrapper.host_monitoring_plugin.Monitor._execute_conn_check", side_effect=TimeoutError())
168+
mocker.patch(
169+
"aws_advanced_python_wrapper.host_monitoring_plugin.Monitor._execute_conn_check", side_effect=TimeoutError())
184170
monitor.start_monitoring(context)
185171
future = executor.submit(monitor.run)
186172
wait([future], 3)
187173

188174
mock_plugin_service.force_connect.assert_called_once_with(host_info, monitoring_conn_props, None)
189175
mock_driver_dialect.abort_connection.assert_called_once()
190-
mock_monitor_service.notify_unused.assert_called_once()
191176
mock_conn.close.assert_called_once()
192177
assert context._is_host_unavailable is True
193178
assert monitor._is_stopped.is_set()
194179

195180

196-
def test_run__no_contexts(mocker, mock_monitor_service, monitor):
181+
def test_run__no_contexts(mocker, monitor):
197182
host_alias = "host-1"
198183
container = MonitoringThreadContainer()
199184
container._monitor_map.put_if_absent(host_alias, monitor)
@@ -209,7 +194,8 @@ def test_run__no_contexts(mocker, mock_monitor_service, monitor):
209194

210195
def test_check_connection_status__valid_then_invalid(mocker, monitor):
211196
mock_execute_conn_check = mocker.patch(
212-
"aws_advanced_python_wrapper.host_monitoring_plugin.Monitor._execute_conn_check", side_effect=[None, TimeoutError()])
197+
"aws_advanced_python_wrapper.host_monitoring_plugin.Monitor._execute_conn_check",
198+
side_effect=[None, TimeoutError()])
213199

214200
status = monitor._check_host_status(30) # Initiate a monitoring connection
215201
assert status.is_available
@@ -221,7 +207,8 @@ def test_check_connection_status__valid_then_invalid(mocker, monitor):
221207

222208

223209
def test_check_connection_status__conn_check_throws_exception(mocker, monitor):
224-
mocker.patch("aws_advanced_python_wrapper.host_monitoring_plugin.Monitor._execute_conn_check", side_effect=Exception())
210+
mocker.patch("aws_advanced_python_wrapper.host_monitoring_plugin.Monitor._execute_conn_check",
211+
side_effect=Exception())
225212

226213
status = monitor._check_host_status(30) # Initiate a monitoring connection
227214
assert status.is_available

tests/unit/test_monitor_service.py

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import psycopg
1616
import pytest
17+
from _weakref import ref
1718

1819
from aws_advanced_python_wrapper.errors import AwsWrapperError
1920
from aws_advanced_python_wrapper.host_monitoring_plugin import (
@@ -39,7 +40,9 @@ def mock_thread_container(mocker):
3940

4041
@pytest.fixture
4142
def mock_monitor(mocker):
42-
return mocker.MagicMock()
43+
monitor = mocker.MagicMock()
44+
monitor.is_stopped = False
45+
return monitor
4346

4447

4548
@pytest.fixture
@@ -57,21 +60,22 @@ def thread_container(mock_executor):
5760
@pytest.fixture
5861
def monitor_service_mocked_container(mock_plugin_service, mock_thread_container):
5962
service = MonitorService(mock_plugin_service)
60-
service._thread_container = mock_thread_container
63+
service._monitor_container = mock_thread_container
6164
return service
6265

6366

6467
@pytest.fixture
6568
def monitor_service_with_container(mock_plugin_service, thread_container):
6669
service = MonitorService(mock_plugin_service)
67-
service._thread_container = thread_container
70+
service._monitor_container = thread_container
6871
return service
6972

7073

7174
@pytest.fixture(autouse=True)
7275
def setup_teardown(mocker, mock_thread_container, mock_plugin_service, mock_monitor):
7376
mock_thread_container.get_or_create_monitor.return_value = mock_monitor
74-
mocker.patch("aws_advanced_python_wrapper.host_monitoring_plugin.MonitorService._create_monitor", return_value=mock_monitor)
77+
mocker.patch(
78+
"aws_advanced_python_wrapper.host_monitoring_plugin.MonitorService._create_monitor", return_value=mock_monitor)
7579

7680
yield
7781

@@ -91,7 +95,7 @@ def test_start_monitoring(
9195
mock_conn, aliases, HostInfo("instance-1"), Properties(), 5000, 1000, 3)
9296

9397
mock_monitor.start_monitoring.assert_called_once()
94-
assert mock_monitor == monitor_service_mocked_container._cached_monitor
98+
assert mock_monitor == monitor_service_mocked_container._cached_monitor()
9599
assert aliases == monitor_service_mocked_container._cached_monitor_aliases
96100

97101

@@ -105,14 +109,14 @@ def test_start_monitoring__multiple_calls(monitor_service_with_container, mock_m
105109

106110
assert num_calls == mock_monitor.start_monitoring.call_count
107111
mock_executor.submit.assert_called_once_with(mock_monitor.run)
108-
assert mock_monitor == monitor_service_with_container._cached_monitor
112+
assert mock_monitor == monitor_service_with_container._cached_monitor()
109113
assert aliases == monitor_service_with_container._cached_monitor_aliases
110114

111115

112116
def test_start_monitoring__cached_monitor(
113117
monitor_service_mocked_container, mock_plugin_service, mock_monitor, mock_conn, mock_thread_container):
114118
aliases = frozenset({"instance-1"})
115-
monitor_service_mocked_container._cached_monitor = mock_monitor
119+
monitor_service_mocked_container._cached_monitor = ref(mock_monitor)
116120
monitor_service_mocked_container._cached_monitor_aliases = aliases
117121

118122
monitor_service_mocked_container.start_monitoring(
@@ -121,7 +125,7 @@ def test_start_monitoring__cached_monitor(
121125
mock_plugin_service.get_dialect.assert_not_called()
122126
mock_thread_container.get_or_create_monitor.assert_not_called()
123127
mock_monitor.start_monitoring.assert_called_once()
124-
assert mock_monitor == monitor_service_mocked_container._cached_monitor
128+
assert mock_monitor == monitor_service_mocked_container._cached_monitor()
125129
assert aliases == monitor_service_mocked_container._cached_monitor_aliases
126130

127131

@@ -156,21 +160,9 @@ def test_stop_monitoring_host_connections(mocker, monitor_service_with_container
156160
mock_monitor2 = mocker.MagicMock()
157161
thread_container.get_or_create_monitor(aliases1, lambda: mock_monitor1)
158162
thread_container.get_or_create_monitor(aliases2, lambda: mock_monitor2)
159-
reset_resource_spy = mocker.spy(thread_container, 'reset_resource')
160163

161164
monitor_service_with_container.stop_monitoring_host(aliases1)
162165
mock_monitor1.clear_contexts.assert_called_once()
163-
reset_resource_spy.assert_called_once_with(mock_monitor1)
164-
reset_resource_spy.reset_mock()
165166

166167
monitor_service_with_container.stop_monitoring_host(aliases2)
167168
mock_monitor2.clear_contexts.assert_called_once()
168-
reset_resource_spy.assert_called_once_with(mock_monitor2)
169-
170-
171-
def test_release_resources(mocker, monitor_service_mocked_container):
172-
spy = mocker.spy(MonitoringThreadContainer, 'release_instance')
173-
174-
monitor_service_mocked_container.release_resources()
175-
assert monitor_service_mocked_container._thread_container is None
176-
spy.assert_called_once()

tests/unit/test_monitoring_thread_container.py

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -73,33 +73,15 @@ def release_container():
7373

7474
def test_get_or_create_monitor__monitor_created(
7575
container, mock_monitor_supplier, mock_stopped_monitor, mock_monitor1, mock_executor, mock_future):
76-
container._available_monitors.put(mock_stopped_monitor)
77-
container._tasks_map.put_if_absent(mock_stopped_monitor, mock_future)
78-
7976
result = container.get_or_create_monitor(frozenset({"alias-1", "alias-2"}), mock_monitor_supplier)
8077
assert mock_monitor1 == result
81-
assert container._tasks_map.get(mock_stopped_monitor) is None
82-
mock_future.cancel.assert_called_once()
78+
8379
mock_monitor_supplier.assert_called_once()
8480
mock_executor.submit.assert_called_once_with(mock_monitor1.run)
8581
assert mock_monitor1 == container._monitor_map.get("alias-1")
8682
assert mock_monitor1 == container._monitor_map.get("alias-2")
8783

8884

89-
def test_get_or_create_monitor__from_available_monitors(
90-
container, mock_monitor_supplier, mock_monitor1, mock_executor, mock_future):
91-
container._available_monitors.put(mock_monitor1)
92-
container._tasks_map.put_if_absent(mock_monitor1, mock_future)
93-
94-
result = container.get_or_create_monitor(frozenset({"alias-1", "alias-2"}), mock_monitor_supplier)
95-
assert mock_monitor1 == result
96-
mock_future.cancel.assert_not_called()
97-
mock_monitor_supplier.assert_not_called()
98-
mock_executor.submit.assert_not_called()
99-
assert mock_monitor1 == container._monitor_map.get("alias-1")
100-
assert mock_monitor1 == container._monitor_map.get("alias-2")
101-
102-
10385
def test_get_or_create_monitor__from_monitor_map(container, mock_monitor1):
10486
container._monitor_map.put_if_absent("alias-2", mock_monitor1)
10587

@@ -161,16 +143,6 @@ def test_get_or_create_monitor__null_monitor(container, mock_monitor_supplier):
161143
container.get_or_create_monitor(frozenset({"alias-1"}), mock_monitor_supplier)
162144

163145

164-
def test_reset_resource(mock_monitor1, mock_monitor2, container):
165-
container._monitor_map.put_if_absent("alias-1", mock_monitor1)
166-
container._monitor_map.put_if_absent("alias-2", mock_monitor2)
167-
168-
container.reset_resource(mock_monitor2)
169-
assert mock_monitor1 == container._monitor_map.get("alias-1")
170-
assert container._monitor_map.get("alias-2") is None
171-
assert mock_monitor2 == container._available_monitors.get()
172-
173-
174146
def test_release_monitor(mocker, mock_monitor1, mock_monitor2, container):
175147
container._monitor_map.put_if_absent("alias-1", mock_monitor1)
176148
container._monitor_map.put_if_absent("alias-2", mock_monitor2)
@@ -193,18 +165,13 @@ def test_release_instance(mocker, container, mock_monitor1, mock_future):
193165
container._tasks_map.put_if_absent(mock_monitor1, mock_future)
194166
mock_future.done.return_value = False
195167
mock_future.cancelled.return_value = False
196-
spy = mocker.spy(container._instance, "_release_resources")
197168

198169
container2 = MonitoringThreadContainer()
199170
assert container2 is container
200-
assert 2 == container._usage_count.get()
201171

202172
container2.release_instance()
203-
spy.assert_not_called()
204173

205-
container.release_instance()
206174
assert 0 == len(container._monitor_map)
207175
assert 0 == len(container._tasks_map)
208176
mock_future.cancel.assert_called_once()
209177
assert MonitoringThreadContainer._instance is None
210-
assert 0 == MonitoringThreadContainer._usage_count.get()

tests/unit/test_multithreaded_monitor_service.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,15 @@ def counter():
9797

9898
@pytest.fixture(autouse=True)
9999
def verify_concurrency(mock_monitor, mock_executor, mock_future, counter, concurrent_counter):
100+
# The ThreadPoolExecutor may have been shut down by a previous test, so we'll need to recreate it here.
101+
MonitoringThreadContainer._executor = ThreadPoolExecutor(thread_name_prefix="MonitoringThreadContainerExecutor")
100102
yield
101103

102104
counter.set(0)
103105
assert concurrent_counter.get() > 0
104106
concurrent_counter.set(0)
105107

106-
while MonitoringThreadContainer._instance is not None:
107-
MonitoringThreadContainer.release_instance()
108+
MonitoringThreadContainer.release_instance()
108109

109110

110111
def test_start_monitoring__connections_to_different_hosts(
@@ -122,12 +123,13 @@ def test_start_monitoring__connections_to_different_hosts(
122123

123124
try:
124125
mock_create_monitor = mocker.patch(
125-
"aws_advanced_python_wrapper.host_monitoring_plugin.MonitorService._create_monitor", return_value=mock_monitor)
126+
"aws_advanced_python_wrapper.host_monitoring_plugin.MonitorService._create_monitor",
127+
return_value=mock_monitor)
126128
contexts = start_monitoring(num_conns, services, host_alias_list)
127129
expected_start_monitoring_calls = [mocker.call(context) for context in contexts]
128130
mock_monitor.start_monitoring.assert_has_calls(expected_start_monitoring_calls, True)
129131
assert num_conns == len(MonitoringThreadContainer()._monitor_map)
130-
expected_create_monitor_calls = [mocker.call(host_info, props)] * num_conns
132+
expected_create_monitor_calls = [mocker.call(host_info, props, MonitoringThreadContainer())] * num_conns
131133
mock_create_monitor.assert_has_calls(expected_create_monitor_calls)
132134
finally:
133135
release_resources(services)
@@ -148,12 +150,13 @@ def test_start_monitoring__connections_to_same_host(
148150

149151
try:
150152
mock_create_monitor = mocker.patch(
151-
"aws_advanced_python_wrapper.host_monitoring_plugin.MonitorService._create_monitor", return_value=mock_monitor)
153+
"aws_advanced_python_wrapper.host_monitoring_plugin.MonitorService._create_monitor",
154+
return_value=mock_monitor)
152155
contexts = start_monitoring(num_conns, services, host_alias_list)
153156
expected_start_monitoring_calls = [mocker.call(context) for context in contexts]
154157
mock_monitor.start_monitoring.assert_has_calls(expected_start_monitoring_calls, True)
155158
assert 1 == len(MonitoringThreadContainer()._monitor_map)
156-
expected_create_monitor_calls = [mocker.call(host_info, props)]
159+
expected_create_monitor_calls = [mocker.call(host_info, props, MonitoringThreadContainer())]
157160
mock_create_monitor.assert_has_calls(expected_create_monitor_calls)
158161
finally:
159162
release_resources(services)

0 commit comments

Comments
 (0)