From 3dcd99a2d16b9f02d067fe9eaa90a3841e24a4b6 Mon Sep 17 00:00:00 2001 From: Minimega12121 Date: Tue, 8 Jul 2025 17:48:57 +0530 Subject: [PATCH 1/2] todo: handle timeout --- libp2p/stream_muxer/mplex/mplex_stream.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index 3b640df1d..b86357e7b 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -176,8 +176,6 @@ async def close(self) -> None: Closing a stream closes it for writing and closes the remote end for reading but allows writing in the other direction. """ - # TODO error handling with timeout - async with self.close_lock: if self.event_local_closed.is_set(): return @@ -185,8 +183,17 @@ async def close(self) -> None: flag = ( HeaderTags.CloseInitiator if self.is_initiator else HeaderTags.CloseReceiver ) - # TODO: Raise when `muxed_conn.send_message` fails and `Mplex` isn't shutdown. - await self.muxed_conn.send_message(flag, None, self.stream_id) + + try: + with trio.fail_after(5): # timeout in seconds + await self.muxed_conn.send_message(flag, None, self.stream_id) + except trio.TooSlowError: + raise TimeoutError("Timeout while trying to close the stream") + except MuxedConnUnavailable: + if not self.muxed_conn.event_shutting_down.is_set(): + raise RuntimeError( + "Failed to send close message and Mplex isn't shutting down" + ) _is_remote_closed: bool async with self.close_lock: From eca548851b0fced9526efeaf3a902aa9ad21dde7 Mon Sep 17 00:00:00 2001 From: Minimega12121 Date: Fri, 25 Jul 2025 16:19:29 +0530 Subject: [PATCH 2/2] added new fragment and tests --- newsfragments/752.internal.rst | 1 + tests/core/stream_muxer/test_mplex_stream.py | 37 ++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 newsfragments/752.internal.rst diff --git a/newsfragments/752.internal.rst b/newsfragments/752.internal.rst new file mode 100644 index 000000000..b0aed33d1 --- /dev/null +++ b/newsfragments/752.internal.rst @@ -0,0 +1 @@ +[mplex] Add timeout and error handling during stream close diff --git a/tests/core/stream_muxer/test_mplex_stream.py b/tests/core/stream_muxer/test_mplex_stream.py index 62d384c2b..1d9c22340 100644 --- a/tests/core/stream_muxer/test_mplex_stream.py +++ b/tests/core/stream_muxer/test_mplex_stream.py @@ -8,6 +8,7 @@ MplexStreamClosed, MplexStreamEOF, MplexStreamReset, + MuxedConnUnavailable, ) from libp2p.stream_muxer.mplex.mplex import ( MPLEX_MESSAGE_CHANNEL_SIZE, @@ -213,3 +214,39 @@ async def test_mplex_stream_reset(mplex_stream_pair): # `reset` should do nothing as well. await stream_0.reset() await stream_1.reset() + + +@pytest.mark.trio +async def test_mplex_stream_close_timeout(monkeypatch, mplex_stream_pair): + stream_0, stream_1 = mplex_stream_pair + + # (simulate hanging) + async def fake_send_message(*args, **kwargs): + await trio.sleep_forever() + + monkeypatch.setattr(stream_0.muxed_conn, "send_message", fake_send_message) + + with pytest.raises(TimeoutError): + await stream_0.close() + + +@pytest.mark.trio +async def test_mplex_stream_close_mux_unavailable(monkeypatch, mplex_stream_pair): + stream_0, _ = mplex_stream_pair + + # Patch send_message to raise MuxedConnUnavailable + def raise_unavailable(*args, **kwargs): + raise MuxedConnUnavailable("Simulated conn drop") + + monkeypatch.setattr(stream_0.muxed_conn, "send_message", raise_unavailable) + + # Case 1: Mplex is shutting down — should not raise + stream_0.muxed_conn.event_shutting_down.set() + await stream_0.close() # Should NOT raise + + # Case 2: Mplex is NOT shutting down — should raise RuntimeError + stream_0.event_local_closed = trio.Event() # Reset since it was set in first call + stream_0.muxed_conn.event_shutting_down = trio.Event() # Unset the shutdown flag + + with pytest.raises(RuntimeError, match="Failed to send close message"): + await stream_0.close()