diff --git a/xllm/core/distributed_runtime/worker_server.cpp b/xllm/core/distributed_runtime/worker_server.cpp index 22d10f90..8587c322 100644 --- a/xllm/core/distributed_runtime/worker_server.cpp +++ b/xllm/core/distributed_runtime/worker_server.cpp @@ -24,6 +24,7 @@ limitations under the License. #include #include +#include #include #include #include @@ -50,6 +51,9 @@ limitations under the License. extern char** environ; namespace xllm { +namespace { +void handle_signal(int signum) { _exit(0); } +} // namespace void WorkerServer::create_server( const runtime::Options& options, @@ -217,6 +221,10 @@ WorkerServer::WorkerServer(int local_worker_idx, local_worker_idx, master_node_addr, done, parallel_args, d, options); return; } else { + // worker process should handle SIGTREM and SIGINT signals. + signal(SIGINT, handle_signal); + signal(SIGTERM, handle_signal); + std::unique_ptr input_shm_manager = nullptr; std::unique_ptr output_shm_manager = nullptr; prepare_shm( diff --git a/xllm/core/runtime/master.cpp b/xllm/core/runtime/master.cpp index 0ade6f56..df75c81c 100644 --- a/xllm/core/runtime/master.cpp +++ b/xllm/core/runtime/master.cpp @@ -45,11 +45,20 @@ limitations under the License. #include #endif +namespace brpc { +DECLARE_bool(graceful_quit_on_sigterm); +DECLARE_bool(graceful_quit_on_sighup); +} // namespace brpc + namespace xllm { Master::Master(const Options& options, EngineType type) : options_(options) { LOG(INFO) << "Master init options: " << options.to_string(); + // Allow brpc receive SIGTREM and SIGINT signal. + brpc::FLAGS_graceful_quit_on_sigterm = true; + brpc::FLAGS_graceful_quit_on_sighup = true; + #if defined(USE_NPU) if (options.rank_tablefile().has_value()) { FLAGS_rank_tablefile = options.rank_tablefile().value(); diff --git a/xllm/core/runtime/vlm_engine.cpp b/xllm/core/runtime/vlm_engine.cpp old mode 100755 new mode 100644 index 3a75a180..74706cd3 --- a/xllm/core/runtime/vlm_engine.cpp +++ b/xllm/core/runtime/vlm_engine.cpp @@ -21,6 +21,7 @@ limitations under the License. #include #include +#include #include #include "common/metrics.h" @@ -33,7 +34,16 @@ limitations under the License. namespace xllm { +namespace { +void handle_signal(int signum) { _exit(0); } +} // namespace + VLMEngine::VLMEngine(const runtime::Options& options) : options_(options) { + // worker process should handle SIGTREM and SIGINT signals. + // TODO: delete these code when multi-process impl is supported. + signal(SIGINT, handle_signal); + signal(SIGTERM, handle_signal); + const auto& devices = options_.devices(); CHECK_GT(devices.size(), 0) << "At least one device is required"; diff --git a/xllm/pybind/embedding.py b/xllm/pybind/embedding.py index 77de11fb..85bfba5a 100644 --- a/xllm/pybind/embedding.py +++ b/xllm/pybind/embedding.py @@ -34,6 +34,9 @@ def __init__( is_local: bool = True, **kwargs, ) -> None: + signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0)) + signal.signal(signal.SIGINT, lambda s, f: sys.exit(0)) + if not os.path.exists(model): raise ValueError(f"model {model} not exists") @@ -79,7 +82,8 @@ def __init__( def finish(self): try: #os.kill(os.getpid(), signal.SIGTERM) - os.kill(os.getpid(), signal.SIGKILL) + #os.kill(os.getpid(), signal.SIGKILL) + util.terminate_process(os.getpid()) except Exception as e: pass diff --git a/xllm/pybind/llm.py b/xllm/pybind/llm.py index d20bc83e..efe874dd 100644 --- a/xllm/pybind/llm.py +++ b/xllm/pybind/llm.py @@ -47,6 +47,8 @@ def __init__( is_local: bool = True, **kwargs, ) -> None: + signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0)) + signal.signal(signal.SIGINT, lambda s, f: sys.exit(0)) if not os.path.exists(model): raise ValueError(f"model {model} not exists") @@ -102,7 +104,8 @@ def __init__( def finish(self): try: #os.kill(os.getpid(), signal.SIGTERM) - os.kill(os.getpid(), signal.SIGKILL) + #os.kill(os.getpid(), signal.SIGKILL) + util.terminate_process(os.getpid()) except Exception as e: pass diff --git a/xllm/pybind/util.py b/xllm/pybind/util.py index 36c29f2d..3f857c95 100644 --- a/xllm/pybind/util.py +++ b/xllm/pybind/util.py @@ -1,4 +1,30 @@ +import os +import psutil +import signal import socket +import sys + +def terminate_process(pid, timeout=30): + try: + parent = psutil.Process(pid) + except psutil.NoSuchProcess: + return + + children = parent.children(recursive=True) + procs = children + [parent] + + for p in procs: + try: + p.terminate() + except psutil.NoSuchProcess: + pass + + gone, alive = psutil.wait_procs(procs, timeout=timeout) + for p in alive: + try: + p.kill() + except psutil.NoSuchProcess: + pass def get_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: diff --git a/xllm/pybind/vlm.py b/xllm/pybind/vlm.py index aa5f0628..a32ed6fc 100644 --- a/xllm/pybind/vlm.py +++ b/xllm/pybind/vlm.py @@ -45,6 +45,8 @@ def __init__( is_local: bool = True, **kwargs, ) -> None: + signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0)) + signal.signal(signal.SIGINT, lambda s, f: sys.exit(0)) if not os.path.exists(model): raise ValueError(f"model {model} not exists") @@ -97,7 +99,8 @@ def __init__( def finish(self): try: #os.kill(os.getpid(), signal.SIGTERM) - os.kill(os.getpid(), signal.SIGKILL) + #os.kill(os.getpid(), signal.SIGKILL) + util.terminate_process(os.getpid()) except Exception as e: pass