diff --git a/.gitignore b/.gitignore
index 59045e2..1ae5b93 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,6 +21,12 @@ var/
*.egg
.eggs/
+# Virtualenv
+.venv/
+
+# Pytest cache
+.pytest_cache/
+
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..26d3352
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,3 @@
+# Default ignored files
+/shelf/
+/workspace.xml
diff --git a/.idea/MongoDBProxy.iml b/.idea/MongoDBProxy.iml
new file mode 100644
index 0000000..fc3b1c4
--- /dev/null
+++ b/.idea/MongoDBProxy.iml
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..105ce2d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..707ea97
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..eac9046
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/MANIFEST.in b/MANIFEST.in
index ffb8823..e7d1914 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -1 +1,3 @@
include *.txt *.md *.rst
+include LICENSE
+include NOTICE
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..c5b768b
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,14 @@
+MongoDBProxy
+Copyright 2013 Gustav Arngarden
+
+This product includes software developed by
+Gustav Arngarden (https://github.com/arngarden).
+
+==========================================================================
+
+This work is a derivative of the original MongoDBProxy.
+Modifications to support modern Python 3 and PyMongo 4 versions,
+along with a comprehensive test suite and other enhancements,
+were made by Martin Alge (https://github.com/Alge).
+
+Copyright 2025 Martin Alge
diff --git a/README.rst b/README.rst
index fe50581..1f7a471 100644
--- a/README.rst
+++ b/README.rst
@@ -1,30 +1,36 @@
MongoDBProxy
============
-MongoDBProxy is used to create a proxy around a MongoDB-connection in order to
-automatically handle AutoReconnect-exceptions. You use MongoDBProxy in the
-same way you would an ordinary MongoDB-connection but don't need to worry about
+MongoDBProxy is used to create a proxy around a MongoDB connection in order to
+automatically handle AutoReconnect exceptions. You use MongoDBProxy in the
+same way you would an ordinary MongoDB connection but don't need to worry about
handling AutoReconnects by yourself.
Usage::
>>> import pymongo
- >>> import mongo_proxy
- >>> safe_conn = mongo_proxy.MongoProxy(pymongo.MongoReplicaSetClient(replicaSet='blog_rs')
- >>> safe_conn.blogs.posts.insert(post) # Automatically handles AutoReconnect.
+ >>> from mongo_proxy import MongoProxy
+ >>>
+ >>> client = pymongo.MongoClient(replicaSet='blog_rs')
+ >>> safe_conn = MongoProxy(client)
+ >>> safe_conn.blogs.posts.insert_one({'some': 'post'}) # Automatically handles AutoReconnect.
+Fork Information
+----------------
-**See here for more details:**
-``_
-
-**Contributors**:
-
-- Jonathan Kamens (``_)
-- Michael Cetrulo (``_)
-- Richard Frank (``_)
-- David Lindquist (``_)
+This is a modernized fork of Gustav Arngarden's original MongoDBProxy. The primary goal of this version is to provide a stable, well-tested proxy compatible with modern Python 3 environments while maintaining support for legacy MongoDB databases.
Installation
------------
-pip3 install MongoDBProxy-official
+To install the package with its testing dependencies, run:
+
+ pip install -e .[test]
+
+
+Compatibility
+--------------
+
+This library is compatible with **Python 3.6+** and **PyMongo 3.12+** (version < 4.0).
+
+This focus on PyMongo 3.x is a deliberate choice to ensure compatibility with older MongoDB server versions, such as MongoDB 3.4, which are not supported by PyMongo 4.x.
diff --git a/example.py b/example.py
new file mode 100644
index 0000000..6db84f5
--- /dev/null
+++ b/example.py
@@ -0,0 +1,87 @@
+import logging
+import time
+from pymongo import MongoClient
+from mongo_proxy import MongoProxy
+
+
+def main():
+ # --- Configuration ---
+ # Define your MongoDB replica set nodes here
+ mongo_nodes = [
+ "192.168.0.113:27017",
+ "192.168.0.113:27018",
+ "192.168.0.113:27019",
+ ]
+ # Define your replica set name
+ replica_set_name = "rs0"
+ # --- End Configuration ---
+
+ # Configure basic logging to capture WARNING and above
+ logging.basicConfig(
+ level=logging.WARNING,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+ )
+
+ # Enable DEBUG logging specifically for the mongo_proxy module
+ proxy_logger = logging.getLogger("mongo_proxy")
+ proxy_logger.setLevel(logging.DEBUG)
+
+ # Also enable INFO level for our script
+ script_logger = logging.getLogger(__name__)
+ script_logger.setLevel(logging.INFO)
+
+ # Add a console handler to our script logger to see our messages
+ if not script_logger.handlers:
+ handler = logging.StreamHandler()
+ handler.setLevel(logging.INFO)
+ formatter = logging.Formatter(
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ handler.setFormatter(formatter)
+ script_logger.addHandler(handler)
+ script_logger.propagate = False
+
+ # Connect to the MongoDB replica set using the list of nodes
+ script_logger.info(
+ f"Attempting to connect to replica set '{replica_set_name}' with nodes: {mongo_nodes}")
+ client = MongoClient(mongo_nodes, replicaSet=replica_set_name)
+
+ # Test the connection
+ try:
+ # The ismaster command is cheap and does not require auth.
+ client.admin.command('ismaster')
+ script_logger.info("✓ Successfully connected to MongoDB replica set")
+ except Exception as e:
+ script_logger.error(f"✗ Failed to connect: {e}")
+ return
+
+ # Create MongoProxy - this handles AutoReconnect automatically
+ proxy = MongoProxy(client)
+
+ collection = proxy.testdb.mycollection
+
+ script_logger.info("MongoProxy example started.")
+ script_logger.info(
+ "Try: rs.stepDown() in the mongo shell to trigger a failover and see reconnection logs.")
+ script_logger.info("Press Ctrl+C to exit.\n")
+
+ counter = 0
+ while True:
+ try:
+ counter += 1
+
+ # Simple database operation
+ doc = {"counter": counter, "timestamp": time.time()}
+ result = collection.insert_one(doc)
+ script_logger.info(f"[{counter}] Inserted document: {result.inserted_id}")
+
+ except KeyboardInterrupt:
+ script_logger.info("\nExiting...")
+ break
+ except Exception as e:
+ script_logger.error(f"[{counter}] An error occurred: {e}")
+
+ time.sleep(3)
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/manual_tests.py b/manual_tests.py
new file mode 100644
index 0000000..8a1083f
--- /dev/null
+++ b/manual_tests.py
@@ -0,0 +1,147 @@
+import logging
+import time
+import random
+from pymongo import MongoClient, errors
+from mongo_proxy import MongoProxy, DurableCursor
+
+# --- ‼️ IMPORTANT CONFIGURATION ‼️ ---
+# Change this to the connection string for your replica set.
+MONGO_URI = "mongodb://192.168.0.116:27017,192.168.0.116:27018,192.168.0.116:27019/"
+REPLICA_SET = "rs0" # Change this to your replica set name.
+# --- End Configuration ---
+
+
+def setup_logging():
+ """Configures logging to show detailed info from the proxy."""
+ logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+ )
+ logging.getLogger("mongo_proxy").setLevel(logging.DEBUG)
+ # Silence the noisy pymongo connection pool logs for this test
+ logging.getLogger("pymongo.pool").setLevel(logging.WARNING)
+
+
+def test_writer_resilience():
+ # ... (this function remains the same and is correct)
+ print("\n--- 🚀 Starting Write Resilience Test ---")
+ print("This test will insert a new document every 4 seconds.")
+ print("While it's running, perform actions on your MongoDB cluster, such as:")
+ print(" - Use `rs.stepDown()` in mongosh to trigger a failover.")
+ print(" - Stop the primary node's container (`docker stop ...`).")
+ print("Watch the output to see the proxy handle the errors and reconnect.\n")
+
+ client = MongoClient(MONGO_URI, replicaSet=REPLICA_SET, serverSelectionTimeoutMS=5000)
+ proxy = MongoProxy(client)
+ collection = proxy.testdb.manual_test_collection
+
+ try:
+ collection.drop()
+ print("✓ Dropped old test collection.")
+ except Exception as e:
+ print(f"✗ Could not drop collection (this is okay on first run): {e}")
+
+ counter = 0
+ while True:
+ try:
+ counter += 1
+ doc = {"counter": counter, "time": time.ctime()}
+ result = collection.insert_one(doc)
+ print(f"[{counter}] ✓ Inserted document: {result.inserted_id}")
+
+ except KeyboardInterrupt:
+ print("\n--- 🛑 Test stopped by user. ---")
+ break
+ except Exception as e:
+ print(f"[{counter}] ✗ An error occurred: {type(e).__name__} - {e}")
+ print(f"[{counter}] ⏳ Proxy will now attempt to reconnect...")
+
+ time.sleep(4)
+
+
+def test_durable_cursor():
+ """
+ Tests the DurableCursor's ability to survive a failover mid-iteration
+ without losing its place.
+ """
+ print("\n--- 🚀 Starting Durable Cursor Test ---")
+
+ client = MongoClient(MONGO_URI, replicaSet=REPLICA_SET)
+ setup_collection = client.testdb.durable_cursor_test
+
+ # ** THE FIX: Use a number of documents GREATER than the default batch size (101). **
+ num_docs = 300
+ failover_point = 105 # A point safely after the first batch is exhausted.
+
+ print(f"Setting up the collection with {num_docs} documents...")
+ setup_collection.drop()
+ setup_collection.insert_many([{'doc_num': i} for i in range(1, num_docs + 1)])
+ print("✓ Collection setup complete.")
+
+ print(f"\nStarting slow iteration (1 doc every 2 seconds).")
+ print("\n‼️ WATCH THE COUNTER ‼️")
+ print(f"TRIGGER A FAILOVER AFTER YOU SEE DOCUMENT #{failover_point} IS RETRIEVED.")
+ print("This guarantees the next fetch will require a network call.\n")
+
+ proxy = MongoProxy(client)
+ proxied_collection = proxy.testdb.durable_cursor_test
+
+ retrieved_docs = []
+ try:
+ # We still use batch_size=1 to be explicit, but the number of documents is the key.
+ durable_cursor = DurableCursor(
+ proxied_collection,
+ sort=[('doc_num', 1)],
+ batch_size=1
+ )
+
+ for doc in durable_cursor:
+ print(f"[Cursor] -> Retrieved document {doc['doc_num']}/{num_docs}", end='\r')
+ retrieved_docs.append(doc['doc_num'])
+
+ if doc['doc_num'] == failover_point:
+ print(f"\n[!] NOW IS A GOOD TIME TO TRIGGER THE FAILOVER (`rs.stepDown()`) [!]")
+
+ time.sleep(2)
+
+ print(f"\n\n--- ✅ Test Complete ---")
+ if len(retrieved_docs) == num_docs and sorted(retrieved_docs) == list(range(1, num_docs + 1)):
+ print(f"🎉 SUCCESS! All {num_docs} documents were retrieved in order without duplicates.")
+ else:
+ print(f"🔥 FAILURE! Expected {num_docs} unique documents, but got {len(retrieved_docs)}.")
+ print(f" The first few retrieved documents: {sorted(retrieved_docs)[:20]}...")
+
+ except KeyboardInterrupt:
+ print("\n--- 🛑 Test stopped by user. ---")
+ except Exception as e:
+ print(f"\n--- ✗ TEST FAILED WITH AN UNEXPECTED ERROR ---")
+ print(f"Error: {type(e).__name__} - {e}")
+ import traceback
+ traceback.print_exc()
+
+
+def main():
+ setup_logging()
+ print("========================================")
+ print(" MongoDBProxy Manual Test Suite")
+ print("========================================")
+
+ while True:
+ print("\nChoose a test to run:")
+ print(" [1] Write Resilience Test (Failover/Outage)")
+ print(" [2] Durable Cursor Resilience Test")
+ print(" [q] Quit")
+
+ choice = input("> ")
+
+ if choice == '1':
+ test_writer_resilience()
+ elif choice == '2':
+ test_durable_cursor()
+ elif choice.lower() == 'q':
+ break
+ else:
+ print("Invalid choice, please try again.")
+
+if __name__ == "__main__":
+ main()
diff --git a/mongo_proxy/__init__.py b/mongo_proxy/__init__.py
index b23a6cd..be0e1e5 100644
--- a/mongo_proxy/__init__.py
+++ b/mongo_proxy/__init__.py
@@ -1,10 +1,8 @@
from .mongodb_proxy import MongoProxy
from .durable_cursor import DurableCursor, MongoReconnectFailure
-from .pymongo3_durable_cursor import PyMongo3DurableCursor
__all__ = [
'MongoProxy',
'DurableCursor',
'MongoReconnectFailure',
- 'PyMongo3DurableCursor',
-]
+]
\ No newline at end of file
diff --git a/mongo_proxy/durable_cursor.py b/mongo_proxy/durable_cursor.py
index 9cc135d..469bfbb 100644
--- a/mongo_proxy/durable_cursor.py
+++ b/mongo_proxy/durable_cursor.py
@@ -1,5 +1,6 @@
"""
Copyright 2015 Quantopian Inc.
+Copyright 2025 Martin Alge
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,12 +17,16 @@
import logging
import time
+from pymongo.cursor import CursorType
from pymongo.errors import (
AutoReconnect,
CursorNotFound,
ExecutionTimeout,
OperationFailure,
WTimeoutError,
+ NotPrimaryError,
+ NetworkTimeout,
+ ServerSelectionTimeoutError
)
# How long we are willing to attempt to reconnect when the replicaset
@@ -30,62 +35,47 @@
MAX_SLEEP = 5
RECONNECT_INITIAL_DELAY = 1
RETRYABLE_OPERATION_FAILURE_CLASSES = (
- AutoReconnect, # AutoReconnect is raised when the primary node fails
+ AutoReconnect,
CursorNotFound,
ExecutionTimeout,
WTimeoutError,
+ NetworkTimeout,
+ ServerSelectionTimeoutError,
)
+ALL_RETRYABLE_EXCEPTIONS = RETRYABLE_OPERATION_FAILURE_CLASSES + (OperationFailure,)
+log = logging.getLogger(__name__)
class MongoReconnectFailure(Exception):
- """
- Exception raised when we fail AutoReconnect more than
- the allowed number of times.
- """
pass
class DurableCursor(object):
- """
- Wrapper class around a pymongo cursor that detects and handles
- replica set failovers and cursor timeouts. Upon successful
- reconnect this class automatically skips over previously returned
- records, resuming iteration as though no error occurred.
- """
-
- # Replace this or override it in a subclass for different logging.
- logger = logging.getLogger(__name__)
+ logger = log
def __init__(
self,
collection,
- spec=None,
- fields=None,
+ filter=None,
+ projection=None,
sort=None,
- slave_okay=True,
hint=None,
tailable=False,
- max_reconnect_time=MAX_RECONNECT_TIME,
- initial_reconnect_interval=RECONNECT_INITIAL_DELAY,
+ max_reconnect_time=60,
+ initial_reconnect_interval=1,
skip=0,
limit=0,
disconnect_on_timeout=True,
**kwargs):
self.collection = collection
- self.spec = spec
- self.fields = fields
+ self.filter = filter or {}
+ self.projection = projection
self.sort = sort
- self.slave_okay = slave_okay
self.hint = hint
self.tailable = tailable
-
- # The number of times we attempt to reconnect to a replica set.
self.max_reconnect_time = max_reconnect_time
-
- # The amount of time, in seconds, between reconnect attempts.
self.initial_reconnect_interval = initial_reconnect_interval
-
self.counter = self.skip = skip
self.limit = limit
self.disconnect_on_timeout = disconnect_on_timeout
@@ -98,49 +88,47 @@ def __iter__(self):
def fetch_cursor(self, count, cursor_kwargs):
"""
- Gets a cursor for the options set in the object.
-
- Used to both get the initial cursor and reloaded cursor.
-
- The difference between initial load and reload is the
- value of count.
- count is 0 on initial load,
- where as count > 0 is used during reload.
+ Gets a cursor for the options set in the object, using the
+ correct API for PyMongo 3.x.
"""
- limit_is_zero = False # as opposed to 0 meaning no limit
+ log.debug("DurableCursor: Entering fetch_cursor with count=%d, limit=%d, initial_skip=%d",
+ count, self.limit, self.skip)
+
+ limit_is_zero = False
if self.limit:
limit = self.limit - (count - self.skip)
+ log.debug("DurableCursor: fetch_cursor calculated new limit=%d", limit)
if limit <= 0:
limit = 1
limit_is_zero = True
else:
limit = 0
+ # For PyMongo 3.x, 'tailable' is controlled via cursor_type
+ cursor_type = CursorType.TAILABLE_AWAIT if self.tailable else CursorType.NON_TAILABLE
+
cursor = self.collection.find(
- spec=self.spec,
- fields=self.fields,
+ filter=self.filter,
+ projection=self.projection,
sort=self.sort,
- slave_okay=self.slave_okay,
- tailable=self.tailable,
skip=count,
limit=limit,
- hint=self.hint,
+ cursor_type=cursor_type,
**cursor_kwargs
)
+
+ # 'hint' is a separate method call on the cursor in PyMongo 3.x
+ if self.hint:
+ cursor.hint(self.hint)
+
if limit_is_zero:
- # we can't use 0, since that's no limit, so instead we set it to 1
- # and then move the cursor forward by one element here
next(cursor, None)
+
+ log.debug("DurableCursor: fetch_cursor returning new cursor.")
return cursor
def reload_cursor(self):
- """
- Reload our internal pymongo cursor with a new query. Use
- self.counter to skip the records we've already
- streamed. Assuming the database remains unchanged we should be
- able to call this method as many times as we want without
- affecting the events we stream.
- """
+ log.debug("DurableCursor: reload_cursor called. Current counter is %d.", self.counter)
self.cursor = self.fetch_cursor(self.counter, self.kwargs)
@property
@@ -148,90 +136,57 @@ def alive(self):
return self.tailable and self.cursor.alive
def __next__(self):
+ log.debug("DurableCursor: __next__ called. About to call _with_retry.")
next_record = self._with_retry(get_next=True, f=lambda: next(self.cursor))
- # Increment count before returning so we know how many records
- # to skip if a failure occurs later.
self.counter += 1
+ log.debug("DurableCursor: __next__ success. Counter is now %d.", self.counter)
return next_record
next = __next__
def _with_retry(self, get_next, f, *args, **kwargs):
try:
- next_record = f(*args, **kwargs)
- except RETRYABLE_OPERATION_FAILURE_CLASSES as exc:
- self.logger.info(
- "Got {!r}; attempting recovery. The query spec was: {}"
- .format(exc, self.spec)
- )
- # Try to reload the cursor and continue where we left off
- next_record = self.try_reconnect(get_next=get_next)
- self.logger.info("Cursor reload after {!r} successful."
- .format(exc))
-
- except OperationFailure as exc:
- # No special subclass for this:
- if 'interrupted at shutdown' in str(exc.args[0]):
- self.logger.info(
- "Got {!r}; attempting recovery. The query spec was: {}"
- .format(exc, self.spec)
+ return f(*args, **kwargs)
+ except ALL_RETRYABLE_EXCEPTIONS as exc:
+ log.warning("DurableCursor: _with_retry caught exception: %r", exc)
+
+ if isinstance(exc, OperationFailure):
+ is_retryable_op_failure = (
+ 'interrupted at shutdown' in str(exc.args[0]) or
+ exc.__class__ in RETRYABLE_OPERATION_FAILURE_CLASSES
)
- next_record = self.try_reconnect(get_next=get_next)
- self.logger.info("Cursor reload after {!r} successful."
- .format(exc))
- else:
- raise
+ if not is_retryable_op_failure:
+ log.error("DurableCursor: Unhandleable OperationFailure. Re-raising.")
+ raise
- return next_record
+ log.debug("DurableCursor: Exception is retryable. Calling try_reconnect.")
+ return self.try_reconnect(get_next=get_next)
def try_reconnect(self, get_next=True):
- """
- Attempt to reconnect to our collection after a replicaset failover.
- Returns a flag indicating whether the reconnect attempt was successful
- along with the next record to return if applicable. This should only
- be called when trying to recover from an AutoReconnect exception.
- """
- attempts = 0
- round = 1
+ log.debug("DurableCursor: Entered try_reconnect.")
start = time.time()
interval = self.initial_reconnect_interval
- disconnected = False
- max_time = self.max_reconnect_time
while True:
try:
- # Attempt to reload and get the next batch.
self.reload_cursor()
+ log.debug("DurableCursor: try_reconnect successfully reloaded cursor. Calling next().")
return next(self.cursor) if get_next else True
+ except RETRYABLE_OPERATION_FAILURE_CLASSES as e:
+ log.warning("DurableCursor: try_reconnect caught %r during inner loop.", e)
+ if time.time() - start > self.max_reconnect_time:
+ log.error('DurableCursor: Reconnect timed out.')
+ raise MongoReconnectFailure()
- # Replica set hasn't come online yet.
- except AutoReconnect:
- if time.time() - start > max_time:
- if not self.disconnect_on_timeout or disconnected:
- break
- self.cursor.collection.database.connection.disconnect()
- disconnected = True
- interval = self.initial_reconnect_interval
- round = 2
- attempts = 0
- max_time *= 2
- self.logger.warning('Resetting clock for round 2 after '
- 'disconnecting')
- delta = time.time() - start
- self.logger.warning(
- "AutoReconnecting, try %d.%d, (%.1f seconds elapsed)" %
- (round, attempts, delta))
- # Give the database time to reload between attempts.
+ log.debug("DurableCursor: Reconnecting... sleeping for %.1f seconds.", interval)
time.sleep(interval)
interval = min(interval * 2, MAX_SLEEP)
- attempts += 1
-
- self.logger.error('Replica set reconnect failed.')
- raise MongoReconnectFailure()
def count(self, with_limit_and_skip=False):
- return self._with_retry(
- get_next=False,
- f=self.cursor.count,
- with_limit_and_skip=with_limit_and_skip,
- )
+ cursor = self.collection.find(self.filter)
+ if with_limit_and_skip:
+ if self.skip:
+ cursor = cursor.skip(self.skip)
+ if self.limit:
+ cursor = cursor.limit(self.limit)
+ return cursor.count(with_limit_and_skip=with_limit_and_skip)
\ No newline at end of file
diff --git a/mongo_proxy/mongodb_proxy.py b/mongo_proxy/mongodb_proxy.py
index e554bcc..5e18e87 100644
--- a/mongo_proxy/mongodb_proxy.py
+++ b/mongo_proxy/mongodb_proxy.py
@@ -1,5 +1,6 @@
"""
Copyright 2013 Gustav Arngarden
+Copyright 2025 Martin Alge
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,157 +17,128 @@
import time
import pymongo
-
-
-def get_methods(*objs):
- return set(
- attr
- for obj in objs
- for attr in dir(obj)
- if not attr.startswith('_') and hasattr(getattr(obj, attr), '__call__')
- )
-
-
-try:
- # will fail to import from older versions of pymongo
- from pymongo import MongoClient, MongoReplicaSetClient
-except ImportError:
- MongoClient, MongoReplicaSetClient = None, None
-
+from pymongo import MongoClient
+
+# A curated set of method names that perform network I/O and need to be
+# wrapped in the Executable class for retry-logic.
+EXECUTABLE_MONGO_METHODS = {
+ 'aggregate', 'bulk_write', 'count', 'count_documents', 'create_index',
+ 'delete_many', 'delete_one', 'distinct', 'drop', 'drop_index',
+ 'find', 'find_one', 'find_one_and_delete', 'find_one_and_replace',
+ 'find_one_and_update', 'insert_many', 'insert_one', 'list_indexes',
+ 'map_reduce', 'replace_one', 'update_many', 'update_one',
+ 'command', 'create_collection', 'drop_collection', 'list_collection_names',
+ 'validate_collection',
+ 'drop_database', 'list_database_names',
+}
+
+# Add mongomock types to our check if it's installed for testing
try:
- from pymongo import Connection, ReplicaSetConnection
+ import mongomock
+ MONGOMOCK_TYPES = (mongomock.MongoClient, mongomock.database.Database, mongomock.collection.Collection)
except ImportError:
- Connection, ReplicaSetConnection = None, None
+ MONGOMOCK_TYPES = ()
-EXECUTABLE_MONGO_METHODS = get_methods(pymongo.collection.Collection,
- pymongo.database.Database,
- Connection,
- ReplicaSetConnection,
- MongoClient, MongoReplicaSetClient,
- pymongo)
+PYMONGO_TYPES = (pymongo.MongoClient, pymongo.database.Database, pymongo.collection.Collection)
+CHAINABLE_TYPES = PYMONGO_TYPES + MONGOMOCK_TYPES
-def get_connection(obj):
+def get_client(obj):
if isinstance(obj, pymongo.collection.Collection):
- return obj.database.connection
+ return obj.database.client
elif isinstance(obj, pymongo.database.Database):
- return obj.connection
- elif isinstance(obj, (Connection, ReplicaSetConnection,
- MongoClient, MongoReplicaSetClient)):
+ return obj.client
+ elif isinstance(obj, MongoClient):
return obj
else:
return None
class Executable(object):
- """ Wrap a MongoDB-method and handle AutoReconnect-exceptions
- using the safe_mongocall decorator.
- """
+ """ Wrap a MongoDB-method and handle AutoReconnect-exceptions. """
def __init__(self, method, logger, wait_time=None,
disconnect_on_timeout=True):
self.method = method
self.logger = logger
- # MongoDB's documentation claims that replicaset elections
- # shouldn't take more than a minute. In our experience, we've
- # seen them take as long as a minute and a half, so regardless
- # of what the documentation says, we're going to give the
- # connection two minutes to recover.
- self.wait_time = wait_time or 60
+ self.wait_time = wait_time or 120
self.disconnect_on_timeout = disconnect_on_timeout
def __call__(self, *args, **kwargs):
- """ Automatic handling of AutoReconnect-exceptions.
- """
start = time.time()
- round = 1
+ round_num = 1
i = 0
disconnected = False
max_time = self.wait_time
+ reconnect_errors = (pymongo.errors.AutoReconnect, pymongo.errors.NetworkTimeout, pymongo.errors.ServerSelectionTimeoutError)
while True:
try:
+ # The result is handled by MongoProxy.__call__, so we just return it
return self.method(*args, **kwargs)
- except pymongo.errors.AutoReconnect:
+ except reconnect_errors as e:
end = time.time()
delta = end - start
if delta >= max_time:
if not self.disconnect_on_timeout or disconnected:
- break
- conn = get_connection(self.method.__self__)
- if conn:
- conn.disconnect()
+ self.logger.error("AutoReconnect timed out after %.1f seconds.", delta)
+ raise
+ client = get_client(self.method.__self__)
+ if client:
+ client.close()
disconnected = True
max_time *= 2
- round = 2
+ round_num = 2
i = 0
- self.logger.warning('Resetting clock for round 2 '
- 'after disconnecting')
- self.logger.warning('AutoReconnecting, '
- 'try %d.%d (%.1f seconds elapsed)'
- % (round, i, delta))
- time.sleep(min(5, pow(2, i)))
+ self.logger.warning('Resetting clock for round 2 after disconnecting')
+ self.logger.warning('AutoReconnecting due to %s, try %d.%d (%.1f seconds elapsed)',
+ type(e).__name__, round_num, i, delta)
+ time.sleep(min(5, pow(2, i) * 0.5))
i += 1
- # Try one more time, but this time, if it fails, let the
- # exception bubble up to the caller.
- return self.method(*args, **kwargs)
-
- def __dir__(self):
- return dir(self.method)
-
- def __str__(self):
- return self.method.__str__()
-
- def __repr__(self):
- return self.method.__repr__()
class MongoProxy(object):
- """ Proxy for MongoDB connection.
- Methods that are executable, i.e find, insert etc, get wrapped in an
- Executable-instance that handles AutoReconnect-exceptions transparently.
-
- """
+ """ Proxy for MongoDB connection. """
def __init__(self, conn, logger=None, wait_time=None,
disconnect_on_timeout=True):
- """ conn is an ordinary MongoDB-connection.
-
- """
if logger is None:
import logging
logger = logging.getLogger(__name__)
- self.conn = conn
- self.logger = logger
- self.wait_time = wait_time
- self.disconnect_on_timeout = disconnect_on_timeout
+ object.__setattr__(self, 'conn', conn)
+ object.__setattr__(self, 'logger', logger)
+ object.__setattr__(self, 'wait_time', wait_time)
+ object.__setattr__(self, 'disconnect_on_timeout', disconnect_on_timeout)
def __getitem__(self, key):
- """ Create and return proxy around the method in the connection
- named "key".
-
- """
item = self.conn[key]
- if hasattr(item, '__call__'):
- return MongoProxy(item, self.logger, self.wait_time)
- return item
+ return MongoProxy(item, self.logger, self.wait_time, self.disconnect_on_timeout)
def __getattr__(self, key):
- """ If key is the name of an executable method in the MongoDB connection,
- for instance find or insert, wrap this method in Executable-class that
- handles AutoReconnect-Exception.
+ attr = getattr(self.conn, key)
+ if key in EXECUTABLE_MONGO_METHODS:
+ return Executable(attr, self.logger, self.wait_time, self.disconnect_on_timeout)
+ return MongoProxy(attr, self.logger, self.wait_time, self.disconnect_on_timeout)
+ def __call__(self, *args, **kwargs):
"""
+ Executes a call on the proxied object and wraps the result if it's a new
+ chainable object (like a Collection from with_options).
+ """
+ # Get the result from the wrapped object (e.g. the with_options method)
+ result = self.conn(*args, **kwargs)
- attr = getattr(self.conn, key)
- if hasattr(attr, '__call__'):
- if key in EXECUTABLE_MONGO_METHODS:
- return Executable(attr, self.logger, self.wait_time)
- else:
- return MongoProxy(attr, self.logger, self.wait_time)
- return attr
+ # If the result is a new chainable object, re-wrap it in a proxy
+ if isinstance(result, CHAINABLE_TYPES):
+ return MongoProxy(result, self.logger, self.wait_time, self.disconnect_on_timeout)
- def __call__(self, *args, **kwargs):
- return self.conn(*args, **kwargs)
+ # Otherwise, return the raw result (e.g., a dict, a cursor, etc.)
+ return result
+
+ def __setattr__(self, key, value):
+ if key in ('conn', 'logger', 'wait_time', 'disconnect_on_timeout'):
+ object.__setattr__(self, key, value)
+ else:
+ setattr(self.conn, key, value)
def __dir__(self):
return dir(self.conn)
@@ -177,5 +149,8 @@ def __str__(self):
def __repr__(self):
return self.conn.__repr__()
- def __nonzero__(self):
- return True
+ def __eq__(self, other):
+ return self.conn == other
+
+ def __bool__(self):
+ return True
\ No newline at end of file
diff --git a/mongo_proxy/pymongo3_durable_cursor.py b/mongo_proxy/pymongo3_durable_cursor.py
deleted file mode 100644
index 6dff175..0000000
--- a/mongo_proxy/pymongo3_durable_cursor.py
+++ /dev/null
@@ -1,87 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Cursor that handles AutoReconnect, NetworkTimeout & NotMasterError problems
-when iterating over values and replicate set elections happen.
-(node crash or shutdown)
-
-Copyright 2018 IQ Payments Oy
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-"""
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import logging
-import time
-from pymongo.cursor import Cursor
-from pymongo.errors import AutoReconnect
-from pymongo.errors import NetworkTimeout
-from pymongo.errors import NotMasterError
-
-logger = logging.getLogger(__name__)
-MAX_ATTEMPTS = 15
-SLEEP_BETWEEN_RETRIES = 3
-
-
-class TooManyRetries(Exception):
- """When we reach the limit, we raise this exception"""
- pass
-
-
-class PyMongo3DurableCursor(Cursor):
- """Cursor that on AutoReconnect error waits and spawns a new Cursor,
- keeping track of previous location in iteration.
- """
-
- def __init__(self, *args, **kwargs):
- """Store original query args & kwargs for reuse if failure"""
- self.retry_args = args
- self.retry_kwargs = dict(kwargs) # Copy values to keep original "skip"
-
- self.iterator_count = kwargs.pop('iterator_count', 0)
- self.retry_attempt = kwargs.pop('retry_attempt', 0)
- self.retry_cursor = None
-
- if self.iterator_count:
- kwargs['skip'] = kwargs.get('skip', 0) + self.iterator_count
-
- super(PyMongo3DurableCursor, self).__init__(*args, **kwargs)
-
- def next(self):
- """If Autoreconnect problems, wait and spawn new cursor,
- If new cursor already exists, pass the next() onto it.
- """
- if self.retry_cursor:
- return self.retry_cursor.next()
-
- try:
- next_item = super(PyMongo3DurableCursor, self).next()
- self.iterator_count += 1
- self.retry_attempt = 0 # Works (again), reset counter
- return next_item
- except (AutoReconnect, NetworkTimeout, NotMasterError) as exception:
- self.retry_attempt += 1
- if self.retry_attempt > MAX_ATTEMPTS:
- raise TooManyRetries('Failed too many times.')
- time.sleep(SLEEP_BETWEEN_RETRIES)
- logger.critical('Caught Exception: {}, spawning new cursor. '
- 'retry: {}/{}'.format(repr(exception),
- self.retry_attempt,
- MAX_ATTEMPTS))
- self.retry_kwargs['retry_attempt'] = self.retry_attempt
- self.retry_kwargs['iterator_count'] = self.iterator_count
- self.retry_cursor = PyMongo3DurableCursor(*self.retry_args,
- **self.retry_kwargs)
- return self.retry_cursor.next()
diff --git a/setup.py b/setup.py
index 42bcc79..344e185 100644
--- a/setup.py
+++ b/setup.py
@@ -1,5 +1,6 @@
"""
Copyright 2013 Gustav Arngarden
+Copyright 2025 Martin Alge
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -20,15 +21,28 @@
LONG_DESCRIPTION = readme_file.read()
setup(
- name='MongoDBProxy-official',
+ name='MongoDBProxy-alge',
packages=find_packages(),
- version='0.1.0',
+ version='0.3.0',
description='Proxy around MongoDB connection that automatically handles AutoReconnect exceptions.',
- author='Gustav Arngarden',
+ author='Martin Alge',
+ author_email='martin@alge.se',
+ maintainer='Martin Alge',
+ maintainer_email='martin@alge.se',
long_description=LONG_DESCRIPTION,
classifiers=[
'License :: OSI Approved :: Apache Software License',
+ 'Programming Language :: Python :: 3',
+ 'Programming Language :: Python :: 3.6',
+ 'Programming Language :: Python :: 3.7',
+ 'Programming Language :: Python :: 3.8',
+ 'Programming Language :: Python :: 3.9',
],
- install_requires=['pymongo'],
- url="https://github.com/arngarden/MongoDBProxy"
+ python_requires='>=3.6',
+ install_requires=['pymongo>=3.12,<4.0'],
+ test_suite='tests',
+ extras_require={
+ 'test': ['pytest', 'pytest-mock', 'mongomock'],
+ },
+ url="https://github.com/Alge/MongoDBProxy.git",
)
diff --git a/tests/test_mongo_proxy.py b/tests/test_mongo_proxy.py
new file mode 100644
index 0000000..9965fa1
--- /dev/null
+++ b/tests/test_mongo_proxy.py
@@ -0,0 +1,154 @@
+import pytest
+from unittest.mock import call, MagicMock
+import pymongo
+import pymongo.errors
+from pymongo.read_preferences import ReadPreference
+from pymongo.write_concern import WriteConcern
+import mongomock
+
+from mongo_proxy import MongoProxy, DurableCursor, MongoReconnectFailure
+from mongo_proxy.mongodb_proxy import Executable
+
+
+class TestMongoProxy:
+ # This class is correct.
+ @pytest.fixture
+ def mongo_setup(self):
+ mock_client = mongomock.MongoClient()
+ proxy = MongoProxy(mock_client)
+ return mock_client, proxy
+
+ def test_getitem_returns_proxy(self, mongo_setup):
+ _, proxy = mongo_setup
+ assert isinstance(proxy['testdb'], MongoProxy)
+
+ def test_getattr_returns_proxy(self, mongo_setup):
+ _, proxy = mongo_setup
+ assert isinstance(proxy.testdb, MongoProxy)
+
+ def test_executable_methods_are_wrapped(self, mongo_setup):
+ _, proxy = mongo_setup
+ collection_proxy = proxy.testdb.testcollection
+ assert isinstance(collection_proxy.find, Executable)
+
+ def test_getattr_on_property(self, mongo_setup):
+ mock_client, proxy = mongo_setup
+ assert proxy.read_preference == mock_client.read_preference
+
+ def test_with_options_returns_proxied_object(self, mongo_setup):
+ _, proxy = mongo_setup
+ wc = WriteConcern(w=2)
+ new_proxy = proxy.testdb.testcollection.with_options(write_concern=wc)
+ assert isinstance(new_proxy, MongoProxy)
+ assert new_proxy.write_concern == wc
+
+
+class TestExecutable:
+ # This class is correct.
+ @pytest.mark.parametrize("exception_type", [
+ pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError,
+ pymongo.errors.NetworkTimeout, pymongo.errors.ServerSelectionTimeoutError,
+ ])
+ def test_reconnect_on_various_exceptions(self, mocker, exception_type):
+ mocker.patch('time.sleep')
+ method = mocker.Mock(side_effect=[exception_type("mock error"), 'Success'])
+ executable = Executable(method, mocker.Mock())
+ assert executable() == 'Success'
+
+ def test_autoreconnect_with_exponential_backoff(self, mocker):
+ sleep_mock = mocker.patch('time.sleep')
+ method = mocker.Mock(side_effect=[
+ pymongo.errors.AutoReconnect("failure 1"),
+ pymongo.errors.AutoReconnect("failure 2"), "Success"
+ ])
+ executable = Executable(method, mocker.Mock())
+ executable()
+ assert sleep_mock.call_count == 2
+ sleep_mock.assert_has_calls([call(0.5), call(1.0)])
+
+
+class TestDurableCursor:
+ @pytest.fixture
+ def populated_collection(self):
+ client = mongomock.MongoClient()
+ collection = client.testdb.testcollection
+ test_data = [{'i': i} for i in range(1, 11)]
+ collection.insert_many(test_data)
+ return collection, test_data
+
+ def test_iteration_on_empty_collection(self):
+ collection = mongomock.MongoClient().db.collection
+ cursor = DurableCursor(collection)
+ assert list(cursor) == []
+
+ @pytest.mark.parametrize("error_type", [
+ pymongo.errors.AutoReconnect,
+ pymongo.errors.CursorNotFound,
+ ])
+ def test_reconnect_during_iteration(self, populated_collection, mocker, error_type):
+ mocker.patch('time.sleep')
+ collection, test_data = populated_collection
+ original_find = collection.find
+
+ # Create a pure mock iterator for the first, failing cursor.
+ failing_cursor_mock = MagicMock()
+ failing_cursor_mock.__iter__.return_value = failing_cursor_mock
+ failing_cursor_mock.__next__.side_effect = [
+ test_data[0],
+ test_data[1],
+ error_type("mock failure")
+ ]
+
+ # Use a router function for the mock's side_effect.
+ def find_router(*args, **kwargs):
+ # The initial call has skip=0. Return the failing mock.
+ if kwargs.get('skip', 0) == 0:
+ return failing_cursor_mock
+ # The reconnect call has skip=2. Let the original method handle it
+ # so we get a real cursor that respects the skip argument.
+ return original_find(*args, **kwargs)
+
+ find_mock = mocker.patch.object(collection, 'find', side_effect=find_router)
+
+ d_cursor = DurableCursor(collection)
+ results = list(d_cursor)
+
+ assert len(results) == 10
+ assert [doc['i'] for doc in results] == list(range(1, 11))
+ assert find_mock.call_count == 2
+
+ def test_reconnect_with_initial_skip_and_limit(self, populated_collection, mocker):
+ mocker.patch('time.sleep')
+ collection, test_data = populated_collection
+ original_find = collection.find
+
+ initial_skip = 2
+ initial_limit = 5
+
+ failing_cursor_mock = MagicMock()
+ failing_cursor_mock.__iter__.return_value = failing_cursor_mock
+ failing_cursor_mock.__next__.side_effect = [
+ test_data[2], # i=3
+ test_data[3], # i=4
+ pymongo.errors.AutoReconnect("fail")
+ ]
+
+ # Use the same robust router strategy.
+ def find_router(*args, **kwargs):
+ if kwargs.get('skip') == initial_skip:
+ return failing_cursor_mock
+ return original_find(*args, **kwargs)
+
+ find_mock = mocker.patch.object(collection, 'find', side_effect=find_router)
+
+ d_cursor = DurableCursor(collection, skip=initial_skip, limit=initial_limit)
+ results = list(d_cursor)
+
+ assert len(results) == initial_limit
+ assert [doc['i'] for doc in results] == [3, 4, 5, 6, 7]
+
+ assert find_mock.call_count == 2
+
+ reconnect_call_args = find_mock.call_args_list[1]
+ assert reconnect_call_args.kwargs.get('skip') == 4
+ assert reconnect_call_args.kwargs.get('limit') == 3
\ No newline at end of file