Skip to content

Commit 08742bb

Browse files
authored
feat: event store support for resumability (#101)
* Add Streamable HTTP Client and multiple refactoring and improvements * chore: typos * chore: update readme * feat: introduce event-store * chore: add event store to the app state * chore: refactor event store integration * chore: add tracing to inmemory store * chore: update examples to use event store * chore: improve flow * chore: replay mechanism * cleanup * test: add new test for event-store * chore: add tracing to tests * chore: add test * chore: refactor replaying logic * chore: cleanup * typo
1 parent abb0c36 commit 08742bb

File tree

24 files changed

+863
-106
lines changed

24 files changed

+863
-106
lines changed

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ let server = hyper_server::create_server(
153153
HyperServerOptions {
154154
host: "127.0.0.1".to_string(),
155155
sse_support: false,
156+
event_store: Some(Arc::new(InMemoryEventStore::default())), // enable resumability
156157
..Default::default()
157158
},
158159
);
@@ -191,7 +192,6 @@ impl ServerHandler for MyServerHandler {
191192
}
192193

193194
/// Handles requests to call a specific tool.
194-
195195
async fn handle_call_tool_request( &self, request: CallToolRequest, runtime: Arc<dyn McpServer> ) -> Result<CallToolResult, CallToolError> {
196196

197197
if request.tool_name() == SayHelloTool::tool_name() {
@@ -416,6 +416,7 @@ server.start().await?;
416416

417417
Here is a list of available options with descriptions for configuring the HyperServer:
418418
```rs
419+
419420
pub struct HyperServerOptions {
420421
/// Hostname or IP address the server will bind to (default: "127.0.0.1")
421422
pub host: String,
@@ -432,6 +433,10 @@ pub struct HyperServerOptions {
432433
/// Shared transport configuration used by the server
433434
pub transport_options: Arc<TransportOptions>,
434435

436+
/// Event store for resumability support
437+
/// If provided, resumability will be enabled, allowing clients to reconnect and resume messages
438+
pub event_store: Option<Arc<dyn EventStore>>,
439+
435440
/// This setting only applies to streamable HTTP.
436441
/// If true, the server will return JSON responses instead of starting an SSE stream.
437442
/// This can be useful for simple request/response scenarios without streaming.
@@ -500,8 +505,8 @@ The `rust-mcp-sdk` crate provides several features that can be enabled or disabl
500505
- `macros`: Provides procedural macros for simplifying the creation and manipulation of MCP Tool structures.
501506
- `sse`: Enables support for the `Server-Sent Events (SSE)` transport.
502507
- `streamable-http`: Enables support for the `Streamable HTTP` transport.
503-
- `stdio`: Enables support for the `standard input/output (stdio)` transport.
504508

509+
- `stdio`: Enables support for the `standard input/output (stdio)` transport.
505510
- `tls-no-provider`: Enables TLS without a crypto provider. This is useful if you are already using a different crypto provider than the aws-lc default.
506511

507512
#### MCP Protocol Versions with Corresponding Features

crates/rust-mcp-sdk/README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ let server = hyper_server::create_server(
153153
HyperServerOptions {
154154
host: "127.0.0.1".to_string(),
155155
sse_support: false,
156+
event_store: Some(Arc::new(InMemoryEventStore::default())), // enable resumability
156157
..Default::default()
157158
},
158159
);
@@ -415,6 +416,7 @@ server.start().await?;
415416

416417
Here is a list of available options with descriptions for configuring the HyperServer:
417418
```rs
419+
418420
pub struct HyperServerOptions {
419421
/// Hostname or IP address the server will bind to (default: "127.0.0.1")
420422
pub host: String,
@@ -431,6 +433,10 @@ pub struct HyperServerOptions {
431433
/// Shared transport configuration used by the server
432434
pub transport_options: Arc<TransportOptions>,
433435

436+
/// Event store for resumability support
437+
/// If provided, resumability will be enabled, allowing clients to reconnect and resume messages
438+
pub event_store: Option<Arc<dyn EventStore>>,
439+
434440
/// This setting only applies to streamable HTTP.
435441
/// If true, the server will return JSON responses instead of starting an SSE stream.
436442
/// This can be useful for simple request/response scenarios without streaming.
@@ -499,8 +505,8 @@ The `rust-mcp-sdk` crate provides several features that can be enabled or disabl
499505
- `macros`: Provides procedural macros for simplifying the creation and manipulation of MCP Tool structures.
500506
- `sse`: Enables support for the `Server-Sent Events (SSE)` transport.
501507
- `streamable-http`: Enables support for the `Streamable HTTP` transport.
502-
- `stdio`: Enables support for the `standard input/output (stdio)` transport.
503508

509+
- `stdio`: Enables support for the `standard input/output (stdio)` transport.
504510
- `tls-no-provider`: Enables TLS without a crypto provider. This is useful if you are already using a different crypto provider than the aws-lc default.
505511

506512
#### MCP Protocol Versions with Corresponding Features

crates/rust-mcp-sdk/src/hyper_servers/app_state.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration};
33
use super::session_store::SessionStore;
44
use crate::mcp_traits::mcp_handler::McpServerHandler;
55
use crate::{id_generator::FastIdGenerator, mcp_traits::IdGenerator, schema::InitializeResult};
6+
use rust_mcp_transport::event_store::EventStore;
67
use rust_mcp_transport::{SessionId, TransportOptions};
78

89
/// Application state struct for the Hyper server
@@ -30,6 +31,9 @@ pub struct AppState {
3031
/// Enable DNS rebinding protection (requires allowedHosts and/or allowedOrigins to be configured).
3132
/// Default is false for backwards compatibility.
3233
pub dns_rebinding_protection: bool,
34+
/// Event store for resumability support
35+
/// If provided, resumability will be enabled, allowing clients to reconnect and resume messages
36+
pub event_store: Option<Arc<dyn EventStore>>,
3337
}
3438

3539
impl AppState {

crates/rust-mcp-sdk/src/hyper_servers/routes/hyper_utils.rs

Lines changed: 97 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use axum::{
2323
use futures::stream;
2424
use hyper::{header, HeaderMap, StatusCode};
2525
use rust_mcp_transport::{
26-
SessionId, SseTransport, StreamId, MCP_PROTOCOL_VERSION_HEADER, MCP_SESSION_ID_HEADER,
26+
EventId, McpDispatch, SessionId, SseTransport, StreamId, ID_SEPARATOR,
27+
MCP_PROTOCOL_VERSION_HEADER, MCP_SESSION_ID_HEADER,
2728
};
2829
use std::{sync::Arc, time::Duration};
2930
use tokio::io::{duplex, AsyncBufReadExt, BufReader};
@@ -36,6 +37,7 @@ async fn create_sse_stream(
3637
state: Arc<AppState>,
3738
payload: Option<&str>,
3839
standalone: bool,
40+
last_event_id: Option<EventId>,
3941
) -> TransportServerResult<hyper::Response<axum::body::Body>> {
4042
let payload_string = payload.map(|p| p.to_string());
4143

@@ -53,50 +55,85 @@ async fn create_sse_stream(
5355
// writable stream to deliver message to the client
5456
let (write_tx, write_rx) = duplex(DUPLEX_BUFFER_SIZE);
5557

56-
let transport = Arc::new(
57-
SseTransport::<ClientMessage>::new(
58-
read_rx,
59-
write_tx,
60-
read_tx,
61-
Arc::clone(&state.transport_options),
62-
)
63-
.map_err(|err| TransportServerError::TransportError(err.to_string()))?,
64-
);
65-
66-
let stream_id: StreamId = if standalone {
67-
DEFAULT_STREAM_ID.to_string()
58+
let session_id = Arc::new(session_id);
59+
let stream_id: Arc<StreamId> = if standalone {
60+
Arc::new(DEFAULT_STREAM_ID.to_string())
6861
} else {
69-
state.stream_id_gen.generate()
62+
Arc::new(state.stream_id_gen.generate())
7063
};
64+
65+
let event_store = state.event_store.as_ref().map(Arc::clone);
66+
let resumability_enabled = event_store.is_some();
67+
68+
let mut transport = SseTransport::<ClientMessage>::new(
69+
read_rx,
70+
write_tx,
71+
read_tx,
72+
Arc::clone(&state.transport_options),
73+
)
74+
.map_err(|err| TransportServerError::TransportError(err.to_string()))?;
75+
if let Some(event_store) = event_store.clone() {
76+
transport.make_resumable((*session_id).clone(), (*stream_id).clone(), event_store);
77+
}
78+
let transport = Arc::new(transport);
79+
7180
let ping_interval = state.ping_interval;
7281
let runtime_clone = Arc::clone(&runtime);
82+
let stream_id_clone = stream_id.clone();
83+
let transport_clone = transport.clone();
7384

7485
//Start the server runtime
7586
tokio::spawn(async move {
7687
match runtime_clone
77-
.start_stream(transport, &stream_id, ping_interval, payload_string)
88+
.start_stream(
89+
transport_clone,
90+
&stream_id_clone,
91+
ping_interval,
92+
payload_string,
93+
)
7894
.await
7995
{
80-
Ok(_) => tracing::trace!("stream {} exited gracefully.", &stream_id),
81-
Err(err) => tracing::info!("stream {} exited with error : {}", &stream_id, err),
96+
Ok(_) => tracing::trace!("stream {} exited gracefully.", &stream_id_clone),
97+
Err(err) => tracing::info!("stream {} exited with error : {}", &stream_id_clone, err),
8298
}
83-
let _ = runtime.remove_transport(&stream_id).await;
99+
let _ = runtime.remove_transport(&stream_id_clone).await;
84100
});
85101

86102
// Construct SSE stream
87103
let reader = BufReader::new(write_rx);
88104

89-
// outgoing messages from server to the client
90-
let message_stream = stream::unfold(reader, |mut reader| async move {
91-
let mut line = String::new();
92-
93-
match reader.read_line(&mut line).await {
94-
Ok(0) => None, // EOF
95-
Ok(_) => {
96-
let trimmed_line = line.trim_end_matches('\n').to_owned();
97-
Some((Ok(Event::default().data(trimmed_line)), reader))
105+
// send outgoing messages from server to the client over the sse stream
106+
let message_stream = stream::unfold(reader, move |mut reader| {
107+
async move {
108+
let mut line = String::new();
109+
110+
match reader.read_line(&mut line).await {
111+
Ok(0) => None, // EOF
112+
Ok(_) => {
113+
let trimmed_line = line.trim_end_matches('\n').to_owned();
114+
115+
// empty sse comment to keep-alive
116+
if is_empty_sse_message(&trimmed_line) {
117+
return Some((Ok(Event::default()), reader));
118+
}
119+
120+
let (event_id, message) = match (
121+
resumability_enabled,
122+
trimmed_line.split_once(char::from(ID_SEPARATOR)),
123+
) {
124+
(true, Some((id, msg))) => (Some(id.to_string()), msg.to_string()),
125+
_ => (None, trimmed_line),
126+
};
127+
128+
let event = match event_id {
129+
Some(id) => Event::default().data(message).id(id),
130+
None => Event::default().data(message),
131+
};
132+
133+
Some((Ok(event), reader))
134+
}
135+
Err(e) => Some((Err(e), reader)),
98136
}
99-
Err(e) => Some((Err(e), reader)),
100137
}
101138
});
102139

@@ -111,6 +148,23 @@ async fn create_sse_stream(
111148
HeaderValue::from_str(&session_id).unwrap(),
112149
);
113150

151+
// if last_event_id exists we replay messages from the event-store
152+
tokio::spawn(async move {
153+
if let Some(last_event_id) = last_event_id {
154+
if let Some(event_store) = state.event_store.as_ref() {
155+
if let Some(events) = event_store.events_after(last_event_id).await {
156+
for message_payload in events.messages {
157+
// skip storing replay messages
158+
let error = transport.write_str(&message_payload, true).await;
159+
if let Err(error) = error {
160+
tracing::trace!("Error replaying message: {error}")
161+
}
162+
}
163+
}
164+
}
165+
}
166+
});
167+
114168
if !payload_contains_request {
115169
*response.status_mut() = StatusCode::ACCEPTED;
116170
}
@@ -148,6 +202,7 @@ fn is_result(json_str: &str) -> Result<bool, serde_json::Error> {
148202

149203
pub async fn create_standalone_stream(
150204
session_id: SessionId,
205+
last_event_id: Option<EventId>,
151206
state: Arc<AppState>,
152207
) -> TransportServerResult<hyper::Response<axum::body::Body>> {
153208
let runtime = state.session_store.get(&session_id).await.ok_or(
@@ -161,12 +216,20 @@ pub async fn create_standalone_stream(
161216
return Ok((StatusCode::CONFLICT, Json(error)).into_response());
162217
}
163218

219+
if let Some(last_event_id) = last_event_id.as_ref() {
220+
tracing::trace!(
221+
"SSE stream re-connected with last-event-id: {}",
222+
last_event_id
223+
);
224+
}
225+
164226
let mut response = create_sse_stream(
165227
runtime.clone(),
166228
session_id.clone(),
167229
state.clone(),
168230
None,
169231
true,
232+
last_event_id,
170233
)
171234
.await?;
172235
*response.status_mut() = StatusCode::OK;
@@ -195,6 +258,7 @@ pub async fn start_new_session(
195258
state.clone(),
196259
Some(payload),
197260
false,
261+
None,
198262
)
199263
.await;
200264

@@ -354,6 +418,7 @@ pub async fn process_incoming_message(
354418
state.clone(),
355419
Some(payload),
356420
false,
421+
None,
357422
)
358423
.await
359424
}
@@ -365,6 +430,10 @@ pub async fn process_incoming_message(
365430
}
366431
}
367432

433+
pub fn is_empty_sse_message(sse_payload: &str) -> bool {
434+
sse_payload.is_empty() || sse_payload.trim() == ":"
435+
}
436+
368437
pub async fn delete_session(
369438
session_id: SessionId,
370439
state: Arc<AppState>,

crates/rust-mcp-sdk/src/hyper_servers/routes/streamable_http_routes.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use axum::{
2323
Json, Router,
2424
};
2525
use hyper::{HeaderMap, StatusCode};
26-
use rust_mcp_transport::{SessionId, MCP_SESSION_ID_HEADER};
26+
use rust_mcp_transport::{SessionId, MCP_LAST_EVENT_ID_HEADER, MCP_SESSION_ID_HEADER};
2727
use std::{collections::HashMap, sync::Arc};
2828

2929
pub fn routes(state: Arc<AppState>, streamable_http_endpoint: &str) -> Router<Arc<AppState>> {
@@ -60,9 +60,14 @@ pub async fn handle_streamable_http_get(
6060
.and_then(|value| value.to_str().ok())
6161
.map(|s| s.to_string());
6262

63+
let last_event_id: Option<SessionId> = headers
64+
.get(MCP_LAST_EVENT_ID_HEADER)
65+
.and_then(|value| value.to_str().ok())
66+
.map(|s| s.to_string());
67+
6368
match session_id {
6469
Some(session_id) => {
65-
let res = create_standalone_stream(session_id, state).await?;
70+
let res = create_standalone_stream(session_id, last_event_id, state).await?;
6671
Ok(res.into_response())
6772
}
6873
None => {

crates/rust-mcp-sdk/src/hyper_servers/server.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use super::{
2323
};
2424
use crate::schema::InitializeResult;
2525
use axum::Router;
26-
use rust_mcp_transport::{SessionId, TransportOptions};
26+
use rust_mcp_transport::{event_store::EventStore, SessionId, TransportOptions};
2727

2828
// Default client ping interval (12 seconds)
2929
const DEFAULT_CLIENT_PING_INTERVAL: Duration = Duration::from_secs(12);
@@ -53,6 +53,10 @@ pub struct HyperServerOptions {
5353
/// Shared transport configuration used by the server
5454
pub transport_options: Arc<TransportOptions>,
5555

56+
/// Event store for resumability support
57+
/// If provided, resumability will be enabled, allowing clients to reconnect and resume messages
58+
pub event_store: Option<Arc<dyn EventStore>>,
59+
5660
/// This setting only applies to streamable HTTP.
5761
/// If true, the server will return JSON responses instead of starting an SSE stream.
5862
/// This can be useful for simple request/response scenarios without streaming.
@@ -225,6 +229,7 @@ impl Default for HyperServerOptions {
225229
allowed_hosts: None,
226230
allowed_origins: None,
227231
dns_rebinding_protection: false,
232+
event_store: None,
228233
}
229234
}
230235
}
@@ -271,6 +276,7 @@ impl HyperServer {
271276
allowed_hosts: server_options.allowed_hosts.take(),
272277
allowed_origins: server_options.allowed_origins.take(),
273278
dns_rebinding_protection: server_options.dns_rebinding_protection,
279+
event_store: server_options.event_store.as_ref().map(Arc::clone),
274280
});
275281
let app = app_routes(Arc::clone(&state), &server_options);
276282
Self {

crates/rust-mcp-sdk/src/mcp_runtimes/server_runtime.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,16 +368,17 @@ impl ServerRuntime {
368368
Ok(())
369369
}
370370

371+
//TODO: re-visit and simplify unnecessary hashmap
371372
pub(crate) async fn remove_transport(&self, stream_id: &str) -> SdkResult<()> {
372373
if stream_id != DEFAULT_STREAM_ID {
373374
return Ok(());
374375
}
375-
let mut transport_map = self.transport_map.write().await;
376+
let transport_map = self.transport_map.read().await;
376377
tracing::trace!("removing transport for stream id : {}", stream_id);
377378
if let Some(transport) = transport_map.get(stream_id) {
378379
transport.shut_down().await?;
379380
}
380-
transport_map.remove(stream_id);
381+
// transport_map.remove(stream_id);
381382
Ok(())
382383
}
383384

@@ -435,6 +436,7 @@ impl ServerRuntime {
435436
};
436437

437438
// in case there is a payload, we consume it by transport to get processed
439+
// payload would be message payload coming from the client
438440
if let Some(payload) = payload {
439441
if let Err(err) = transport.consume_string_payload(&payload).await {
440442
let _ = self.remove_transport(stream_id).await;

0 commit comments

Comments
 (0)