Skip to content

Refactor ClusterPubSub to reuse NodesManager-managed connections. Refactor ClusterPubSub to reuse NodesManager-managed connections#4037

Open
petyaslavova wants to merge 1 commit intomasterfrom
ps_cluster_pubsub_to_use_nodes_managers_managed_connections
Open

Refactor ClusterPubSub to reuse NodesManager-managed connections. Refactor ClusterPubSub to reuse NodesManager-managed connections#4037
petyaslavova wants to merge 1 commit intomasterfrom
ps_cluster_pubsub_to_use_nodes_managers_managed_connections

Conversation

@petyaslavova
Copy link
Copy Markdown
Collaborator

@petyaslavova petyaslavova commented Apr 17, 2026

Refactor ClusterPubSub to reuse NodesManager-managed connections

Previously, both sync and async ClusterPubSub constructed detached
ConnectionPool instances from connection_kwargs whenever a target
node was supplied or resolved. This bypassed NodesManager's authoritative
pool, so credential refreshes, maintenance events, and connection limits
applied to the cluster did not propagate to the PubSub socket.

Sync (redis/cluster.py):

  • ClusterPubSub.init, _get_node_pubsub and execute_command now
    obtain the pool via cluster.get_redis_connection(node).connection_pool.

Async (redis/asyncio/cluster.py):

  • Introduce _ClusterNodePoolAdapter that exposes a ClusterNode as a
    ConnectionPool-compatible object (get_connection / release / get_encoder).
  • ClusterPubSub.init, _get_node_pubsub and execute_command wrap the
    target node with the adapter instead of building a standalone pool.
  • Drop the now-unused ConnectionPool import.
  • keyspace_notifications.py re-imports the adapter from
    redis.asyncio.cluster to preserve the existing public re-export.

Tests:

  • Drop module-level onlycluster from tests/test_asyncio/test_cluster.py
    and apply it per class so mock-based unit tests can run in standalone CI.
  • Add TestClusterPubSubWithMocks (sync + async) under
    @pytest.mark.fixed_client covering connection extraction, pool reuse,
    and shard routing on mocks.

Analysis:

  • .agent/cluster_pubsub_connection_extraction_analysis.md documents the
    original divergence, the resource model, and the chosen fix.

Fix PubSub.aclose() hang by disconnecting with nowait=True (#3941)

PubSub.aclose() called self.connection.disconnect() with the default
nowait=False, which awaits StreamWriter.wait_closed(). When a
concurrent reader task (e.g. a background pubsub.run() task or a
get_message(block=True) call) still holds the transport, the writer
cannot finish closing and aclose() hangs indefinitely — made permanent
by socket_connect_timeout defaulting to None.

Pass nowait=True in PubSub.aclose() to match every other internal
disconnect() call site in redis/asyncio/connection.py and the
Python-3.13 GC fallback path from #3856. The reader task now observes
a normal ConnectionError on the torn-down socket, which is already
the documented contract for a pubsub whose transport is closed.

No sync mirror is required: redis.client.Connection.disconnect() has
no wait_closed() analogue and cannot deadlock.
ClusterPubSub.aclose() delegates to super().aclose() and picks up
the fix transparently.

Add TestPubSubAcloseWithMocks (fixed_client) with two regressions:

  • asserts connection.disconnect(nowait=True) is called and the
    full cleanup sequence (deregister callback, pool release, clear
    self.connection) runs;
  • drives a mock disconnect that hangs forever on nowait=False
    and verifies aclose() still returns under a bounded timeout.

Fixes #3941


Note

Medium Risk
Medium risk because it changes how cluster pubsub acquires/releases connections (sync + asyncio) and modifies async pubsub shutdown semantics, which can affect connection reuse and cleanup behavior under concurrency.

Overview
Cluster pubsub connection management is refactored to stop creating detached pools. Async ClusterPubSub now wraps a ClusterNode with a new internal _ClusterNodePoolAdapter and uses it for both primary and sharded pubsub connections, ensuring pubsub sockets participate in the node’s own connection budgeting/maintenance behavior; the adapter is re-used by asyncio/keyspace_notifications.

Sync ClusterPubSub is tightened for lazy node materialization and safer cleanup. Sharded pubsubs are created via cluster.get_redis_connection(node).pubsub(...) (instead of node.redis_connection), and disconnect() now tolerates shard pubsubs whose connection was never created.

Async pubsub shutdown is made non-blocking under concurrent readers. PubSub.aclose() now calls connection.disconnect(nowait=True) to avoid deadlocks waiting on StreamWriter.wait_closed().

Tests/docs are updated. Adds mock-based unit tests for sync/async ClusterPubSub pool/adapter usage and disconnect behavior, adds regressions for the aclose() hang, and adjusts asyncio cluster tests to apply onlycluster per-class so mock tests can run without a live cluster.

Reviewed by Cursor Bugbot for commit 353c9ea. Bugbot is set up for automated code reviews on this repo. Configure here.

…actor ClusterPubSub to reuse NodesManager-managed connections
@jit-ci
Copy link
Copy Markdown

jit-ci bot commented Apr 17, 2026

🛡️ Jit Security Scan Results

CRITICAL HIGH MEDIUM

✅ No security findings were detected in this PR


Security scan by Jit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Pubsub aclose causes hanging due to wait_closed called internally

1 participant