[BugFix] resource_manager_v1 lock PD (#5616)

* bugfix resource_manager_v1 lock PD

* with lock add_prefilled_request

---------

Co-authored-by: YuBaoku <49938469+EmmonsCurse@users.noreply.github.com>
Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
This commit is contained in:
Daci
2026-01-08 10:02:54 +08:00
committed by GitHub
parent 5088d4acdb
commit d8c6ba61f3
+24 -22
View File
@@ -1163,36 +1163,38 @@ class ResourceManagerV1(ResourceManager):
Check whether there are enough slot and gpu resource for the prefilled request,
of which the cache is saved in cpu buffer.
"""
assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method"
assert request_id in self.preallocated_reqs, "request_id must be in preallocate"
need_blocks_num = len(self.preallocated_reqs[request_id].disaggregate_info["block_tables"])
return self.available_batch() > 0 and self.cache_manager.can_allocate_gpu_blocks(need_blocks_num)
with self.lock:
assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method"
assert request_id in self.preallocated_reqs, "request_id must be in preallocate"
need_blocks_num = len(self.preallocated_reqs[request_id].disaggregate_info["block_tables"])
return self.available_batch() > 0 and self.cache_manager.can_allocate_gpu_blocks(need_blocks_num)
def add_prefilled_request(self, request_output: RequestOutput):
"""
In P/D aggregated deployment, D should continue to decode after receiving first token and cache from P.
NOTE: GPU resources should be checked in advance to ensure they are sufficient for the prefilled request.
"""
assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method"
if request_output.request_id not in self.requests:
llm_logger.error(f"Request {request_output.request_id} not found in requests")
return
request = self.requests[request_output.request_id]
with self.lock:
assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method"
if request_output.request_id not in self.requests:
llm_logger.error(f"Request {request_output.request_id} not found in requests")
return
request = self.requests[request_output.request_id]
# update request and insert to running
request.output_token_ids.append(request_output.outputs.token_ids[0])
request.num_cached_tokens = request_output.num_cached_tokens
if (
self.config.speculative_config.method in ["mtp"]
and self.config.scheduler_config.splitwise_role == "decode"
):
request.draft_token_ids = copy.deepcopy(request_output.outputs.draft_token_ids)
request.need_prefill_tokens = len(request.prompt_token_ids) + 1
# update request and insert to running
request.output_token_ids.append(request_output.outputs.token_ids[0])
request.num_cached_tokens = request_output.num_cached_tokens
if (
self.config.speculative_config.method in ["mtp"]
and self.config.scheduler_config.splitwise_role == "decode"
):
request.draft_token_ids = copy.deepcopy(request_output.outputs.draft_token_ids)
request.need_prefill_tokens = len(request.prompt_token_ids) + 1
request_output.metrics.decode_recv_req_time = request.metrics.decode_recv_req_time
request_output.metrics.decode_preallocate_req_time = request.metrics.decode_preallocate_req_time
request.metrics = request_output.metrics
self.running.append(request)
request_output.metrics.decode_recv_req_time = request.metrics.decode_recv_req_time
request_output.metrics.decode_preallocate_req_time = request.metrics.decode_preallocate_req_time
request.metrics = request_output.metrics
self.running.append(request)
def _free_blocks(self, request: Request):
if self.config.cache_config.enable_prefix_caching: