Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions libp2p/stream_muxer/mplex/mplex_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,17 +247,24 @@ async def close(self) -> None:
Closing a stream closes it for writing and closes the remote end for
reading but allows writing in the other direction.
"""
# TODO error handling with timeout

async with self.close_lock:
if self.event_local_closed.is_set():
return

flag = (
HeaderTags.CloseInitiator if self.is_initiator else HeaderTags.CloseReceiver
)
# TODO: Raise when `muxed_conn.send_message` fails and `Mplex` isn't shutdown.
await self.muxed_conn.send_message(flag, None, self.stream_id)

try:
with trio.fail_after(5): # timeout in seconds
await self.muxed_conn.send_message(flag, None, self.stream_id)
except trio.TooSlowError:
raise TimeoutError("Timeout while trying to close the stream")
except MuxedConnUnavailable:
if not self.muxed_conn.event_shutting_down.is_set():
raise RuntimeError(
"Failed to send close message and Mplex isn't shutting down"
)

_is_remote_closed: bool
async with self.close_lock:
Expand Down
1 change: 1 addition & 0 deletions newsfragments/752.internal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[mplex] Add timeout and error handling during stream close
37 changes: 37 additions & 0 deletions tests/core/stream_muxer/test_mplex_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
MplexStreamClosed,
MplexStreamEOF,
MplexStreamReset,
MuxedConnUnavailable,
)
from libp2p.stream_muxer.mplex.mplex import (
MPLEX_MESSAGE_CHANNEL_SIZE,
Expand Down Expand Up @@ -213,3 +214,39 @@ async def test_mplex_stream_reset(mplex_stream_pair):
# `reset` should do nothing as well.
await stream_0.reset()
await stream_1.reset()


@pytest.mark.trio
async def test_mplex_stream_close_timeout(monkeypatch, mplex_stream_pair):
stream_0, stream_1 = mplex_stream_pair

# (simulate hanging)
async def fake_send_message(*args, **kwargs):
await trio.sleep_forever()

monkeypatch.setattr(stream_0.muxed_conn, "send_message", fake_send_message)

with pytest.raises(TimeoutError):
await stream_0.close()


@pytest.mark.trio
async def test_mplex_stream_close_mux_unavailable(monkeypatch, mplex_stream_pair):
stream_0, _ = mplex_stream_pair

# Patch send_message to raise MuxedConnUnavailable
def raise_unavailable(*args, **kwargs):
raise MuxedConnUnavailable("Simulated conn drop")

monkeypatch.setattr(stream_0.muxed_conn, "send_message", raise_unavailable)

# Case 1: Mplex is shutting down — should not raise
stream_0.muxed_conn.event_shutting_down.set()
await stream_0.close() # Should NOT raise

# Case 2: Mplex is NOT shutting down — should raise RuntimeError
stream_0.event_local_closed = trio.Event() # Reset since it was set in first call
stream_0.muxed_conn.event_shutting_down = trio.Event() # Unset the shutdown flag

with pytest.raises(RuntimeError, match="Failed to send close message"):
await stream_0.close()