Skip to content

feat: support runai streamer for vllm #423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions api/inference/v1alpha1/playground_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ const (
PlaygroundProgressing = "Progressing"
// PlaygroundAvailable indicates the corresponding inference service is available now.
PlaygroundAvailable string = "Available"
// SkipModelLoaderAnnoKey indicates whether to skip the model loader,
// enabling the inference engine to manage model loading directly.
SkipModelLoaderAnnoKey = "llmaz.io/skip-model-loader"
)

// PlaygroundStatus defines the observed state of Playground
Expand Down
4 changes: 4 additions & 0 deletions docs/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ llama.cpp supports speculative decoding to significantly improve inference perfo

[Speculative Decoding](https://arxiv.org/abs/2211.17192) can improve inference performance efficiently, see [example](./speculative-decoding/vllm/) here.

### Loading models with Run:ai Model Streamer with vLLM

[Run:ai Model Streamer](https://github.com/run-ai/runai-model-streamer/blob/master/docs/README.md) is a library to read tensors in concurrency, while streaming it to GPU memory. vLLM supports loading weights in Safetensors format using the Run:ai Model Streamer. See [example](./runai-streamer/) here.

### Multi-Host Inference

Model size is growing bigger and bigger, Llama 3.1 405B FP16 LLM requires more than 750 GB GPU for weights only, leaving kv cache unconsidered, even with 8 x H100 Nvidia GPUs, 80 GB size of HBM each, can not fit in a single host, requires a multi-host deployment, see [example](./multi-nodes/) here.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# This example demonstrates how to use the Run:ai Model Streamer to load models from the local file system.
# The model-loader initContainer first downloads the model from Hugging Face.
# By using `--load-format runai_streamer`, vLLM leverages the Run:ai Model Streamer to stream models from the local file system.
# While this approach may be slightly slower than streaming directly from S3 (due to the initial download to local disk),
# it still offers faster model loading compared to not using the Streamer,
# as it utilizes multiple threads to concurrently read tensor data from files into a dedicated CPU buffer,
# and then transfers the tensors to GPU memory.
apiVersion: llmaz.io/v1alpha1
kind: OpenModel
metadata:
name: deepseek-r1-distill-qwen-1-5b
spec:
familyName: deepseek
source:
modelHub:
modelID: deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B
inferenceConfig:
flavors:
- name: t4 # GPU type
limits:
nvidia.com/gpu: 1
---
apiVersion: inference.llmaz.io/v1alpha1
kind: Playground
metadata:
name: deepseek-r1-distill-qwen-1-5b
spec:
replicas: 1
modelClaim:
modelName: deepseek-r1-distill-qwen-1-5b
backendRuntimeConfig:
backendName: vllm # currently, only vllm supports runai streamer
args:
- --load-format
- runai_streamer
resources:
limits:
cpu: "4"
memory: 16Gi
nvidia.com/gpu: "1"
requests:
cpu: "4"
memory: 16Gi
nvidia.com/gpu: "1"
54 changes: 54 additions & 0 deletions docs/examples/runai-streamer/playground-streaming-from-s3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# This example demonstrates how to use the Run:ai Model Streamer to load models directly from S3.
# Adding the annotation `llmaz.io/skip-model-loader: "true"` skips the model-loader initContainer,
# allowing the inference engine to load models directly from remote storage (e.g., S3).
# By using `--load-format runai_streamer`, the vLLM leverages the Run:ai Model Streamer to stream models from S3.
# vLLM will load models into the CPU buffer and then into GPU memory, without the need to download them to local disk first.
# This can significantly improve model loading speed and reduce disk usage.
apiVersion: llmaz.io/v1alpha1
kind: OpenModel
metadata:
name: deepseek-r1-distill-qwen-1-5b
spec:
familyName: deepseek
source:
# Note: You need to replace <YOUR_S3_BUCKET> with your actual S3 bucket name
# If the s3 bucket need AWS credentials for authentication,
# please run `kubectl create secret generic aws-access-secret --from-literal=AWS_ACCESS_KEY_ID=<YOUR_ACCESS_KEY_ID> --from-literal=AWS_SECRET_ACCESS_KEY=<YOUR_SECRET_ACCESS_KEY>` ahead.
uri: s3://<YOUR_S3_BUCKET>/DeepSeek-R1-Distill-Qwen-1.5B
inferenceConfig:
flavors:
- name: t4 # GPU type
limits:
nvidia.com/gpu: 1
---
apiVersion: inference.llmaz.io/v1alpha1
kind: Playground
metadata:
name: deepseek-r1-distill-qwen-1-5b
annotations:
llmaz.io/skip-model-loader: "true"
spec:
replicas: 1
modelClaim:
modelName: deepseek-r1-distill-qwen-1-5b
backendRuntimeConfig:
backendName: vllm # currently, only vllm supports runai streamer
args:
- --load-format
- runai_streamer
envs:
# The default value is 1 second. Increase it to 10 seconds to avoid timeouts in case of slow network conditions.
- name: RUNAI_STREAMER_S3_REQUEST_TIMEOUT_MS
value: "10000"
# Controls the level of concurrency and number of OS threads reading tensors from the file to the CPU buffer, the default value is 16
#- name: RUNAI_STREAMER_CONCURRENCY
# value: "32"
resources:
limits:
cpu: "4"
memory: 16Gi
nvidia.com/gpu: "1"
requests:
cpu: "4"
memory: 16Gi
nvidia.com/gpu: "1"
9 changes: 9 additions & 0 deletions pkg/controller/inference/playground_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ func buildServiceApplyConfiguration(models []*coreapi.OpenModel, playground *inf
// Build metadata
serviceApplyConfiguration := inferenceclientgo.Service(playground.Name, playground.Namespace)

if annotations := playground.GetAnnotations(); annotations != nil {
// Propagate llmaz.io/skip-model-loader annotation to Inference Service.
if value, exists := annotations[inferenceapi.SkipModelLoaderAnnoKey]; exists {
serviceApplyConfiguration.WithAnnotations(map[string]string{
inferenceapi.SkipModelLoaderAnnoKey: value,
})
}
}

// Build spec.
spec := inferenceclientgo.ServiceSpec()

Expand Down
20 changes: 18 additions & 2 deletions pkg/controller/inference/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,26 @@ func injectModelProperties(template *applyconfigurationv1.LeaderWorkerTemplateAp

for i, model := range models {
source := modelSource.NewModelSourceProvider(model)
// Skip model-loader initContainer if llmaz.io/skip-model-loader annotation is set.
if !helper.SkipModelLoader(service) {
if isMultiNodesInference {
source.InjectModelLoader(template.LeaderTemplate, i)
}
source.InjectModelLoader(template.WorkerTemplate, i)
} else {
if isMultiNodesInference {
source.InjectModelEnvVars(template.LeaderTemplate)
}
source.InjectModelEnvVars(template.WorkerTemplate)
}
}

// If model-loader initContainer is injected, we should mount the model-volume to the model-runner container.
if !helper.SkipModelLoader(service) {
if isMultiNodesInference {
source.InjectModelLoader(template.LeaderTemplate, i)
modelSource.InjectModelVolume(template.LeaderTemplate)
}
source.InjectModelLoader(template.WorkerTemplate, i)
modelSource.InjectModelVolume(template.WorkerTemplate)
}

// We only consider the main model's requirements for now.
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller_helper/backendruntime/backendruntime.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ func (p *BackendRuntimeParser) Args() ([]string, error) {

source := modelSource.NewModelSourceProvider(mainModel)
modelInfo := map[string]string{
"ModelPath": source.ModelPath(),
"ModelPath": source.ModelPath(helper.SkipModelLoader(p.playground)),
"ModelName": source.ModelName(),
}

// TODO: This is not that reliable because two models doesn't always means speculative-decoding.
// Revisit this later.
if len(p.models) > 1 {
modelInfo["DraftModelPath"] = modelSource.NewModelSourceProvider(p.models[1]).ModelPath()
modelInfo["DraftModelPath"] = modelSource.NewModelSourceProvider(p.models[1]).ModelPath(helper.SkipModelLoader(p.playground))
}

for _, recommend := range p.backendRuntime.Spec.RecommendedConfigs {
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller_helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

coreapi "github.com/inftyai/llmaz/api/core/v1alpha1"
inferenceapi "github.com/inftyai/llmaz/api/inference/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -122,3 +123,10 @@ func FirstAssignedFlavor(model *coreapi.OpenModel, playground *inferenceapi.Play

return nil
}

func SkipModelLoader(obj metav1.Object) bool {
if annotations := obj.GetAnnotations(); annotations != nil {
return annotations[inferenceapi.SkipModelLoaderAnnoKey] == "true"
}
return false
}
100 changes: 67 additions & 33 deletions pkg/controller_helper/modelsource/modelhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ func (p *ModelHubProvider) ModelName() string {
// - modelID: Qwen/Qwen2-0.5B-Instruct-GGUF
// fileName: qwen2-0_5b-instruct-q5_k_m.gguf
// modelPath: /workspace/models/qwen2-0_5b-instruct-q5_k_m.gguf
func (p *ModelHubProvider) ModelPath() string {
func (p *ModelHubProvider) ModelPath(skipModelLoader bool) string {
// Skip the model loader to allow the inference engine to handle loading models directly from model hub (e.g., Hugging Face, ModelScope).
// In this case, the model ID should be returned (e.g., facebook/opt-125m).
if skipModelLoader {
return p.modelID
}

if p.fileName != nil {
return CONTAINER_MODEL_PATH + *p.fileName
}
Expand Down Expand Up @@ -108,59 +114,87 @@ func (p *ModelHubProvider) InjectModelLoader(template *corev1.PodTemplateSpec, i
// Both HUGGING_FACE_HUB_TOKEN and HF_TOKEN works.
initContainer.Env = append(initContainer.Env,
corev1.EnvVar{
Name: "HUGGING_FACE_HUB_TOKEN",
Name: HUGGING_FACE_HUB_TOKEN,
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: MODELHUB_SECRET_NAME, // if secret not exists, the env is empty.
},
Key: HUGGINGFACE_TOKEN_KEY,
Key: HUGGING_FACE_TOKEN_KEY,
Optional: ptr.To[bool](true),
},
},
}, corev1.EnvVar{
Name: "HF_TOKEN",
})

initContainer.Env = append(initContainer.Env,
corev1.EnvVar{
Name: HUGGING_FACE_TOKEN_KEY,
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: MODELHUB_SECRET_NAME,
},
Key: HUGGINGFACE_TOKEN_KEY,
Key: HUGGING_FACE_TOKEN_KEY,
Optional: ptr.To[bool](true),
},
},
},
)
template.Spec.InitContainers = append(template.Spec.InitContainers, *initContainer)
})

// Return once not the main model, because all the below has already been injected.
if index != 0 {
return
}
template.Spec.InitContainers = append(template.Spec.InitContainers, *initContainer)
}

