Skip to content

Commit 23bc5d0

Browse files
authored
fix: Clean shutdown for Sink Async server using asyncio.Event (#323)
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent 50fdebb commit 23bc5d0

File tree

6 files changed

+77
-21
lines changed

6 files changed

+77
-21
lines changed

packages/pynumaflow/poetry.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/pynumaflow/pynumaflow/_constants.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@
6161
# If NUM_THREADS_DEFAULT env is not set default to 4
6262
NUM_THREADS_DEFAULT = int(os.getenv("MAX_THREADS", "4"))
6363

64+
# Grace period in seconds for the GRPC server to shutdown
65+
NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS = int(
66+
os.getenv("NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS", "60")
67+
)
68+
6469
_LOGGER = setup_logging(__name__)
6570
if os.getenv("PYTHONDEBUG"):
6671
_LOGGER.setLevel(logging.DEBUG)

packages/pynumaflow/pynumaflow/sinker/async_server.py

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
import asyncio
2+
import contextlib
13
import os
4+
import sys
25

36
import aiorun
47
import grpc
58

9+
from pynumaflow.info.server import write as info_server_write
610
from pynumaflow.info.types import ContainerType, ServerInfo, MINIMUM_NUMAFLOW_VERSION
711
from pynumaflow.sinker.servicer.async_servicer import AsyncSinkServicer
812
from pynumaflow.proto.sinker import sink_pb2_grpc
@@ -22,9 +26,10 @@
2226
ON_SUCCESS_SINK_SOCK_PATH,
2327
ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH,
2428
MAX_NUM_THREADS,
29+
NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS,
2530
)
2631

27-
from pynumaflow.shared.server import NumaflowServer, start_async_server
32+
from pynumaflow.shared.server import NumaflowServer
2833
from pynumaflow.sinker._dtypes import SinkAsyncCallable
2934

3035

@@ -118,13 +123,17 @@ def __init__(
118123
]
119124

120125
self.servicer = AsyncSinkServicer(sinker_instance)
126+
self._error: BaseException | None = None
121127

122128
def start(self):
123129
"""
124130
Starter function for the Async server class, need a separate caller
125131
so that all the async coroutines can be started from a single context
126132
"""
127133
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)
134+
if self._error:
135+
_LOGGER.critical("Server exiting due to UDF error: %s", self._error)
136+
sys.exit(1)
128137

