Skip to content

Commit a1ffdd6

Browse files
[serve] support running user code on same event loop (#54219)
## Why are these changes needed? Currently we run all user code on a separate event loop (the user code event loop) to avoid the system event loop being blocked by user code. However if the user is confident that they are writing proper async python code, they can choose to bypass that and run user code on the same event loop. This will boost the performance since we avoid cross-thread communication. --------- Signed-off-by: Cindy Zhang <cindyzyx9@gmail.com> Co-authored-by: akyang-anyscale <alexyang@anyscale.com>
1 parent 17c0e65 commit a1ffdd6

File tree

4 files changed

+281
-184
lines changed

4 files changed

+281
-184
lines changed

python/ray/serve/_private/constants.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,3 +455,10 @@ def str_to_list(s: str) -> List[str]:
455455

456456
# Name of deployment request routing stats method implemented by user.
457457
REQUEST_ROUTING_STATS_METHOD = "record_routing_stats"
458+
459+
# By default, we run user code in a separate event loop.
460+
# This flag can be set to 0 to run user code in the same event loop as the
461+
# replica's main event loop.
462+
RAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREAD = (
463+
os.environ.get("RAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREAD", "1") == "1"
464+
)

python/ray/serve/_private/local_testing_mode.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def make_local_deployment_handle(
7070
deployment.init_kwargs,
7171
deployment_id=deployment_id,
7272
run_sync_methods_in_threadpool=RAY_SERVE_RUN_SYNC_IN_THREADPOOL,
73+
run_user_code_in_separate_thread=True,
7374
local_testing_mode=True,
7475
)
7576
try:

python/ray/serve/_private/replica.py

Lines changed: 63 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_PERIOD_S,
5656
RAY_SERVE_RUN_SYNC_IN_THREADPOOL,
5757
RAY_SERVE_RUN_SYNC_IN_THREADPOOL_WARNING,
58+
RAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREAD,
5859
RECONFIGURE_METHOD,
5960
REQUEST_LATENCY_BUCKETS_MS,
6061
REQUEST_ROUTING_STATS_METHOD,
@@ -371,6 +372,7 @@ def __init__(
371372
init_kwargs,
372373
deployment_id=self._deployment_id,
373374
run_sync_methods_in_threadpool=RAY_SERVE_RUN_SYNC_IN_THREADPOOL,
375+
run_user_code_in_separate_thread=RAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREAD,
374376
local_testing_mode=False,
375377
)
376378
self._semaphore = Semaphore(lambda: self.max_ongoing_requests)
@@ -1169,6 +1171,7 @@ def __init__(
11691171
*,
11701172
deployment_id: DeploymentID,
11711173
run_sync_methods_in_threadpool: bool,
1174+
run_user_code_in_separate_thread: bool,
11721175
local_testing_mode: bool,
11731176
):
11741177
if not (inspect.isfunction(deployment_def) or inspect.isclass(deployment_def)):
@@ -1185,33 +1188,44 @@ def __init__(
11851188
self._local_testing_mode = local_testing_mode
11861189
self._destructor_called = False
11871190
self._run_sync_methods_in_threadpool = run_sync_methods_in_threadpool
1191+
self._run_user_code_in_separate_thread = run_user_code_in_separate_thread
11881192
self._warned_about_sync_method_change = False
11891193
self._cached_user_method_info: Dict[str, UserMethodInfo] = {}
11901194

11911195
# Will be populated in `initialize_callable`.
11921196
self._callable = None
11931197

1194-
# All interactions with user code run on this loop to avoid blocking the
1195-
# replica's main event loop.
1196-
self._user_code_event_loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
1198+
if self._run_user_code_in_separate_thread:
1199+
# All interactions with user code run on this loop to avoid blocking the
1200+
# replica's main event loop.
1201+
self._user_code_event_loop: asyncio.AbstractEventLoop = (
1202+
asyncio.new_event_loop()
1203+
)
11971204

1198-
def _run_user_code_event_loop():
1199-
# Required so that calls to get the current running event loop work
1200-
# properly in user code.
1201-
asyncio.set_event_loop(self._user_code_event_loop)
1202-
self._user_code_event_loop.run_forever()
1205+
def _run_user_code_event_loop():
1206+
# Required so that calls to get the current running event loop work
1207+
# properly in user code.
1208+
asyncio.set_event_loop(self._user_code_event_loop)
1209+
self._user_code_event_loop.run_forever()
12031210

1204-
self._user_code_event_loop_thread = threading.Thread(
1205-
daemon=True,
1206-
target=_run_user_code_event_loop,
1207-
)
1208-
self._user_code_event_loop_thread.start()
1211+
self._user_code_event_loop_thread = threading.Thread(
1212+
daemon=True,
1213+
target=_run_user_code_event_loop,
1214+
)
1215+
self._user_code_event_loop_thread.start()
1216+
else:
1217+
self._user_code_event_loop = asyncio.get_running_loop()
1218+
1219+
@property
1220+
def event_loop(self) -> asyncio.AbstractEventLoop:
1221+
return self._user_code_event_loop
12091222

12101223
def _run_user_code(f: Callable) -> Callable:
12111224
"""Decorator to run a coroutine method on the user code event loop.
12121225
12131226
The method will be modified to be a sync function that returns a
1214-
`asyncio.Future`.
1227+
`asyncio.Future` if user code is running in a separate event loop.
1228+
Otherwise, it will return the coroutine directly.
12151229
"""
12161230
assert inspect.iscoroutinefunction(
12171231
f
@@ -1220,11 +1234,14 @@ def _run_user_code(f: Callable) -> Callable:
12201234
@functools.wraps(f)
12211235
def wrapper(self, *args, **kwargs) -> Any:
12221236
coro = f(self, *args, **kwargs)
1223-
fut = asyncio.run_coroutine_threadsafe(coro, self._user_code_event_loop)
1224-
if self._local_testing_mode:
1225-
return fut
1237+
if self._run_user_code_in_separate_thread:
1238+
fut = asyncio.run_coroutine_threadsafe(coro, self._user_code_event_loop)
1239+
if self._local_testing_mode:
1240+
return fut
12261241

1227-
return asyncio.wrap_future(fut)
1242+
return asyncio.wrap_future(fut)
1243+
else:
1244+
return coro
12281245

12291246
return wrapper
12301247

@@ -1576,18 +1593,22 @@ async def call_http_entrypoint(
15761593
# `asyncio.Event`s are not thread safe, so `call_soon_threadsafe` must be
15771594
# used to interact with the result queue from the user callable thread.
15781595
system_event_loop = asyncio.get_running_loop()
1596+
user_method_info = self.get_user_method_info(request_metadata.call_method)
15791597

15801598
async def enq(item: Any):
15811599
system_event_loop.call_soon_threadsafe(result_queue.put_nowait, item)
15821600

1583-
user_method_info = self.get_user_method_info(request_metadata.call_method)
1584-
call_user_method_future = self._call_http_entrypoint(
1585-
user_method_info, scope, receive, enq
1586-
)
1601+
if self._run_user_code_in_separate_thread:
1602+
call_future = self._call_http_entrypoint(
1603+
user_method_info, scope, receive, enq
1604+
)
1605+
else:
1606+
call_future = asyncio.create_task(
1607+
self._call_http_entrypoint(user_method_info, scope, receive, enq)
1608+
)
1609+
15871610
first_message_peeked = False
1588-
async for messages in result_queue.fetch_messages_from_queue(
1589-
call_user_method_future
1590-
):
1611+
async for messages in result_queue.fetch_messages_from_queue(call_future):
15911612
# HTTP (ASGI) messages are only consumed by the proxy so batch them
15921613
# and use vanilla pickle (we know it's safe because these messages
15931614
# only contain primitive Python types).
@@ -1702,16 +1723,24 @@ async def call_user_generator(
17021723
def _enqueue_thread_safe(item: Any):
17031724
system_event_loop.call_soon_threadsafe(result_queue.put_nowait, item)
17041725

1705-
call_user_method_future = self._call_user_generator(
1706-
request_metadata,
1707-
request_args,
1708-
request_kwargs,
1709-
generator_result_callback=_enqueue_thread_safe,
1710-
)
1726+
if self._run_user_code_in_separate_thread:
1727+
call_future = self._call_user_generator(
1728+
request_metadata,
1729+
request_args,
1730+
request_kwargs,
1731+
generator_result_callback=_enqueue_thread_safe,
1732+
)
1733+
else:
1734+
call_future = asyncio.create_task(
1735+
self._call_user_generator(
1736+
request_metadata,
1737+
request_args,
1738+
request_kwargs,
1739+
generator_result_callback=_enqueue_thread_safe,
1740+
)
1741+
)
17111742

1712-
async for messages in result_queue.fetch_messages_from_queue(
1713-
call_user_method_future
1714-
):
1743+
async for messages in result_queue.fetch_messages_from_queue(call_future):
17151744
for msg in messages:
17161745
yield msg
17171746

0 commit comments

Comments
 (0)