Skip to content

Commit 90a5f86

Browse files
authored
Merge pull request #238 from Arthurdw/staging
[BUG] RecursionError in `_wait_with_activity_timeout` with concurrent tasks
2 parents 0d0ff5a + ee3cb70 commit 90a5f86

File tree

2 files changed

+35
-5
lines changed

2 files changed

+35
-5
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -694,9 +694,17 @@ async def _cancel(self):
694694

695695
async def connect(self, force=False):
696696
if not force:
697-
await self._lock.acquire()
697+
async with self._lock:
698+
return await self._connect_internal(force)
698699
else:
699700
logger.debug("Proceeding without acquiring lock.")
701+
return await self._connect_internal(force)
702+
703+
async def _connect_internal(self, force):
704+
# Check state again after acquiring lock to avoid duplicate connections
705+
if not force and self.state in (State.OPEN, State.CONNECTING):
706+
return None
707+
700708
logger.debug(f"Websocket connecting to {self.ws_url}")
701709
if self._sending is None or self._sending.empty():
702710
self._sending = asyncio.Queue()
@@ -725,17 +733,13 @@ async def connect(self, force=False):
725733
except socket.gaierror:
726734
logger.debug(f"Hostname not known (this is just for testing")
727735
await asyncio.sleep(10)
728-
if self._lock.locked():
729-
self._lock.release()
730736
return await self.connect(force=force)
731737
logger.debug("Connection established")
732738
self.ws = connection
733739
if self._send_recv_task is None or self._send_recv_task.done():
734740
self._send_recv_task = asyncio.get_running_loop().create_task(
735741
self._handler(self.ws)
736742
)
737-
if self._lock.locked():
738-
self._lock.release()
739743
return None
740744

741745
async def _handler(self, ws: ClientConnection) -> Union[None, Exception]:

tests/integration_tests/test_async_substrate_interface.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,3 +293,29 @@ async def test_get_payment_info():
293293
assert partial_fee_all_options > partial_fee_no_era
294294
assert partial_fee_all_options > partial_fee_era
295295
print("test_get_payment_info succeeded")
296+
297+
298+
@pytest.mark.asyncio
299+
async def test_concurrent_rpc_requests():
300+
"""
301+
Test that multiple concurrent RPC requests on a shared connection work correctly.
302+
303+
This test verifies the fix for the issue where multiple concurrent tasks
304+
re-initializing the WebSocket connection caused requests to hang.
305+
"""
306+
print("Testing test_concurrent_rpc_requests")
307+
308+
async def concurrent_task(substrate, task_id):
309+
"""Make multiple RPC calls from a single task."""
310+
for i in range(5):
311+
result = await substrate.get_block_number(None)
312+
assert isinstance(result, int)
313+
assert result > 0
314+
315+
async with AsyncSubstrateInterface(LATENT_LITE_ENTRYPOINT) as substrate:
316+
# Run 5 concurrent tasks, each making 5 RPC calls (25 total)
317+
# This tests that the connection is properly shared without re-initialization
318+
tasks = [concurrent_task(substrate, i) for i in range(5)]
319+
await asyncio.gather(*tasks)
320+
321+
print("test_concurrent_rpc_requests succeeded")

0 commit comments

Comments
 (0)