Skip to content

Commit ef14880

Browse files
authored
Merge branch 'main' into ayushag/worker-to-mdc-parser-info
2 parents ec51035 + 5045f13 commit ef14880

File tree

5 files changed

+22
-16
lines changed

5 files changed

+22
-16
lines changed

examples/multimodal/README.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,12 @@ flowchart LR
5959
pd_worker --> encode_worker
6060
```
6161

62+
***Note*** Only the LLaVA 1.5 7B model is supported. Qwen2.5-VL and Phi3V support will be added in the future.
63+
6264
```bash
6365
cd $DYNAMO_HOME/examples/multimodal
6466
# Serve a LLaVA 1.5 7B model:
6567
bash launch/agg.sh --model llava-hf/llava-1.5-7b-hf
66-
# Serve a Qwen2.5-VL model:
67-
# bash launch/agg.sh --model Qwen/Qwen2.5-VL-7B-Instruct
68-
# Serve a Phi3V model:
69-
# bash launch/agg.sh --model microsoft/Phi-3.5-vision-instruct
7068
```
7169

7270
### Client
@@ -100,8 +98,6 @@ curl http://localhost:8080/v1/chat/completions \
10098
}'
10199
```
102100

103-
If serving the example Qwen model, replace `"llava-hf/llava-1.5-7b-hf"` in the `"model"` field with `"Qwen/Qwen2.5-VL-7B-Instruct"`. If serving the example Phi3V model, replace `"llava-hf/llava-1.5-7b-hf"` in the `"model"` field with `"microsoft/Phi-3.5-vision-instruct"`.
104-
105101
You should see a response similar to this:
106102
```json
107103
{"id": "c37b946e-9e58-4d54-88c8-2dbd92c47b0c", "object": "chat.completion", "created": 1747725277, "model": "llava-hf/llava-1.5-7b-hf", "choices": [{"index": 0, "message": {"role": "assistant", "content": " In the image, there is a city bus parked on a street, with a street sign nearby on the right side. The bus appears to be stopped out of service. The setting is in a foggy city, giving it a slightly moody atmosphere."}, "finish_reason": "stop"}]}

lib/llm/src/migration.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,13 @@ impl RetryManager {
100100
if let Some(response) = response_stream.next().await {
101101
if let Some(err) = response.err() {
102102
const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
103-
if format!("{:?}", err) == STREAM_ERR_MSG {
103+
if err
104+
.chain()
105+
.any(|e| e.to_string().starts_with(STREAM_ERR_MSG))
106+
{
104107
tracing::warn!("Stream disconnected... recreating stream...");
105108
if let Err(err) = self.new_stream().await {
106-
tracing::warn!("Cannot recreate stream: {:?}", err);
109+
tracing::warn!("Cannot recreate stream: {:#}", err);
107110
} else {
108111
continue;
109112
}
@@ -462,6 +465,7 @@ mod tests {
462465
/// Expected behavior: All 10 responses should be received successfully.
463466
#[tokio::test]
464467
async fn test_retry_manager_no_migration() {
468+
dynamo_runtime::logging::init();
465469
let request = create_mock_request(10);
466470
let mock_engine = Arc::new(MockEngine::new(MockBehavior::Success, 10, 100));
467471
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
@@ -493,6 +497,7 @@ mod tests {
493497
/// Expected behavior: All 10 responses should be received successfully after retry.
494498
#[tokio::test]
495499
async fn test_retry_manager_new_request_migration() {
500+
dynamo_runtime::logging::init();
496501
let request = create_mock_request(10);
497502
let mock_engine = Arc::new(MockEngine::new(MockBehavior::FailThenSuccess, 10, 100));
498503
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
@@ -524,6 +529,8 @@ mod tests {
524529
/// Expected behavior: 5 responses from first stream + 5 responses from retry stream = 10 total.
525530
#[tokio::test]
526531
async fn test_retry_manager_ongoing_request_migration() {
532+
dynamo_runtime::logging::init();
533+
527534
let request = create_mock_request(10);
528535
let mock_engine = Arc::new(MockEngine::new(
529536
MockBehavior::MidStreamFail { fail_after: 5 },
@@ -560,6 +567,7 @@ mod tests {
560567
/// Expected behavior: Should receive an error after all retries are exhausted, with the original error.
561568
#[tokio::test]
562569
async fn test_retry_manager_new_request_migration_indefinite_failure() {
570+
dynamo_runtime::logging::init();
563571
let request = create_mock_request(0);
564572
let mock_engine = Arc::new(MockEngine::new(MockBehavior::AlwaysFail, 0, 100));
565573
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
@@ -580,6 +588,7 @@ mod tests {
580588
/// Expected behavior: Should receive some responses from first stream, then error after retries exhausted.
581589
#[tokio::test]
582590
async fn test_retry_manager_ongoing_request_migration_indefinite_failure() {
591+
dynamo_runtime::logging::init();
583592
let request = create_mock_request(10);
584593
let mock_engine = Arc::new(MockEngine::new(
585594
MockBehavior::MidStreamFailAlways { fail_after: 3 },
@@ -627,6 +636,7 @@ mod tests {
627636
/// Expected behavior: Should receive some responses from first stream, then error after retries exhausted.
628637
#[tokio::test]
629638
async fn test_retry_manager_ongoing_request_migration_indefinite_failure_stream_error() {
639+
dynamo_runtime::logging::init();
630640
let request = create_mock_request(10);
631641
let mock_engine = Arc::new(MockEngine::new(
632642
MockBehavior::MidStreamFailAlwaysStreamError { fail_after: 3 },

lib/llm/src/protocols/common/llm_backend.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,9 @@ impl MaybeError for LLMEngineOutput {
157157
LLMEngineOutput::error(format!("{:?}", err))
158158
}
159159

160-
fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
160+
fn err(&self) -> Option<anyhow::Error> {
161161
if let Some(FinishReason::Error(err_msg)) = &self.finish_reason {
162-
Some(anyhow::Error::msg(err_msg.clone()).into())
162+
Some(anyhow::Error::msg(err_msg.clone()))
163163
} else {
164164
None
165165
}

lib/runtime/src/protocols/annotated.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,14 @@ where
143143
Annotated::from_error(format!("{:?}", err))
144144
}
145145

146-
fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
146+
fn err(&self) -> Option<anyhow::Error> {
147147
if self.is_error() {
148148
if let Some(comment) = &self.comment {
149149
if !comment.is_empty() {
150-
return Some(anyhow::Error::msg(comment.join("; ")).into());
150+
return Some(anyhow::Error::msg(comment.join("; ")));
151151
}
152152
}
153-
Some(anyhow::Error::msg("unknown error").into())
153+
Some(anyhow::Error::msg("unknown error"))
154154
} else {
155155
None
156156
}

lib/runtime/src/protocols/maybe_error.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub trait MaybeError {
2020
fn from_err(err: Box<dyn Error + Send + Sync>) -> Self;
2121

2222
/// Construct into an error instance.
23-
fn err(&self) -> Option<Box<dyn Error + Send + Sync>>;
23+
fn err(&self) -> Option<anyhow::Error>;
2424

2525
/// Check if the current instance represents a success.
2626
fn is_ok(&self) -> bool {
@@ -46,8 +46,8 @@ mod tests {
4646
message: err.to_string(),
4747
}
4848
}
49-
fn err(&self) -> Option<Box<dyn Error + Send + Sync>> {
50-
Some(anyhow::Error::msg(self.message.clone()).into())
49+
fn err(&self) -> Option<anyhow::Error> {
50+
Some(anyhow::Error::msg(self.message.clone()))
5151
}
5252
}
5353

0 commit comments

Comments
 (0)