Skip to content

feat: efm v2 support #930

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
506 changes: 506 additions & 0 deletions aws_advanced_python_wrapper/host_monitoring_v2_plugin.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions aws_advanced_python_wrapper/plugin_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
HostListProviderService, StaticHostListProvider)
from aws_advanced_python_wrapper.host_monitoring_plugin import \
HostMonitoringPluginFactory
from aws_advanced_python_wrapper.host_monitoring_v2_plugin import \
HostMonitoringV2PluginFactory
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
from aws_advanced_python_wrapper.iam_plugin import IamAuthPluginFactory
from aws_advanced_python_wrapper.plugin import CanReleaseResources
Expand Down Expand Up @@ -755,6 +757,7 @@ class PluginManager(CanReleaseResources):
"aws_secrets_manager": AwsSecretsManagerPluginFactory,
"aurora_connection_tracker": AuroraConnectionTrackerPluginFactory,
"host_monitoring": HostMonitoringPluginFactory,
"host_monitoring_v2": HostMonitoringV2PluginFactory,
"failover": FailoverPluginFactory,
"read_write_splitting": ReadWriteSplittingPluginFactory,
"fastest_response_strategy": FastestResponseStrategyPluginFactory,
Expand Down Expand Up @@ -783,6 +786,7 @@ class PluginManager(CanReleaseResources):
ReadWriteSplittingPluginFactory: 300,
FailoverPluginFactory: 400,
HostMonitoringPluginFactory: 500,
HostMonitoringV2PluginFactory: 510,
BlueGreenPluginFactory: 550,
FastestResponseStrategyPluginFactory: 600,
IamAuthPluginFactory: 700,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,30 @@ HostMonitoringPlugin.ConfigurationNotSupported=[HostMonitoringPlugin] Aborting c
HostMonitoringPlugin.UnableToIdentifyConnection=[HostMonitoringPlugin] Unable to identify the connected database instance: '{}', please ensure the correct host list provider is specified. The host list provider in use is: '{}'.
HostMonitoringPlugin.UnavailableHost=[HostMonitoringPlugin] Host '{}' is unavailable.

HostMonitoringV2Plugin.ActivatedMonitoring=[HostMonitoringV2Plugin] Executing method '{}', monitoring is activated.
HostMonitoringV2Plugin.ClusterEndpointHostInfo=[HostMonitoringV2Plugin] The HostInfo to monitor is associated with a cluster endpoint. The plugin will attempt to identify the connected database instance.
HostMonitoringV2Plugin.ErrorIdentifyingConnection=[HostMonitoringV2Plugin] An error occurred while identifying the connection database instance: '{}'.
HostMonitoringV2Plugin.MonitoringDeactivated=[HostMonitoringV2Plugin] Monitoring deactivated for method '{}'.
HostMonitoringV2Plugin.ConnectionNone=[HostMonitoringV2Plugin] Attempted to execute method '{}' but the current connection is None.
HostMonitoringV2Plugin.HostInfoNone=[HostMonitoringV2Plugin] Could not find HostInfo to monitor for the current connection.
HostMonitoringV2Plugin.ConfigurationNotSupported=[HostMonitoringV2Plugin] Aborting connections from a separate thread is not supported for the detected driver dialect: '{}'. The EFM V2 plugin requires this feature to be supported.
HostMonitoringV2Plugin.UnableToIdentifyConnection=[HostMonitoringV2Plugin] Unable to identify the connected database instance: '{}', please ensure the correct host list provider is specified. The host list provider in use is: '{}'.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are a couple unnecessary new lines in this file

HostMonitorV2.ExceptionDuringMonitoringStop=[HostMonitorV2] Stopping monitoring after unhandled exception was thrown in monitoring thread for node '{}'.
HostMonitorV2.MonitorIsStopped=[HostMonitorV2] Monitoring was already stopped for node '{}'.
HostMonitorV2.StartMonitoringThreadNewContext=[HostMonitorV2] Start monitoring thread for checking new contexts for '{}'.
HostMonitorV2.StopMonitoringThreadNewContext=[HostMonitorV2] Stop monitoring thread for checking new contexts for '{}'.
HostMonitorV2.StartMonitoringThread=[HostMonitorV2] Start monitoring thread for '{}'.
HostMonitorV2.OpeningMonitoringConnection=[HostMonitorV2] Opening a monitoring connection to '{}'
HostMonitorV2.OpenedMonitoringConnection=[HostMonitorV2] Opened monitoring connection: '{}'
HostMonitorV2.ExceptionAbortingConnection=[HostMonitorV2] Exception during aborting connection: '{}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
HostMonitorV2.ExceptionAbortingConnection=[HostMonitorV2] Exception during aborting connection: '{}'
HostMonitorV2.ExceptionAbortingConnection=[HostMonitorV2] Exception while aborting connection: '{}'

