Skip to content

Commit 55dae2f

Browse files
committed
feat: efm v2 support
# Conflicts: # aws_advanced_python_wrapper/plugin_service.py # aws_advanced_python_wrapper/utils/atomic.py
1 parent 701f4d0 commit 55dae2f

File tree

9 files changed

+950
-14
lines changed

9 files changed

+950
-14
lines changed

aws_advanced_python_wrapper/host_monitoring_v2_plugin.py

Lines changed: 505 additions & 0 deletions
Large diffs are not rendered by default.

aws_advanced_python_wrapper/plugin_service.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@
7575
HostListProviderService, StaticHostListProvider)
7676
from aws_advanced_python_wrapper.host_monitoring_plugin import \
7777
HostMonitoringPluginFactory
78+
from aws_advanced_python_wrapper.host_monitoring_v2_plugin import \
79+
HostMonitoringV2PluginFactory
7880
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
7981
from aws_advanced_python_wrapper.iam_plugin import IamAuthPluginFactory
8082
from aws_advanced_python_wrapper.plugin import CanReleaseResources
@@ -755,6 +757,7 @@ class PluginManager(CanReleaseResources):
755757
"aws_secrets_manager": AwsSecretsManagerPluginFactory,
756758
"aurora_connection_tracker": AuroraConnectionTrackerPluginFactory,
757759
"host_monitoring": HostMonitoringPluginFactory,
760+
"host_monitoring_v2": HostMonitoringV2PluginFactory,
758761
"failover": FailoverPluginFactory,
759762
"read_write_splitting": ReadWriteSplittingPluginFactory,
760763
"fastest_response_strategy": FastestResponseStrategyPluginFactory,
@@ -783,6 +786,7 @@ class PluginManager(CanReleaseResources):
783786
ReadWriteSplittingPluginFactory: 300,
784787
FailoverPluginFactory: 400,
785788
HostMonitoringPluginFactory: 500,
789+
HostMonitoringV2PluginFactory: 510,
786790
BlueGreenPluginFactory: 550,
787791
FastestResponseStrategyPluginFactory: 600,
788792
IamAuthPluginFactory: 700,

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,39 @@ HostMonitoringPlugin.ConfigurationNotSupported=[HostMonitoringPlugin] Aborting c
176176
HostMonitoringPlugin.UnableToIdentifyConnection=[HostMonitoringPlugin] Unable to identify the connected database instance: '{}', please ensure the correct host list provider is specified. The host list provider in use is: '{}'.
177177
HostMonitoringPlugin.UnavailableHost=[HostMonitoringPlugin] Host '{}' is unavailable.
178178

