[Others] api server exits when worker process is dead (#3271)
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled

* [fix] fix terminal hangs when worker process is dead

* [chore] change sleep time of monitor

* [chore] remove redundant comments
This commit is contained in:
李泳桦
2025-10-27 10:23:48 +08:00
committed by GitHub
parent ebae69b1f8
commit cdc40cdc2a
2 changed files with 26 additions and 0 deletions
+3
View File
@@ -376,6 +376,7 @@ class LLMEngine:
exit sub services
"""
self.running = False
llm_logger.info("Engine shut down, exiting sub services...")
if hasattr(self, "cache_manager_processes"):
self.engine.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear()
@@ -394,6 +395,7 @@ class LLMEngine:
if hasattr(self, "get_profile_block_num_signal"):
self.get_profile_block_num_signal.clear()
if hasattr(self, "worker_proc") and self.worker_proc is not None:
try:
pgid = os.getpgid(self.worker_proc.pid)
@@ -403,6 +405,7 @@ class LLMEngine:
if hasattr(self, "zmq_server") and self.zmq_server is not None:
self.zmq_server.close()
if hasattr(self, "dp_processed"):
for p in self.dp_processed:
console_logger.info(f"Waiting for worker {p.pid} to exit")
@@ -17,6 +17,7 @@
import asyncio
import json
import os
import signal
import threading
import time
import traceback
@@ -649,6 +650,27 @@ def launch_controller_server():
time.sleep(1)
def launch_worker_monitor():
"""
Detect whether worker process is alive. If not, stop the API serverby triggering llm_engine.
"""
def _monitor():
global llm_engine
while True:
if hasattr(llm_engine, "worker_proc") and llm_engine.worker_proc.poll() is not None:
console_logger.error(
f"Worker process has died in the background (code={llm_engine.worker_proc.returncode}). API server is forced to stop."
)
os.kill(os.getpid(), signal.SIGINT)
break
time.sleep(5)
worker_monitor_thread = threading.Thread(target=_monitor, daemon=True)
worker_monitor_thread.start()
time.sleep(1)
def main():
"""main函数"""
if args.local_data_parallel_id == 0:
@@ -662,6 +684,7 @@ def main():
console_logger.info(f"Launching chat completion service at http://{args.host}:{args.port}/v1/chat/completions")
console_logger.info(f"Launching completion service at http://{args.host}:{args.port}/v1/completions")
launch_worker_monitor()
launch_controller_server()
launch_metrics_server()
launch_api_server()