From 0431ddf43c32347f22476f8541a88e14396ca80e Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Tue, 21 Jan 2025 19:43:59 -0800 Subject: [PATCH 1/3] [V1][Frontend] Coalesce bunched RequestOutputs Under high load it's possible for the frontend per-request asyncio queues to back up, with the next token(s) arriving before existing ones are streamed back to the user. In this case there's no reason for them to be emitted as separate outputs in subsequent iterations. By concatenating them into a single output it reduces the number of tasks / context switches / response messages and means those additional "ready" tokens should reach the user faster. Signed-off-by: Nick Hill Co-authored-by: Robert Shaw --- vllm/outputs.py | 22 +++++++++++++++++++++- vllm/v1/engine/async_llm.py | 19 ++++++++++++------- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/vllm/outputs.py b/vllm/outputs.py index 63df7dcf519b..25b2265285d1 100644 --- a/vllm/outputs.py +++ b/vllm/outputs.py @@ -1,6 +1,6 @@ import time from dataclasses import dataclass -from typing import Dict, Generic, List, Optional +from typing import Dict, Generic, List, MutableSequence, Optional from typing import Sequence as GenericSequence from typing import Union @@ -162,6 +162,26 @@ def new( finished=finished, ) + def add(self, next_output: "RequestOutput") -> None: + """Merge subsequent RequestOutput into this one""" + + self.prompt = next_output.prompt + self.prompt_token_ids = next_output.prompt_token_ids + self.prompt_logprobs = next_output.prompt_logprobs + self.finished |= next_output.finished + + #TODO assuming n == 1 for now + completion = self.outputs[0] + next_completion = next_output.outputs[0] + completion.text += next_completion.text + if not isinstance(completion.token_ids, MutableSequence): + completion.token_ids = list(completion.token_ids) + completion.token_ids.extend(next_completion.token_ids) + if next_completion.logprobs: + assert completion.logprobs is not None + completion.logprobs.extend(next_completion.logprobs) + completion.cumulative_logprob = next_completion.cumulative_logprob + @classmethod def from_seq_group( cls, seq_group: SequenceGroup, use_cache: bool, diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index a74699f7513e..7a80114f49c2 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -12,7 +12,7 @@ from vllm.outputs import RequestOutput from vllm.pooling_params import PoolingParams from vllm.prompt_adapter.request import PromptAdapterRequest -from vllm.sampling_params import SamplingParams +from vllm.sampling_params import RequestOutputKind, SamplingParams from vllm.transformers_utils.tokenizer import AnyTokenizer from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs from vllm.usage.usage_lib import UsageContext @@ -205,17 +205,22 @@ async def generate( # The output_handler task pushes items into the queue. # This task pulls from the queue and yields to caller. - while True: + finished = False + while not finished: # Note: drain queue without await if possible (avoids # task switching under load which helps performance). - out = q.get_nowait() if q.qsize() > 0 else await q.get() + out = q.get_nowait() if not q.empty() else await q.get() + + # Coalesce any additional queued outputs + while not q.empty(): + if sampling_params.output_kind == RequestOutputKind.DELTA: + out.add(q.get_nowait()) + else: + out = q.get_nowait() # Note: both OutputProcessor and EngineCore handle their # own request cleanup based on finished. - if out.finished: - yield out - break - + finished = out.finished yield out # If the request is disconnected by the client, the From 3ff92d0064f2b28591b84a7cb8e21bc48c80fb9c Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Thu, 23 Jan 2025 08:31:42 -0800 Subject: [PATCH 2/3] Update AsyncLLM tests Signed-off-by: Nick Hill --- tests/v1/engine/test_async_llm.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/tests/v1/engine/test_async_llm.py b/tests/v1/engine/test_async_llm.py index 2c805e18eeba..edb46dc9b689 100644 --- a/tests/v1/engine/test_async_llm.py +++ b/tests/v1/engine/test_async_llm.py @@ -1,4 +1,5 @@ import asyncio +from contextlib import ExitStack from typing import List, Tuple import pytest @@ -20,12 +21,13 @@ async def generate(engine: AsyncLLM, request_id: str, max_tokens: int) -> Tuple[int, str]: count = 0 - async for _ in engine.generate(request_id=request_id, - prompt="Hello my name is Robert and", - sampling_params=SamplingParams( - max_tokens=max_tokens, temperature=0)): + async for out in engine.generate( + request_id=request_id, + prompt="Hello my name is Robert and", + sampling_params=SamplingParams(max_tokens=max_tokens, + temperature=0)): - count += 1 + count += len(out.outputs[0].token_ids) await asyncio.sleep(0.) return count, request_id @@ -36,10 +38,11 @@ async def test_load(monkeypatch): # TODO(rickyx): Remove monkeypatch once we have a better way to test V1 # so that in the future when we switch, we don't have to change all the # tests. - with monkeypatch.context() as m: + with monkeypatch.context() as m, ExitStack() as after: m.setenv("VLLM_USE_V1", "1") engine = AsyncLLM.from_engine_args(ENGINE_ARGS) + after.callback(engine.shutdown) NUM_REQUESTS = 10000 NUM_EXPECTED_TOKENS = 10 @@ -61,16 +64,16 @@ async def test_load(monkeypatch): f"expected {NUM_EXPECTED_TOKENS}") assert not engine.output_processor.has_unfinished_requests() - engine.shutdown() @pytest.mark.asyncio async def test_abort(monkeypatch): - with monkeypatch.context() as m: + with monkeypatch.context() as m, ExitStack() as after: m.setenv("VLLM_USE_V1", "1") engine = AsyncLLM.from_engine_args(ENGINE_ARGS) + after.callback(engine.shutdown) NUM_REQUESTS = 100 NUM_EXPECTED_TOKENS = 100 @@ -112,5 +115,3 @@ async def test_abort(monkeypatch): num_generated_tokens, request_id = await task assert num_generated_tokens == NUM_EXPECTED_TOKENS assert not engine.output_processor.has_unfinished_requests() - - engine.shutdown() From e7aa463cfd45cd09c4d53b9e1de8084e1b9ca1b5 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Thu, 23 Jan 2025 10:36:30 -0800 Subject: [PATCH 3/3] Fix AsyncLLM tests Signed-off-by: Nick Hill --- tests/v1/engine/test_async_llm.py | 42 ++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/tests/v1/engine/test_async_llm.py b/tests/v1/engine/test_async_llm.py index edb46dc9b689..10f783b21a9e 100644 --- a/tests/v1/engine/test_async_llm.py +++ b/tests/v1/engine/test_async_llm.py @@ -7,6 +7,7 @@ from vllm import SamplingParams from vllm.engine.arg_utils import AsyncEngineArgs from vllm.platforms import current_platform +from vllm.sampling_params import RequestOutputKind from vllm.v1.engine.async_llm import AsyncLLM if not current_platform.is_cuda(): @@ -19,22 +20,31 @@ async def generate(engine: AsyncLLM, request_id: str, + output_kind: RequestOutputKind, max_tokens: int) -> Tuple[int, str]: count = 0 - async for out in engine.generate( - request_id=request_id, - prompt="Hello my name is Robert and", - sampling_params=SamplingParams(max_tokens=max_tokens, - temperature=0)): + sampling_params = SamplingParams(max_tokens=max_tokens, + output_kind=output_kind, + temperature=0) + async for out in engine.generate(request_id=request_id, + prompt="Hello my name is Robert and", + sampling_params=sampling_params): + + num_tokens = len(out.outputs[0].token_ids) + if output_kind == RequestOutputKind.DELTA: + count += num_tokens + else: + count = num_tokens - count += len(out.outputs[0].token_ids) await asyncio.sleep(0.) return count, request_id +@pytest.mark.parametrize( + "output_kind", [RequestOutputKind.DELTA, RequestOutputKind.FINAL_ONLY]) @pytest.mark.asyncio -async def test_load(monkeypatch): +async def test_load(monkeypatch, output_kind: RequestOutputKind): # TODO(rickyx): Remove monkeypatch once we have a better way to test V1 # so that in the future when we switch, we don't have to change all the # tests. @@ -54,10 +64,15 @@ async def test_load(monkeypatch): for request_id in request_ids: tasks.append( asyncio.create_task( - generate(engine, request_id, NUM_EXPECTED_TOKENS))) + generate(engine, request_id, output_kind, + NUM_EXPECTED_TOKENS))) # Confirm that we got all the EXPECTED tokens from the requests. - for task in tasks: + done, pending = await asyncio.wait(tasks, + return_when=asyncio.FIRST_EXCEPTION) + for task in pending: + task.cancel() + for task in done: num_generated_tokens, request_id = await task assert num_generated_tokens == NUM_EXPECTED_TOKENS, ( f"{request_id} generated {num_generated_tokens} but " @@ -66,8 +81,10 @@ async def test_load(monkeypatch): assert not engine.output_processor.has_unfinished_requests() +@pytest.mark.parametrize( + "output_kind", [RequestOutputKind.DELTA, RequestOutputKind.FINAL_ONLY]) @pytest.mark.asyncio -async def test_abort(monkeypatch): +async def test_abort(monkeypatch, output_kind: RequestOutputKind): with monkeypatch.context() as m, ExitStack() as after: m.setenv("VLLM_USE_V1", "1") @@ -86,7 +103,8 @@ async def test_abort(monkeypatch): for request_id in request_ids: tasks.append( asyncio.create_task( - generate(engine, request_id, NUM_EXPECTED_TOKENS))) + generate(engine, request_id, output_kind, + NUM_EXPECTED_TOKENS))) # API server cancels requests when they disconnect. for idx in REQUEST_IDS_TO_ABORT: @@ -111,7 +129,7 @@ async def test_abort(monkeypatch): # Confirm we can do another generation. request_id = f"request-{REQUEST_IDS_TO_ABORT[0]}" task = asyncio.create_task( - generate(engine, request_id, NUM_EXPECTED_TOKENS)) + generate(engine, request_id, output_kind, NUM_EXPECTED_TOKENS)) num_generated_tokens, request_id = await task assert num_generated_tokens == NUM_EXPECTED_TOKENS assert not engine.output_processor.has_unfinished_requests()