179+
HostMonitoringV2Plugin.ActivatedMonitoring=[HostMonitoringV2Plugin] Executing method '{}', monitoring is activated.
180+
HostMonitoringV2Plugin.ClusterEndpointHostInfo=[HostMonitoringV2Plugin] The HostInfo to monitor is associated with a cluster endpoint. The plugin will attempt to identify the connected database instance.
181+
HostMonitoringV2Plugin.ErrorIdentifyingConnection=[HostMonitoringV2Plugin] An error occurred while identifying the connection database instance: '{}'.
182+
HostMonitoringV2Plugin.MonitoringDeactivated=[HostMonitoringV2Plugin] Monitoring deactivated for method '{}'.
183+
HostMonitoringV2Plugin.ConnectionNone=[HostMonitoringV2Plugin] Attempted to execute method '{}' but the current connection is None.
184+
HostMonitoringV2Plugin.HostInfoNone=[HostMonitoringV2Plugin] Could not find HostInfo to monitor for the current connection.
185+
HostMonitoringV2Plugin.HostInfoNoneForMethod=[HostMonitoringV2Plugin] Attempted to execute method '{}' but the current host info is None.
186+
HostMonitoringV2Plugin.ConfigurationNotSupported=[HostMonitoringV2Plugin] Aborting connections from a separate thread is not supported for the detected driver dialect: '{}'. The EFM plugin requires this feature to be supported.
187+
HostMonitoringV2Plugin.UnableToIdentifyConnection=[HostMonitoringV2Plugin] Unable to identify the connected database instance: '{}', please ensure the correct host list provider is specified. The host list provider in use is: '{}'.
188+
HostMonitoringV2Plugin.UnavailableHost=[HostMonitoringV2Plugin] Host '{}' is unavailable.
189+
190+
HostMonitorV2.ContextNullWarning=[HostMonitorV2] Parameter 'context' should not be null.
191+
HostMonitorV2.InterruptedExceptionDuringMonitoring=[HostMonitorV2] Monitoring thread for node {} was interrupted.
192+
HostMonitorV2.ExceptionDuringMonitoringContinue=[HostMonitorV2] Continuing monitoring after unhandled exception was thrown in monitoring thread for node '{}'.
193+
HostMonitorV2.ExceptionDuringMonitoringStop=[HostMonitorV2] Stopping monitoring after unhandled exception was thrown in monitoring thread for node '{}'.
194+
HostMonitorV2.MonitorIsStopped=[HostMonitorV2] Monitoring was already stopped for node '{}'.
195+
HostMonitorV2.Stopped=[HostMonitorV2] Stopped monitoring thread for node '{}'.
196+
HostMonitorV2.StartMonitoringThreadNewContext=[HostMonitorV2] Start monitoring thread for checking new contexts for '{}'.
197+
HostMonitorV2.StopMonitoringThreadNewContext=[HostMonitorV2] Stop monitoring thread for checking new contexts for '{}'.
198+
HostMonitorV2.StartMonitoringThread=[HostMonitorV2] Start monitoring thread for '{}'.
199+
HostMonitorV2.StopMonitoringThread=[HostMonitorV2] Stop monitoring thread for '{}'.
200+
HostMonitorV2.OpeningMonitoringConnection=[HostMonitorV2] Opening a monitoring connection to '{}'
201+
HostMonitorV2.OpenedMonitoringConnection=[HostMonitorV2] Opened monitoring connection: '{}'
202+
HostMonitorV2.ExceptionAbortingConnection=[HostMonitorV2] Exception during aborting connection: '{}'
203+
HostMonitorV2.HostDead=[HostMonitorV2] Host '{}' is *dead*.
204+
HostMonitorV2.HostNotResponding=[HostMonitorV2] Host '{}' is not *responding* '{}'.
205+
HostMonitorV2.HostAlive=[HostMonitorV2] Host '{}' is *alive*.
206+
207+
MonitorServiceV2.ExceptionAbortingConnection=[MonitorServiceV2] Exception during aborting connection: '{}'
208+
MonitorServiceV2.HostDead=[MonitorServiceV2] Host '{}' is *dead*.
209+
MonitorServiceV2.HostNotResponding=[MonitorServiceV2] Host '{}' is not *responding* '{}'.
210+
MonitorServiceV2.HostAlive=[MonitorServiceV2] Host '{}' is *alive*.
211+
179212
HostSelector.NoEligibleHost=[HostSelector] No Eligible Hosts Found.
180213
HostSelector.NoHostsMatchingRole=[HostSelector] No hosts were found matching the requested role: '{}'.
181214

aws_advanced_python_wrapper/utils/atomic.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from threading import Lock
16+
from typing import Any
1617

1718

1819
class AtomicInt:
@@ -59,3 +60,31 @@ def compare_and_set(self, expected_value: int, new_value: int) -> bool:
5960
self._value = new_value
6061
return True
6162
return False
63+
64+
65+
class AtomicBoolean:
66+
def __init__(self, initial_value: bool):
67+
self._value = initial_value
68+
self._lock: Lock = Lock()
69+
70+
def get(self) -> bool:
71+
with self._lock:
72+
return self._value
73+
74+
def set(self, value: bool) -> None:
75+
with self._lock:
76+
self._value = value
77+
78+
79+
class AtomicReference:
80+
def __init__(self, initial_value):
81+
self._value = initial_value
82+
self._lock: Lock = Lock()
83+
84+
def get(self) -> Any:
85+
with self._lock:
86+
return self._value
87+
88+
def set(self, new_value: Any) -> None:
89+
with self._lock:
90+
self._value = new_value

tests/integration/container/test_aurora_failover.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ def test_fail_from_writer_to_new_writer_fail_on_connection_bound_object_invocati
116116
assert aurora_utility.is_db_instance_writer(current_connection_id) is True
117117
assert current_connection_id != initial_writer_id
118118

119+
@pytest.mark.parametrize("plugins", ["failover,host_monitoring", "failover,host_monitoring_v2"])
119120
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
120121
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
121122
def test_fail_from_reader_to_writer(
@@ -124,12 +125,13 @@ def test_fail_from_reader_to_writer(
124125
test_driver: TestDriver,
125126
conn_utils,
126127
proxied_props,
127-
aurora_utility):
128+
aurora_utility,
129+
plugins):
128130
target_driver_connect = DriverHelper.get_connect_func(test_driver)
129131
reader: TestInstanceInfo = test_environment.get_proxy_instances()[1]
130132
writer_id: str = test_environment.get_proxy_writer().get_instance_id()
131133

