Skip to content

Commit dde7142

Browse files
vidhyavmeta-codesync[bot]
authored andcommitted
Added endpoint latency for stream method. (#1753)
Summary: Pull Request resolved: #1753 As stated here. Reviewed By: pzhan9 Differential Revision: D86232946
1 parent fa55131 commit dde7142

File tree

1 file changed

+17
-1
lines changed

1 file changed

+17
-1
lines changed

python/monarch/_src/actor/endpoint.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@
5151
description="Latency of endpoint call_one operations in microseconds",
5252
)
5353

54+
# Histogram for measuring endpoint stream latency per yield
55+
endpoint_stream_latency_histogram: Histogram = METER.create_histogram(
56+
name="endpoint_stream_latency.us",
57+
description="Latency of endpoint stream operations per yield in microseconds",
58+
)
59+
5460
T = TypeVar("T")
5561

5662

@@ -247,13 +253,23 @@ def stream(
247253
they become available. Returns an async generator of response values.
248254
"""
249255
p, r_port = self._port()
256+
start_time: int = time.monotonic_ns()
250257
# pyre-ignore[6]: ParamSpec kwargs is compatible with Dict[str, Any]
251258
extent: Extent = self._send(args, kwargs, port=p)
252259
r: "PortReceiver[R]" = r_port
253260

261+
method_name: str = self._get_method_name()
262+
254263
def _stream() -> Generator[Future[R], None, None]:
255264
for _ in range(extent.nelements):
256-
yield r.recv()
265+
measured_coro = _measure_latency(
266+
r._recv(),
267+
start_time,
268+
endpoint_stream_latency_histogram,
269+
method_name,
270+
extent.nelements,
271+
)
272+
yield Future(coro=measured_coro)
257273

258274
return _stream()
259275

0 commit comments

Comments
 (0)