Skip to content

Commit 796f385

Browse files
committed
feat: efm v2 support
# Conflicts: # aws_advanced_python_wrapper/plugin_service.py # aws_advanced_python_wrapper/utils/atomic.py # Conflicts: # docs/using-the-python-driver/UsingThePythonDriver.md
1 parent ee58b5e commit 796f385

19 files changed

+1155
-43
lines changed

aws_advanced_python_wrapper/host_monitoring_plugin.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
from concurrent.futures import (Executor, Future, ThreadPoolExecutor,
2626
TimeoutError)
27-
from copy import copy
2827
from dataclasses import dataclass
2928
from queue import Queue
3029
from threading import Event, Lock, RLock
@@ -43,6 +42,7 @@
4342
from aws_advanced_python_wrapper.utils.notifications import (
4443
ConnectionEvent, HostEvent, OldConnectionSuggestedAction)
4544
from aws_advanced_python_wrapper.utils.properties import (Properties,
45+
PropertiesUtils,
4646
WrapperProperties)
4747
from aws_advanced_python_wrapper.utils.rdsutils import RdsUtils
4848
from aws_advanced_python_wrapper.utils.telemetry.telemetry import (
@@ -335,7 +335,6 @@ class Monitor:
335335
_DEFAULT_CONNECT_TIMEOUT_SEC = 10
336336
_INACTIVE_SLEEP_MS = 100
337337
_MIN_HOST_CHECK_TIMEOUT_MS = 3000
338-
_MONITORING_PROPERTY_PREFIX = "monitoring-"
339338
_QUERY = "SELECT 1"
340339

341340
def __init__(
@@ -511,19 +510,15 @@ def _check_host_status(self, host_check_timeout_ms: int) -> HostStatus:
511510
try:
512511
driver_dialect = self._plugin_service.driver_dialect
513512
if self._monitoring_conn is None or driver_dialect.is_closed(self._monitoring_conn):
514-
props_copy: Properties = copy(self._props)
515-
for key, value in self._props.items():
516-
if key.startswith(Monitor._MONITORING_PROPERTY_PREFIX):
517-
props_copy[key[len(Monitor._MONITORING_PROPERTY_PREFIX):len(key)]] = value
518-
props_copy.pop(key, None)
513+
monitoring_properties: Properties = PropertiesUtils.create_monitoring_properties(self._props)
519514

520515
# Set a default connect timeout if the user hasn't configured one
521-
if props_copy.get(WrapperProperties.CONNECT_TIMEOUT_SEC.name, None) is None:
522-
props_copy[WrapperProperties.CONNECT_TIMEOUT_SEC.name] = Monitor._DEFAULT_CONNECT_TIMEOUT_SEC
516+
if monitoring_properties.get(WrapperProperties.CONNECT_TIMEOUT_SEC.name, None) is None:
517+
monitoring_properties[WrapperProperties.CONNECT_TIMEOUT_SEC.name] = Monitor._DEFAULT_CONNECT_TIMEOUT_SEC
523518

524519
logger.debug("Monitor.OpeningMonitorConnection", self._host_info.url)
525520
start_ns = perf_counter_ns()
526-
self._monitoring_conn = self._plugin_service.force_connect(self._host_info, props_copy, None)
521+
self._monitoring_conn = self._plugin_service.force_connect(self._host_info, monitoring_properties, None)
527522
logger.debug("Monitor.OpenedMonitorConnection", self._host_info.url)
528523
return Monitor.HostStatus(True, perf_counter_ns() - start_ns)
529524

aws_advanced_python_wrapper/host_monitoring_v2_plugin.py

Lines changed: 518 additions & 0 deletions
Large diffs are not rendered by default.

aws_advanced_python_wrapper/plugin_service.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@
7575
HostListProviderService, StaticHostListProvider)
7676
from aws_advanced_python_wrapper.host_monitoring_plugin import \
7777
HostMonitoringPluginFactory
78+
from aws_advanced_python_wrapper.host_monitoring_v2_plugin import \
79+
HostMonitoringV2PluginFactory
7880
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
7981
from aws_advanced_python_wrapper.iam_plugin import IamAuthPluginFactory
8082
from aws_advanced_python_wrapper.plugin import CanReleaseResources
@@ -755,6 +757,7 @@ class PluginManager(CanReleaseResources):
755757
"aws_secrets_manager": AwsSecretsManagerPluginFactory,
756758
"aurora_connection_tracker": AuroraConnectionTrackerPluginFactory,
757759
"host_monitoring": HostMonitoringPluginFactory,
760+
"host_monitoring_v2": HostMonitoringV2PluginFactory,
758761
"failover": FailoverPluginFactory,
759762
"read_write_splitting": ReadWriteSplittingPluginFactory,
760763
"fastest_response_strategy": FastestResponseStrategyPluginFactory,
@@ -783,6 +786,7 @@ class PluginManager(CanReleaseResources):
783786
ReadWriteSplittingPluginFactory: 300,
784787
FailoverPluginFactory: 400,
785788
HostMonitoringPluginFactory: 500,
789+
HostMonitoringV2PluginFactory: 510,
786790
BlueGreenPluginFactory: 550,
787791
FastestResponseStrategyPluginFactory: 600,
788792
IamAuthPluginFactory: 700,

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,28 @@ HostMonitoringPlugin.ConfigurationNotSupported=[HostMonitoringPlugin] Aborting c
176176
HostMonitoringPlugin.UnableToIdentifyConnection=[HostMonitoringPlugin] Unable to identify the connected database instance: '{}', please ensure the correct host list provider is specified. The host list provider in use is: '{}'.
177177
HostMonitoringPlugin.UnavailableHost=[HostMonitoringPlugin] Host '{}' is unavailable.
178178

179+
HostMonitoringV2Plugin.ActivatedMonitoring=[HostMonitoringV2Plugin] Executing method '{}', monitoring is activated.
180+
HostMonitoringV2Plugin.ClusterEndpointHostInfo=[HostMonitoringV2Plugin] The HostInfo to monitor is associated with a cluster endpoint. The plugin will attempt to identify the connected database instance.
181+
HostMonitoringV2Plugin.ErrorIdentifyingConnection=[HostMonitoringV2Plugin] An error occurred while identifying the connection database instance: '{}'.
182+
HostMonitoringV2Plugin.MonitoringDeactivated=[HostMonitoringV2Plugin] Monitoring deactivated for method '{}'.
183+
HostMonitoringV2Plugin.ConnectionNone=[HostMonitoringV2Plugin] Attempted to execute method '{}' but the current connection is None.
184+
HostMonitoringV2Plugin.HostInfoNone=[HostMonitoringV2Plugin] Could not find HostInfo to monitor for the current connection.
185+
HostMonitoringV2Plugin.ConfigurationNotSupported=[HostMonitoringV2Plugin] Aborting connections from a separate thread is not supported for the detected driver dialect: '{}'. The EFM V2 plugin requires this feature to be supported.
186+
HostMonitoringV2Plugin.UnableToIdentifyConnection=[HostMonitoringV2Plugin] Unable to identify the connected database instance: '{}', please ensure the correct host list provider is specified. The host list provider in use is: '{}'.
187+
HostMonitorV2.ExceptionDuringMonitoringStop=[HostMonitorV2] Stopping monitoring after unhandled exception was thrown in monitoring thread for host '{}'. Exception: '{}'
188+
HostMonitorV2.MonitorIsStopped=[HostMonitorV2] Monitoring was already stopped for host '{}'.
189+
HostMonitorV2.StartMonitoringThreadNewContext=[HostMonitorV2] Start monitoring thread for checking new contexts for '{}'.
190+
HostMonitorV2.StopMonitoringThreadNewContext=[HostMonitorV2] Stop monitoring thread for checking new contexts for '{}'.
191+
HostMonitorV2.StartMonitoringThread=[HostMonitorV2] Start monitoring thread for '{}'.
192+
HostMonitorV2.OpeningMonitoringConnection=[HostMonitorV2] Opening a monitoring connection to '{}'
193+
HostMonitorV2.OpenedMonitoringConnection=[HostMonitorV2] Opened monitoring connection: '{}'
194+
HostMonitorV2.ExceptionAbortingConnection=[HostMonitorV2] Exception while aborting connection: '{}'
195+
HostMonitorV2.HostDead=[HostMonitorV2] Host '{}' is *dead*.
196+
HostMonitorV2.HostNotResponding=[HostMonitorV2] Host '{}' is not *responding* '{}'.
197+
HostMonitorV2.HostAlive=[HostMonitorV2] Host '{}' is *alive*.
198+
HostMonitorV2.StopMonitoringThread=[HostMonitorV2] Stop monitoring thread for '{}'.
199+
MonitorServiceV2.ExceptionAbortingConnection=[MonitorServiceV2] Exception during aborting connection: '{}'
200+
179201
HostSelector.NoEligibleHost=[HostSelector] No Eligible Hosts Found.
180202
HostSelector.NoHostsMatchingRole=[HostSelector] No hosts were found matching the requested role: '{}'.
181203

@@ -193,15 +215,15 @@ LimitlessPlugin.UnsupportedDialectOrDatabase=[LimitlessPlugin] Unsupported diale
193215

194216
LimitlessQueryHelper.UnsupportedDialectOrDatabase=[LimitlessQueryHelper] Unsupported dialect '{}' encountered. Please ensure JDBC connection parameters are correct, and refer to the documentation to ensure that the connecting database is compatible with the Limitless Connection Plugin.
195217

196-
LimitlessRouterMonitor.errorDuringMonitoringStop=[LimitlessRouterMonitor] Stopping monitoring after unhandled error was thrown in Limitless Router Monitoring thread for node {}. Error: {}
197-
LimitlessRouterMonitor.InterruptedErrorDuringMonitoring=[LimitlessRouterMonitor] Limitless Router Monitoring thread for node {} was interrupted.
218+
LimitlessRouterMonitor.errorDuringMonitoringStop=[LimitlessRouterMonitor] Stopping monitoring after unhandled error was thrown in Limitless Router Monitoring thread for host {}. Error: {}
219+
LimitlessRouterMonitor.InterruptedErrorDuringMonitoring=[LimitlessRouterMonitor] Limitless Router Monitoring thread for host {} was interrupted.
198220
LimitlessRouterMonitor.InvalidQuery=[LimitlessRouterMonitor] Limitless Connection Plugin has encountered an error obtaining Limitless Router endpoints. Please ensure that you are connecting to an Aurora Limitless Database Shard Group Endpoint URL.
199221
LimitlessRouterMonitor.InvalidRouterLoad=[LimitlessRouterMonitor] Invalid load metric value of '{}' from the transaction router query aurora_limitless_router_endpoints() for transaction router '{}'. The load metric value must be a decimal value between 0 and 1. Host weight be assigned a default weight of 1.
200222
LimitlessRouterMonitor.GetNetworkTimeoutError=[LimitlessRouterMonitor] An error occurred while getting the connection network timeout: {}
201223
LimitlessRouterMonitor.OpeningConnection=[LimitlessRouterMonitor] Opening Limitless Router Monitor connection to '{}'.
202224
LimitlessRouterMonitor.OpenedConnection=[LimitlessRouterMonitor] Opened Limitless Router Monitor connection: {}.
203-
LimitlessRouterMonitor.Running=[LimitlessRouterMonitor] Limitless Router Monitor thread running on node {}.
204-
LimitlessRouterMonitor.Stopped=[LimitlessRouterMonitor] Limitless Router Monitor thread stopped on node {}.
225+
LimitlessRouterMonitor.Running=[LimitlessRouterMonitor] Limitless Router Monitor thread running on host {}.
226+
LimitlessRouterMonitor.Stopped=[LimitlessRouterMonitor] Limitless Router Monitor thread stopped on host {}.
205227

206228
LimitlessRouterService.ConnectWithHost=[LimitlessRouterService] Connecting to host {}.
207229
LimitlessRouterService.ErrorClosingMonitor=[LimitlessRouterService] An error occurred while closing Limitless Router Monitor: {}

aws_advanced_python_wrapper/utils/atomic.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
# limitations under the License.
1414

1515
from threading import Lock
16+
from typing import Generic, TypeVar
17+
18+
T = TypeVar('T')
1619

1720

1821
class AtomicInt:
@@ -59,3 +62,31 @@ def compare_and_set(self, expected_value: int, new_value: int) -> bool:
5962
self._value = new_value
6063
return True
6164
return False
65+
66+
67+
class AtomicBoolean:
68+
def __init__(self, initial_value: bool):
69+
self._value: bool = initial_value
70+
self._lock: Lock = Lock()
71+
72+
def get(self) -> bool:
73+
with self._lock:
74+
return self._value
75+
76+
def set(self, value: bool) -> None:
77+
with self._lock:
78+
self._value = value
79+
80+
81+
class AtomicReference(Generic[T]):
82+
def __init__(self, initial_value: T):
83+
self._value: T = initial_value
84+
self._lock: Lock = Lock()
85+
86+
def get(self) -> T:
87+
with self._lock:
88+
return self._value
89+
90+
def set(self, new_value: T) -> None:
91+
with self._lock:
92+
self._value = new_value

aws_advanced_python_wrapper/utils/properties.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
14+
import copy
1515
from typing import Any, Dict, Optional
1616
from urllib.parse import unquote
1717

@@ -73,7 +73,7 @@ def set(self, props: Properties, value: Any):
7373

7474

7575
class WrapperProperties:
76-
DEFAULT_PLUGINS = "aurora_connection_tracker,failover,host_monitoring"
76+
DEFAULT_PLUGINS = "aurora_connection_tracker,failover,host_monitoring_v2"
7777
_DEFAULT_TOKEN_EXPIRATION_SEC = 15 * 60
7878

7979
PROFILE_NAME = WrapperProperty("profile_name", "Driver configuration profile name", None)
@@ -464,6 +464,8 @@ class WrapperProperties:
464464

465465

466466
class PropertiesUtils:
467+
_MONITORING_PROPERTY_PREFIX = "monitoring-"
468+
467469
@staticmethod
468470
def parse_properties(conn_info: str, **kwargs: Any) -> Properties:
469471
if conn_info.startswith("postgresql://") or conn_info.startswith("postgres://"):
@@ -582,7 +584,7 @@ def remove_wrapper_props(props: Properties):
582584
if attr_val.name not in persisting_properties:
583585
props.pop(attr_val.name, None)
584586

585-
monitor_prop_keys = [key for key in props if key.startswith("monitoring-")]
587+
monitor_prop_keys = [key for key in props if key.startswith(PropertiesUtils._MONITORING_PROPERTY_PREFIX)]
586588
for key in monitor_prop_keys:
587589
props.pop(key, None)
588590

@@ -610,3 +612,12 @@ def mask_properties(props: Properties) -> Properties:
610612
masked_properties[WrapperProperties.PASSWORD.name] = "***"
611613

612614
return masked_properties
615+
616+
@staticmethod
617+
def create_monitoring_properties(props: Properties) -> Properties:
618+
monitoring_properties = copy.deepcopy(props)
619+
for property_key in monitoring_properties.keys():
620+
if property_key.startswith(PropertiesUtils._MONITORING_PROPERTY_PREFIX):
621+
monitoring_properties[property_key[len(PropertiesUtils._MONITORING_PROPERTY_PREFIX):]] = \
622+
monitoring_properties[property_key]
623+
return monitoring_properties

docs/using-the-python-driver/UsingThePythonDriver.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ The AWS Advanced Python Driver has several built-in plugins that are available t
6464
| Plugin name | Plugin Code | Database Compatibility | Description | Additional Required Dependencies |
6565
|--------------------------------------------------------------------------------------------------------|-----------------------------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------|
6666
| [Failover Connection Plugin](./using-plugins/UsingTheFailoverPlugin.md) | `failover` | Aurora | Enables the failover functionality supported by Amazon Aurora clusters. Prevents opening a wrong connection to an old writer host dues to stale DNS after failover event. This plugin is enabled by default. | None |
67-
| [Host Monitoring Connection Plugin](./using-plugins/UsingTheHostMonitoringPlugin.md) | `host_monitoring` | Aurora | Enables enhanced host connection failure monitoring, allowing faster failure detection rates. This plugin is enabled by default. | None |
67+
| [Host Monitoring Connection Plugin](./using-plugins/UsingTheHostMonitoringPlugin.md) | `host_monitoring_v2` or `host_monitoring` | Aurora | Enables enhanced host connection failure monitoring, allowing faster failure detection rates. This plugin is enabled by default. | None |
6868
| [IAM Authentication Connection Plugin](./using-plugins/UsingTheIamAuthenticationPlugin.md) | `iam` | Any database | Enables users to connect to their Amazon Aurora clusters using AWS Identity and Access Management (IAM). | [Boto3 - AWS SDK for Python](https://aws.amazon.com/sdk-for-python/) |
6969
| [AWS Secrets Manager Connection Plugin](./using-plugins/UsingTheAwsSecretsManagerPlugin.md) | `aws_secrets_manager` | Any database | Enables fetching database credentials from the AWS Secrets Manager service. | [Boto3 - AWS SDK for Python](https://aws.amazon.com/sdk-for-python/) |
7070
| [Federated Authentication Connection Plugin](./using-plugins/UsingTheFederatedAuthenticationPlugin.md) | `federated_auth` | Any database | Enables users to authenticate via Federated Identity and then database access via IAM. | [Boto3 - AWS SDK for Python](https://aws.amazon.com/sdk-for-python/) |

docs/using-the-python-driver/using-plugins/UsingTheFastestResponseStrategyPlugin.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ The plugin can be loaded by adding the plugin code `fastest_response_strategy` t
1313
1414
```python
1515
params = {
16-
"plugins": "read_write_splitting,fastest_response_strategy,failover,host_monitoring",
16+
"plugins": "read_write_splitting,fastest_response_strategy,failover,host_monitoring_v2",
1717
"reader_response_strategy": "fastest_response"
1818
# Add other connection properties below...
1919
}

0 commit comments

Comments
 (0)