HostMonitorV2.HostDead=[HostMonitorV2] Host '{}' is *dead*.
HostMonitorV2.HostNotResponding=[HostMonitorV2] Host '{}' is not *responding* '{}'.
HostMonitorV2.HostAlive=[HostMonitorV2] Host '{}' is *alive*.

MonitorServiceV2.ExceptionAbortingConnection=[MonitorServiceV2] Exception during aborting connection: '{}'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MonitorServiceV2.ExceptionAbortingConnection=[MonitorServiceV2] Exception during aborting connection: '{}'
MonitorServiceV2.ExceptionAbortingConnection=[MonitorServiceV2] Exception while aborting connection: '{}'

MonitorServiceV2.HostNotResponding=[MonitorServiceV2] Host '{}' is not *responding* '{}'.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a duplicate of 197


HostSelector.NoEligibleHost=[HostSelector] No Eligible Hosts Found.
HostSelector.NoHostsMatchingRole=[HostSelector] No hosts were found matching the requested role: '{}'.

Expand Down
29 changes: 29 additions & 0 deletions aws_advanced_python_wrapper/utils/atomic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from threading import Lock
from typing import Any


class AtomicInt:
Expand Down Expand Up @@ -59,3 +60,31 @@ def compare_and_set(self, expected_value: int, new_value: int) -> bool:
self._value = new_value
return True
return False


class AtomicBoolean:
def __init__(self, initial_value: bool):
self._value: bool = initial_value
self._lock: Lock = Lock()

def get(self) -> bool:
with self._lock:
return self._value

def set(self, value: bool) -> None:
with self._lock:
self._value = value


class AtomicReference:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to use generics here instead of Any? might be an overkill

def __init__(self, initial_value):
self._value = initial_value
self._lock: Lock = Lock()

def get(self) -> Any:
with self._lock:
return self._value

