File tree Expand file tree Collapse file tree 1 file changed +17
-1
lines changed
python/monarch/_src/actor Expand file tree Collapse file tree 1 file changed +17
-1
lines changed Original file line number Diff line number Diff line change 5151 description = "Latency of endpoint call_one operations in microseconds" ,
5252)
5353
54+ # Histogram for measuring endpoint stream latency per yield
55+ endpoint_stream_latency_histogram : Histogram = METER .create_histogram (
56+ name = "endpoint_stream_latency.us" ,
57+ description = "Latency of endpoint stream operations per yield in microseconds" ,
58+ )
59+
5460T = TypeVar ("T" )
5561
5662
@@ -247,13 +253,23 @@ def stream(
247253 they become available. Returns an async generator of response values.
248254 """
249255 p , r_port = self ._port ()
256+ start_time : int = time .monotonic_ns ()
250257 # pyre-ignore[6]: ParamSpec kwargs is compatible with Dict[str, Any]
251258 extent : Extent = self ._send (args , kwargs , port = p )
252259 r : "PortReceiver[R]" = r_port
253260
261+ method_name : str = self ._get_method_name ()
262+
254263 def _stream () -> Generator [Future [R ], None , None ]:
255264 for _ in range (extent .nelements ):
256- yield r .recv ()
265+ measured_coro = _measure_latency (
266+ r ._recv (),
267+ start_time ,
268+ endpoint_stream_latency_histogram ,
269+ method_name ,
270+ extent .nelements ,
271+ )
272+ yield Future (coro = measured_coro )
257273
258274 return _stream ()
259275
You can’t perform that action at this time.
0 commit comments