[Optimization] Reduce preemption occurrence when blocks not enough (#5696)

* [Optimize] Reduce preemption occurrence when blocks not enough for decoding

* fix

* fix

* fix spell

* optimize performance

* fix
This commit is contained in:
chenjian
2026-01-07 20:01:16 +08:00
committed by GitHub
parent 78adf83549
commit c883a2d3ec
2 changed files with 57 additions and 2 deletions
+47 -2
View File
@@ -200,6 +200,19 @@ class ResourceManagerV1(ResourceManager):
self.bos_client = None
self.async_preprocess_pool = ThreadPoolExecutor(max_workers=4)
self.init_reserve_output_block_num = (
envs.FD_RESERVE_OUTPUT_BLOCK_NUM_FOR_DECODE_WHEN_SCHEDULE_NEW_PREFILL
) # int
self.decay_output_block_num = (
envs.FD_RESERVE_DECAY_OUTPUT_BLOCK_NUM_FOR_DECODE_WHEN_SCHEDULE_NEW_PREFILL
) # float
self.min_reserve_output_block_num = (
envs.FD_RESERVE_MIN_OUTPUT_BLOCK_NUM_FOR_DECODE_WHEN_SCHEDULE_NEW_PREFILL
) # int
self.current_reserve_output_block_num = self.init_reserve_output_block_num
self.current_reserve_output_block_num_float = self.init_reserve_output_block_num
self.can_relax_prefill_strategy = True
def allocated_slots(self, request: Request):
return len(request.block_tables) * self.config.cache_config.block_size
@@ -295,8 +308,24 @@ class ResourceManagerV1(ResourceManager):
# The request can be scheduled.
can_schedule = True
break
self.current_reserve_output_block_num = self.init_reserve_output_block_num
self.current_reserve_output_block_num_float = self.init_reserve_output_block_num
self.can_relax_prefill_strategy = False
return can_schedule
def _get_can_schedule_prefill_threshold_block(self, request, num_chunk_new_block):
if self.can_relax_prefill_strategy:
can_schedule_block_num_threshold = num_chunk_new_block
else:
can_schedule_block_num_threshold = (
request.need_prefill_tokens + self.config.cache_config.block_size - 1
) // self.config.cache_config.block_size + len(self.running) * self.current_reserve_output_block_num
if self.config.speculative_config.method is not None:
can_schedule_block_num_threshold = min(
can_schedule_block_num_threshold + 1, self.config.cache_config.max_block_num_per_seq
)
return can_schedule_block_num_threshold
def _update_mm_hashes(self, request):
if request.multimodal_inputs is None:
return
@@ -756,7 +785,11 @@ class ResourceManagerV1(ResourceManager):
# Allocate blocks for the tokens that does not hit cache
num_new_tokens = self._get_num_new_tokens(request, token_budget)
num_new_block = self.get_new_block_nums(request, num_new_tokens)
if self.cache_manager.can_allocate_gpu_blocks(num_new_block):
can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block(
request, num_new_block
)
# Allocate blocks to prefill
if self.cache_manager.can_allocate_gpu_blocks(can_schedule_block_num_threshold):
if not request.get("skip_allocate", False):
extra_gpu_block_ids = self.cache_manager.allocate_gpu_blocks(num_new_block)
request.block_tables.extend(extra_gpu_block_ids)
@@ -802,7 +835,11 @@ class ResourceManagerV1(ResourceManager):
# Allocate blocks for the tokens that does not hit cache
num_new_tokens = self._get_num_new_tokens(request, token_budget)
num_new_block = self.get_new_block_nums(request, num_new_tokens)
if self.cache_manager.can_allocate_gpu_blocks(num_new_block):
can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block(
request, num_new_block
)
# Allocate blocks to prefill
if self.cache_manager.can_allocate_gpu_blocks(can_schedule_block_num_threshold):
if not request.get("skip_allocate", False):
extra_gpu_block_ids = self.cache_manager.allocate_gpu_blocks(num_new_block)
request.block_tables.extend(extra_gpu_block_ids)
@@ -829,6 +866,14 @@ class ResourceManagerV1(ResourceManager):
if scheduled_reqs:
llm_logger.debug(f"schedued_reqs: {scheduled_reqs}")
self.current_reserve_output_block_num_float -= self.decay_output_block_num
self.current_reserve_output_block_num = max(
int(self.current_reserve_output_block_num_float),
self.min_reserve_output_block_num,
0,
)
if self.current_reserve_output_block_num == 0:
self.can_relax_prefill_strategy = True
self.update_metrics()