132-
proxied_props["plugins"] = "failover,host_monitoring"
134+
proxied_props["plugins"] = plugins
133135
with AwsWrapperConnection.connect(
134136
target_driver_connect,
135137
**conn_utils.get_proxy_connect_params(reader.get_host()),

tests/integration/container/test_basic_connectivity.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,15 +126,32 @@ def test_proxied_wrapper_connection_failed(
126126
# That is expected exception. Test pass.
127127
assert True
128128

129+
@pytest.mark.parametrize("plugins", ["host_monitoring", "host_monitoring_v2"])
129130
@enable_on_num_instances(min_instances=2)
130131
@enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER])
131132
@enable_on_features([TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
132-
def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: TestDriver, conn_utils):
133+
def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: TestDriver, conn_utils, plugins):
133134
target_driver_connect = DriverHelper.get_connect_func(test_driver)
134135
conn = AwsWrapperConnection.connect(
135136
target_driver_connect,
136137
**conn_utils.get_connect_params(conn_utils.reader_cluster_host),
137-
plugins="host_monitoring", connect_timeout=10)
138+
plugins=plugins, connect_timeout=10)
139+
cursor = conn.cursor()
140+
cursor.execute("SELECT 1")
141+
result = cursor.fetchone()
142+
assert 1 == result[0]
143+
144+
conn.close()
145+
146+
@enable_on_num_instances(min_instances=2)
147+
@enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER])
148+
@enable_on_features([TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
149+
def test_wrapper_connection_reader_cluster_with_efmv2_enabled(self, test_driver: TestDriver, conn_utils):
150+
target_driver_connect = DriverHelper.get_connect_func(test_driver)
151+
conn = AwsWrapperConnection.connect(
152+
target_driver_connect,
153+
**conn_utils.get_connect_params(conn_utils.reader_cluster_host),
154+
plugins="host_monitoring_v2", connect_timeout=10)
138155
cursor = conn.cursor()
139156
cursor.execute("SELECT 1")
140157
result = cursor.fetchone()

tests/integration/container/test_failover_performance.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,9 @@ def props(self):
131131

132132
return props
133133

134+
@pytest.mark.parametrize("plugins", ["host_monitoring", "host_monitoring_v2"])
134135
def test_failure_detection_time_efm(self, test_environment: TestEnvironment, test_driver: TestDriver, conn_utils,
135-
props: Properties):
136+
props: Properties, plugins):
136137
enhanced_failure_monitoring_perf_data_list: List[PerfStatBase] = []
137138
target_driver_connect_func = DriverHelper.get_connect_func(test_driver)
138139
try:
@@ -147,7 +148,7 @@ def test_failure_detection_time_efm(self, test_environment: TestEnvironment, tes
147148
WrapperProperties.FAILURE_DETECTION_TIME_MS.set(props, str(detection_time))
148149
WrapperProperties.FAILURE_DETECTION_INTERVAL_MS.set(props, str(detection_interval))
149150
WrapperProperties.FAILURE_DETECTION_COUNT.set(props, str(detection_count))
150-
WrapperProperties.PLUGINS.set(props, "host_monitoring")
151+
WrapperProperties.PLUGINS.set(props, plugins)
151152

152153
data: PerfStatMonitoring = PerfStatMonitoring()
153154
self._measure_performance(test_environment, target_driver_connect_func, conn_utils, sleep_delay_sec, props, data)
@@ -162,8 +163,9 @@ def test_failure_detection_time_efm(self, test_environment: TestEnvironment, tes
162163
f"FailureDetectionPerformanceResults_EnhancedMonitoringEnabled.csv",
163164
TestPerformance.PERF_STAT_MONITORING_HEADER, enhanced_failure_monitoring_perf_data_list)
164165

166+
@pytest.mark.parametrize("plugins", ["failover,host_monitoring", "failover,host_monitoring_v2"])
165167
def test_failure_detection_time_failover_and_efm(self, test_environment: TestEnvironment, test_driver: TestDriver, conn_utils,
166-
props: Properties):
168+
props: Properties, plugins):
167169
enhanced_failure_monitoring_perf_data_list: List[PerfStatBase] = []
168170
try:
169171
for i in range(len(TestPerformance.failure_detection_time_params)):
@@ -177,7 +179,7 @@ def test_failure_detection_time_failover_and_efm(self, test_environment: TestEnv
177179
WrapperProperties.FAILURE_DETECTION_TIME_MS.set(props, str(detection_time))
178180
WrapperProperties.FAILURE_DETECTION_INTERVAL_MS.set(props, str(detection_interval))
179181
WrapperProperties.FAILURE_DETECTION_COUNT.set(props, str(detection_count))
180-
WrapperProperties.PLUGINS.set(props, "failover,host_monitoring")
182+
WrapperProperties.PLUGINS.set(props, plugins)
181183
WrapperProperties.FAILOVER_TIMEOUT_SEC.set(props, TestPerformance.PERF_FAILOVER_TIMEOUT_SEC)
182184
WrapperProperties.FAILOVER_MODE.set(props, "strict_reader")
183185

tests/integration/container/test_read_write_splitting.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -346,14 +346,16 @@ def test_failover_to_new_writer__switch_read_only(
346346
current_id = rds_utils.query_instance_id(conn)
347347
assert new_writer_id == current_id
348348

349+
@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring",
350+
"read_write_splitting,failover,host_monitoring_v2"])
349351
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
350352
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
351353
@enable_on_num_instances(min_instances=3)
352354
@disable_on_engines([DatabaseEngine.MYSQL])
353355
def test_failover_to_new_reader__switch_read_only(
354356
self, test_environment: TestEnvironment, test_driver: TestDriver,
355-
proxied_failover_props, conn_utils, rds_utils):
356-
WrapperProperties.PLUGINS.set(proxied_failover_props, "read_write_splitting,failover,host_monitoring")
357+
proxied_failover_props, conn_utils, rds_utils, plugins):
358+
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
357359
WrapperProperties.FAILOVER_MODE.set(proxied_failover_props, "reader-or-writer")
358360

359361
target_driver_connect = DriverHelper.get_connect_func(test_driver)
@@ -394,14 +396,16 @@ def test_failover_to_new_reader__switch_read_only(
394396
current_id = rds_utils.query_instance_id(conn)
395397
assert other_reader_id == current_id
396398

399+
@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring",
400+
"read_write_splitting,failover,host_monitoring_v2"])
397401
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
398402
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
399403
@enable_on_num_instances(min_instances=3)
400404
@disable_on_engines([DatabaseEngine.MYSQL])
401405
def test_failover_reader_to_writer__switch_read_only(
402406
self, test_environment: TestEnvironment, test_driver: TestDriver,
403-
proxied_failover_props, conn_utils, rds_utils):
404-
WrapperProperties.PLUGINS.set(proxied_failover_props, "read_write_splitting,failover,host_monitoring")
407+
proxied_failover_props, conn_utils, rds_utils, plugins):
408+
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
405409
target_driver_connect = DriverHelper.get_connect_func(test_driver)
406410
conn = AwsWrapperConnection.connect(
407411
target_driver_connect, **conn_utils.get_proxy_connect_params(), **proxied_failover_props)
@@ -513,17 +517,19 @@ def test_pooled_connection__cluster_url_failover(
513517
new_driver_conn = conn.target_connection
514518
assert initial_driver_conn is not new_driver_conn
515519

520+
@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring",
521+
"read_write_splitting,failover,host_monitoring_v2"])
516522
@enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED, TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
517523
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
518524
@disable_on_engines([DatabaseEngine.MYSQL])
519525
def test_pooled_connection__failover_failed(
520526
self, test_environment: TestEnvironment, test_driver: TestDriver,
521-
rds_utils, conn_utils, proxied_failover_props):
527+
rds_utils, conn_utils, proxied_failover_props, plugins):
522528
writer_host = test_environment.get_writer().get_host()
523529
provider = SqlAlchemyPooledConnectionProvider(lambda _, __: {"pool_size": 1}, None, lambda host_info, props: writer_host in host_info.host)
524530
ConnectionProviderManager.set_connection_provider(provider)
525531

526-
WrapperProperties.PLUGINS.set(proxied_failover_props, "read_write_splitting,failover,host_monitoring")
532+
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
527533
WrapperProperties.FAILOVER_TIMEOUT_SEC.set(proxied_failover_props, "1")
528534
WrapperProperties.FAILURE_DETECTION_TIME_MS.set(proxied_failover_props, "1000")
529535
WrapperProperties.FAILURE_DETECTION_COUNT.set(proxied_failover_props, "1")

0 commit comments

Comments
 (0)