From 90eebc4317561a2813d775f8eb6bbf648c95c2d7 Mon Sep 17 00:00:00 2001 From: Toshiki Kataoka Date: Tue, 18 Jun 2024 17:11:09 +0900 Subject: [PATCH 1/5] fix(chunked prefill): don't schedule prefill if freeing kv cache --- vllm/core/scheduler.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 48c34625c08a..0ca71c54e02b 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -861,16 +861,17 @@ def _schedule_chunked_prefill(self): fcfs_policy, enable_chunking=True) - # Schedule swapped out requests. - # If preemption happens, it means we don't have space for swap-in. + # If preemption happens, it means we don't have space for other requests. if len(running_scheduled.preempted) + len( running_scheduled.swapped_out) == 0: + # Schedule swapped out requests. remaining_swapped, swapped_in = self._schedule_swapped( self.swapped, budget, curr_loras, fcfs_policy) - # Schedule new prefills. - remaining_waiting, prefills = self._schedule_prefills( - self.waiting, budget, curr_loras, enable_chunking=True) + # Schedule new prefills. + if len(remaining_swapped) == 0: + remaining_waiting, prefills = self._schedule_prefills( + self.waiting, budget, curr_loras, enable_chunking=True) assert (budget.num_batched_tokens <= self.scheduler_config.max_num_batched_tokens) From c06e940a852140bf2f8d28a65eb9aa56a01c7aec Mon Sep 17 00:00:00 2001 From: Toshiki Kataoka Date: Wed, 19 Jun 2024 00:37:42 +0900 Subject: [PATCH 2/5] chore: format --- vllm/core/scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 0ca71c54e02b..4ca96ed664e3 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -861,7 +861,8 @@ def _schedule_chunked_prefill(self): fcfs_policy, enable_chunking=True) - # If preemption happens, it means we don't have space for other requests. + # If preemption happens, it means we don't have space for other + # requests. if len(running_scheduled.preempted) + len( running_scheduled.swapped_out) == 0: # Schedule swapped out requests. From 231ec60db4bfed65d2c92ee2cadacecccb619ae0 Mon Sep 17 00:00:00 2001 From: Toshiki Kataoka Date: Sat, 6 Jul 2024 21:41:16 +0900 Subject: [PATCH 3/5] test: rewrite test_running_prefill_prioritized_over_swap --- tests/core/test_chunked_prefill_scheduler.py | 61 ++++++++++++-------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/tests/core/test_chunked_prefill_scheduler.py b/tests/core/test_chunked_prefill_scheduler.py index a3b76327e0a5..a25c5bd37dcc 100644 --- a/tests/core/test_chunked_prefill_scheduler.py +++ b/tests/core/test_chunked_prefill_scheduler.py @@ -383,7 +383,12 @@ def test_running_prefill_prioritized_over_swap(): cache_config.num_gpu_blocks = 8 scheduler = Scheduler(scheduler_config, cache_config, None) - _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2) + # Artificial priority is needed for testing with the fcfs policy in + # _schedule_running. This seq will be prioritized more among running + # seqs but less among waiting seqs. + _, seq_group2 = create_dummy_prompt("2", prompt_length=20 + 30 + 30 + 30) + + _, seq_group = create_dummy_prompt("1", prompt_length=30 + 10, best_of=2) scheduler.add_seq_group(seq_group) _, out = schedule_and_update_computed_tokens(scheduler) # The request is chunked. @@ -393,7 +398,27 @@ def test_running_prefill_prioritized_over_swap(): assert seq_group.is_prefill() assert out.num_batched_tokens == max_num_batched_tokens - # The request should be swapped out. + # Add 1 more task. + scheduler.add_seq_group(seq_group2) + _, out = schedule_and_update_computed_tokens(scheduler) + # task 1 finished the last 10 tokens of prefill. + # task 2 started the first 20 tokens of prefill. + assert len(out.scheduled_seq_groups) == 2 + assert out.num_prefill_groups == 2 + assert not seq_group.is_prefill() + assert seq_group2.is_prefill() + assert out.num_batched_tokens == max_num_batched_tokens + + # seq_group starts decoding with best_of=2 + # see vllm/engine/output_processor/single_step.py + seq = seq_group.seqs_dict[1] + new_seq_id = 3 + new_seq = seq.fork(new_seq_id) + seq_group.add(new_seq) + scheduler.fork_seq(seq, new_seq) + append_new_token(seq_group, 1) + + # The first request should be swapped out. scheduler.block_manager.can_append_slots = MagicMock() def cannot_append_second_group(seq_group, num_lookahead_slots): @@ -402,32 +427,29 @@ def cannot_append_second_group(seq_group, num_lookahead_slots): scheduler.block_manager.can_append_slots.side_effect = ( cannot_append_second_group) - # The running prefill is now swapped. _, out = schedule_and_update_computed_tokens(scheduler) - assert len(out.scheduled_seq_groups) == 0 - assert out.num_batched_tokens == 0 - assert out.blocks_to_swap_out != [] + assert len(out.scheduled_seq_groups) == 1 + assert out.num_batched_tokens == 30 assert out.blocks_to_swap_in == [] + assert out.blocks_to_swap_out != [] + assert out.scheduled_seq_groups[0].seq_group == seq_group2 - # Add 1 more task. Swap is not possible, so prefill is running. + # Swap is not possible, so prefill is running. scheduler.block_manager.can_swap_in = MagicMock() scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER - _, seq_group2 = create_dummy_prompt("2", prompt_length=60) - scheduler.add_seq_group(seq_group2) _, out = schedule_and_update_computed_tokens(scheduler) assert len(out.scheduled_seq_groups) == 1 - # 3 decodes. It is swapped in. assert out.num_batched_tokens == 30 assert out.blocks_to_swap_in == [] assert out.blocks_to_swap_out == [] + assert seq_group2.is_prefill() assert out.scheduled_seq_groups[0].seq_group == seq_group2 # Now although swap is possible, running prefill is prioritized. scheduler.block_manager.can_swap_in.return_value = AllocStatus.OK _, out = schedule_and_update_computed_tokens(scheduler) assert len(out.scheduled_seq_groups) == 1 - # 3 decodes. It is swapped in. assert out.num_batched_tokens == 30 assert out.blocks_to_swap_in == [] assert out.blocks_to_swap_out == [] @@ -437,23 +459,16 @@ def cannot_append_second_group(seq_group, num_lookahead_slots): # Decoding is prioritized. _, out = schedule_and_update_computed_tokens(scheduler) - assert len(out.scheduled_seq_groups) == 1 + assert len(out.scheduled_seq_groups) == 2 # 3 decodes. It is swapped in. - assert out.num_batched_tokens == 1 - assert out.blocks_to_swap_in == [] + assert out.num_batched_tokens == 3 + assert out.blocks_to_swap_in != [] assert out.blocks_to_swap_out == [] + assert not seq_group.is_prefill() assert not seq_group2.is_prefill() - assert out.scheduled_seq_groups[0].seq_group == seq_group2 + append_new_token(seq_group, 1) append_new_token(seq_group2, 1) - # Since we abort the sequence group, we can finally swap. - scheduler.abort_seq_group(seq_group2.request_id) - _, out = schedule_and_update_computed_tokens(scheduler) - assert len(out.scheduled_seq_groups) == 1 - assert out.num_batched_tokens == 30 - assert out.blocks_to_swap_in != [] - assert out.blocks_to_swap_out == [] - def test_chunked_prefill_preempt(): """Verify preempt works with chunked prefill requests""" From 25984378a6dcdb0e50bbee97dacc861ec2924072 Mon Sep 17 00:00:00 2001 From: Toshiki Kataoka Date: Wed, 10 Jul 2024 14:57:01 +0900 Subject: [PATCH 4/5] empty commit From af63d5291cfc2b3febc61ed26ffc2306ca1c8d67 Mon Sep 17 00:00:00 2001 From: Toshiki Kataoka Date: Fri, 6 Sep 2024 14:26:20 +0900 Subject: [PATCH 5/5] remove test --- tests/core/test_chunked_prefill_scheduler.py | 102 ------------------- 1 file changed, 102 deletions(-) diff --git a/tests/core/test_chunked_prefill_scheduler.py b/tests/core/test_chunked_prefill_scheduler.py index 11835c660d84..4290f6b65610 100644 --- a/tests/core/test_chunked_prefill_scheduler.py +++ b/tests/core/test_chunked_prefill_scheduler.py @@ -4,7 +4,6 @@ import pytest # noqa from vllm.config import CacheConfig, SchedulerConfig -from vllm.core.interfaces import AllocStatus from vllm.core.scheduler import Scheduler from vllm.sequence import Logprob, SequenceGroup @@ -369,107 +368,6 @@ def cannot_append_second_group(seq_group, num_lookahead_slots): assert out.blocks_to_swap_out == [] -def test_running_prefill_prioritized_over_swap(): - block_size = 4 - max_seqs = 30 - max_model_len = 200 - max_num_batched_tokens = 30 - scheduler_config = SchedulerConfig(max_num_batched_tokens, - max_seqs, - max_model_len, - enable_chunked_prefill=True) - cache_config = CacheConfig(block_size, 1.0, 1, "auto") - cache_config.num_cpu_blocks = 8 - cache_config.num_gpu_blocks = 8 - scheduler = Scheduler(scheduler_config, cache_config, None) - - # Artificial priority is needed for testing with the fcfs policy in - # _schedule_running. This seq will be prioritized more among running - # seqs but less among waiting seqs. - _, seq_group2 = create_dummy_prompt("2", prompt_length=20 + 30 + 30 + 30) - - _, seq_group = create_dummy_prompt("1", prompt_length=30 + 10, best_of=2) - scheduler.add_seq_group(seq_group) - _, out = schedule_and_update_computed_tokens(scheduler) - # The request is chunked. - # prefill scheduled now. - assert len(out.scheduled_seq_groups) == 1 - assert out.num_prefill_groups == 1 - assert seq_group.is_prefill() - assert out.num_batched_tokens == max_num_batched_tokens - - # Add 1 more task. - scheduler.add_seq_group(seq_group2) - _, out = schedule_and_update_computed_tokens(scheduler) - # task 1 finished the last 10 tokens of prefill. - # task 2 started the first 20 tokens of prefill. - assert len(out.scheduled_seq_groups) == 2 - assert out.num_prefill_groups == 2 - assert not seq_group.is_prefill() - assert seq_group2.is_prefill() - assert out.num_batched_tokens == max_num_batched_tokens - - # seq_group starts decoding with best_of=2 - # see vllm/engine/output_processor/single_step.py - seq = seq_group.seqs_dict[1] - new_seq_id = 3 - new_seq = seq.fork(new_seq_id) - seq_group.add(new_seq) - scheduler.fork_seq(seq, new_seq) - append_new_token(seq_group, 1) - - # The first request should be swapped out. - scheduler.block_manager.can_append_slots = MagicMock() - - def cannot_append_second_group(seq_group, num_lookahead_slots): - return seq_group.request_id != "1" - - scheduler.block_manager.can_append_slots.side_effect = ( - cannot_append_second_group) - - _, out = schedule_and_update_computed_tokens(scheduler) - assert len(out.scheduled_seq_groups) == 1 - assert out.num_batched_tokens == 30 - assert out.blocks_to_swap_in == [] - assert out.blocks_to_swap_out != [] - assert out.scheduled_seq_groups[0].seq_group == seq_group2 - - # Swap is not possible, so prefill is running. - scheduler.block_manager.can_swap_in = MagicMock() - scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER - - _, out = schedule_and_update_computed_tokens(scheduler) - assert len(out.scheduled_seq_groups) == 1 - assert out.num_batched_tokens == 30 - assert out.blocks_to_swap_in == [] - assert out.blocks_to_swap_out == [] - assert seq_group2.is_prefill() - assert out.scheduled_seq_groups[0].seq_group == seq_group2 - - # Now although swap is possible, running prefill is prioritized. - scheduler.block_manager.can_swap_in.return_value = AllocStatus.OK - _, out = schedule_and_update_computed_tokens(scheduler) - assert len(out.scheduled_seq_groups) == 1 - assert out.num_batched_tokens == 30 - assert out.blocks_to_swap_in == [] - assert out.blocks_to_swap_out == [] - assert not seq_group2.is_prefill() - assert out.scheduled_seq_groups[0].seq_group == seq_group2 - append_new_token(seq_group2, 1) - - # Decoding is prioritized. - _, out = schedule_and_update_computed_tokens(scheduler) - assert len(out.scheduled_seq_groups) == 2 - # 3 decodes. It is swapped in. - assert out.num_batched_tokens == 3 - assert out.blocks_to_swap_in != [] - assert out.blocks_to_swap_out == [] - assert not seq_group.is_prefill() - assert not seq_group2.is_prefill() - append_new_token(seq_group, 1) - append_new_token(seq_group2, 1) - - def test_chunked_prefill_preempt(): """Verify preempt works with chunked prefill requests""" block_size = 4