Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:

python-compatibility-tests:
runs-on: ubuntu-latest
needs: [ redis_version, tests ]
needs: [ redis_version ]
timeout-minutes: 60
strategy:
max-parallel: 15
Expand All @@ -118,7 +118,7 @@ jobs:

hiredis-tests:
runs-on: ubuntu-latest
needs: [redis_version, tests]
needs: [redis_version]
timeout-minutes: 60
strategy:
max-parallel: 15
Expand All @@ -144,7 +144,7 @@ jobs:

uvloop-tests:
runs-on: ubuntu-latest
needs: [redis_version, tests]
needs: [redis_version]
timeout-minutes: 60
strategy:
max-parallel: 15
Expand All @@ -170,7 +170,7 @@ jobs:
build-and-test-package:
name: Validate building and installing the package
runs-on: ubuntu-latest
needs: [tests]
needs: [redis_version]
strategy:
fail-fast: false
matrix:
Expand Down
17 changes: 15 additions & 2 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
build
build==1.2.2.post1 ; platform_python_implementation == "PyPy"
click==8.0.4
invoke==2.2.0
mock
mock==5.1.0 ; platform_python_implementation == "PyPy"
packaging>=20.4
packaging==24.2 ; platform_python_implementation == "PyPy"

pytest
pytest==8.3.4 ; platform_python_implementation == "PyPy"
pytest-asyncio>=0.23.0
pytest-asyncio==1.1.0 ; platform_python_implementation == "PyPy"
pytest-cov
pytest-cov==6.0.0 ; platform_python_implementation == "PyPy"
coverage==7.6.12 ; platform_python_implementation == "PyPy"
pytest-profiling==1.8.1
pytest-timeout
pytest-timeout==2.3.1 ; platform_python_implementation == "PyPy"

ruff==0.9.6
ujson>=4.2.0
uvloop
uvloop<=0.21.0; platform_python_implementation == "CPython"
vulture>=2.3.0
numpy>=1.24.0

numpy>=1.24.0 ; platform_python_implementation == "CPython"
numpy>=1.24.0,<2.0 ; platform_python_implementation == "PyPy"

redis-entraid==1.0.0
pybreaker>=1.4.0
36 changes: 33 additions & 3 deletions tests/test_multidb/test_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import threading
import time
from time import sleep
from unittest.mock import patch, Mock

Expand All @@ -16,6 +17,16 @@
from tests.test_multidb.conftest import create_weighted_list


# tiny helper to poll until a condition is true
def validate_condition(condition, timeout=2.0, interval=0.02, msg="condition not met"):
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if condition():
return
time.sleep(interval)
pytest.fail(msg)