129138
async def aexec(self):
130139
"""
@@ -133,17 +142,52 @@ async def aexec(self):
133142
# As the server is async, we need to create a new server instance in the
134143
# same thread as the event loop so that all the async calls are made in the
135144
# same context
136-
# Create a new server instance, add the servicer to it and start the server
137145
server = grpc.aio.server(options=self._server_options)
138146
server.add_insecure_port(self.sock_path)
147+
148+
# The asyncio.Event must be created here (inside aexec) rather than in __init__,
149+
# because it must be bound to the running event loop that aiorun creates.
150+
# At __init__ time no event loop exists yet.
151+
shutdown_event = asyncio.Event()
152+
self.servicer.set_shutdown_event(shutdown_event)
153+
139154
sink_pb2_grpc.add_SinkServicer_to_server(self.servicer, server)
155+
140156
serv_info = ServerInfo.get_default_server_info()
141157
serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[ContainerType.Sinker]
142-
await start_async_server(
143-
server_async=server,
144-
sock_path=self.sock_path,
145-
max_threads=self.max_threads,
146-
cleanup_coroutines=list(),
147-
server_info_file=self.server_info_file,
148-
server_info=serv_info,
158+
159+
await server.start()
160+
info_server_write(server_info=serv_info, info_file=self.server_info_file)
161+
162+
_LOGGER.info(
163+
"Async GRPC Server listening on: %s with max threads: %s",
164+
self.sock_path,
165+
self.max_threads,
149166
)
167+
168+
async def _watch_for_shutdown():
169+
"""Wait for the shutdown event and stop the server with a grace period."""
170+
await shutdown_event.wait()
171+
_LOGGER.info("Shutdown signal received, stopping server gracefully...")
172+
# Stop accepting new requests and wait for a maximum of
173+
# NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS seconds for in-flight requests to complete
174+
await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS)
175+
176+
shutdown_task = asyncio.create_task(_watch_for_shutdown())
177+
await server.wait_for_termination()
178+
179+
# Propagate error so start() can exit with a non-zero code
180+
self._error = self.servicer._error
181+
182+
shutdown_task.cancel()
183+
with contextlib.suppress(asyncio.CancelledError):
184+
await shutdown_task
185+
186+
_LOGGER.info("Stopping event loop...")
187+
# We use aiorun to manage the event loop. The aiorun.run() runs
188+
# forever until loop.stop() is called. If we don't stop the
189+
# event loop explicitly here, the python process will not exit.
190+
# It reamins stuck for 5 minutes until liveness and readiness probe
191+
# fails enough times and k8s sends a SIGTERM
192+
asyncio.get_event_loop().stop()
193+
_LOGGER.info("Event loop stopped")

packages/pynumaflow/pynumaflow/sinker/servicer/async_servicer.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from google.protobuf import empty_pb2 as _empty_pb2
55
from pynumaflow.shared.asynciter import NonBlockingIterator
66

7-
from pynumaflow.shared.server import handle_async_error
7+
from pynumaflow.shared.server import update_context_err
88
from pynumaflow.sinker._dtypes import Datum, SinkAsyncCallable
99
from pynumaflow.proto.sinker import sink_pb2_grpc, sink_pb2
1010
from pynumaflow.sinker.servicer.utils import (
@@ -30,6 +30,12 @@ def __init__(
3030
self.background_tasks = set()
3131
self.__sink_handler: SinkAsyncCallable = handler
3232
self.cleanup_coroutines = []
33+
self._shutdown_event: asyncio.Event | None = None
34+
self._error: BaseException | None = None
35+
36+
def set_shutdown_event(self, event: asyncio.Event):
37+
"""Wire up the shutdown event created by the server's aexec() coroutine."""
38+
self._shutdown_event = event
3339

3440
async def SinkFn(
3541
self,
@@ -82,10 +88,13 @@ async def SinkFn(
8288
datum = datum_from_sink_req(d)
8389
await req_queue.put(datum)
8490
except BaseException as err:
85-
# if there is an exception, we will mark all the responses as a failure
86-
err_msg = f"UDSinkError: {repr(err)}"
91+
err_msg = f"UDSinkError, {ERR_UDF_EXCEPTION_STRING}: {repr(err)}"
8792
_LOGGER.critical(err_msg, exc_info=True)
88-
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
93+
update_context_err(context, err, err_msg)
94+
# Store the error and signal the server to shut down gracefully
95+
self._error = err
96+
if self._shutdown_event is not None:
97+
self._shutdown_event.set()
8998
return
9099

91100
async def __invoke_sink(

packages/pynumaflow/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ grpcio-tools = "^1.48.1"
2828
google-api-core = "^2.11.0"
2929
grpcio-status = "^1.48.1"
3030
protobuf = ">=6.31.1,<7.0"
31-
aiorun = "^2023.7"
31+
aiorun = "^2025.1.1"
3232
uvloop = ">=0.21.0"
3333
psutil = "^6.0.0"
3434

packages/pynumaflow/tests/sink/test_async_sink.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import threading
44
import unittest
55
from collections.abc import AsyncIterable
6-
from unittest.mock import patch
76

87
import grpc
98
from google.protobuf import empty_pb2 as _empty_pb2
@@ -32,7 +31,7 @@
3231
mock_fallback_message,
3332
mockenv,
3433
)
35-
from tests.testing_utils import get_time_args, mock_terminate_on_stop
34+
from tests.testing_utils import get_time_args
3635

3736
LOGGER = setup_logging(__name__)
3837

@@ -128,7 +127,6 @@ async def start_server():
128127

129128

130129
# We are mocking the terminate function from the psutil to not exit the program during testing
131-
@patch("psutil.Process.kill", mock_terminate_on_stop)
132130
class TestAsyncSink(unittest.TestCase):
133131
@classmethod
134132
def setUpClass(cls) -> None:

0 commit comments

Comments
 (0)