diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index cbd210c14a..5adf7f41ac 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -94,7 +94,7 @@ jobs: python-compatibility-tests: runs-on: ubuntu-latest - needs: [ redis_version, tests ] + needs: [ redis_version ] timeout-minutes: 60 strategy: max-parallel: 15 @@ -118,7 +118,7 @@ jobs: hiredis-tests: runs-on: ubuntu-latest - needs: [redis_version, tests] + needs: [redis_version] timeout-minutes: 60 strategy: max-parallel: 15 @@ -144,7 +144,7 @@ jobs: uvloop-tests: runs-on: ubuntu-latest - needs: [redis_version, tests] + needs: [redis_version] timeout-minutes: 60 strategy: max-parallel: 15 @@ -170,7 +170,7 @@ jobs: build-and-test-package: name: Validate building and installing the package runs-on: ubuntu-latest - needs: [tests] + needs: [redis_version] strategy: fail-fast: false matrix: diff --git a/dev_requirements.txt b/dev_requirements.txt index e61f37f101..e0093c0bf1 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,17 +1,30 @@ build +build==1.2.2.post1 ; platform_python_implementation == "PyPy" click==8.0.4 invoke==2.2.0 mock +mock==5.1.0 ; platform_python_implementation == "PyPy" packaging>=20.4 +packaging==24.2 ; platform_python_implementation == "PyPy" + pytest +pytest==8.3.4 ; platform_python_implementation == "PyPy" pytest-asyncio>=0.23.0 +pytest-asyncio==1.1.0 ; platform_python_implementation == "PyPy" pytest-cov +pytest-cov==6.0.0 ; platform_python_implementation == "PyPy" +coverage==7.6.12 ; platform_python_implementation == "PyPy" pytest-profiling==1.8.1 pytest-timeout +pytest-timeout==2.3.1 ; platform_python_implementation == "PyPy" + ruff==0.9.6 ujson>=4.2.0 -uvloop +uvloop<=0.21.0; platform_python_implementation == "CPython" vulture>=2.3.0 -numpy>=1.24.0 + +numpy>=1.24.0 ; platform_python_implementation == "CPython" +numpy>=1.24.0,<2.0 ; platform_python_implementation == "PyPy" + redis-entraid==1.0.0 pybreaker>=1.4.0 diff --git a/tests/test_multidb/test_client.py b/tests/test_multidb/test_client.py index cbc81b15ed..f39fc286db 100644 --- a/tests/test_multidb/test_client.py +++ b/tests/test_multidb/test_client.py @@ -1,4 +1,5 @@ import threading +import time from time import sleep from unittest.mock import patch, Mock @@ -16,6 +17,16 @@ from tests.test_multidb.conftest import create_weighted_list +# tiny helper to poll until a condition is true +def validate_condition(condition, timeout=2.0, interval=0.02, msg="condition not met"): + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if condition(): + return + time.sleep(interval) + pytest.fail(msg) + + @pytest.mark.onlynoncluster class TestMultiDbClient: @pytest.mark.parametrize( @@ -54,6 +65,7 @@ def test_execute_command_against_correct_db_on_successful_initialization( assert mock_db1.circuit.state == CBState.CLOSED assert mock_db2.circuit.state == CBState.CLOSED + # @pytest.mark.repeat(600) @pytest.mark.parametrize( "mock_multi_db_config,mock_db, mock_db1, mock_db2", [ @@ -91,7 +103,8 @@ def test_execute_command_against_correct_db_and_closed_circuit( client = MultiDBClient(mock_multi_db_config) assert mock_multi_db_config.failover_strategy.set_databases.call_count == 1 - assert client.set("key", "value") == "OK1" + result = client.set("key", "value") + assert result == "OK1" assert mock_hc.check_health.call_count == 7 assert mock_db.circuit.state == CBState.CLOSED @@ -278,11 +291,28 @@ def mock_check_health(database): mock_multi_db_config.failover_strategy = WeightBasedFailoverStrategy() client = MultiDBClient(mock_multi_db_config) + + # 1) Initially should pick highest weight (db1) assert client.set("key", "value") == "OK1" + + # 2) Wait until the SECOND db1 check actually ran (db1 becomes unhealthy) error_event.wait(timeout=0.5) - assert client.set("key", "value") == "OK2" + + # 3) Eventually, fallback should route to db2 (next highest weight) + validate_condition( + lambda: client.set("key", "value") == "OK2", + timeout=1.0, + msg="Timeout waiting for fallback to db2", + ) + sleep(0.5) - assert client.set("key", "value") == "OK1" + # 4) After auto-fallback interval + another health cycle, db1 returns healthy. + # Eventually we should route back to db1. + validate_condition( + lambda: client.set("key", "value") == "OK1", + timeout=1.0, + msg="Timeout waiting for fallback back to db1 after successful health check", + ) @pytest.mark.parametrize( "mock_multi_db_config,mock_db, mock_db1, mock_db2", diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py index 549eeb49a2..eb195f182c 100644 --- a/tests/test_multiprocessing.py +++ b/tests/test_multiprocessing.py @@ -54,6 +54,10 @@ def target(conn): proc = self._mp_context.Process(target=target, args=(conn,)) proc.start() proc.join(3) + if proc.exitcode is None: + proc.terminate() + proc.join(3) + pytest.xfail("Intermittent PyPy/Linux fork+Event hang; see pypy/pypy#5268") assert proc.exitcode == 0 # The connection was created in the parent but disconnected in the @@ -80,7 +84,7 @@ def target(conn, ev): with pytest.raises(ConnectionError): conn.send_command("ping") - ev = multiprocessing.Event() + ev = self._mp_context.Event() proc = self._mp_context.Process(target=target, args=(conn, ev)) proc.start() @@ -88,6 +92,10 @@ def target(conn, ev): ev.set() proc.join(3) + if proc.exitcode is None: + proc.terminate() + proc.join(3) + pytest.xfail("Intermittent PyPy/Linux fork+Event hang; see pypy/pypy#5268") assert proc.exitcode == 0 @pytest.mark.parametrize("max_connections", [2, None]) @@ -122,6 +130,10 @@ def target(pool, parent_conn): proc = self._mp_context.Process(target=target, args=(pool, parent_conn)) proc.start() proc.join(3) + if proc.exitcode is None: + proc.terminate() + proc.join(3) + pytest.xfail("Intermittent PyPy/Linux fork+Event hang; see pypy/pypy#5268") assert proc.exitcode == 0 @pytest.mark.parametrize("max_connections", [1, 2, None]) @@ -152,6 +164,10 @@ def target(pool): proc = self._mp_context.Process(target=target, args=(pool,)) proc.start() proc.join(3) + if proc.exitcode is None: + proc.terminate() + proc.join(3) + pytest.xfail("Intermittent PyPy/Linux fork+Event hang; see pypy/pypy#5268") assert proc.exitcode == 0 # Check that connection is still alive after fork process has exited @@ -185,7 +201,7 @@ def target(pool, disconnect_event): assert conn.send_command("ping") is None assert conn.read_response() == b"PONG" - ev = multiprocessing.Event() + ev = self._mp_context.Event() proc = self._mp_context.Process(target=target, args=(pool, ev)) proc.start() @@ -193,6 +209,10 @@ def target(pool, disconnect_event): pool.disconnect() ev.set() proc.join(3) + if proc.exitcode is None: + proc.terminate() + proc.join(3) + pytest.xfail("Intermittent PyPy/Linux fork+Event hang; see pypy/pypy#5268") assert proc.exitcode == 0 def test_redis_client(self, r): diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 3dc07caf51..db313e2437 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -942,8 +942,16 @@ def poll(ps, expected_res): def test_get_message_wait_for_subscription_not_being_called(self, r): p = r.pubsub() p.subscribe("foo") - with patch.object(threading.Event, "wait") as mock: - assert p.subscribed is True + assert p.subscribed is True + + # Ensure p has the event attribute your wait_for_message would call: + ev = getattr(p, "subscribed_event", None) + + assert ev is not None, ( + "PubSub event attribute not found (check redis-py version)" + ) + + with patch.object(ev, "wait") as mock: assert wait_for_message(p) == make_message("subscribe", "foo", 1) assert mock.called is False