def set(self, new_value: Any) -> None:
with self._lock:
self._value = new_value
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,19 @@ conn = AwsWrapperConnection.connect(
> We recommend you either disable the Host Monitoring Connection Plugin or avoid using RDS Proxy endpoints when the Host Monitoring Connection Plugin is active.
>
> Although using RDS Proxy endpoints with the AWS Advanced Python Driver with Enhanced Failure Monitoring doesn't cause any critical issues, we don't recommend this approach. The main reason is that RDS Proxy transparently re-routes requests to a single database instance. RDS Proxy decides which database instance is used based on many criteria (on a per-request basis). Switching between different instances makes the Host Monitoring Connection Plugin useless in terms of instance health monitoring because the plugin will be unable to identify which instance it's connected to, and which one it's monitoring. This could result in false positive failure detections. At the same time, the plugin will still proactively monitor network connectivity to RDS Proxy endpoints and report outages back to a user application if they occur.

# Host Monitoring Plugin v2

Host Monitoring Plugin v2, also known as `host_monitoring_v2`, is an alternative implementation of enhanced failure monitoring and it is functionally equal to the Host Monitoring Plugin described above. Both plugins share the same set of [configuration parameters](#enhanced-failure-monitoring-parameters). The `host_monitoring_v2` plugin is designed to be a drop-in replacement for the `host_monitoring` plugin.
The `host_monitoring_v2` plugin can be used in any scenario where the `host_monitoring` plugin is mentioned. This plugin is enabled by default. The original EFM plugin can still be used by specifying `host_monitoring` in the `plugins` parameter.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value DEFAULT_PLUGINS = "aurora_connection_tracker,failover,host_monitoring"
in properties.py needs to be updated for this to be true. Should also double check other documentation for mentions of default plugins and update those

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to update the DEFAULT_PLUGINS value in utils\properties.py so that it uses the new plugin by default instead of v1


> [!NOTE]\
> Since these two plugins are separate plugins, users may decide to use them together with a single connection. While this should not have any negative side effects, it is not recommended. It is recommended to use either the `host_monitoring` plugin, or the `host_monitoring_v2` plugin where it's needed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
> Since these two plugins are separate plugins, users may decide to use them together with a single connection. While this should not have any negative side effects, it is not recommended. It is recommended to use either the `host_monitoring` plugin, or the `host_monitoring_v2` plugin where it's needed.
> Since these two plugins are separate plugins, users may decide to use them together with a single connection. While this should not have any negative side effects, it is not recommended. It is recommended to use either the `host_monitoring_v2` plugin, or the `host_monitoring` plugin where it's needed.



The `host_monitoring_v2` plugin is designed to address [some of the issues](https://github.com/aws/aws-advanced-jdbc-wrapper/issues/675) that have been reported by multiple users. The following changes have been made:
- Used weak pointers to ease garbage collection
- Split monitoring logic into two separate threads to increase overall monitoring stability
- Reviewed locks for monitoring context
- Reviewed and redesigned stopping of idle monitoring threads
- Reviewed and simplified monitoring logic
6 changes: 4 additions & 2 deletions tests/integration/container/test_aurora_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def test_fail_from_writer_to_new_writer_fail_on_connection_bound_object_invocati
assert aurora_utility.is_db_instance_writer(current_connection_id) is True
assert current_connection_id != initial_writer_id

@pytest.mark.parametrize("plugins", ["failover,host_monitoring", "failover,host_monitoring_v2"])
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
def test_fail_from_reader_to_writer(
Expand All @@ -124,12 +125,13 @@ def test_fail_from_reader_to_writer(
test_driver: TestDriver,
conn_utils,
proxied_props,
aurora_utility):
aurora_utility,
plugins):
target_driver_connect = DriverHelper.get_connect_func(test_driver)
reader: TestInstanceInfo = test_environment.get_proxy_instances()[1]
writer_id: str = test_environment.get_proxy_writer().get_instance_id()

proxied_props["plugins"] = "failover,host_monitoring"
proxied_props["plugins"] = plugins
with AwsWrapperConnection.connect(
target_driver_connect,
**conn_utils.get_proxy_connect_params(reader.get_host()),
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/container/test_basic_connectivity.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,16 @@ def test_proxied_wrapper_connection_failed(
# That is expected exception. Test pass.
assert True

@pytest.mark.parametrize("plugins", ["host_monitoring", "host_monitoring_v2"])
@enable_on_num_instances(min_instances=2)
@enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER])
@enable_on_features([TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: TestDriver, conn_utils):
def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: TestDriver, conn_utils, plugins):
target_driver_connect = DriverHelper.get_connect_func(test_driver)
conn = AwsWrapperConnection.connect(
target_driver_connect,
**conn_utils.get_connect_params(conn_utils.reader_cluster_host),
plugins="host_monitoring", connect_timeout=10)
plugins=plugins, connect_timeout=10)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
Expand Down
26 changes: 16 additions & 10 deletions tests/integration/container/test_failover_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from logging import getLogger
from time import perf_counter_ns, sleep
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional

import pytest

Expand Down Expand Up @@ -131,8 +131,9 @@ def props(self):

return props

@pytest.mark.parametrize("plugins", ["host_monitoring", "host_monitoring_v2"])
def test_failure_detection_time_efm(self, test_environment: TestEnvironment, test_driver: TestDriver, conn_utils,
props: Properties):
props: Properties, plugins):
enhanced_failure_monitoring_perf_data_list: List[PerfStatBase] = []
target_driver_connect_func = DriverHelper.get_connect_func(test_driver)
try:
Expand All @@ -147,7 +148,7 @@ def test_failure_detection_time_efm(self, test_environment: TestEnvironment, tes
WrapperProperties.FAILURE_DETECTION_TIME_MS.set(props, str(detection_time))
WrapperProperties.FAILURE_DETECTION_INTERVAL_MS.set(props, str(detection_interval))
WrapperProperties.FAILURE_DETECTION_COUNT.set(props, str(detection_count))
WrapperProperties.PLUGINS.set(props, "host_monitoring")
WrapperProperties.PLUGINS.set(props, plugins)

data: PerfStatMonitoring = PerfStatMonitoring()
self._measure_performance(test_environment, target_driver_connect_func, conn_utils, sleep_delay_sec, props, data)
Expand All @@ -159,11 +160,13 @@ def test_failure_detection_time_efm(self, test_environment: TestEnvironment, tes
PerformanceUtil.write_perf_data_to_file(
f"/app/tests/integration/container/reports/"
f"DbEngine_{test_environment.get_engine()}_"
f"Plugins_{plugins}_"
f"FailureDetectionPerformanceResults_EnhancedMonitoringEnabled.csv",
TestPerformance.PERF_STAT_MONITORING_HEADER, enhanced_failure_monitoring_perf_data_list)

@pytest.mark.parametrize("plugins", ["failover,host_monitoring", "failover,host_monitoring_v2"])
def test_failure_detection_time_failover_and_efm(self, test_environment: TestEnvironment, test_driver: TestDriver, conn_utils,
props: Properties):
props: Properties, plugins):
enhanced_failure_monitoring_perf_data_list: List[PerfStatBase] = []
try:
for i in range(len(TestPerformance.failure_detection_time_params)):
Expand All @@ -177,7 +180,7 @@ def test_failure_detection_time_failover_and_efm(self, test_environment: TestEnv
WrapperProperties.FAILURE_DETECTION_TIME_MS.set(props, str(detection_time))
WrapperProperties.FAILURE_DETECTION_INTERVAL_MS.set(props, str(detection_interval))
WrapperProperties.FAILURE_DETECTION_COUNT.set(props, str(detection_count))
WrapperProperties.PLUGINS.set(props, "failover,host_monitoring")
WrapperProperties.PLUGINS.set(props, plugins)
WrapperProperties.FAILOVER_TIMEOUT_SEC.set(props, TestPerformance.PERF_FAILOVER_TIMEOUT_SEC)
WrapperProperties.FAILOVER_MODE.set(props, "strict_reader")

Expand All @@ -191,6 +194,7 @@ def test_failure_detection_time_failover_and_efm(self, test_environment: TestEnv
PerformanceUtil.write_perf_data_to_file(
f"/app/tests/integration/container/reports/"
f"DbEngine_{test_environment.get_engine()}_"
f"Plugins_{plugins}_"
f"FailureDetectionPerformanceResults_FailoverAndEnhancedMonitoringEnabled.csv",
TestPerformance.PERF_STAT_MONITORING_HEADER, enhanced_failure_monitoring_perf_data_list)

Expand All @@ -205,12 +209,14 @@ def _measure_performance(
query: str = "SELECT pg_sleep(600)"
downtime: AtomicInt = AtomicInt()
elapsed_times: List[int] = []
connection_str = conn_utils.get_proxy_conn_string(test_environment.get_proxy_writer().get_host())

for _ in range(TestPerformance.REPEAT_TIMES):
downtime.set(0)

with self._open_connect_with_retry(connect_func, connection_str, props) as aws_conn, ThreadPoolExecutor() as executor:
with self._open_connect_with_retry(connect_func,
conn_utils.get_proxy_connect_params(
test_environment.get_proxy_writer().get_host()),
props) as aws_conn, ThreadPoolExecutor() as executor:
try:
futures = [
executor.submit(self._stop_network_thread, test_environment, sleep_delay_sec, downtime),
Expand All @@ -236,21 +242,21 @@ def _measure_performance(
data.max_failure_detection_time_millis = PerformanceUtil.to_millis(max_val)
data.avg_failure_detection_time_millis = PerformanceUtil.to_millis(avg_val)

def _open_connect_with_retry(self, connect_func, conn_str: str, props: Properties):
def _open_connect_with_retry(self, connect_func, connect_params: Dict[str, Any], props: Properties):
connection_attempts: int = 0
conn: Optional[Connection] = None
while conn is None and connection_attempts < 10:
try:
conn = AwsWrapperConnection.connect(
connect_func,
conn_str,
**connect_params,
**props)
except Exception as e:
TestPerformance.logger.debug("OpenConnectionFailed", str(e))
connection_attempts += 1

if conn is None:
pytest.fail(f"Unable to connect to {conn_str}")
pytest.fail(f"Unable to connect to {connect_params}")
return conn

def _stop_network_thread(self, test_environment: TestEnvironment, sleep_delay_seconds: int, downtime: AtomicInt):
Expand Down
Loading