diff --git a/django_prometheus/db/__init__.py b/django_prometheus/db/__init__.py index cca58c0f..a4f4c20d 100644 --- a/django_prometheus/db/__init__.py +++ b/django_prometheus/db/__init__.py @@ -7,6 +7,9 @@ execute_many_total, execute_total, query_duration_seconds, + aws_failover_success_total, + aws_failover_failed_total, + aws_transaction_resolution_unknown_total, ) __all__ = [ @@ -17,4 +20,7 @@ "execute_many_total", "execute_total", "query_duration_seconds", + "aws_failover_success_total", + "aws_failover_failed_total", + "aws_transaction_resolution_unknown_total", ] diff --git a/django_prometheus/db/backends/README.md b/django_prometheus/db/backends/README.md index ef53478b..53a1792c 100644 --- a/django_prometheus/db/backends/README.md +++ b/django_prometheus/db/backends/README.md @@ -1,3 +1,109 @@ +# Database Backends + +This directory contains Django database backends with Prometheus metrics integration. + +## Available Backends + +### Standard Backends + +- **postgresql/** - PostgreSQL backend with Prometheus metrics +- **mysql/** - MySQL backend with Prometheus metrics +- **sqlite3/** - SQLite3 backend with Prometheus metrics +- **postgis/** - PostGIS (PostgreSQL + GIS) backend with Prometheus metrics +- **spatialite/** - SpatiaLite (SQLite + GIS) backend with Prometheus metrics + +### Enhanced Backends + +- **postgresql_aws/** - PostgreSQL backend with AWS Advanced Python Wrapper integration + +## PostgreSQL AWS Backend + +The `postgresql_aws` backend extends the standard PostgreSQL backend with AWS Advanced Python Wrapper integration, providing automatic failover capabilities for Amazon RDS clusters while maintaining comprehensive Prometheus metrics collection. + +### Features + +- **Automatic Failover**: Seamlessly handles RDS cluster failovers using AWS Advanced Python Wrapper +- **Prometheus Metrics**: Collects all standard database metrics plus AWS-specific failover metrics +- **Connection Monitoring**: Built-in health checks and connection monitoring +- **Query Retry**: Automatically retries queries after successful failover +- **Error Handling**: Proper handling for failed failovers and transaction resolution issues + +### AWS-Specific Metrics + +The backend adds these additional Prometheus metrics: + +- `django_db_aws_failover_success_total` - Counter of successful database failovers +- `django_db_aws_failover_failed_total` - Counter of failed database failovers +- `django_db_aws_transaction_resolution_unknown_total` - Counter of transactions with unknown resolution status + +### Usage + +```python +DATABASES = { + 'default': { + 'ENGINE': 'django_prometheus.db.backends.postgresql_aws', + 'HOST': 'database.cluster-xyz.us-east-1.rds.amazonaws.com', + 'NAME': 'mydb', + 'USER': 'myuser', + 'PASSWORD': 'mypassword', + 'PORT': '5432', + 'OPTIONS': { + 'aws_plugins': 'failover,host_monitoring', # AWS wrapper plugins + 'connect_timeout': 30, # Connection timeout in seconds + 'socket_timeout': 30, # Socket timeout in seconds + # Additional psycopg connection options can be added here + }, + } +} +``` + +### Prerequisites + +1. Install the AWS Advanced Python Wrapper: + ```bash + pip install aws-advanced-python-wrapper + ``` + +2. Configure your RDS cluster for failover (reader/writer endpoints) + +3. Ensure proper IAM permissions for RDS cluster access + +### Configuration Options + +| Option | Default | Description | +|--------|---------|-------------| +| `aws_plugins` | `'failover,host_monitoring'` | Comma-separated list of AWS wrapper plugins | +| `connect_timeout` | `30` | Connection timeout in seconds | +| `socket_timeout` | `30` | Socket timeout in seconds | + +### Monitoring + +The backend automatically logs failover events and metrics. Monitor these key indicators: + +- Connection success/failure rates +- Failover frequency and success rates +- Query execution times during normal operation vs. failover +- Transaction resolution status + +### Best Practices + +1. **Connection Pooling**: Use with Django's database connection pooling +2. **Health Checks**: Monitor the failover metrics to detect cluster issues +3. **Timeout Configuration**: Tune timeout values based on your application requirements +4. **Testing**: Test failover scenarios in a staging environment +5. **Monitoring**: Set up alerts for failover events and failures + +### Troubleshooting + +- **ImportError**: Ensure `aws-advanced-python-wrapper` is installed +- **Connection Issues**: Verify RDS cluster configuration and IAM permissions +- **Slow Queries**: Monitor query duration metrics during failover events +- **Transaction Issues**: Check transaction resolution unknown metrics for application logic issues + +For more information, see the [AWS Advanced Python Wrapper documentation](https://github.com/aws/aws-advanced-python-wrapper). + +--- + # Adding new database wrapper types Unfortunately, I don't have the resources to create wrappers for all diff --git a/django_prometheus/db/backends/postgresql_aws/__init__.py b/django_prometheus/db/backends/postgresql_aws/__init__.py new file mode 100644 index 00000000..8488af18 --- /dev/null +++ b/django_prometheus/db/backends/postgresql_aws/__init__.py @@ -0,0 +1,30 @@ +"""PostgreSQL database backend with AWS Advanced Python Wrapper integration. + +This backend provides automatic failover capabilities for Amazon RDS clusters +while maintaining comprehensive Prometheus metrics collection. + +Usage in Django settings: + + DATABASES = { + 'default': { + 'ENGINE': 'django_prometheus.db.backends.postgresql_aws', + 'HOST': 'database.cluster-xyz.us-east-1.rds.amazonaws.com', + 'NAME': 'mydb', + 'USER': 'myuser', + 'PASSWORD': 'mypassword', + 'PORT': '5432', + 'OPTIONS': { + 'aws_plugins': 'failover,host_monitoring', + 'connect_timeout': 30, + 'socket_timeout': 30, + }, + } + } + +The backend automatically handles: +- Database failover for RDS clusters +- Connection monitoring and health checks +- Prometheus metrics for all database operations +- Query retry on successful failover +- Proper error handling for failed failovers +""" \ No newline at end of file diff --git a/django_prometheus/db/backends/postgresql_aws/base.py b/django_prometheus/db/backends/postgresql_aws/base.py new file mode 100644 index 00000000..15c86cf5 --- /dev/null +++ b/django_prometheus/db/backends/postgresql_aws/base.py @@ -0,0 +1,170 @@ +import logging + +from django.core.exceptions import ImproperlyConfigured +from django.db.backends.postgresql import base +from django.db.backends.postgresql.base import Cursor + +from django_prometheus.db import ( + aws_failover_failed_total, + aws_failover_success_total, + aws_transaction_resolution_unknown_total, + connection_errors_total, + connections_total, + errors_total, + execute_many_total, + execute_total, + query_duration_seconds, +) +from django_prometheus.db.common import DatabaseWrapperMixin, ExceptionCounterByType + +try: + import psycopg + from aws_advanced_python_wrapper import AwsWrapperConnection + from aws_advanced_python_wrapper.errors import ( + FailoverFailedError, + FailoverSuccessError, + TransactionResolutionUnknownError, + ) +except ImportError as e: + raise ImproperlyConfigured( + "AWS Advanced Python Wrapper is required for this backend. " + "Install it with: pip install aws-advanced-python-wrapper" + ) from e + +logger = logging.getLogger(__name__) + + +class AwsPrometheusCursor(Cursor): + def __init__(self, connection, alias, vendor): + super().__init__(connection) + self.alias = alias + self.vendor = vendor + self._labels = {"alias": alias, "vendor": vendor} + + def execute(self, sql, params=None): + execute_total.labels(self.alias, self.vendor).inc() + with ( + query_duration_seconds.labels(**self._labels).time(), + ExceptionCounterByType(errors_total, extra_labels=self._labels), + ): + return self._execute_with_failover_handling(sql, params) + + def executemany(self, sql, param_list): + param_count = len(param_list) if param_list else 0 + execute_total.labels(self.alias, self.vendor).inc(param_count) + execute_many_total.labels(self.alias, self.vendor).inc(param_count) + with ( + query_duration_seconds.labels(**self._labels).time(), + ExceptionCounterByType(errors_total, extra_labels=self._labels), + ): + return self._executemany_with_failover_handling(sql, param_list) + + def _execute_with_failover_handling(self, sql, params=None): + try: + return super().execute(sql, params) + except FailoverSuccessError: + logger.info("Database failover completed successfully, retrying query") + aws_failover_success_total.labels(self.alias, self.vendor).inc() + self._configure_session_state() + return super().execute(sql, params) + except FailoverFailedError as e: + logger.error("Database failover failed: %s", e) + aws_failover_failed_total.labels(self.alias, self.vendor).inc() + raise + except TransactionResolutionUnknownError as e: + logger.error("Transaction resolution unknown after failover: %s", e) + aws_transaction_resolution_unknown_total.labels(self.alias, self.vendor).inc() + raise + + def _executemany_with_failover_handling(self, sql, param_list): + try: + return super().executemany(sql, param_list) + except FailoverSuccessError: + logger.info("Database failover completed successfully, retrying executemany") + aws_failover_success_total.labels(self.alias, self.vendor).inc() + self._configure_session_state() + return super().executemany(sql, param_list) + except FailoverFailedError as e: + logger.error("Database failover failed during executemany: %s", e) + aws_failover_failed_total.labels(self.alias, self.vendor).inc() + raise + except TransactionResolutionUnknownError as e: + logger.error("Transaction resolution unknown during executemany: %s", e) + aws_transaction_resolution_unknown_total.labels(self.alias, self.vendor).inc() + raise + + def _configure_session_state(self): + pass + + +class DatabaseWrapper(DatabaseWrapperMixin, base.DatabaseWrapper): + def __init__(self, settings_dict, alias=None): + super().__init__(settings_dict, alias) + options = self.settings_dict.get("OPTIONS", {}) + self.aws_plugins = options.get("aws_plugins", "failover,host_monitoring") + self.connect_timeout = options.get("connect_timeout", 30) + self.socket_timeout = options.get("socket_timeout", 30) + + def get_new_connection(self, conn_params): + connections_total.labels(self.alias, self.vendor).inc() + try: + host = conn_params.get("host", "localhost") + port = conn_params.get("port", 5432) + database = conn_params.get("database", "") + user = conn_params.get("user", "") + password = conn_params.get("password", "") + options = conn_params.get("options", {}) + + connection = AwsWrapperConnection.connect( + psycopg.Connection.connect, + host=host, + port=port, + dbname=database, + user=user, + password=password, + plugins=self.aws_plugins, + connect_timeout=self.connect_timeout, + socket_timeout=self.socket_timeout, + autocommit=False, + **options, + ) + + connection.cursor_factory = lambda conn: AwsPrometheusCursor(conn, self.alias, self.vendor) + logger.info("Successfully created AWS wrapper connection to %s:%s", host, port) + return connection + + except Exception as e: + connection_errors_total.labels(self.alias, self.vendor).inc() + logger.error("Failed to create AWS wrapper connection: %s", e) + raise + + def create_cursor(self, name=None): + if name: + cursor = self.connection.cursor(name=name) + else: + cursor = self.connection.cursor() + return AwsPrometheusCursor(cursor.connection, self.alias, self.vendor) + + def _close(self): + if self.connection is not None: + try: + self.connection.close() + except Exception as e: + logger.warning("Error closing AWS wrapper connection: %s", e) + + def is_usable(self): + try: + with self.connection.cursor() as cursor: + cursor.execute("SELECT 1") + return True + except Exception as e: + logger.warning("Connection is not usable: %s", e) + return False + + def ensure_connection(self): + if self.connection is None: + self.connect() + elif not self.is_usable(): + logger.info("Connection is not usable, reconnecting...") + self.close() + self.connect() diff --git a/django_prometheus/db/metrics.py b/django_prometheus/db/metrics.py index 4eb3dab9..561d5c79 100644 --- a/django_prometheus/db/metrics.py +++ b/django_prometheus/db/metrics.py @@ -46,3 +46,25 @@ buckets=PROMETHEUS_LATENCY_BUCKETS, namespace=NAMESPACE, ) + +# AWS Advanced Wrapper specific metrics +aws_failover_success_total = Counter( + "django_db_aws_failover_success_total", + "Counter of successful AWS database failovers by database and vendor.", + ["alias", "vendor"], + namespace=NAMESPACE, +) + +aws_failover_failed_total = Counter( + "django_db_aws_failover_failed_total", + "Counter of failed AWS database failovers by database and vendor.", + ["alias", "vendor"], + namespace=NAMESPACE, +) + +aws_transaction_resolution_unknown_total = Counter( + "django_db_aws_transaction_resolution_unknown_total", + "Counter of AWS database transactions with unknown resolution status.", + ["alias", "vendor"], + namespace=NAMESPACE, +) diff --git a/django_prometheus/tests/test_postgresql_aws_backend.py b/django_prometheus/tests/test_postgresql_aws_backend.py new file mode 100644 index 00000000..2c5d4447 --- /dev/null +++ b/django_prometheus/tests/test_postgresql_aws_backend.py @@ -0,0 +1,240 @@ +""" +Tests for PostgreSQL AWS backend integration. +""" + +import unittest +from unittest.mock import patch, MagicMock + +from django.test import TestCase +from django.core.exceptions import ImproperlyConfigured + +from django_prometheus.db import ( + connections_total, + connection_errors_total, + aws_failover_success_total, + aws_failover_failed_total, + aws_transaction_resolution_unknown_total, +) + + +class PostgreSQLAWSBackendTest(TestCase): + """Test the PostgreSQL AWS backend.""" + + def setUp(self): + """Set up test fixtures.""" + self.database_config = { + 'ENGINE': 'django_prometheus.db.backends.postgresql_aws', + 'HOST': 'test-cluster.cluster-xyz.us-east-1.rds.amazonaws.com', + 'NAME': 'testdb', + 'USER': 'testuser', + 'PASSWORD': 'testpass', + 'PORT': '5432', + 'OPTIONS': { + 'aws_plugins': 'failover,host_monitoring', + 'connect_timeout': 30, + 'socket_timeout': 30, + }, + } + + @patch('django_prometheus.db.backends.postgresql_aws.base.psycopg') + @patch('django_prometheus.db.backends.postgresql_aws.base.AwsWrapperConnection') + def test_import_backend(self, mock_aws_wrapper, mock_psycopg): + """Test that the backend can be imported without AWS wrapper.""" + try: + from django_prometheus.db.backends.postgresql_aws.base import DatabaseWrapper + self.assertIsNotNone(DatabaseWrapper) + except ImportError: + self.fail("Backend import should not fail when AWS wrapper is available") + + def test_import_error_without_aws_wrapper(self): + """Test that backend raises ImproperlyConfigured without AWS wrapper.""" + with patch.dict('sys.modules', {'aws_advanced_python_wrapper': None}): + with self.assertRaises(ImproperlyConfigured) as cm: + from django_prometheus.db.backends.postgresql_aws.base import DatabaseWrapper + self.assertIn("AWS Advanced Python Wrapper is required", str(cm.exception)) + + @patch('django_prometheus.db.backends.postgresql_aws.base.psycopg') + @patch('django_prometheus.db.backends.postgresql_aws.base.AwsWrapperConnection') + def test_database_wrapper_initialization(self, mock_aws_wrapper, mock_psycopg): + """Test DatabaseWrapper initialization with configuration options.""" + from django_prometheus.db.backends.postgresql_aws.base import DatabaseWrapper + + wrapper = DatabaseWrapper(self.database_config, alias='default') + + self.assertEqual(wrapper.aws_plugins, 'failover,host_monitoring') + self.assertEqual(wrapper.connect_timeout, 30) + self.assertEqual(wrapper.socket_timeout, 30) + + @patch('django_prometheus.db.backends.postgresql_aws.base.psycopg') + @patch('django_prometheus.db.backends.postgresql_aws.base.AwsWrapperConnection') + def test_default_configuration(self, mock_aws_wrapper, mock_psycopg): + """Test DatabaseWrapper with default configuration.""" + from django_prometheus.db.backends.postgresql_aws.base import DatabaseWrapper + + config_without_options = self.database_config.copy() + config_without_options.pop('OPTIONS') + + wrapper = DatabaseWrapper(config_without_options, alias='default') + + self.assertEqual(wrapper.aws_plugins, 'failover,host_monitoring') + self.assertEqual(wrapper.connect_timeout, 30) + self.assertEqual(wrapper.socket_timeout, 30) + + @patch('django_prometheus.db.backends.postgresql_aws.base.logger') + @patch('django_prometheus.db.backends.postgresql_aws.base.psycopg') + @patch('django_prometheus.db.backends.postgresql_aws.base.AwsWrapperConnection') + def test_connection_creation_success(self, mock_aws_wrapper, mock_psycopg, mock_logger): + """Test successful connection creation.""" + from django_prometheus.db.backends.postgresql_aws.base import DatabaseWrapper + + # Mock successful connection + mock_connection = MagicMock() + mock_aws_wrapper.connect.return_value = mock_connection + + wrapper = DatabaseWrapper(self.database_config, alias='default') + + conn_params = { + 'host': 'test-cluster.cluster-xyz.us-east-1.rds.amazonaws.com', + 'port': 5432, + 'database': 'testdb', + 'user': 'testuser', + 'password': 'testpass', + 'options': {} + } + + # Record initial metric value + initial_connections = connections_total.labels('default', 'postgresql')._value.get() or 0 + + connection = wrapper.get_new_connection(conn_params) + + # Verify connection was created + self.assertIsNotNone(connection) + mock_aws_wrapper.connect.assert_called_once() + + # Verify metrics were updated + final_connections = connections_total.labels('default', 'postgresql')._value.get() + self.assertEqual(final_connections, initial_connections + 1) + + # Verify logging + mock_logger.info.assert_called_with( + "Successfully created AWS wrapper connection to test-cluster.cluster-xyz.us-east-1.rds.amazonaws.com:5432" + ) + + @patch('django_prometheus.db.backends.postgresql_aws.base.logger') + @patch('django_prometheus.db.backends.postgresql_aws.base.psycopg') + @patch('django_prometheus.db.backends.postgresql_aws.base.AwsWrapperConnection') + def test_connection_creation_failure(self, mock_aws_wrapper, mock_psycopg, mock_logger): + """Test connection creation failure.""" + from django_prometheus.db.backends.postgresql_aws.base import DatabaseWrapper + + # Mock connection failure + mock_aws_wrapper.connect.side_effect = Exception("Connection failed") + + wrapper = DatabaseWrapper(self.database_config, alias='default') + + conn_params = { + 'host': 'test-cluster.cluster-xyz.us-east-1.rds.amazonaws.com', + 'port': 5432, + 'database': 'testdb', + 'user': 'testuser', + 'password': 'testpass', + 'options': {} + } + + # Record initial metric values + initial_connections = connections_total.labels('default', 'postgresql')._value.get() or 0 + initial_errors = connection_errors_total.labels('default', 'postgresql')._value.get() or 0 + + with self.assertRaises(Exception): + wrapper.get_new_connection(conn_params) + + # Verify metrics were updated + final_connections = connections_total.labels('default', 'postgresql')._value.get() + final_errors = connection_errors_total.labels('default', 'postgresql')._value.get() + + self.assertEqual(final_connections, initial_connections + 1) # Connection attempt counted + self.assertEqual(final_errors, initial_errors + 1) # Error counted + + # Verify error logging + mock_logger.error.assert_called_with("Failed to create AWS wrapper connection: Connection failed") + + @patch('django_prometheus.db.backends.postgresql_aws.base.psycopg') + @patch('django_prometheus.db.backends.postgresql_aws.base.AwsWrapperConnection') + def test_failover_success_metrics(self, mock_aws_wrapper, mock_psycopg): + """Test that failover success metrics are recorded.""" + from django_prometheus.db.backends.postgresql_aws.base import AwsPrometheusCursor + from aws_advanced_python_wrapper.errors import FailoverSuccessError + + # Create cursor instance + mock_connection = MagicMock() + cursor = AwsPrometheusCursor(mock_connection, 'default', 'postgresql') + + # Mock the parent execute to raise FailoverSuccessError on first call + with patch.object(cursor.__class__.__bases__[0], 'execute') as mock_execute: + mock_execute.side_effect = [FailoverSuccessError(), None] # Fail then succeed + + # Record initial metric value + initial_failovers = aws_failover_success_total.labels('default', 'postgresql')._value.get() or 0 + + # Execute query + cursor.execute("SELECT 1") + + # Verify metrics were updated + final_failovers = aws_failover_success_total.labels('default', 'postgresql')._value.get() + self.assertEqual(final_failovers, initial_failovers + 1) + + @patch('django_prometheus.db.backends.postgresql_aws.base.psycopg') + @patch('django_prometheus.db.backends.postgresql_aws.base.AwsWrapperConnection') + def test_failover_failed_metrics(self, mock_aws_wrapper, mock_psycopg): + """Test that failover failure metrics are recorded.""" + from django_prometheus.db.backends.postgresql_aws.base import AwsPrometheusCursor + from aws_advanced_python_wrapper.errors import FailoverFailedError + + # Create cursor instance + mock_connection = MagicMock() + cursor = AwsPrometheusCursor(mock_connection, 'default', 'postgresql') + + # Mock the parent execute to raise FailoverFailedError + with patch.object(cursor.__class__.__bases__[0], 'execute') as mock_execute: + mock_execute.side_effect = FailoverFailedError("Failover failed") + + # Record initial metric value + initial_failures = aws_failover_failed_total.labels('default', 'postgresql')._value.get() or 0 + + # Execute query and expect exception + with self.assertRaises(FailoverFailedError): + cursor.execute("SELECT 1") + + # Verify metrics were updated + final_failures = aws_failover_failed_total.labels('default', 'postgresql')._value.get() + self.assertEqual(final_failures, initial_failures + 1) + + @patch('django_prometheus.db.backends.postgresql_aws.base.psycopg') + @patch('django_prometheus.db.backends.postgresql_aws.base.AwsWrapperConnection') + def test_transaction_resolution_unknown_metrics(self, mock_aws_wrapper, mock_psycopg): + """Test that transaction resolution unknown metrics are recorded.""" + from django_prometheus.db.backends.postgresql_aws.base import AwsPrometheusCursor + from aws_advanced_python_wrapper.errors import TransactionResolutionUnknownError + + # Create cursor instance + mock_connection = MagicMock() + cursor = AwsPrometheusCursor(mock_connection, 'default', 'postgresql') + + # Mock the parent execute to raise TransactionResolutionUnknownError + with patch.object(cursor.__class__.__bases__[0], 'execute') as mock_execute: + mock_execute.side_effect = TransactionResolutionUnknownError("Unknown transaction state") + + # Record initial metric value + initial_unknown = aws_transaction_resolution_unknown_total.labels('default', 'postgresql')._value.get() or 0 + + # Execute query and expect exception + with self.assertRaises(TransactionResolutionUnknownError): + cursor.execute("SELECT 1") + + # Verify metrics were updated + final_unknown = aws_transaction_resolution_unknown_total.labels('default', 'postgresql')._value.get() + self.assertEqual(final_unknown, initial_unknown + 1) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file