Skip to content

Commit 9727b6a

Browse files
committed
feat: efm v2 support
# Conflicts: # aws_advanced_python_wrapper/plugin_service.py # aws_advanced_python_wrapper/utils/atomic.py
1 parent f830594 commit 9727b6a

12 files changed

+959
-23
lines changed

aws_advanced_python_wrapper/host_monitoring_v2_plugin.py

Lines changed: 504 additions & 0 deletions
Large diffs are not rendered by default.

aws_advanced_python_wrapper/plugin_service.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@
7575
HostListProviderService, StaticHostListProvider)
7676
from aws_advanced_python_wrapper.host_monitoring_plugin import \
7777
HostMonitoringPluginFactory
78+
from aws_advanced_python_wrapper.host_monitoring_v2_plugin import \
79+
HostMonitoringV2PluginFactory
7880
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
7981
from aws_advanced_python_wrapper.iam_plugin import IamAuthPluginFactory
8082
from aws_advanced_python_wrapper.plugin import CanReleaseResources
@@ -755,6 +757,7 @@ class PluginManager(CanReleaseResources):
755757
"aws_secrets_manager": AwsSecretsManagerPluginFactory,
756758
"aurora_connection_tracker": AuroraConnectionTrackerPluginFactory,
757759
"host_monitoring": HostMonitoringPluginFactory,
760+
"host_monitoring_v2": HostMonitoringV2PluginFactory,
758761
"failover": FailoverPluginFactory,
759762
"read_write_splitting": ReadWriteSplittingPluginFactory,
760763
"fastest_response_strategy": FastestResponseStrategyPluginFactory,
@@ -783,6 +786,7 @@ class PluginManager(CanReleaseResources):
783786
ReadWriteSplittingPluginFactory: 300,
784787
FailoverPluginFactory: 400,
785788
HostMonitoringPluginFactory: 500,
789+
HostMonitoringV2PluginFactory: 510,
786790
BlueGreenPluginFactory: 550,
787791
FastestResponseStrategyPluginFactory: 600,
788792
IamAuthPluginFactory: 700,

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,30 @@ HostMonitoringPlugin.ConfigurationNotSupported=[HostMonitoringPlugin] Aborting c
176176
HostMonitoringPlugin.UnableToIdentifyConnection=[HostMonitoringPlugin] Unable to identify the connected database instance: '{}', please ensure the correct host list provider is specified. The host list provider in use is: '{}'.
177177
HostMonitoringPlugin.UnavailableHost=[HostMonitoringPlugin] Host '{}' is unavailable.
178178

179+
HostMonitoringV2Plugin.ActivatedMonitoring=[HostMonitoringV2Plugin] Executing method '{}', monitoring is activated.
180+
HostMonitoringV2Plugin.ClusterEndpointHostInfo=[HostMonitoringV2Plugin] The HostInfo to monitor is associated with a cluster endpoint. The plugin will attempt to identify the connected database instance.
181+
HostMonitoringV2Plugin.ErrorIdentifyingConnection=[HostMonitoringV2Plugin] An error occurred while identifying the connection database instance: '{}'.
182+
HostMonitoringV2Plugin.MonitoringDeactivated=[HostMonitoringV2Plugin] Monitoring deactivated for method '{}'.
183+
HostMonitoringV2Plugin.ConnectionNone=[HostMonitoringV2Plugin] Attempted to execute method '{}' but the current connection is None.
184+
HostMonitoringV2Plugin.HostInfoNone=[HostMonitoringV2Plugin] Could not find HostInfo to monitor for the current connection.
185+
HostMonitoringV2Plugin.ConfigurationNotSupported=[HostMonitoringV2Plugin] Aborting connections from a separate thread is not supported for the detected driver dialect: '{}'. The EFM V2 plugin requires this feature to be supported.
186+
HostMonitoringV2Plugin.UnableToIdentifyConnection=[HostMonitoringV2Plugin] Unable to identify the connected database instance: '{}', please ensure the correct host list provider is specified. The host list provider in use is: '{}'.
187+
188+
HostMonitorV2.ExceptionDuringMonitoringStop=[HostMonitorV2] Stopping monitoring after unhandled exception was thrown in monitoring thread for node '{}'.
189+
HostMonitorV2.MonitorIsStopped=[HostMonitorV2] Monitoring was already stopped for node '{}'.
190+
HostMonitorV2.StartMonitoringThreadNewContext=[HostMonitorV2] Start monitoring thread for checking new contexts for '{}'.
191+
HostMonitorV2.StopMonitoringThreadNewContext=[HostMonitorV2] Stop monitoring thread for checking new contexts for '{}'.
192+
HostMonitorV2.StartMonitoringThread=[HostMonitorV2] Start monitoring thread for '{}'.
193+
HostMonitorV2.OpeningMonitoringConnection=[HostMonitorV2] Opening a monitoring connection to '{}'
194+
HostMonitorV2.OpenedMonitoringConnection=[HostMonitorV2] Opened monitoring connection: '{}'
195+
HostMonitorV2.ExceptionAbortingConnection=[HostMonitorV2] Exception during aborting connection: '{}'
196+
HostMonitorV2.HostDead=[HostMonitorV2] Host '{}' is *dead*.
197+
HostMonitorV2.HostNotResponding=[HostMonitorV2] Host '{}' is not *responding* '{}'.
198+
HostMonitorV2.HostAlive=[HostMonitorV2] Host '{}' is *alive*.
199+
200+
MonitorServiceV2.ExceptionAbortingConnection=[MonitorServiceV2] Exception during aborting connection: '{}'
201+
MonitorServiceV2.HostNotResponding=[MonitorServiceV2] Host '{}' is not *responding* '{}'.
202+
179203
HostSelector.NoEligibleHost=[HostSelector] No Eligible Hosts Found.
180204
HostSelector.NoHostsMatchingRole=[HostSelector] No hosts were found matching the requested role: '{}'.
181205

