Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
337 changes: 336 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions lib/llm/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl
incoming_request: SingleIn<NvCreateChatCompletionRequest>,
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
let (request, context) = incoming_request.transfer(());
let mut deltas = request.response_generator();
let mut deltas = request.response_generator(None, None);
let ctx = context.context();
let req = request.inner.messages.into_iter().next_back().unwrap();

Expand All @@ -204,12 +204,12 @@ impl
for c in prompt.chars() {
// we are returning characters not tokens, so there will be some postprocessing overhead
tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
let response = deltas.create_choice(0, Some(c.to_string()), None, None);
let response = deltas.create_choice(0, Some(c.to_string()), None, None, None);
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
id += 1;
}

let response = deltas.create_choice(0, None, Some(dynamo_async_openai::types::FinishReason::Stop), None);
let response = deltas.create_choice(0, None, None, Some(dynamo_async_openai::types::FinishReason::Stop), None);
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
};

Expand Down
6 changes: 5 additions & 1 deletion lib/llm/src/preprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub struct OpenAIPreprocessor {
formatter: Arc<dyn OAIPromptFormatter>,
tokenizer: Arc<dyn Tokenizer>,
model_info: Arc<dyn ModelInfo>,
vocab: HashMap<String, u32>,
}

impl OpenAIPreprocessor {
Expand All @@ -113,6 +114,7 @@ impl OpenAIPreprocessor {
);
}
};
let vocab = tokenizer.get_vocab(true);
let tokenizer = Arc::new(tokenizer);

let Some(model_info) = mdc.model_info else {
Expand All @@ -127,6 +129,7 @@ impl OpenAIPreprocessor {
tokenizer,
model_info,
mdcsum,
vocab,
}))
}

Expand Down Expand Up @@ -499,7 +502,8 @@ impl
let (request, context) = request.into_parts();

// create a response generator
let response_generator = request.response_generator();
let response_generator =
request.response_generator(Some(&self.tokenizer), Some(&self.vocab));
let mut response_generator = Box::new(response_generator);
Comment on lines +505 to 507
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Passing tokenizer and vocab downstream is correct and future-proof.

Providing both Some(&self.tokenizer) and Some(&self.vocab) enables token-id aware postprocessing. Confirm all other call sites updated to the new signature.

Run to locate any lingering old signature usages:


🏁 Script executed:

#!/bin/bash
# Find old zero-arg response_generator calls for chat completions
rg -nP --type=rust '\.response_generator\s*\(\s*\)' -C2

# Find four-arg create_choice calls in chat completions path (should be five-arg now)
rg -nP --type=rust '\.create_choice\s*\(\s*[^\)]*(?:,[^\)]*){3}\s*\)' -g '!*completions/**' -C2

Length of output: 1772


Critical: Lingering old response_generator() and four-arg create_choice() call sites must be updated

The grep run uncovered remaining zero-argument calls to response_generator() and a four-argument call to create_choice(). These must be changed to the new signatures before merging:

• lib/engines/mistralrs/src/lib.rs:489

// before
let response_generator = request.response_generator();
// after
let response_generator =
    request.response_generator(Some(&self.tokenizer), Some(&self.vocab));

• lib/llm/src/preprocessor.rs:564

// before
let response_generator = request.response_generator();
// after
let response_generator =
    request.response_generator(Some(&self.tokenizer), Some(&self.vocab));

• lib/llm/src/engines.rs:233

// before
let deltas = request.response_generator();
// after
let deltas =
    request.response_generator(Some(&self.tokenizer), Some(&self.vocab));

• lib/llm/src/engines.rs:212

// before (four args)
let response = deltas.create_choice(
    0,
    None,
    None,
    Some(dynamo_async_openai::types::FinishReason::Stop),
    None,
);
// after (five args – add the missing parameter to match the updated signature)
let response = deltas.create_choice(
    0,
    None,
    None,
    Some(dynamo_async_openai::types::FinishReason::Stop),
    None,
    /* e.g. Some(&self.tokenizer) or the new parameter */,
);

Please update these call sites to pass the tokenizer and vocab (or the appropriate fifth argument) so that token-ID-aware postprocessing works everywhere.