@pytest.mark.onlynoncluster
class TestMultiDbClient:
@pytest.mark.parametrize(
Expand Down Expand Up @@ -54,6 +65,7 @@ def test_execute_command_against_correct_db_on_successful_initialization(
assert mock_db1.circuit.state == CBState.CLOSED
assert mock_db2.circuit.state == CBState.CLOSED

# @pytest.mark.repeat(600)
@pytest.mark.parametrize(
"mock_multi_db_config,mock_db, mock_db1, mock_db2",
[
Expand Down Expand Up @@ -91,7 +103,8 @@ def test_execute_command_against_correct_db_and_closed_circuit(

client = MultiDBClient(mock_multi_db_config)
assert mock_multi_db_config.failover_strategy.set_databases.call_count == 1
assert client.set("key", "value") == "OK1"
result = client.set("key", "value")
assert result == "OK1"
assert mock_hc.check_health.call_count == 7

assert mock_db.circuit.state == CBState.CLOSED
Expand Down Expand Up @@ -278,11 +291,28 @@ def mock_check_health(database):
mock_multi_db_config.failover_strategy = WeightBasedFailoverStrategy()

client = MultiDBClient(mock_multi_db_config)

# 1) Initially should pick highest weight (db1)
assert client.set("key", "value") == "OK1"

# 2) Wait until the SECOND db1 check actually ran (db1 becomes unhealthy)
error_event.wait(timeout=0.5)
assert client.set("key", "value") == "OK2"

# 3) Eventually, fallback should route to db2 (next highest weight)
validate_condition(
lambda: client.set("key", "value") == "OK2",
timeout=1.0,
msg="Timeout waiting for fallback to db2",
)

sleep(0.5)
assert client.set("key", "value") == "OK1"
# 4) After auto-fallback interval + another health cycle, db1 returns healthy.
# Eventually we should route back to db1.
validate_condition(
lambda: client.set("key", "value") == "OK1",
timeout=1.0,
msg="Timeout waiting for fallback back to db1 after successful health check",
)

@pytest.mark.parametrize(
"mock_multi_db_config,mock_db, mock_db1, mock_db2",
Expand Down
24 changes: 22 additions & 2 deletions tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def target(conn):
proc = self._mp_context.Process(target=target, args=(conn,))
proc.start()
proc.join(3)
if proc.exitcode is None:
proc.terminate()
proc.join(3)
pytest.xfail("Intermittent PyPy/Linux fork+Event hang; see pypy/pypy#5268")
assert proc.exitcode == 0

# The connection was created in the parent but disconnected in the
Expand All @@ -80,14 +84,18 @@ def target(conn, ev):
with pytest.raises(ConnectionError):
conn.send_command("ping")

ev = multiprocessing.Event()
ev = self._mp_context.Event()
proc = self._mp_context.Process(target=target, args=(conn, ev))
proc.start()

conn.disconnect()
ev.set()

proc.join(3)
if proc.exitcode is None:
proc.terminate()
proc.join(3)
pytest.xfail("Intermittent PyPy/Linux fork+Event hang; see pypy/pypy#5268")
assert proc.exitcode == 0

@pytest.mark.parametrize("max_connections", [2, None])
Expand Down Expand Up @@ -122,6 +130,10 @@ def target(pool, parent_conn):
proc = self._mp_context.Process(target=target, args=(pool, parent_conn))
proc.start()
proc.join(3)
if proc.exitcode is None:
proc.terminate()
proc.join(3)
pytest.xfail("Intermittent PyPy/Linux fork+Event hang; see pypy/pypy#5268")
assert proc.exitcode == 0

@pytest.mark.parametrize("max_connections", [1, 2, None])
Expand Down Expand Up @@ -152,6 +164,10 @@ def target(pool):
proc = self._mp_context.Process(target=target, args=(pool,))
proc.start()
proc.join(3)
if proc.exitcode is None:
proc.terminate()
proc.join(3)
pytest.xfail("Intermittent PyPy/Linux fork+Event hang; see pypy/pypy#5268")
assert proc.exitcode == 0

# Check that connection is still alive after fork process has exited
Expand Down Expand Up @@ -185,14 +201,18 @@ def target(pool, disconnect_event):
assert conn.send_command("ping") is None
assert conn.read_response() == b"PONG"

ev = multiprocessing.Event()
ev = self._mp_context.Event()

proc = self._mp_context.Process(target=target, args=(pool, ev))
proc.start()

pool.disconnect()
ev.set()
proc.join(3)
if proc.exitcode is None:
proc.terminate()
proc.join(3)
pytest.xfail("Intermittent PyPy/Linux fork+Event hang; see pypy/pypy#5268")
assert proc.exitcode == 0

def test_redis_client(self, r):
Expand Down
12 changes: 10 additions & 2 deletions tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,8 +942,16 @@ def poll(ps, expected_res):
def test_get_message_wait_for_subscription_not_being_called(self, r):
p = r.pubsub()
p.subscribe("foo")
with patch.object(threading.Event, "wait") as mock:
assert p.subscribed is True
assert p.subscribed is True

# Ensure p has the event attribute your wait_for_message would call:
ev = getattr(p, "subscribed_event", None)

assert ev is not None, (
"PubSub event attribute not found (check redis-py version)"
)

with patch.object(ev, "wait") as mock:
assert wait_for_message(p) == make_message("subscribe", "foo", 1)
assert mock.called is False

Expand Down
Loading