diff --git a/candle_demo/Cargo.toml b/candle_demo/Cargo.toml index a04e9f8..e52db58 100644 --- a/candle_demo/Cargo.toml +++ b/candle_demo/Cargo.toml @@ -25,6 +25,10 @@ intel-mkl-src = { version = "0.8.1", features = ["mkl-static-lp64-iomp"] } rand = "0.8.5" owo-colors = "4.0.0" codegeex4 = {path = "./codegeex4"} - - - +api-server = {path = "./api-server"} +flume = "0.11.0" +serde = { version = "1.0.204", features = ["derive"] } +futures = "0.3.30" +axum = "0.7.5" +tokio = {version = "1.39.1", features = ["full"]} +uuid = { version = "1.10.0", features = ["v4"] } \ No newline at end of file diff --git a/candle_demo/README.org b/candle_demo/README.org index 1eab25a..4d467c4 100644 --- a/candle_demo/README.org +++ b/candle_demo/README.org @@ -7,7 +7,14 @@ THUDM/CodeGeeX4 is a versatile model for all AI software development scenarios, - [[https://huggingface.co/THUDM/codegeex4-all-9b][huggingface]] - [[https://github.com/huggingface/candle/blob/main/candle-examples/examples/codegeex4-9b/README.org][Candle]] -- 目前openai-api正在开发中 +** api-server +#+begin_src shell + cargo build --release -p api-server --features cuda + ./target/release/api-server 0.0.0.0:3000 +#+end_src + +[[file:../resources/rust-api-server.png][file:../resources/rust-api-server.png]] + ** Cli #+begin_src shell cargo build --release -p codegeex4-cli # Cpu @@ -86,3 +93,4 @@ THUDM/CodeGeeX4 is a versatile model for all AI software development scenarios, year={2023} } #+end_src +** Candle-vllm 利用了部分candle-vllm的代码 diff --git a/candle_demo/api-server/Cargo.toml b/candle_demo/api-server/Cargo.toml index 1524216..337489b 100644 --- a/candle_demo/api-server/Cargo.toml +++ b/candle_demo/api-server/Cargo.toml @@ -23,11 +23,17 @@ intel-mkl-src = { workspace = true ,optional = true} rand = { workspace = true} owo-colors = {workspace = true} codegeex4 = {workspace = true} -tokio = {version = "1.39.1", features = ["full"]} -actix-web = "4.8.0" -serde = { version = "1.0.204", features = ["derive"] } -shortuuid = "0.0.1" -short-uuid = "0.1.2" +# for async runtime and excutor +tokio= {workspace=true} + +# for api-server +axum = {workspace= true} +serde = {workspace=true} +# for uuid generation +tower-http = { version = "0.5.2", features = ["cors"] } +uuid= {workspace=true} +futures = {workspace = true} +flume = {workspace=true} [build-dependencies] bindgen_cuda = { version = "0.1.1", optional = true } [features] diff --git a/candle_demo/api-server/src/api.rs b/candle_demo/api-server/src/api.rs index cf28a73..e69de29 100644 --- a/candle_demo/api-server/src/api.rs +++ b/candle_demo/api-server/src/api.rs @@ -1,123 +0,0 @@ -use actix_web::{ - get, post, - web::{self, Data}, - HttpRequest, Responder, -}; -use owo_colors::OwoColorize; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Serialize, Deserialize)] -pub struct ChatMessage { - pub role: String, - pub content: String, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ChatCompletionRequest { - pub model: String, - pub messages: Vec, - pub temperature: f64, - pub top_p: f64, - pub max_tokens: usize, - pub stop: Vec, - pub stream: bool, - pub presence_penalty: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct DeltaMessage { - pub role: String, - pub content: String, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ChatCompletionResponseStreamChoice { - pub index: i32, - pub delta: DeltaMessage, - pub finish_reason: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ChatCompletionStreamResponse { - pub id: String, - pub object: String, - pub created: i32, - pub model: String, - pub choices: Vec, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ChatCompletionResponseChoice { - pub index: i32, - pub message: ChatMessage, - pub finish_reason: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ChatCompletionResponse { - pub id: String, - pub object: String, - pub created: u64, - pub model: String, - pub choices: Vec, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum FinishResaon{ - STOP, - LENGTH, -} -use std::time::{SystemTime, UNIX_EPOCH}; -impl ChatCompletionResponse { - pub fn empty() -> Self { - let current_time = SystemTime::now(); - Self { - id: format!("chatcmpl-{}", short_uuid::ShortUuid::generate()), - object: "chat.completion".to_string(), - created: current_time - .duration_since(UNIX_EPOCH) - .expect("failed to get time") - .as_secs() - .into(), - model: "codegeex4".to_string(), - choices: vec![ChatCompletionResponseChoice::empty()], - } - } -} - -impl ChatCompletionResponseChoice { - pub fn empty() -> Self { - Self { - index: 0, - message: ChatMessage { - role: "assistant".to_string(), - content: "".to_string(), - }, - finish_reason: None, - } - } -} - -impl ChatCompletionRequest { - pub fn empty() -> Self { - Self{ - model: "codegeex4".to_string(), - messages: vec!(ChatMessage { - role: "assistant".to_string(), - content: "".to_string(), - }), - temperature: 0.2_f64, - top_p: 0.2_f64, - max_tokens: 1024_usize, - stop: vec!("<|user|>".to_string(), "<|assistant|>".to_string(), "<|observation|>".to_string(), "<|endoftext|>".to_string()), - stream: true, - presence_penalty: None, - } - } -} - -// impl DeltaMessage { -// pub fn new() -> Self { -// role: -// } -// } diff --git a/candle_demo/api-server/src/args.rs b/candle_demo/api-server/src/args.rs index 349bb72..4f8221b 100644 --- a/candle_demo/api-server/src/args.rs +++ b/candle_demo/api-server/src/args.rs @@ -7,4 +7,50 @@ pub struct Args { pub address: String, #[arg(short, long, default_value_t = 1)] pub workers: usize, + /// Run on CPU rather than on GPU. + #[arg(name = "cache", short, long, default_value = ".")] + pub cache_path: String, + + #[arg(long)] + pub cpu: bool, + + /// Display the token for the specified prompt. + #[arg(long)] + pub verbose_prompt: bool, + + /// The temperature used to generate samples. + #[arg(long)] + pub temperature: Option, + + /// Nucleus sampling probability cutoff. + #[arg(long)] + pub top_p: Option, + + /// The seed to use when generating random samples. + #[arg(long)] + pub seed: Option, + + /// The length of the sample to generate (in tokens). + #[arg(long, short = 'n', default_value_t = 5000)] + pub sample_len: usize, + + #[arg(long)] + pub model_id: Option, + + #[arg(long)] + pub revision: Option, + + #[arg(long)] + pub weight_file: Option, + + #[arg(long)] + pub tokenizer: Option, + + /// Penalty to be applied for repeating tokens, 1. means no penalty. + #[arg(long, default_value_t = 1.1)] + pub repeat_penalty: f32, + + /// The context size to consider for the repeat penalty. + #[arg(long, default_value_t = 64)] + pub repeat_last_n: usize, } diff --git a/candle_demo/api-server/src/main.rs b/candle_demo/api-server/src/main.rs index 15a2d50..dc7ad2c 100644 --- a/candle_demo/api-server/src/main.rs +++ b/candle_demo/api-server/src/main.rs @@ -1,19 +1,113 @@ -mod api; mod args; -mod server; mod model; +mod server; +mod streamer; +use axum::{routing, Router}; +use candle_core::DType; +use candle_nn::VarBuilder; use clap::Parser; -use owo_colors::OwoColorize; +use codegeex4::codegeex4::*; +use codegeex4::TextGenerationApiServer; +use hf_hub::{Repo, RepoType}; +use owo_colors::{self, OwoColorize}; +use rand::Rng; +use server::chat; +use std::sync::{Arc, Mutex}; +use tokenizers::Tokenizer; +use tower_http::cors::{AllowOrigin, CorsLayer}; + +pub struct Data { + pub pipeline: Mutex, +} #[tokio::main] async fn main() { let args = args::Args::parse(); - let server = server::Server::new(args.clone()); println!( "{} Server Binding On {} with {} workers", "[INFO]".green(), &args.address.purple(), &args.workers.purple() ); - server.run().await; + + let mut seed: u64 = 0; + if let Some(_seed) = args.seed { + seed = _seed; + } else { + let mut rng = rand::thread_rng(); + seed = rng.gen(); + } + println!("Using Seed {}", seed.red()); + let api = hf_hub::api::sync::ApiBuilder::from_cache(hf_hub::Cache::new(args.cache_path.into())) + .build() + .unwrap(); + + let model_id = match args.model_id { + Some(model_id) => model_id.to_string(), + None => "THUDM/codegeex4-all-9b".to_string(), + }; + let revision = match args.revision { + Some(rev) => rev.to_string(), + None => "main".to_string(), + }; + let repo = api.repo(Repo::with_revision(model_id, RepoType::Model, revision)); + let tokenizer_filename = match args.tokenizer { + Some(file) => std::path::PathBuf::from(file), + None => api + .model("THUDM/codegeex4-all-9b".to_string()) + .get("tokenizer.json") + .unwrap(), + }; + let filenames = match args.weight_file { + Some(weight_file) => vec![std::path::PathBuf::from(weight_file)], + None => { + candle_examples::hub_load_safetensors(&repo, "model.safetensors.index.json").unwrap() + } + }; + let tokenizer = Tokenizer::from_file(tokenizer_filename).expect("Tokenizer Error"); + let start = std::time::Instant::now(); + let config = Config::codegeex4(); + let device = candle_examples::device(args.cpu).unwrap(); + let dtype = if device.is_cuda() { + DType::BF16 + } else { + DType::F32 + }; + println!("DType is {:?}", dtype.yellow()); + let vb = unsafe { VarBuilder::from_mmaped_safetensors(&filenames, dtype, &device).unwrap() }; + let model = Model::new(&config, vb).unwrap(); + + println!("模型加载完毕 {:?}", start.elapsed().as_secs().green()); + + let pipeline = TextGenerationApiServer::new( + model, + tokenizer, + seed, + args.temperature, + args.top_p, + args.repeat_penalty, + args.repeat_last_n, + args.verbose_prompt, + &device, + dtype, + ); + let server_data = Arc::new(Data { + pipeline: Mutex::new(pipeline), + }); + + let allow_origin = AllowOrigin::any(); + let allow_methods = tower_http::cors::AllowMethods::any(); + let allow_headers = tower_http::cors::AllowHeaders::any(); + let cors_layer = CorsLayer::new() + .allow_methods(allow_methods) + .allow_headers(allow_headers) + .allow_origin(allow_origin); + let chat = Router::new() + // .route("/v1/chat/completions", routing::post(raw)) + .route("/v1/chat/completions", routing::post(chat)) + .layer(cors_layer) + .with_state(server_data); + // .with_state(Arc::new(server_data)); + let listener = tokio::net::TcpListener::bind(args.address).await.unwrap(); + axum::serve(listener, chat).await.unwrap(); } diff --git a/candle_demo/api-server/src/model.rs b/candle_demo/api-server/src/model.rs index feb1d6d..8b13789 100644 --- a/candle_demo/api-server/src/model.rs +++ b/candle_demo/api-server/src/model.rs @@ -1,6 +1 @@ -use codegeex4::codegeex4::Config; -use crate::api::ChatCompletionRequest; -fn stream_chat(request: ChatCompletionRequest) { - let default_config = codegeex4::codegeex4::Config::codegeex4(); - -} + diff --git a/candle_demo/api-server/src/server.rs b/candle_demo/api-server/src/server.rs index bd554d5..a9fcfc3 100644 --- a/candle_demo/api-server/src/server.rs +++ b/candle_demo/api-server/src/server.rs @@ -1,31 +1,80 @@ -use crate::args::Args; -use actix_web::{web, App, HttpResponse, HttpServer}; -use owo_colors::OwoColorize; +use crate::Data; +use axum::response::IntoResponse; +use axum::{ + extract::{Json, State}, + response::sse::{KeepAlive, Sse}, +}; +use codegeex4::{ + api::{ChatCompletionRequest, ChatCompletionResponse}, + stream::Streamer, +}; +use std::sync::Arc; +use std::time::Duration; +pub enum ChatResponder { + Streamer(Sse), + Completion(ChatCompletionResponse), +} -#[derive(Debug)] -pub struct Server { - config: Args, +impl IntoResponse for ChatResponder { + fn into_response(self) -> axum::response::Response { + match self { + ChatResponder::Streamer(s) => s.into_response(), + ChatResponder::Completion(s) => Json(s).into_response(), + } + } } -impl Server { - pub fn new(config: Args) -> Self { - return Server { config }; +pub async fn chat( + State(data): State>, + request: Json, +) -> ChatResponder { + // debug + let max_tokens = match request.max_tokens { + Some(max_tokens) => max_tokens, + None => 1024, + }; + + let mut prompt = String::new(); + for message in &request.messages { + prompt.push_str(message.get("content").unwrap()); } - pub async fn run(&self) -> () { - HttpServer::new(move || App::new()) - .bind(&self.config.address) - .expect(&format!("{}", "Unable To Bind Server !".red())) - .workers(self.config.workers) - .run() - .await - .expect(&format!("{}", "Unable To Run the Server !".red())); + + if request.stream.is_some_and(|x| x == false) { + println!("测试链接"); + return ChatResponder::Completion(ChatCompletionResponse { + id: "".to_string(), + choices: vec![], + object: "chat.completion".to_string(), + created: 0, + model: "codegeex4".to_string(), + }); } -} -// use super::api::*; -// use uuid; -// pub async fn chat(request: ChatCompletionRequest) ->impl Responder { -// if request.stream == true { -// return Htt -// } + let (response_tx, rx) = flume::unbounded(); + + let _ = tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(async move { + data.pipeline + .lock() + .unwrap() + .run(prompt, max_tokens, response_tx.clone()).await; + }); + }); + println!("打开SSE"); + let streamer = ChatResponder::Streamer( + Sse::new(Streamer { + rx, + status: codegeex4::stream::StreamingStatus::Uninitilized, + }) + .keep_alive( + KeepAlive::new() + .interval(Duration::from_secs(20)) + .text("keep-alive-text"), + ), + ); + + return streamer; +} +// pub async fn sse() -> Sse>> { +// let stream = // } diff --git a/candle_demo/api-server/src/streamer.rs b/candle_demo/api-server/src/streamer.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/candle_demo/api-server/src/streamer.rs @@ -0,0 +1 @@ + diff --git a/candle_demo/codegeex4/Cargo.toml b/candle_demo/codegeex4/Cargo.toml index db038cd..3cc0115 100644 --- a/candle_demo/codegeex4/Cargo.toml +++ b/candle_demo/codegeex4/Cargo.toml @@ -22,9 +22,11 @@ accelerate-src = { workspace = true, optional = true} intel-mkl-src = { workspace = true ,optional = true} rand = { workspace = true} owo-colors = {workspace = true} - - - +flume = {workspace = true} +serde = {workspace = true} +axum = {workspace = true} +futures= {workspace=true} +uuid = {workspace=true} [build-dependencies] bindgen_cuda = { version = "0.1.1", optional = true } [features] diff --git a/candle_demo/codegeex4/src/api.rs b/candle_demo/codegeex4/src/api.rs new file mode 100644 index 0000000..6485cb4 --- /dev/null +++ b/candle_demo/codegeex4/src/api.rs @@ -0,0 +1,86 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChatCompletionChunk { + pub id: String, + pub object: String, + pub created: u64, + pub model: String, + pub choices: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Choice { + pub delta: ChatChoiceData, + pub finish_reason: Option, + pub index: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChatCompletionRequest { + pub messages: Vec>, + pub model: String, + #[serde(default)] + pub stream: Option, //false + pub temperature: Option, //0.7 + #[serde(default)] + pub top_p: Option, //1.0 + #[serde(default)] + pub max_tokens: Option, //None + #[serde(default)] + pub stop: Option, + #[serde(default)] + presence_penalty: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChatCompletionResponse { + pub id: String, + pub choices: Vec, + pub created: u64, + pub model: String, + pub object: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChatCompletionResponseStream { + pub id: String, + pub choices: Vec, + pub created: u64, + pub model: String, + pub object: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChatChoice { + pub index: usize, + pub message: ChatChoiceData, + pub finish_reason: Option, + pub logprobs: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChatChoiceStream { + pub index: usize, + pub delta: ChatChoiceData, + pub finish_reason: Option, + pub logprobs: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChatChoiceData { + pub role: String, + pub content: Option, +} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChatCompletionUsageResponse { + pub request_id: String, + pub created: u64, + pub completion_tokens: usize, + pub prompt_tokens: usize, + pub total_tokens: usize, + pub prompt_time_costs: usize, + pub completion_time_costs: usize, +} diff --git a/candle_demo/codegeex4/src/lib.rs b/candle_demo/codegeex4/src/lib.rs index de7a75a..cda2875 100644 --- a/candle_demo/codegeex4/src/lib.rs +++ b/candle_demo/codegeex4/src/lib.rs @@ -1,15 +1,21 @@ pub mod codegeex4; +pub mod api; pub mod args; - +pub mod stream; +use api::ChatChoiceData; +use api::ChatChoiceStream; +use api::ChatCompletionChunk; use candle_core::{DType, Device, Tensor}; use candle_transformers::generation::LogitsProcessor; use codegeex4::*; + +use flume::Sender; use owo_colors::{self, OwoColorize}; use std::io::BufRead; use std::io::BufReader; +use stream::ChatResponse; use tokenizers::Tokenizer; - pub struct TextGeneration { model: Model, device: Device, @@ -138,3 +144,141 @@ impl TextGeneration { Ok(()) } } + +pub struct TextGenerationApiServer { + model: Model, + device: Device, + tokenizer: Tokenizer, + logits_processor: LogitsProcessor, + repeat_penalty: f32, + repeat_last_n: usize, + verbose_prompt: bool, + dtype: DType, +} + +impl TextGenerationApiServer { + #[allow(clippy::too_many_arguments)] + pub fn new( + model: Model, + tokenizer: Tokenizer, + seed: u64, + temp: Option, + top_p: Option, + repeat_penalty: f32, + repeat_last_n: usize, + verbose_prompt: bool, + device: &Device, + dtype: DType, + ) -> Self { + let logits_processor = LogitsProcessor::new(seed, temp, top_p); + Self { + model, + tokenizer, + logits_processor, + repeat_penalty, + repeat_last_n, + verbose_prompt, + device: device.clone(), + dtype, + } + } + + pub async fn run(&mut self, prompt: String, sample_len: usize, sender: Sender) -> () { + let tokens = self.tokenizer.encode(prompt, true).expect("tokens error"); + if tokens.is_empty() { + panic!("Empty prompts are not supported in the chatglm model.") + } + if self.verbose_prompt { + for (token, id) in tokens.get_tokens().iter().zip(tokens.get_ids().iter()) { + let token = token.replace('▁', " ").replace("<0x0A>", "\n"); + println!("{id:7} -> '{token}'"); + } + } + let eos_token = match self.tokenizer.get_vocab(true).get("<|endoftext|>") { + Some(token) => *token, + None => panic!("cannot find the endoftext token"), + }; + let mut tokens = tokens.get_ids().to_vec(); + let mut generated_tokens = 0usize; + + let start_gen = std::time::Instant::now(); + + for index in 0..sample_len { + let context_size = if index > 0 { 1 } else { tokens.len() }; + let ctxt = &tokens[tokens.len().saturating_sub(context_size)..]; + let input = Tensor::new(ctxt, &self.device) + .unwrap() + .unsqueeze(0) + .expect("create tensor input error"); + let logits = self.model.forward(&input).unwrap(); + let logits = logits.squeeze(0).unwrap().to_dtype(self.dtype).unwrap(); + let logits = if self.repeat_penalty == 1. { + logits + } else { + let start_at = tokens.len().saturating_sub(self.repeat_last_n); + candle_transformers::utils::apply_repeat_penalty( + &logits, + self.repeat_penalty, + &tokens[start_at..], + ) + .unwrap() + }; + + let next_token = self.logits_processor.sample(&logits).unwrap(); + tokens.push(next_token); + generated_tokens += 1; + if next_token == eos_token { + break; + } + let token = self + .tokenizer + .decode(&[next_token], true) + .expect("Token error"); + if self.verbose_prompt { + println!( + "[Index: {}] [Raw Token: {}] [Decode Token: {}]", + index.blue(), + next_token.green(), + token.yellow() + ); + } + let chunk = ChatResponse::Chunk(build_response_chunk(token).await); + let _ = sender.send(chunk); + println!("send"); + } + // 发送Done + let _ = sender.send(ChatResponse::Done); + let dt = start_gen.elapsed(); + if self.verbose_prompt { + println!( + "\n{generated_tokens} tokens generated ({:.2} token/s)", + generated_tokens as f64 / dt.as_secs_f64(), + ); + } + self.model.reset_kv_cache(); // 清理模型kv + } +} + +async fn build_response_chunk(tokens: String) -> ChatCompletionChunk { + let uuid = uuid::Uuid::new_v4(); + let completion_id = format!("chatcmpl-{}", uuid); + + let choice_data = ChatChoiceData { + role: "assistant".to_string(), + content: Some(tokens), + }; + let choice = ChatChoiceStream { + delta: choice_data.clone(), + //finish_reason: Some("stop".to_string()), + finish_reason: None, + index: 0, + logprobs: None, + }; + return ChatCompletionChunk { + id: completion_id, + object: "chat.completion.chunk".to_string(), + created: 0, + model: "codegeex4".to_string(), + choices: vec![choice], + }; +} diff --git a/candle_demo/codegeex4/src/stream.rs b/candle_demo/codegeex4/src/stream.rs new file mode 100644 index 0000000..859b66f --- /dev/null +++ b/candle_demo/codegeex4/src/stream.rs @@ -0,0 +1,69 @@ +use crate::api::ChatCompletionChunk; +use axum::response::sse::Event; +use flume::Receiver; +use futures::Stream; +use owo_colors::OwoColorize; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +#[derive(PartialEq)] +pub enum StreamingStatus { + Uninitilized, + Started, + Interrupted, + Stopped, +} +pub enum ChatResponse { + InternalError(String), + ValidationError(String), + ModelError(String), + Chunk(ChatCompletionChunk), + Done, //finish flag +} +pub struct Streamer { + pub rx: Receiver, + pub status: StreamingStatus, +} + +impl Stream for Streamer { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if self.status == StreamingStatus::Stopped { + return Poll::Ready(None); + } + match self.rx.try_recv() { + Ok(resp) => match resp { + ChatResponse::InternalError(e) => Poll::Ready(Some(Ok(Event::default().data(e)))), + ChatResponse::ValidationError(e) => Poll::Ready(Some(Ok(Event::default().data(e)))), + ChatResponse::ModelError(e) => Poll::Ready(Some(Ok(Event::default().data(e)))), + ChatResponse::Chunk(response) => { + if self.status != StreamingStatus::Started { + self.status = StreamingStatus::Started; + } + Poll::Ready(Some(Event::default().json_data(response))) + } + ChatResponse::Done => { + println!("{}", "SSE通道关闭".yellow()); + self.status = StreamingStatus::Stopped; + Poll::Ready(Some(Ok(Event::default().data("[DONE]")))) + } + }, + + Err(e) => { + { + if self.status == StreamingStatus::Started + && e == flume::TryRecvError::Disconnected + { + //no TryRecvError::Disconnected returned even if the client closed the stream or disconnected + self.status = StreamingStatus::Interrupted; + Poll::Ready(None) + } else { + Poll::Pending + } + } + } + } + } +} diff --git a/resources/rust-api-server.png b/resources/rust-api-server.png new file mode 100644 index 0000000..602443a Binary files /dev/null and b/resources/rust-api-server.png differ