🤖 Prompt for AI Agents
In lib/llm/src/preprocessor.rs around lines 505-507, the code calls
request.response_generator() without the new tokenizer and vocab args and then
boxes it; change the call to request.response_generator(Some(&self.tokenizer),
Some(&self.vocab)) and then Box::new that result (i.e., replace the zero-arg
invocation with the two-arg form and keep the Box::new wrapping).


// convert the chat completion request to a common completion request
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/protocols/openai/chat_completions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::{
};

pub mod aggregator;
mod delta;
pub mod delta;

pub use aggregator::DeltaAggregator;
pub use delta::DeltaGenerator;
Expand Down
92 changes: 70 additions & 22 deletions lib/llm/src/protocols/openai/chat_completions/delta.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::tokenizers::traits::Tokenizer;
use std::{collections::HashMap, sync::Arc};

use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper};

use super::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse};
Expand All @@ -15,14 +18,18 @@ impl NvCreateChatCompletionRequest {
///
/// # Returns
/// * [`DeltaGenerator`] configured with model name and response options.
pub fn response_generator(&self) -> DeltaGenerator {
pub fn response_generator(
&self,
tokenizer: Option<&Arc<(dyn Tokenizer + 'static)>>,
vocab: Option<&HashMap<String, u32>>,
) -> DeltaGenerator {
let options = DeltaGeneratorOptions {
enable_usage: true,
enable_logprobs: self.inner.logprobs.unwrap_or(false)
|| self.inner.top_logprobs.unwrap_or(0) > 0,
};

DeltaGenerator::new(self.inner.model.clone(), options)
DeltaGenerator::new(self.inner.model.clone(), options, tokenizer, vocab)
}
}

Expand All @@ -36,7 +43,6 @@ pub struct DeltaGeneratorOptions {
}

/// Generates incremental chat completion responses in a streaming fashion.
#[derive(Debug)]
pub struct DeltaGenerator {
/// Unique identifier for the chat completion session.
id: String,
Expand All @@ -58,7 +64,9 @@ pub struct DeltaGenerator {

/// Reasoning Parser object
/// This is used to parse reasoning content in the response.
reasoning_parser: ReasoningParserWrapper,
reasoning_parser: Option<ReasoningParserWrapper>,

tokenizer: Option<Arc<(dyn Tokenizer + 'static)>>,
}

impl DeltaGenerator {
Expand All @@ -70,7 +78,12 @@ impl DeltaGenerator {
///
/// # Returns
/// * A new instance of [`DeltaGenerator`].
pub fn new(model: String, options: DeltaGeneratorOptions) -> Self {
pub fn new(
model: String,
options: DeltaGeneratorOptions,
tokenizer: Option<&Arc<(dyn Tokenizer + 'static)>>,
vocab: Option<&HashMap<String, u32>>,
) -> Self {
let now = std::time::SystemTime::now()
Comment on lines +81 to 87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid constructing a parser when markers are missing (use try_get + and_then).

If vocab is present but lacks "" or "", BasicReasoningParser::new will panic. Use a fallible acquisition to skip reasoning gracefully.

Apply this diff (paired with mod.rs change):

-        let reasoning_parser_type = ReasoningParserType::Basic;
-        // Reasoning parser wrapper
-        let reasoning_parser = vocab.map(|v| reasoning_parser_type.get_reasoning_parser(v));
+        let reasoning_parser_type = ReasoningParserType::Basic;
+        // Try to create a reasoning parser only if vocab has required markers.
+        let reasoning_parser = vocab.and_then(|v| reasoning_parser_type.try_get_reasoning_parser(v));

Also applies to: 104-111, 123-124

🤖 Prompt for AI Agents
In lib/llm/src/protocols/openai/chat_completions/delta.rs around lines 81-87
(and similarly at 104-111 and 123-124), avoid unconditionally constructing
BasicReasoningParser when the vocab may be missing the "<think>" or "</think>"
markers; instead use fallible lookups (e.g., vocab.try_get or vocab.get
and_then) to retrieve both token IDs and only call BasicReasoningParser::new if
both markers are present, otherwise leave the reasoning parser as None so
reasoning is skipped gracefully; update the mapping logic to chain Option
results (and_then/map) and propagate None when either marker is absent.

.duration_since(std::time::UNIX_EPOCH)
.unwrap()
Expand All @@ -94,7 +107,7 @@ impl DeltaGenerator {
let reasoning_parser_type = ReasoningParserType::Basic;

// Reasoning parser wrapper
let reasoning_parser = reasoning_parser_type.get_reasoning_parser();
let reasoning_parser = vocab.map(|v| reasoning_parser_type.get_reasoning_parser(v));

Self {
id: format!("chatcmpl-{}", uuid::Uuid::new_v4()),
Expand All @@ -107,6 +120,7 @@ impl DeltaGenerator {
msg_counter: 0,
options,
reasoning_parser,
tokenizer: tokenizer.cloned(),
}
}

Expand Down Expand Up @@ -183,13 +197,16 @@ impl DeltaGenerator {
})
}

fn create_reasoning_content(&mut self, text: Option<String>) -> Option<ParserResult> {
let text = text?;
let parser_result = self
.reasoning_parser
.parse_reasoning_streaming_incremental(&text);

Some(parser_result)
fn create_reasoning_content(&mut self, token_ids: Vec<u32>) -> Option<ParserResult> {
if self.tokenizer.is_none() || self.reasoning_parser.is_none() {
return None;
}
Some(
self.reasoning_parser
.as_mut()
.unwrap()
.parse_reasoning_streaming_incremental(&token_ids),
)
}

/// Creates a choice within a chat completion response.
Expand All @@ -207,17 +224,12 @@ impl DeltaGenerator {
&mut self,
index: u32,
text: Option<String>,
reasoning_content: Option<String>,
finish_reason: Option<dynamo_async_openai::types::FinishReason>,
logprobs: Option<dynamo_async_openai::types::ChatChoiceLogprobs>,
) -> NvCreateChatCompletionStreamResponse {
let reasoning_parser_result = self.create_reasoning_content(text).unwrap_or_default();

let (normal_text, reasoning_content) = (
reasoning_parser_result.get_some_normal_text(),
reasoning_parser_result.get_some_reasoning(),
);
let delta = dynamo_async_openai::types::ChatCompletionStreamResponseDelta {
content: normal_text,
content: text,
function_call: None,
tool_calls: None,
role: if self.msg_counter == 0 {
Expand Down Expand Up @@ -292,7 +304,7 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamRes

let logprobs = self.create_logprobs(
delta.tokens,
delta.token_ids,
delta.token_ids.clone(),
delta.log_probs,
delta.top_logprobs,
);
Expand All @@ -319,8 +331,44 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamRes
};

// Create the streaming response.
let reasoning_parser_result = self.create_reasoning_content(delta.token_ids);

let (normal_text, reasoning_content) = if let Some(parser_result) = reasoning_parser_result
{
let none_if_empty = |vec: String| {
if vec.is_empty() {
None
} else {
Some(vec)
}
};
(
none_if_empty(
self.tokenizer
.as_ref()
.unwrap()
.decode(&parser_result.normal_token_ids, false)
.unwrap(),
),
none_if_empty(
self.tokenizer
.as_ref()
.unwrap()
.decode(&parser_result.reasoning_token_ids, false)
.unwrap(),
),
)
} else {
(delta.text, None)
};
Comment on lines 333 to +363
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Don’t unwrap on tokenizer.decode during streaming; fall back gracefully.

Decoding can fail (e.g., invalid IDs or tokenizer errors). Unwrap will crash the stream. Handle decode errors and treat them as “no delta” or log and continue.

Apply this diff:

-        let (normal_text, reasoning_content) = if let Some(parser_result) = reasoning_parser_result
-        {
-            let none_if_empty = |vec: String| {
-                if vec.is_empty() {
-                    None
-                } else {
-                    Some(vec)
-                }
-            };
-            (
-                none_if_empty(
-                    self.tokenizer
-                        .as_ref()
-                        .unwrap()
-                        .decode(&parser_result.normal_token_ids, false)
-                        .unwrap(),
-                ),
-                none_if_empty(
-                    self.tokenizer
-                        .as_ref()
-                        .unwrap()
-                        .decode(&parser_result.reasoning_token_ids, false)
-                        .unwrap(),
-                ),
-            )
-        } else {
-            (delta.text, None)
-        };
+        let (normal_text, reasoning_content) = if let Some(parser_result) = reasoning_parser_result {
+            let decode = |ids: &Vec<u32>| {
+                self.tokenizer
+                    .as_ref()
+                    .unwrap()
+                    .decode(ids, false)
+                    .ok()
+                    .and_then(|s| if s.is_empty() { None } else { Some(s) })
+            };
+            (decode(&parser_result.normal_token_ids), decode(&parser_result.reasoning_token_ids))
+        } else {
+            (delta.text, None)
+        };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Create the streaming response.
let reasoning_parser_result = self.create_reasoning_content(delta.token_ids);
let (normal_text, reasoning_content) = if let Some(parser_result) = reasoning_parser_result
{
let none_if_empty = |vec: String| {
if vec.is_empty() {
None
} else {
Some(vec)
}
};
(
none_if_empty(
self.tokenizer
.as_ref()
.unwrap()
.decode(&parser_result.normal_token_ids, false)
.unwrap(),
),
none_if_empty(
self.tokenizer
.as_ref()
.unwrap()
.decode(&parser_result.reasoning_token_ids, false)
.unwrap(),
),
)
} else {
(delta.text, None)
};
// Create the streaming response.
let reasoning_parser_result = self.create_reasoning_content(delta.token_ids);
let (normal_text, reasoning_content) = if let Some(parser_result) = reasoning_parser_result {
let decode = |ids: &Vec<u32>| {
self.tokenizer
.as_ref()
.unwrap()
.decode(ids, false)
.ok()
.and_then(|s| if s.is_empty() { None } else { Some(s) })
};
(decode(&parser_result.normal_token_ids), decode(&parser_result.reasoning_token_ids))
} else {
(delta.text, None)
};
🤖 Prompt for AI Agents
In lib/llm/src/protocols/openai/chat_completions/delta.rs around lines 333 to
363, the code calls tokenizer.decode(...).unwrap() which will panic on decode
errors during streaming; instead handle decode failures gracefully by replacing
unwraps with error-aware logic: attempt to decode each token id vector with
tokenizer.decode(...), map successful results to Some(string) if non-empty, and
on Err return None (or log the error and return None) so streaming continues;
ensure the closure none_if_empty is applied after successful decode and that in
the else branch you still use delta.text when appropriate.

let index = 0;
let stream_response = self.create_choice(index, delta.text, finish_reason, logprobs);
let stream_response = self.create_choice(
index,
normal_text,
reasoning_content,
finish_reason,
logprobs,
);

Ok(stream_response)
}
Expand Down
6 changes: 6 additions & 0 deletions lib/llm/src/tokenizers/hf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use tokenizers::tokenizer::Tokenizer as HfTokenizer;

use super::{
Expand All @@ -35,6 +37,10 @@ impl HuggingFaceTokenizer {
pub fn from_tokenizer(tokenizer: HfTokenizer) -> Self {
HuggingFaceTokenizer { tokenizer }
}

pub fn get_vocab(&self, with_added_tokens: bool) -> HashMap<String, u32> {
self.tokenizer.get_vocab(with_added_tokens)
}
}

impl Encoder for HuggingFaceTokenizer {
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/tests/http-service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ impl
let max_tokens = request.inner.max_tokens.unwrap_or(0) as u64;

// let generator = NvCreateChatCompletionStreamResponse::generator(request.model.clone());
let mut generator = request.response_generator();
let mut generator = request.response_generator(None, None);

let stream = stream! {
tokio::time::sleep(std::time::Duration::from_millis(max_tokens)).await;
for i in 0..10 {
let output = generator.create_choice(i,Some(format!("choice {i}")), None, None);
let output = generator.create_choice(i,Some(format!("choice {i}")), None, None, None);

yield Annotated::from_data(output);
}
Expand Down
3 changes: 2 additions & 1 deletion lib/parsers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ serde_json = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }

regex = "1"
regex = "1"
openai-harmony = { git = "https://github.com/openai/harmony", version = "0.0.4" }
Loading
Loading