// Handle container.
func spreadEnvToInitContainer(containerEnv []corev1.EnvVar, initContainer *corev1.Container) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again would like to see same behaviors across all the models.

initContainer.Env = append(initContainer.Env, containerEnv...)
}

func (p *ModelHubProvider) InjectModelEnvVars(template *corev1.PodTemplateSpec) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have already injected the HF token in above L115. Keep one is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The InjectModelEnvVars function is used to inject model credentials into the model-runner container instead of the model-loader initContainer, in case the model-runner container handles the model loading itself.

for i := range template.Spec.Containers {
// We only consider this container.
if template.Spec.Containers[i].Name == MODEL_RUNNER_CONTAINER_NAME {
template.Spec.Containers[i].VolumeMounts = append(template.Spec.Containers[i].VolumeMounts, corev1.VolumeMount{
Name: MODEL_VOLUME_NAME,
MountPath: CONTAINER_MODEL_PATH,
ReadOnly: true,
})
// Check if HuggingFace token environment variables already exist
hfHubTokenExists := false
hfTokenExists := false
for _, env := range template.Spec.Containers[i].Env {
if env.Name == HUGGING_FACE_HUB_TOKEN {
hfHubTokenExists = true
}
if env.Name == HUGGING_FACE_TOKEN_KEY {
hfTokenExists = true
}
}

// Add HUGGING_FACE_HUB_TOKEN if it doesn't exist
if !hfHubTokenExists {
template.Spec.Containers[i].Env = append(template.Spec.Containers[i].Env,
corev1.EnvVar{
Name: HUGGING_FACE_HUB_TOKEN,
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: MODELHUB_SECRET_NAME, // if secret not exists, the env is empty.
},
Key: HUGGING_FACE_TOKEN_KEY,
Optional: ptr.To[bool](true),
},
},
})
}

// Add HF_TOKEN if it doesn't exist
if !hfTokenExists {
template.Spec.Containers[i].Env = append(template.Spec.Containers[i].Env,
corev1.EnvVar{
Name: HUGGING_FACE_TOKEN_KEY,
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: MODELHUB_SECRET_NAME,
},
Key: HUGGING_FACE_TOKEN_KEY,
Optional: ptr.To[bool](true),
},
},
})
}
}
}

// Handle spec.

template.Spec.Volumes = append(template.Spec.Volumes, corev1.Volume{
Name: MODEL_VOLUME_NAME,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
})
}

func spreadEnvToInitContainer(containerEnv []corev1.EnvVar, initContainer *corev1.Container) {
initContainer.Env = append(initContainer.Env, containerEnv...)
}
Loading
Loading