Skip to content

Commit 15aa315

Browse files
allenwang28facebook-github-bot
authored andcommitted
TCPBuffer (#753)
Summary: X-link: #753 Adds a TCPBuffer implementation to tensor_engine, as a fallback example for RDMABuffer. The main change from the original TCP-based buffer is that it uses ZMQ for communications to 1) decouple from Monarch's message passing system and 2) to mimic RDMABuffer's design, and showcase how other backends can be easily added Differential Revision: D79588454
1 parent 6978e61 commit 15aa315

File tree

5 files changed

+596
-0
lines changed

5 files changed

+596
-0
lines changed

python/monarch/_src/actor/proc_mesh.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@
7878
_RdmaManager,
7979
)
8080

81+
# type: ignore[import]
82+
from monarch._src.tensor_engine.tcp import TCPManager # @manual
83+
8184
# type: ignore[16]
8285
HAS_TENSOR_ENGINE = torch.cuda.is_available()
8386
except ImportError:
@@ -132,6 +135,9 @@ def __init__(
132135
self._slice = False
133136
# type: ignore[21]
134137
self._rdma_manager: Optional["_RdmaManager"] = None
138+
# type: ignore[21]
139+
self._tcp_manager: Optional["TCPManager"] = None
140+
135141
self._debug_manager: Optional[DebugManager] = None
136142
self._code_sync_client: Optional[CodeSyncMeshClient] = None
137143
self._logging_mesh_client: Optional[LoggingMeshClient] = None
@@ -182,12 +188,20 @@ async def _init_manager_actors_coro(
182188
else None
183189
)
184190

191+
_tcp_manager = (
192+
# type: ignore[16]
193+
await self._spawn_nonblocking_on(proc_mesh, "tcp_manager", TCPManager)
194+
if HAS_TENSOR_ENGINE
195+
else None
196+
)
197+
185198
_debug_manager = await self._spawn_nonblocking_on(
186199
proc_mesh, _DEBUG_MANAGER_ACTOR_NAME, DebugManager, await _debug_client()
187200
)
188201

189202
self._debug_manager = _debug_manager
190203
self._rdma_manager = _rdma_manager
204+
self._tcp_manager = _tcp_manager
191205

192206
if setup is not None:
193207
# If the user has passed the setup lambda, we need to call

python/monarch/_src/tensor_engine/rdma.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,14 @@ def read_into(
101101
102102
Returns an ActorFuture that can be awaited or called with .get() for blocking operation.
103103
"""
104+
try:
105+
MonarchContext.get()
106+
except LookupError:
107+
raise RuntimeError(
108+
"RDMABuffer.read_into() can only be called from within a Monarch actor context. "
109+
"Make sure you're calling this from within an actor method."
110+
)
111+
104112
_assert_tensor_is_1d_contiguous_uint8(dst)
105113
dst_gpu = None
106114
if dst.device.type != "cpu":
@@ -148,6 +156,14 @@ def write_from(
148156
149157
Returns an ActorFuture that can be awaited or called with .get() for blocking operation.
150158
"""
159+
try:
160+
MonarchContext.get()
161+
except LookupError:
162+
raise RuntimeError(
163+
"RDMABuffer.write_from() can only be called from within a Monarch actor context. "
164+
"Make sure you're calling this from within an actor method."
165+
)
166+
151167
_assert_tensor_is_1d_contiguous_uint8(src)
152168
src_gpu = None
153169
if src.device.type != "cpu":

0 commit comments

Comments
 (0)