aws_advanced_python_wrapper/utils/atomic.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from threading import Lock
16+
from typing import Any
1617

1718

1819
class AtomicInt:
@@ -59,3 +60,31 @@ def compare_and_set(self, expected_value: int, new_value: int) -> bool:
5960
self._value = new_value
6061
return True
6162
return False
63+
64+
65+
class AtomicBoolean:
66+
def __init__(self, initial_value: bool):
67+
self._value: bool = initial_value
68+
self._lock: Lock = Lock()
69+
70+
def get(self) -> bool:
71+
with self._lock:
72+
return self._value
73+
74+
def set(self, value: bool) -> None:
75+
with self._lock:
76+
self._value = value
77+
78+
79+
class AtomicReference:
80+
def __init__(self, initial_value):
81+
self._value = initial_value
82+
self._lock: Lock = Lock()
83+
84+
def get(self) -> Any:
85+
with self._lock:
86+
return self._value
87+
88+
def set(self, new_value: Any) -> None:
89+
with self._lock:
90+
self._value = new_value

docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,19 @@ conn = AwsWrapperConnection.connect(
7979
> We recommend you either disable the Host Monitoring Connection Plugin or avoid using RDS Proxy endpoints when the Host Monitoring Connection Plugin is active.
8080
>
8181
> Although using RDS Proxy endpoints with the AWS Advanced Python Driver with Enhanced Failure Monitoring doesn't cause any critical issues, we don't recommend this approach. The main reason is that RDS Proxy transparently re-routes requests to a single database instance. RDS Proxy decides which database instance is used based on many criteria (on a per-request basis). Switching between different instances makes the Host Monitoring Connection Plugin useless in terms of instance health monitoring because the plugin will be unable to identify which instance it's connected to, and which one it's monitoring. This could result in false positive failure detections. At the same time, the plugin will still proactively monitor network connectivity to RDS Proxy endpoints and report outages back to a user application if they occur.
82+
83+
# Host Monitoring Plugin v2
84+
85+
Host Monitoring Plugin v2, also known as `host_monitoring_v2`, is an alternative implementation of enhanced failure monitoring and it is functionally equal to the Host Monitoring Plugin described above. Both plugins share the same set of [configuration parameters](#enhanced-failure-monitoring-parameters). The `host_monitoring_v2` plugin is designed to be a drop-in replacement for the `host_monitoring` plugin.
86+
The `host_monitoring_v2` plugin can be used in any scenario where the `host_monitoring` plugin is mentioned. This plugin is enabled by default. The original EFM plugin can still be used by specifying `host_monitoring` in the `plugins` parameter.
87+
88+
> [!NOTE]\
89+
> Since these two plugins are separate plugins, users may decide to use them together with a single connection. While this should not have any negative side effects, it is not recommended. It is recommended to use either the `host_monitoring` plugin, or the `host_monitoring_v2` plugin where it's needed.
90+
91+
92+
The `host_monitoring_v2` plugin is designed to address [some of the issues](https://github.com/aws/aws-advanced-jdbc-wrapper/issues/675) that have been reported by multiple users. The following changes have been made:
93+
- Used weak pointers to ease garbage collection
94+
- Split monitoring logic into two separate threads to increase overall monitoring stability
95+
- Reviewed locks for monitoring context
96+
- Reviewed and redesigned stopping of idle monitoring threads
97+
- Reviewed and simplified monitoring logic

tests/integration/container/test_aurora_failover.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ def test_fail_from_writer_to_new_writer_fail_on_connection_bound_object_invocati
116116
assert aurora_utility.is_db_instance_writer(current_connection_id) is True
117117
assert current_connection_id != initial_writer_id
118118

119+
@pytest.mark.parametrize("plugins", ["failover,host_monitoring", "failover,host_monitoring_v2"])
119120
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
120121
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
121122
def test_fail_from_reader_to_writer(
@@ -124,12 +125,13 @@ def test_fail_from_reader_to_writer(
124125
test_driver: TestDriver,
125126
conn_utils,
126127
proxied_props,
127-
aurora_utility):
128+
aurora_utility,
129+
plugins):
128130
target_driver_connect = DriverHelper.get_connect_func(test_driver)
129131
reader: TestInstanceInfo = test_environment.get_proxy_instances()[1]
130132
writer_id: str = test_environment.get_proxy_writer().get_instance_id()
131133

132-
proxied_props["plugins"] = "failover,host_monitoring"
134+
proxied_props["plugins"] = plugins
133135
with AwsWrapperConnection.connect(
134136
target_driver_connect,
135137
**conn_utils.get_proxy_connect_params(reader.get_host()),

tests/integration/container/test_basic_connectivity.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,15 +126,16 @@ def test_proxied_wrapper_connection_failed(
126126
# That is expected exception. Test pass.
127127
assert True
128128

129+
@pytest.mark.parametrize("plugins", ["host_monitoring", "host_monitoring_v2"])
129130
@enable_on_num_instances(min_instances=2)
130131
@enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER])
131132
@enable_on_features([TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
132-
def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: TestDriver, conn_utils):
133+
def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: TestDriver, conn_utils, plugins):
133134
target_driver_connect = DriverHelper.get_connect_func(test_driver)
134135
conn = AwsWrapperConnection.connect(
135136
target_driver_connect,
136137
**conn_utils.get_connect_params(conn_utils.reader_cluster_host),
137-
plugins="host_monitoring", connect_timeout=10)
138+
plugins=plugins, connect_timeout=10)
138139
cursor = conn.cursor()
139140
cursor.execute("SELECT 1")
140141
result = cursor.fetchone()

tests/integration/container/test_failover_performance.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from concurrent.futures import ThreadPoolExecutor, as_completed
1919
from logging import getLogger
2020
from time import perf_counter_ns, sleep
21-
from typing import TYPE_CHECKING, List, Optional
21+
from typing import TYPE_CHECKING, Any, Dict, List, Optional
2222

2323
import pytest
2424

@@ -131,8 +131,9 @@ def props(self):
131131

132132
return props
133133

134+
@pytest.mark.parametrize("plugins", ["host_monitoring", "host_monitoring_v2"])
134135
def test_failure_detection_time_efm(self, test_environment: TestEnvironment, test_driver: TestDriver, conn_utils,
135-
props: Properties):
136+
props: Properties, plugins):
136137
enhanced_failure_monitoring_perf_data_list: List[PerfStatBase] = []
137138
target_driver_connect_func = DriverHelper.get_connect_func(test_driver)
138139
try:
@@ -147,7 +148,7 @@ def test_failure_detection_time_efm(self, test_environment: TestEnvironment, tes
147148
WrapperProperties.FAILURE_DETECTION_TIME_MS.set(props, str(detection_time))
148149
WrapperProperties.FAILURE_DETECTION_INTERVAL_MS.set(props, str(detection_interval))
149150
WrapperProperties.FAILURE_DETECTION_COUNT.set(props, str(detection_count))
150-
WrapperProperties.PLUGINS.set(props, "host_monitoring")
151+
WrapperProperties.PLUGINS.set(props, plugins)
151152

152153
data: PerfStatMonitoring = PerfStatMonitoring()
153154
self._measure_performance(test_environment, target_driver_connect_func, conn_utils, sleep_delay_sec, props, data)
@@ -159,11 +160,13 @@ def test_failure_detection_time_efm(self, test_environment: TestEnvironment, tes
159160
PerformanceUtil.write_perf_data_to_file(
160161
f"/app/tests/integration/container/reports/"
161162
f"DbEngine_{test_environment.get_engine()}_"
163+
f"Plugins_{plugins}_"
162164
f"FailureDetectionPerformanceResults_EnhancedMonitoringEnabled.csv",
163165
TestPerformance.PERF_STAT_MONITORING_HEADER, enhanced_failure_monitoring_perf_data_list)
164166

167+
@pytest.mark.parametrize("plugins", ["failover,host_monitoring", "failover,host_monitoring_v2"])
165168
def test_failure_detection_time_failover_and_efm(self, test_environment: TestEnvironment, test_driver: TestDriver, conn_utils,
166-
props: Properties):
169+
props: Properties, plugins):
167170
enhanced_failure_monitoring_perf_data_list: List[PerfStatBase] = []
168171
try:
169172
for i in range(len(TestPerformance.failure_detection_time_params)):
@@ -177,7 +180,7 @@ def test_failure_detection_time_failover_and_efm(self, test_environment: TestEnv
177180
WrapperProperties.FAILURE_DETECTION_TIME_MS.set(props, str(detection_time))
178181
WrapperProperties.FAILURE_DETECTION_INTERVAL_MS.set(props, str(detection_interval))
179182
WrapperProperties.FAILURE_DETECTION_COUNT.set(props, str(detection_count))
180-
WrapperProperties.PLUGINS.set(props, "failover,host_monitoring")
183+
WrapperProperties.PLUGINS.set(props, plugins)
181184
WrapperProperties.FAILOVER_TIMEOUT_SEC.set(props, TestPerformance.PERF_FAILOVER_TIMEOUT_SEC)
182185
WrapperProperties.FAILOVER_MODE.set(props, "strict_reader")
183186

@@ -191,6 +194,7 @@ def test_failure_detection_time_failover_and_efm(self, test_environment: TestEnv
191194
PerformanceUtil.write_perf_data_to_file(
192195
f"/app/tests/integration/container/reports/"
193196
f"DbEngine_{test_environment.get_engine()}_"
197+
f"Plugins_{plugins}_"
194198
f"FailureDetectionPerformanceResults_FailoverAndEnhancedMonitoringEnabled.csv",
195199
TestPerformance.PERF_STAT_MONITORING_HEADER, enhanced_failure_monitoring_perf_data_list)
196200

@@ -205,12 +209,14 @@ def _measure_performance(
205209
query: str = "SELECT pg_sleep(600)"
206210
downtime: AtomicInt = AtomicInt()
207211
elapsed_times: List[int] = []
208-
connection_str = conn_utils.get_proxy_conn_string(test_environment.get_proxy_writer().get_host())
209212

210213
for _ in range(TestPerformance.REPEAT_TIMES):
211214
downtime.set(0)
212215

213-
with self._open_connect_with_retry(connect_func, connection_str, props) as aws_conn, ThreadPoolExecutor() as executor:
216+
with self._open_connect_with_retry(connect_func,
217+
conn_utils.get_proxy_connect_params(
218+
test_environment.get_proxy_writer().get_host()),
219+
props) as aws_conn, ThreadPoolExecutor() as executor:
214220
try:
215221
futures = [
216222
executor.submit(self._stop_network_thread, test_environment, sleep_delay_sec, downtime),
@@ -236,21 +242,21 @@ def _measure_performance(
236242
data.max_failure_detection_time_millis = PerformanceUtil.to_millis(max_val)
237243
data.avg_failure_detection_time_millis = PerformanceUtil.to_millis(avg_val)
238244

239-
def _open_connect_with_retry(self, connect_func, conn_str: str, props: Properties):
245+
def _open_connect_with_retry(self, connect_func, connect_params: Dict[str, Any], props: Properties):
240246
connection_attempts: int = 0
241247
conn: Optional[Connection] = None
242248
while conn is None and connection_attempts < 10:
243249
try:
244250
conn = AwsWrapperConnection.connect(
245251
connect_func,
246-
conn_str,
252+
**connect_params,
247253
**props)
248254
except Exception as e:
249255
TestPerformance.logger.debug("OpenConnectionFailed", str(e))
250256
connection_attempts += 1
251257

252258
if conn is None:
253-
pytest.fail(f"Unable to connect to {conn_str}")
259+
pytest.fail(f"Unable to connect to {connect_params}")
254260
return conn
255261

256262
def _stop_network_thread(self, test_environment: TestEnvironment, sleep_delay_seconds: int, downtime: AtomicInt):

0 commit comments

Comments
 (0)