Skip to content

[BugFix] Fix stuck stats/metrics after requests are aborted #22995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 94 additions & 1 deletion tests/entrypoints/openai/test_metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import asyncio
import subprocess
import sys
import tempfile
Expand Down Expand Up @@ -294,6 +294,99 @@ async def test_metrics_exist(server: RemoteOpenAIServer,
assert metric in response.text


@pytest.mark.asyncio
async def test_abort_metrics_reset(server: RemoteOpenAIServer,
client: openai.AsyncClient, use_v1: bool):

running_requests, waiting_requests, kv_cache_usage = (
_get_running_metrics_from_api(server))

# Expect no running requests or kvcache usage
assert running_requests == 0
assert waiting_requests == 0
assert kv_cache_usage == 0.0

# Start some long-running requests that we can abort
tasks = []
for _ in range(3):
task = asyncio.create_task(
client.completions.create(
model=MODEL_NAME,
prompt=_TOKENIZED_PROMPT,
max_tokens=100, # Long generation to give time to abort
temperature=0.0))
tasks.append(task)

# Wait a bit for requests to start processing
await asyncio.sleep(0.5)

# Check that we have running requests
running_requests, waiting_requests, kv_cache_usage = (
_get_running_metrics_from_api(server))

# Expect running requests and kvcache usage
assert running_requests > 0
assert kv_cache_usage > 0

# Cancel all tasks to abort the requests
for task in tasks:
task.cancel()

# Wait for cancellations to be processed
await asyncio.sleep(1.0)

# Check that metrics have reset to zero
response = requests.get(server.url_for("metrics"))
assert response.status_code == HTTPStatus.OK

# Verify running and waiting requests counts and KV cache usage are zero
running_requests_after, waiting_requests_after, kv_cache_usage_after = (
_get_running_metrics_from_api(server))

assert running_requests_after == 0,\
(f"Expected 0 running requests after abort, got "
f"{running_requests_after}")
assert waiting_requests_after == 0,\
(f"Expected 0 waiting requests after abort, got "
f"{waiting_requests_after}")
assert kv_cache_usage_after == 0,\
(f"Expected 0% KV cache usage after abort, got "
f"{kv_cache_usage_after}")


def _get_running_metrics_from_api(server: RemoteOpenAIServer):
"""Return (running_count, waiting_count, kv_cache_usage)"""

response = requests.get(server.url_for("metrics"))
assert response.status_code == HTTPStatus.OK

# Verify running and waiting requests counts and KV cache usage are zero
running_requests, waiting_requests, kv_cache_usage = None, None, None

for family in text_string_to_metric_families(response.text):
if family.name == "vllm:num_requests_running":
for sample in family.samples:
if sample.name == "vllm:num_requests_running":
running_requests = sample.value
break
elif family.name == "vllm:num_requests_waiting":
for sample in family.samples:
if sample.name == "vllm:num_requests_waiting":
waiting_requests = sample.value
break
elif family.name == "vllm:gpu_cache_usage_perc":
for sample in family.samples:
if sample.name == "vllm:gpu_cache_usage_perc":
kv_cache_usage = sample.value
break

assert running_requests is not None
assert waiting_requests is not None
assert kv_cache_usage is not None

return running_requests, waiting_requests, kv_cache_usage


def test_metrics_exist_run_batch(use_v1: bool):
input_batch = """{"custom_id": "request-0", "method": "POST", "url": "/v1/embeddings", "body": {"model": "intfloat/multilingual-e5-small", "input": "You are a helpful assistant."}}""" # noqa: E501

Expand Down
5 changes: 4 additions & 1 deletion vllm/v1/core/block_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ def get_usage(self) -> float:
Returns:
The KV cache usage (between 0.0 and 1.0).
"""
return 1.0 - (self.get_num_free_blocks() / self.num_gpu_blocks)

# Subtract 1 to account for null block.
total_gpu_blocks = self.num_gpu_blocks - 1
return 1.0 - (self.get_num_free_blocks() / total_gpu_blocks)

def take_events(self) -> list[KVCacheEvent]:
"""Atomically takes all events and clears the queue.
Expand Down
9 changes: 6 additions & 3 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,10 +917,13 @@ def update_from_output(
finished_requests=finished_set)
finished_req_ids.clear()

if engine_core_outputs:
if (stats := self.make_stats(spec_decoding_stats)) is not None:
# Return stats to only one of the front-ends.
next(iter(engine_core_outputs.values())).scheduler_stats = (
self.make_stats(spec_decoding_stats))
if (eco := next(iter(engine_core_outputs.values()), None)) is None:
# We must return the stats even if there are no request
# outputs this step.
engine_core_outputs[0] = eco = EngineCoreOutputs()
eco.scheduler_stats = stats

return engine_core_outputs

Expand Down