Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ let server = hyper_server::create_server(
HyperServerOptions {
host: "127.0.0.1".to_string(),
sse_support: false,
event_store: Some(Arc::new(InMemoryEventStore::default())), // enable resumability
..Default::default()
},
);
Expand Down Expand Up @@ -191,7 +192,6 @@ impl ServerHandler for MyServerHandler {
}

/// Handles requests to call a specific tool.

async fn handle_call_tool_request( &self, request: CallToolRequest, runtime: Arc<dyn McpServer> ) -> Result<CallToolResult, CallToolError> {

if request.tool_name() == SayHelloTool::tool_name() {
Expand Down Expand Up @@ -416,6 +416,7 @@ server.start().await?;

Here is a list of available options with descriptions for configuring the HyperServer:
```rs

pub struct HyperServerOptions {
/// Hostname or IP address the server will bind to (default: "127.0.0.1")
pub host: String,
Expand All @@ -432,6 +433,10 @@ pub struct HyperServerOptions {
/// Shared transport configuration used by the server
pub transport_options: Arc<TransportOptions>,

/// Event store for resumability support
/// If provided, resumability will be enabled, allowing clients to reconnect and resume messages
pub event_store: Option<Arc<dyn EventStore>>,

/// This setting only applies to streamable HTTP.
/// If true, the server will return JSON responses instead of starting an SSE stream.
/// This can be useful for simple request/response scenarios without streaming.
Expand Down Expand Up @@ -500,8 +505,8 @@ The `rust-mcp-sdk` crate provides several features that can be enabled or disabl
- `macros`: Provides procedural macros for simplifying the creation and manipulation of MCP Tool structures.
- `sse`: Enables support for the `Server-Sent Events (SSE)` transport.
- `streamable-http`: Enables support for the `Streamable HTTP` transport.
- `stdio`: Enables support for the `standard input/output (stdio)` transport.

- `stdio`: Enables support for the `standard input/output (stdio)` transport.
- `tls-no-provider`: Enables TLS without a crypto provider. This is useful if you are already using a different crypto provider than the aws-lc default.

#### MCP Protocol Versions with Corresponding Features
Expand Down
8 changes: 7 additions & 1 deletion crates/rust-mcp-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ let server = hyper_server::create_server(
HyperServerOptions {
host: "127.0.0.1".to_string(),
sse_support: false,
event_store: Some(Arc::new(InMemoryEventStore::default())), // enable resumability
..Default::default()
},
);
Expand Down Expand Up @@ -415,6 +416,7 @@ server.start().await?;

Here is a list of available options with descriptions for configuring the HyperServer:
```rs

pub struct HyperServerOptions {
/// Hostname or IP address the server will bind to (default: "127.0.0.1")
pub host: String,
Expand All @@ -431,6 +433,10 @@ pub struct HyperServerOptions {
/// Shared transport configuration used by the server
pub transport_options: Arc<TransportOptions>,

/// Event store for resumability support
/// If provided, resumability will be enabled, allowing clients to reconnect and resume messages
pub event_store: Option<Arc<dyn EventStore>>,

/// This setting only applies to streamable HTTP.
/// If true, the server will return JSON responses instead of starting an SSE stream.
/// This can be useful for simple request/response scenarios without streaming.
Expand Down Expand Up @@ -499,8 +505,8 @@ The `rust-mcp-sdk` crate provides several features that can be enabled or disabl
- `macros`: Provides procedural macros for simplifying the creation and manipulation of MCP Tool structures.
- `sse`: Enables support for the `Server-Sent Events (SSE)` transport.
- `streamable-http`: Enables support for the `Streamable HTTP` transport.
- `stdio`: Enables support for the `standard input/output (stdio)` transport.

- `stdio`: Enables support for the `standard input/output (stdio)` transport.
- `tls-no-provider`: Enables TLS without a crypto provider. This is useful if you are already using a different crypto provider than the aws-lc default.

#### MCP Protocol Versions with Corresponding Features
Expand Down
4 changes: 4 additions & 0 deletions crates/rust-mcp-sdk/src/hyper_servers/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration};
use super::session_store::SessionStore;
use crate::mcp_traits::mcp_handler::McpServerHandler;
use crate::{id_generator::FastIdGenerator, mcp_traits::IdGenerator, schema::InitializeResult};
use rust_mcp_transport::event_store::EventStore;
use rust_mcp_transport::{SessionId, TransportOptions};

/// Application state struct for the Hyper server
Expand Down Expand Up @@ -30,6 +31,9 @@ pub struct AppState {
/// Enable DNS rebinding protection (requires allowedHosts and/or allowedOrigins to be configured).
/// Default is false for backwards compatibility.
pub dns_rebinding_protection: bool,
/// Event store for resumability support
/// If provided, resumability will be enabled, allowing clients to reconnect and resume messages
pub event_store: Option<Arc<dyn EventStore>>,
}

impl AppState {
Expand Down
125 changes: 97 additions & 28 deletions crates/rust-mcp-sdk/src/hyper_servers/routes/hyper_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use axum::{
use futures::stream;
use hyper::{header, HeaderMap, StatusCode};
use rust_mcp_transport::{
SessionId, SseTransport, StreamId, MCP_PROTOCOL_VERSION_HEADER, MCP_SESSION_ID_HEADER,
EventId, McpDispatch, SessionId, SseTransport, StreamId, ID_SEPARATOR,
MCP_PROTOCOL_VERSION_HEADER, MCP_SESSION_ID_HEADER,
};
use std::{sync::Arc, time::Duration};
use tokio::io::{duplex, AsyncBufReadExt, BufReader};
Expand All @@ -36,6 +37,7 @@ async fn create_sse_stream(
state: Arc<AppState>,
payload: Option<&str>,
standalone: bool,
last_event_id: Option<EventId>,
) -> TransportServerResult<hyper::Response<axum::body::Body>> {
let payload_string = payload.map(|p| p.to_string());

Expand All @@ -53,50 +55,85 @@ async fn create_sse_stream(
// writable stream to deliver message to the client
let (write_tx, write_rx) = duplex(DUPLEX_BUFFER_SIZE);

let transport = Arc::new(
SseTransport::<ClientMessage>::new(
read_rx,
write_tx,
read_tx,
Arc::clone(&state.transport_options),
)
.map_err(|err| TransportServerError::TransportError(err.to_string()))?,
);

let stream_id: StreamId = if standalone {
DEFAULT_STREAM_ID.to_string()
let session_id = Arc::new(session_id);
let stream_id: Arc<StreamId> = if standalone {
Arc::new(DEFAULT_STREAM_ID.to_string())
} else {
state.stream_id_gen.generate()
Arc::new(state.stream_id_gen.generate())
};

let event_store = state.event_store.as_ref().map(Arc::clone);
let resumability_enabled = event_store.is_some();

let mut transport = SseTransport::<ClientMessage>::new(
read_rx,
write_tx,
read_tx,
Arc::clone(&state.transport_options),
)
.map_err(|err| TransportServerError::TransportError(err.to_string()))?;
if let Some(event_store) = event_store.clone() {
transport.make_resumable((*session_id).clone(), (*stream_id).clone(), event_store);
}
let transport = Arc::new(transport);

let ping_interval = state.ping_interval;
let runtime_clone = Arc::clone(&runtime);
let stream_id_clone = stream_id.clone();
let transport_clone = transport.clone();

//Start the server runtime
tokio::spawn(async move {
match runtime_clone
.start_stream(transport, &stream_id, ping_interval, payload_string)
.start_stream(
transport_clone,
&stream_id_clone,
ping_interval,
payload_string,
)
.await
{
Ok(_) => tracing::trace!("stream {} exited gracefully.", &stream_id),
Err(err) => tracing::info!("stream {} exited with error : {}", &stream_id, err),
Ok(_) => tracing::trace!("stream {} exited gracefully.", &stream_id_clone),
Err(err) => tracing::info!("stream {} exited with error : {}", &stream_id_clone, err),
}
let _ = runtime.remove_transport(&stream_id).await;
let _ = runtime.remove_transport(&stream_id_clone).await;
});

// Construct SSE stream
let reader = BufReader::new(write_rx);

// outgoing messages from server to the client
let message_stream = stream::unfold(reader, |mut reader| async move {
let mut line = String::new();

match reader.read_line(&mut line).await {
Ok(0) => None, // EOF
Ok(_) => {
let trimmed_line = line.trim_end_matches('\n').to_owned();
Some((Ok(Event::default().data(trimmed_line)), reader))
// send outgoing messages from server to the client over the sse stream
let message_stream = stream::unfold(reader, move |mut reader| {
async move {
let mut line = String::new();

match reader.read_line(&mut line).await {
Ok(0) => None, // EOF
Ok(_) => {
let trimmed_line = line.trim_end_matches('\n').to_owned();

// empty sse comment to keep-alive
if is_empty_sse_message(&trimmed_line) {
return Some((Ok(Event::default()), reader));
}

let (event_id, message) = match (
resumability_enabled,
trimmed_line.split_once(char::from(ID_SEPARATOR)),
) {
(true, Some((id, msg))) => (Some(id.to_string()), msg.to_string()),
_ => (None, trimmed_line),
};

let event = match event_id {
Some(id) => Event::default().data(message).id(id),
None => Event::default().data(message),
};

Some((Ok(event), reader))
}
Err(e) => Some((Err(e), reader)),
}
Err(e) => Some((Err(e), reader)),
}
});

Expand All @@ -111,6 +148,23 @@ async fn create_sse_stream(
HeaderValue::from_str(&session_id).unwrap(),
);

// if last_event_id exists we replay messages from the event-store
tokio::spawn(async move {
if let Some(last_event_id) = last_event_id {
if let Some(event_store) = state.event_store.as_ref() {
if let Some(events) = event_store.events_after(last_event_id).await {
for message_payload in events.messages {
// skip storing replay messages
let error = transport.write_str(&message_payload, true).await;
if let Err(error) = error {
tracing::trace!("Error replaying message: {error}")
}
}
}
}
}
});

if !payload_contains_request {
*response.status_mut() = StatusCode::ACCEPTED;
}
Expand Down Expand Up @@ -148,6 +202,7 @@ fn is_result(json_str: &str) -> Result<bool, serde_json::Error> {

pub async fn create_standalone_stream(
session_id: SessionId,
last_event_id: Option<EventId>,
state: Arc<AppState>,
) -> TransportServerResult<hyper::Response<axum::body::Body>> {
let runtime = state.session_store.get(&session_id).await.ok_or(
Expand All @@ -161,12 +216,20 @@ pub async fn create_standalone_stream(
return Ok((StatusCode::CONFLICT, Json(error)).into_response());
}

if let Some(last_event_id) = last_event_id.as_ref() {
tracing::trace!(
"SSE stream re-connected with last-event-id: {}",
last_event_id
);
}

let mut response = create_sse_stream(
runtime.clone(),
session_id.clone(),
state.clone(),
None,
true,
last_event_id,
)
.await?;
*response.status_mut() = StatusCode::OK;
Expand Down Expand Up @@ -195,6 +258,7 @@ pub async fn start_new_session(
state.clone(),
Some(payload),
false,
None,
)
.await;

Expand Down Expand Up @@ -354,6 +418,7 @@ pub async fn process_incoming_message(
state.clone(),
Some(payload),
false,
None,
)
.await
}
Expand All @@ -365,6 +430,10 @@ pub async fn process_incoming_message(
}
}

pub fn is_empty_sse_message(sse_payload: &str) -> bool {
sse_payload.is_empty() || sse_payload.trim() == ":"
}

pub async fn delete_session(
session_id: SessionId,
state: Arc<AppState>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use axum::{
Json, Router,
};
use hyper::{HeaderMap, StatusCode};
use rust_mcp_transport::{SessionId, MCP_SESSION_ID_HEADER};
use rust_mcp_transport::{SessionId, MCP_LAST_EVENT_ID_HEADER, MCP_SESSION_ID_HEADER};
use std::{collections::HashMap, sync::Arc};

pub fn routes(state: Arc<AppState>, streamable_http_endpoint: &str) -> Router<Arc<AppState>> {
Expand Down Expand Up @@ -60,9 +60,14 @@ pub async fn handle_streamable_http_get(
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string());

let last_event_id: Option<SessionId> = headers
.get(MCP_LAST_EVENT_ID_HEADER)
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string());

match session_id {
Some(session_id) => {
let res = create_standalone_stream(session_id, state).await?;
let res = create_standalone_stream(session_id, last_event_id, state).await?;
Ok(res.into_response())
}
None => {
Expand Down
8 changes: 7 additions & 1 deletion crates/rust-mcp-sdk/src/hyper_servers/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::{
};
use crate::schema::InitializeResult;
use axum::Router;
use rust_mcp_transport::{SessionId, TransportOptions};
use rust_mcp_transport::{event_store::EventStore, SessionId, TransportOptions};

// Default client ping interval (12 seconds)
const DEFAULT_CLIENT_PING_INTERVAL: Duration = Duration::from_secs(12);
Expand Down Expand Up @@ -53,6 +53,10 @@ pub struct HyperServerOptions {
/// Shared transport configuration used by the server
pub transport_options: Arc<TransportOptions>,

/// Event store for resumability support
/// If provided, resumability will be enabled, allowing clients to reconnect and resume messages
pub event_store: Option<Arc<dyn EventStore>>,

/// This setting only applies to streamable HTTP.
/// If true, the server will return JSON responses instead of starting an SSE stream.
/// This can be useful for simple request/response scenarios without streaming.
Expand Down Expand Up @@ -225,6 +229,7 @@ impl Default for HyperServerOptions {
allowed_hosts: None,
allowed_origins: None,
dns_rebinding_protection: false,
event_store: None,
}
}
}
Expand Down Expand Up @@ -271,6 +276,7 @@ impl HyperServer {
allowed_hosts: server_options.allowed_hosts.take(),
allowed_origins: server_options.allowed_origins.take(),
dns_rebinding_protection: server_options.dns_rebinding_protection,
event_store: server_options.event_store.as_ref().map(Arc::clone),
});
let app = app_routes(Arc::clone(&state), &server_options);
Self {
Expand Down
6 changes: 4 additions & 2 deletions crates/rust-mcp-sdk/src/mcp_runtimes/server_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,16 +368,17 @@ impl ServerRuntime {
Ok(())
}

//TODO: re-visit and simplify unnecessary hashmap
pub(crate) async fn remove_transport(&self, stream_id: &str) -> SdkResult<()> {
if stream_id != DEFAULT_STREAM_ID {
return Ok(());
}
let mut transport_map = self.transport_map.write().await;
let transport_map = self.transport_map.read().await;
tracing::trace!("removing transport for stream id : {}", stream_id);
if let Some(transport) = transport_map.get(stream_id) {
transport.shut_down().await?;
}
transport_map.remove(stream_id);
// transport_map.remove(stream_id);
Ok(())
}

Expand Down Expand Up @@ -435,6 +436,7 @@ impl ServerRuntime {
};

// in case there is a payload, we consume it by transport to get processed
// payload would be message payload coming from the client
if let Some(payload) = payload {
if let Err(err) = transport.consume_string_payload(&payload).await {
let _ = self.remove_transport(stream_id).await;
Expand Down
Loading