From 197206cf9c9d4cbdf49bf613abe271d138da098b Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Fri, 11 Jul 2025 09:21:11 +0200 Subject: [PATCH 01/19] Expose streaming API --- lambda-http/src/lib.rs | 2 +- lambda-http/src/streaming.rs | 48 +++++++++++++++++++++++++++++------- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index cea99750..d6f8dbb0 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -101,7 +101,7 @@ use std::{ }; mod streaming; -pub use streaming::run_with_streaming_response; +pub use streaming::{into_streaming_response, run_with_streaming_response}; /// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type pub type Request = http::Request; diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index a93408b4..f10ebe03 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -3,7 +3,12 @@ use bytes::Bytes; pub use http::{self, Response}; use http_body::Body; use lambda_runtime::Diagnostic; -pub use lambda_runtime::{self, tower::ServiceExt, Error, LambdaEvent, MetadataPrelude, Service, StreamResponse}; +pub use lambda_runtime::{ + self, + tower::util::{MapRequest, MapResponse}, + tower::ServiceExt, + Error, LambdaEvent, MetadataPrelude, Service, StreamResponse, +}; use std::{ fmt::Debug, pin::Pin, @@ -11,12 +16,20 @@ use std::{ }; use tokio_stream::Stream; -/// Starts the Lambda Rust runtime and stream response back [Configure Lambda -/// Streaming Response](https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html). +/// Converts a handler into a streaming-compatible service for use with AWS +/// Lambda. /// -/// This takes care of transforming the LambdaEvent into a [`Request`] and -/// accepts [`http::Response`] as response. -pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error> +/// This function wraps a `Service` implementation, transforming its input and +/// output to be compatible with AWS Lambda's streaming response feature. It +/// provides the necessary middleware to handle `LambdaEvent` requests and +/// converts the `http::Response` into a `StreamResponse` containing a metadata +/// prelude and body stream. +pub fn into_streaming_response<'a, S, B, E>( + handler: S, +) -> MapResponse< + MapRequest) -> Request>, + impl FnOnce(Response) -> StreamResponse> + Clone, +> where S: Service, Error = E>, S::Future: Send + 'a, @@ -25,13 +38,13 @@ where B::Data: Into + Send, B::Error: Into + Send + Debug, { - let svc = ServiceBuilder::new() + ServiceBuilder::new() .map_request(|req: LambdaEvent| { let event: Request = req.payload.into(); event.with_lambda_context(req.context) }) .service(handler) - .map_response(|res| { + .map_response(|res: Response| { let (parts, body) = res.into_parts(); let mut prelude_headers = parts.headers; @@ -54,8 +67,25 @@ where metadata_prelude, stream: BodyStream { body }, } - }); + }) +} +/// Starts the Lambda Rust runtime and stream response back [Configure Lambda +/// Streaming +/// Response](https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html). +/// +/// This takes care of transforming the LambdaEvent into a [`Request`] and +/// accepts [`http::Response`] as response. +pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error> +where + S: Service, Error = E>, + S::Future: Send + 'a, + E: Debug + Into, + B: Body + Unpin + Send + 'static, + B::Data: Into + Send, + B::Error: Into + Send + Debug, +{ + let svc = into_streaming_response(handler); lambda_runtime::run(svc).await } From 55ab900ba8c71b3283a7c2d1be6bd0bb8d09da28 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Fri, 11 Jul 2025 09:42:04 +0200 Subject: [PATCH 02/19] fmt --- lambda-http/src/streaming.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index f10ebe03..e8fb2065 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -5,8 +5,10 @@ use http_body::Body; use lambda_runtime::Diagnostic; pub use lambda_runtime::{ self, - tower::util::{MapRequest, MapResponse}, - tower::ServiceExt, + tower::{ + util::{MapRequest, MapResponse}, + ServiceExt, + }, Error, LambdaEvent, MetadataPrelude, Service, StreamResponse, }; use std::{ From 1c39e2ee8b9fdfb83d03c63928f76b3a616a5125 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Fri, 11 Jul 2025 09:47:08 +0200 Subject: [PATCH 03/19] silence clippy --- lambda-http/src/streaming.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index e8fb2065..67d772c0 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -26,6 +26,7 @@ use tokio_stream::Stream; /// provides the necessary middleware to handle `LambdaEvent` requests and /// converts the `http::Response` into a `StreamResponse` containing a metadata /// prelude and body stream. +#[allow(clippy::type_complexity)] pub fn into_streaming_response<'a, S, B, E>( handler: S, ) -> MapResponse< From 2f44c5a3eb7ea3304ef79e32187341765e435702 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Sun, 10 Aug 2025 11:04:55 +0200 Subject: [PATCH 04/19] udpate --- lambda-http/src/lib.rs | 2 +- lambda-http/src/streaming.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index 6aaa6bc1..bf481ee1 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -102,7 +102,7 @@ use std::{ }; mod streaming; -pub use streaming::{into_streaming_response, run_with_streaming_response}; +pub use streaming::{into_streaming_response_inner, run_with_streaming_response}; /// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type pub type Request = http::Request; diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index 67d772c0..755fcd8a 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -52,8 +52,8 @@ where let mut prelude_headers = parts.headers; - let cookies = prelude_headers.get_all(SET_COOKIE); - let cookies = cookies + let cookies = prelude_headers + .get_all(SET_COOKIE) .iter() .map(|c| String::from_utf8_lossy(c.as_bytes()).to_string()) .collect::>(); From 024876ead980c2e5431f7169a32abc0aa4d873ff Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Sun, 10 Aug 2025 11:20:09 +0200 Subject: [PATCH 05/19] update --- lambda-http/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index bf481ee1..6aaa6bc1 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -102,7 +102,7 @@ use std::{ }; mod streaming; -pub use streaming::{into_streaming_response_inner, run_with_streaming_response}; +pub use streaming::{into_streaming_response, run_with_streaming_response}; /// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type pub type Request = http::Request; From ccd3585f0245dd9b56e998f2fdbe7b6a12d8f6c1 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Sun, 10 Aug 2025 13:41:27 +0200 Subject: [PATCH 06/19] wip --- lambda-http/src/streaming.rs | 41 ++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index 755fcd8a..8b8c56c2 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -2,7 +2,6 @@ use crate::{http::header::SET_COOKIE, request::LambdaRequest, tower::ServiceBuil use bytes::Bytes; pub use http::{self, Response}; use http_body::Body; -use lambda_runtime::Diagnostic; pub use lambda_runtime::{ self, tower::{ @@ -11,6 +10,7 @@ pub use lambda_runtime::{ }, Error, LambdaEvent, MetadataPrelude, Service, StreamResponse, }; +use lambda_runtime::{tower::util::BoxService, Diagnostic}; use std::{ fmt::Debug, pin::Pin, @@ -18,17 +18,25 @@ use std::{ }; use tokio_stream::Stream; -/// Converts a handler into a streaming-compatible service for use with AWS -/// Lambda. -/// -/// This function wraps a `Service` implementation, transforming its input and -/// output to be compatible with AWS Lambda's streaming response feature. It -/// provides the necessary middleware to handle `LambdaEvent` requests and -/// converts the `http::Response` into a `StreamResponse` containing a metadata -/// prelude and body stream. -#[allow(clippy::type_complexity)] +/// Runs the Lambda runtime with a handler that returns **streaming** HTTP +/// responses. pub fn into_streaming_response<'a, S, B, E>( handler: S, +) -> BoxService, StreamResponse>, E> +where + S: Service, Error = E> + Send + 'static, + S::Future: Send + 'a, + E: Debug + Into + 'static, + B: Body + Unpin + Send + 'static, + B::Data: Into + Send, + B::Error: Into + Send + Debug, +{ + into_streaming_response_inner::(handler).boxed() +} + +#[allow(clippy::type_complexity)] +fn into_streaming_response_inner<'a, S, B, E>( + handler: S, ) -> MapResponse< MapRequest) -> Request>, impl FnOnce(Response) -> StreamResponse> + Clone, @@ -73,12 +81,13 @@ where }) } -/// Starts the Lambda Rust runtime and stream response back [Configure Lambda -/// Streaming -/// Response](https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html). +/// Runs the Lambda runtime with a handler that returns **streaming** HTTP +/// responses. +/// +/// See the [AWS docs for response streaming]. /// -/// This takes care of transforming the LambdaEvent into a [`Request`] and -/// accepts [`http::Response`] as response. +/// [AWS docs for response streaming]: +/// https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error> where S: Service, Error = E>, @@ -88,7 +97,7 @@ where B::Data: Into + Send, B::Error: Into + Send + Debug, { - let svc = into_streaming_response(handler); + let svc = into_streaming_response_inner(handler); lambda_runtime::run(svc).await } From 42fc6fda779272f0888d65df01bb6ead00ea9280 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Sat, 16 Aug 2025 12:30:43 +0200 Subject: [PATCH 07/19] StreamAdapter --- lambda-http/src/lib.rs | 2 +- lambda-http/src/streaming.rs | 106 +++++++++++++++++------------------ 2 files changed, 54 insertions(+), 54 deletions(-) diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index 6aaa6bc1..36e2ffbd 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -102,7 +102,7 @@ use std::{ }; mod streaming; -pub use streaming::{into_streaming_response, run_with_streaming_response}; +pub use streaming::{run_with_streaming_response, StreamAdapter}; /// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type pub type Request = http::Request; diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index 8b8c56c2..edc9e1bc 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -1,84 +1,85 @@ -use crate::{http::header::SET_COOKIE, request::LambdaRequest, tower::ServiceBuilder, Request, RequestExt}; +use crate::{http::header::SET_COOKIE, request::LambdaRequest, Request, RequestExt}; use bytes::Bytes; -pub use http::{self, Response}; -use http_body::Body; -pub use lambda_runtime::{ - self, - tower::{ - util::{MapRequest, MapResponse}, - ServiceExt, - }, - Error, LambdaEvent, MetadataPrelude, Service, StreamResponse, -}; -use lambda_runtime::{tower::util::BoxService, Diagnostic}; -use std::{ +use core::{ fmt::Debug, + future::Future, pin::Pin, task::{Context, Poll}, }; +pub use http::{self, Response}; +use http_body::Body; +use lambda_runtime::Diagnostic; +pub use lambda_runtime::{Error, LambdaEvent, MetadataPrelude, Service, StreamResponse}; +use std::marker::PhantomData; use tokio_stream::Stream; -/// Runs the Lambda runtime with a handler that returns **streaming** HTTP +/// An adapter that lifts a standard [`Service`] into a +/// [`Service>`] which produces streaming Lambda HTTP /// responses. -pub fn into_streaming_response<'a, S, B, E>( - handler: S, -) -> BoxService, StreamResponse>, E> +pub struct StreamAdapter<'a, S, B> { + service: S, + _phantom_data: PhantomData<&'a B>, +} + +impl<'a, S, B, E> From for StreamAdapter<'a, S, B> where - S: Service, Error = E> + Send + 'static, + S: Service, Error = E>, S::Future: Send + 'a, - E: Debug + Into + 'static, + E: Debug + Into, B: Body + Unpin + Send + 'static, B::Data: Into + Send, B::Error: Into + Send + Debug, { - into_streaming_response_inner::(handler).boxed() + fn from(service: S) -> Self { + StreamAdapter { + service, + _phantom_data: PhantomData, + } + } } -#[allow(clippy::type_complexity)] -fn into_streaming_response_inner<'a, S, B, E>( - handler: S, -) -> MapResponse< - MapRequest) -> Request>, - impl FnOnce(Response) -> StreamResponse> + Clone, -> +impl<'a, S, B, E> Service> for StreamAdapter<'a, S, B> where S: Service, Error = E>, S::Future: Send + 'a, - E: Debug + Into, - B: Body + Unpin + Send + 'static, + B: Body + Send + 'static, B::Data: Into + Send, B::Error: Into + Send + Debug, + E: Debug + Into, { - ServiceBuilder::new() - .map_request(|req: LambdaEvent| { - let event: Request = req.payload.into(); - event.with_lambda_context(req.context) - }) - .service(handler) - .map_response(|res: Response| { - let (parts, body) = res.into_parts(); + type Response = StreamResponse>; + type Error = E; + type Future = Pin> + Send + 'a>>; - let mut prelude_headers = parts.headers; + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } - let cookies = prelude_headers + fn call(&mut self, req: LambdaEvent) -> Self::Future { + let event: Request = req.payload.into(); + let fut = self.service.call(event.with_lambda_context(req.context)); + Box::pin(async move { + let res = fut.await?; + let (parts, body) = res.into_parts(); + + let mut headers = parts.headers; + let cookies = headers .get_all(SET_COOKIE) .iter() .map(|c| String::from_utf8_lossy(c.as_bytes()).to_string()) - .collect::>(); - - prelude_headers.remove(SET_COOKIE); + .collect::>(); + headers.remove(SET_COOKIE); - let metadata_prelude = MetadataPrelude { - headers: prelude_headers, - status_code: parts.status, - cookies, - }; - - StreamResponse { - metadata_prelude, + Ok(StreamResponse { + metadata_prelude: MetadataPrelude { + headers, + status_code: parts.status, + cookies, + }, stream: BodyStream { body }, - } + }) }) + } } /// Runs the Lambda runtime with a handler that returns **streaming** HTTP @@ -97,8 +98,7 @@ where B::Data: Into + Send, B::Error: Into + Send + Debug, { - let svc = into_streaming_response_inner(handler); - lambda_runtime::run(svc).await + lambda_runtime::run(StreamAdapter::from(handler)).await } pin_project_lite::pin_project! { From 035c0b046f30250ef151f45e883d2e1cb5cc97e4 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Sat, 16 Aug 2025 13:57:48 +0200 Subject: [PATCH 08/19] use futures_util::Stream --- lambda-http/src/streaming.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index edc9e1bc..837f61b8 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -6,12 +6,12 @@ use core::{ pin::Pin, task::{Context, Poll}, }; +use futures_util::Stream; pub use http::{self, Response}; use http_body::Body; use lambda_runtime::Diagnostic; pub use lambda_runtime::{Error, LambdaEvent, MetadataPrelude, Service, StreamResponse}; use std::marker::PhantomData; -use tokio_stream::Stream; /// An adapter that lifts a standard [`Service`] into a /// [`Service>`] which produces streaming Lambda HTTP From 1cdeb925478de12e7e602cfb7a068df2efc30ea7 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Sun, 17 Aug 2025 12:53:44 +0200 Subject: [PATCH 09/19] wip --- lambda-http/src/streaming.rs | 163 ++++++++++++++++++++++++++++------- 1 file changed, 132 insertions(+), 31 deletions(-) diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index 837f61b8..37ecf7e2 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -2,16 +2,21 @@ use crate::{http::header::SET_COOKIE, request::LambdaRequest, Request, RequestEx use bytes::Bytes; use core::{ fmt::Debug, - future::Future, pin::Pin, task::{Context, Poll}, }; -use futures_util::Stream; +use futures_util::{Stream, TryFutureExt}; pub use http::{self, Response}; use http_body::Body; -use lambda_runtime::Diagnostic; +use lambda_runtime::{ + tower::{ + util::{MapRequest, MapResponse}, + ServiceBuilder, ServiceExt, + }, + Diagnostic, +}; pub use lambda_runtime::{Error, LambdaEvent, MetadataPrelude, Service, StreamResponse}; -use std::marker::PhantomData; +use std::{future::Future, marker::PhantomData}; /// An adapter that lifts a standard [`Service`] into a /// [`Service>`] which produces streaming Lambda HTTP @@ -25,7 +30,6 @@ impl<'a, S, B, E> From for StreamAdapter<'a, S, B> where S: Service, Error = E>, S::Future: Send + 'a, - E: Debug + Into, B: Body + Unpin + Send + 'static, B::Data: Into + Send, B::Error: Into + Send + Debug, @@ -42,14 +46,13 @@ impl<'a, S, B, E> Service> for StreamAdapter<'a, S, B where S: Service, Error = E>, S::Future: Send + 'a, - B: Body + Send + 'static, + B: Body + Unpin + Send + 'static, B::Data: Into + Send, B::Error: Into + Send + Debug, - E: Debug + Into, { type Response = StreamResponse>; type Error = E; - type Future = Pin> + Send + 'a>>; + type Future = Pin> + Send + 'a>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx) @@ -57,31 +60,73 @@ where fn call(&mut self, req: LambdaEvent) -> Self::Future { let event: Request = req.payload.into(); - let fut = self.service.call(event.with_lambda_context(req.context)); - Box::pin(async move { - let res = fut.await?; - let (parts, body) = res.into_parts(); - - let mut headers = parts.headers; - let cookies = headers - .get_all(SET_COOKIE) - .iter() - .map(|c| String::from_utf8_lossy(c.as_bytes()).to_string()) - .collect::>(); - headers.remove(SET_COOKIE); - - Ok(StreamResponse { - metadata_prelude: MetadataPrelude { - headers, - status_code: parts.status, - cookies, - }, - stream: BodyStream { body }, - }) - }) + Box::pin( + self.service + .call(event.with_lambda_context(req.context)) + .map_ok(into_stream_response), + ) } } +/// Converts an `http::Response` into a streaming Lambda response. +fn into_stream_response(res: Response) -> StreamResponse> +where + B: Body + Unpin + Send + 'static, + B::Data: Into + Send, + B::Error: Into + Send + Debug, +{ + let (parts, body) = res.into_parts(); + + let mut headers = parts.headers; + let cookies = headers + .get_all(SET_COOKIE) + .iter() + .map(|c| String::from_utf8_lossy(c.as_bytes()).to_string()) + .collect::>(); + headers.remove(SET_COOKIE); + + StreamResponse { + metadata_prelude: MetadataPrelude { + headers, + status_code: parts.status, + cookies, + }, + stream: BodyStream { body }, + } +} + +/// Builds a streaming-aware Tower service from a `Service` **without** +/// boxing its future (no heap allocation / vtable). +/// +/// Transforms `LambdaEvent` into `Request` with Lambda context +/// and wraps `Response` into `StreamResponse>`. +/// +/// Used internally by [`run_with_streaming_response`]; not part of the public +/// API. +#[allow(clippy::type_complexity)] +fn into_streaming_response<'a, S, B, E>( + handler: S, +) -> MapResponse< + MapRequest) -> Request>, + impl FnOnce(Response) -> StreamResponse> + Clone, +> +where + S: Service, Error = E>, + S::Future: Send + 'a, + E: Debug + Into, + B: Body + Unpin + Send + 'static, + B::Data: Into + Send, + B::Error: Into + Send + Debug, +{ + ServiceBuilder::new() + .map_request(|req: LambdaEvent| { + let event: Request = req.payload.into(); + event.with_lambda_context(req.context) + }) + .service(handler) + .map_response(into_stream_response) +} + /// Runs the Lambda runtime with a handler that returns **streaming** HTTP /// responses. /// @@ -98,7 +143,7 @@ where B::Data: Into + Send, B::Error: Into + Send + Debug, { - lambda_runtime::run(StreamAdapter::from(handler)).await + lambda_runtime::run(into_streaming_response(handler)).await } pin_project_lite::pin_project! { @@ -127,3 +172,59 @@ where } } } + +#[cfg(test)] +mod test_stream_adapter { + use super::*; + + use crate::{ + tower::{ServiceBuilder, ServiceExt}, + Body, Request, StreamAdapter, + }; + use http::StatusCode; + + // A middleware that logs requests before forwarding them to another service + struct LogService { + inner: S, + } + + impl Service> for LogService + where + S: Service>, + { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, event: LambdaEvent) -> Self::Future { + // Log the request + println!("Lambda event: {event:#?}"); + + self.inner.call(event) + } + } + + /// This tests that `StreamAdapter` can be used in a `tower::Service` where + /// the user may require additional middleware between `lambda_runtime::run` + /// and where the `LambdaEvent` is converted into a `Request`. + #[test] + fn stream_adapter_is_boxable() { + let _svc = ServiceBuilder::new() + .layer_fn(|service| { + // This could be any middleware that logs, inspects, or + // manipulates the `LambdaEvent` before it's converted to a + // `Request` by `Adapter`. + + LogService { inner: service } + }) + .layer_fn(StreamAdapter::from) + .service_fn( + |_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) }, + ) + .boxed(); + } +} From ce636acbb478e99286a75fd47b0acffcb2b2d972 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Sun, 17 Aug 2025 13:09:47 +0200 Subject: [PATCH 10/19] update --- lambda-http/src/streaming.rs | 54 ++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index 37ecf7e2..ecaa6dbb 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -68,33 +68,6 @@ where } } -/// Converts an `http::Response` into a streaming Lambda response. -fn into_stream_response(res: Response) -> StreamResponse> -where - B: Body + Unpin + Send + 'static, - B::Data: Into + Send, - B::Error: Into + Send + Debug, -{ - let (parts, body) = res.into_parts(); - - let mut headers = parts.headers; - let cookies = headers - .get_all(SET_COOKIE) - .iter() - .map(|c| String::from_utf8_lossy(c.as_bytes()).to_string()) - .collect::>(); - headers.remove(SET_COOKIE); - - StreamResponse { - metadata_prelude: MetadataPrelude { - headers, - status_code: parts.status, - cookies, - }, - stream: BodyStream { body }, - } -} - /// Builds a streaming-aware Tower service from a `Service` **without** /// boxing its future (no heap allocation / vtable). /// @@ -127,6 +100,33 @@ where .map_response(into_stream_response) } +/// Converts an `http::Response` into a streaming Lambda response. +fn into_stream_response(res: Response) -> StreamResponse> +where + B: Body + Unpin + Send + 'static, + B::Data: Into + Send, + B::Error: Into + Send + Debug, +{ + let (parts, body) = res.into_parts(); + + let mut headers = parts.headers; + let cookies = headers + .get_all(SET_COOKIE) + .iter() + .map(|c| String::from_utf8_lossy(c.as_bytes()).to_string()) + .collect::>(); + headers.remove(SET_COOKIE); + + StreamResponse { + metadata_prelude: MetadataPrelude { + headers, + status_code: parts.status, + cookies, + }, + stream: BodyStream { body }, + } +} + /// Runs the Lambda runtime with a handler that returns **streaming** HTTP /// responses. /// From e87ad7400615141ece3a4f13535633aabe305e40 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Sun, 17 Aug 2025 13:21:54 +0200 Subject: [PATCH 11/19] update --- lambda-http/src/streaming.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index ecaa6dbb..c60a016f 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -77,7 +77,7 @@ where /// Used internally by [`run_with_streaming_response`]; not part of the public /// API. #[allow(clippy::type_complexity)] -fn into_streaming_response<'a, S, B, E>( +fn into_stream_service<'a, S, B, E>( handler: S, ) -> MapResponse< MapRequest) -> Request>, @@ -143,7 +143,7 @@ where B::Data: Into + Send, B::Error: Into + Send + Debug, { - lambda_runtime::run(into_streaming_response(handler)).await + lambda_runtime::run(into_stream_service(handler)).await } pin_project_lite::pin_project! { From f45ca50a81bd345b9634eb8b5f82ade425f46fb6 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Sun, 17 Aug 2025 14:11:23 +0200 Subject: [PATCH 12/19] update --- lambda-http/src/streaming.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index c60a016f..d65d64aa 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -177,10 +177,7 @@ where mod test_stream_adapter { use super::*; - use crate::{ - tower::{ServiceBuilder, ServiceExt}, - Body, Request, StreamAdapter, - }; + use crate::Body; use http::StatusCode; // A middleware that logs requests before forwarding them to another service From 5c23616d04d4cf12d6ea059b18fd7f3334193bc5 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Sun, 17 Aug 2025 16:24:12 +0200 Subject: [PATCH 13/19] add axum streaming example --- examples/http-axum-streaming/Cargo.toml | 15 ++++++ examples/http-axum-streaming/README.md | 15 ++++++ examples/http-axum-streaming/src/main.rs | 62 ++++++++++++++++++++++++ 3 files changed, 92 insertions(+) create mode 100644 examples/http-axum-streaming/Cargo.toml create mode 100644 examples/http-axum-streaming/README.md create mode 100644 examples/http-axum-streaming/src/main.rs diff --git a/examples/http-axum-streaming/Cargo.toml b/examples/http-axum-streaming/Cargo.toml new file mode 100644 index 00000000..3ee0eb8c --- /dev/null +++ b/examples/http-axum-streaming/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "http-axum-streaming" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.8" +bytes = "1" +futures-util = "0.3" +lambda_http = { path = "../../lambda-http", default-features = false, features = [ + "apigw_rest", "apigw_http", "tracing" +] } +thiserror = "2.0" +tokio = { version = "1", features = ["macros"] } +tokio-stream = "0.1.2" diff --git a/examples/http-axum-streaming/README.md b/examples/http-axum-streaming/README.md new file mode 100644 index 00000000..1ae89e2b --- /dev/null +++ b/examples/http-axum-streaming/README.md @@ -0,0 +1,15 @@ +# AWS Lambda Function example + +This example demonstrates building a **streaming** HTTP response with Axum, deployed on AWS Lambda using a custom runtime. + +## Build & Deploy + +1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) +2. Build the function with `cargo lambda build --release` +3. Deploy the function to AWS Lambda with `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE` +4. Enable Lambda streaming response on Lambda console: change the function url's invoke mode to `RESPONSE_STREAM` +5. Verify the function works: `curl -v -N `. The results should be streamed back with 0.5 second pause between each word. + +## Build for ARM 64 + +Build the function with `cargo lambda build --release --arm64` diff --git a/examples/http-axum-streaming/src/main.rs b/examples/http-axum-streaming/src/main.rs new file mode 100644 index 00000000..8ad33cd2 --- /dev/null +++ b/examples/http-axum-streaming/src/main.rs @@ -0,0 +1,62 @@ +use axum::{ + body::Body, + http, + http::{ + header::{CACHE_CONTROL, CONTENT_TYPE}, + StatusCode, + }, + response::{IntoResponse, Response}, + routing::get, + Router, +}; +use bytes::Bytes; +use lambda_http::{lambda_runtime, tracing, Error, StreamAdapter}; +use std::{convert::Infallible, time::Duration}; +use thiserror::Error; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +#[derive(Debug, Error)] +pub enum AppError { + #[error("{0}")] + Http(#[from] http::Error), +} + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response() + } +} + +type AppResult = Result; + +async fn stream_handler() -> AppResult { + let (tx, rx) = mpsc::channel::>(8); + let body = Body::from_stream(ReceiverStream::new(rx)); + + tokio::spawn(async move { + for msg in ["Hello", "world", "from", "Lambda!"] { + tokio::time::sleep(Duration::from_millis(500)).await; + if tx.send(Ok(Bytes::from(format!("{msg}\n")))).await.is_err() { + break; + } + } + }); + + Ok(Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "text/plain; charset=utf-8") + .header(CACHE_CONTROL, "no-cache") + .body(body)?) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing::init_default_subscriber(); + + let app = Router::new().route("/", get(stream_handler)); + + let runtime = lambda_runtime::Runtime::new(StreamAdapter::from(app)); + + runtime.run().await +} From ff397067e35a2f26efeedc22e7cfaeff3e545fbb Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Mon, 18 Aug 2025 19:47:11 +0200 Subject: [PATCH 14/19] Update examples --- examples/http-axum-streaming-otel/Cargo.toml | 20 +++ examples/http-axum-streaming-otel/README.md | 28 ++++ examples/http-axum-streaming-otel/src/main.rs | 147 ++++++++++++++++++ examples/http-axum-streaming/Cargo.toml | 3 +- examples/http-axum-streaming/README.md | 18 ++- examples/http-axum-streaming/src/main.rs | 58 +++++-- lambda-http/src/streaming.rs | 53 ------- 7 files changed, 258 insertions(+), 69 deletions(-) create mode 100644 examples/http-axum-streaming-otel/Cargo.toml create mode 100644 examples/http-axum-streaming-otel/README.md create mode 100644 examples/http-axum-streaming-otel/src/main.rs diff --git a/examples/http-axum-streaming-otel/Cargo.toml b/examples/http-axum-streaming-otel/Cargo.toml new file mode 100644 index 00000000..d917bb03 --- /dev/null +++ b/examples/http-axum-streaming-otel/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "http-axum-streaming-otel" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.8" +bytes = "1" +lambda_http = { path = "../../lambda-http", default-features = false, features = [ + "apigw_http", "tracing", "opentelemetry" +] } +opentelemetry = "0.30" +opentelemetry_sdk = { version = "0.30", features = ["rt-tokio"] } +opentelemetry-stdout = { version = "0.30", features = ["trace"] } +thiserror = "2.0" +tokio = { version = "1", features = ["macros"] } +tokio-stream = "0.1.2" +tracing = "0.1" +tracing-opentelemetry = "0.31" +tracing-subscriber = "0.3" diff --git a/examples/http-axum-streaming-otel/README.md b/examples/http-axum-streaming-otel/README.md new file mode 100644 index 00000000..5289ccdd --- /dev/null +++ b/examples/http-axum-streaming-otel/README.md @@ -0,0 +1,28 @@ +# AWS Lambda Function example + +This example shows how to build a **streaming HTTP response** with `Axum` and +run it on AWS Lambda using a custom runtime with OpenTelemetry (OTel) support. + +Tracing data is exported as console log entries visible in CloudWatch. Note that +CloudWatch assigns a `Timestamp` to each log entry based on when it receives the +data (batch exported). To see when work actually occurred, look at the span's +event attributes, which include the precise local timestamps of those events. + +## Build & Deploy + +1. Install + [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) +2. Build the function with `cargo lambda build --release` +3. Deploy the function to AWS Lambda with: + - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var +USE_NUMBERS=0` to stream words + - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var +USE_NUMBERS=1` to stream numbers. +4. Enable Lambda streaming response on Lambda console: change the function url's + invoke mode to `RESPONSE_STREAM` +5. Verify the function works: `curl -N `. The results should be + streamed back with 0.5 second pause between each word. + +## Build for ARM 64 + +Build the function with `cargo lambda build --release --arm64` diff --git a/examples/http-axum-streaming-otel/src/main.rs b/examples/http-axum-streaming-otel/src/main.rs new file mode 100644 index 00000000..153b0257 --- /dev/null +++ b/examples/http-axum-streaming-otel/src/main.rs @@ -0,0 +1,147 @@ +//! # Example: Axum Streaming Responses on AWS Lambda with OTel +//! +//! Demonstrates serving **incremental streaming responses** from Axum handlers +//! running in AWS Lambda using a **custom** `lambda_runtime::Runtime` with +//! OpenTelemetry (OTel) support. +//! +//! - Streams numbers if `USE_NUMBERS` is set, otherwise streams words. +//! - Uses `BoxService` to erase the router's concrete type so different routers +//! can be selected at runtime. +//! - Runs with a custom `Runtime` + `StreamAdapter`, which convert Axum +//! responses into streaming bodies delivered as data is produced (unlike the +//! default `run_with_streaming_response` helper). + +use axum::{ + body::Body, + extract::Request, + http::{ + self, + header::{CACHE_CONTROL, CONTENT_TYPE}, + StatusCode, + }, + response::{IntoResponse, Response}, + routing::get, + Router, +}; +use bytes::Bytes; +use core::{convert::Infallible, time::Duration}; +use lambda_http::{ + lambda_runtime::{ + layers::{OpenTelemetryFaasTrigger, OpenTelemetryLayer as OtelLayer}, + tracing::Instrument, + Runtime, + }, + tower::util::BoxService, + tracing, Error, StreamAdapter, +}; +use opentelemetry::trace::TracerProvider; +use opentelemetry_sdk::trace; +use thiserror::Error; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tracing_subscriber::prelude::*; + +#[derive(Debug, Error)] +pub enum AppError { + #[error("{0}")] + Http(#[from] http::Error), +} + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response() + } +} + +#[tracing::instrument(skip_all)] +async fn stream_numbers() -> Result { + let (tx, rx) = mpsc::channel::>(8); + let body = Body::from_stream(ReceiverStream::new(rx)); + + tokio::spawn( + async move { + for (idx, i) in (1..=4).enumerate() { + tokio::time::sleep(Duration::from_millis(500)).await; + let line = format!("number: {i}\n"); + tracing::info!(chunk.idx = idx, bytes = line.len(), "emit"); + if tx.send(Ok(Bytes::from(line))).await.is_err() { + break; + } + } + } + .instrument(tracing::info_span!("producer.stream_numbers")), + ); + + Ok(Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "text/plain; charset=utf-8") + .header(CACHE_CONTROL, "no-cache") + .body(body)?) +} + +#[tracing::instrument(skip_all)] +async fn stream_words() -> Result { + let (tx, rx) = mpsc::channel::>(8); + let body = Body::from_stream(ReceiverStream::new(rx)); + + tokio::spawn( + async move { + for (idx, msg) in ["Hello", "world", "from", "Lambda!"].iter().enumerate() { + tokio::time::sleep(Duration::from_millis(500)).await; + let line = format!("{msg}\n"); + tracing::info!(chunk.idx = idx, bytes = line.len(), "emit"); + if tx.send(Ok(Bytes::from(line))).await.is_err() { + break; + } + } + } + .instrument(tracing::info_span!("producer.stream_words")), + ); + + Ok(Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "text/plain; charset=utf-8") + .header(CACHE_CONTROL, "no-cache") + .body(body)?) +} + +// Creates a dynamic router based on the environment variable. Demonstrating how +// you can type-erase a service +fn create_svc() -> BoxService, Response, Infallible> { + if std::env::var("USE_NUMBERS").as_deref() == Ok("1") { + BoxService::new(Router::new().route("/", get(stream_numbers))) + } else { + BoxService::new(Router::new().route("/", get(stream_words))) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // Set up OpenTelemetry tracer provider that writes spans to stdout for + // debugging purposes + let exporter = opentelemetry_stdout::SpanExporter::default(); + let tracer_provider = trace::SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .build(); + + // Set up link between OpenTelemetry and tracing crate + tracing_subscriber::registry() + .with(tracing_opentelemetry::OpenTelemetryLayer::new( + tracer_provider.tracer("my-streaming-app"), + )) + .init(); + + let svc = create_svc(); + + // Initialize the Lambda runtime and add OpenTelemetry tracing + let runtime = Runtime::new(StreamAdapter::from(svc)).layer( + OtelLayer::new(|| { + if let Err(err) = tracer_provider.force_flush() { + eprintln!("Error flushing traces: {err:#?}"); + } + }) + .with_trigger(OpenTelemetryFaasTrigger::Http), + ); + + runtime.run().await +} diff --git a/examples/http-axum-streaming/Cargo.toml b/examples/http-axum-streaming/Cargo.toml index 3ee0eb8c..a951562b 100644 --- a/examples/http-axum-streaming/Cargo.toml +++ b/examples/http-axum-streaming/Cargo.toml @@ -6,9 +6,8 @@ edition = "2021" [dependencies] axum = "0.8" bytes = "1" -futures-util = "0.3" lambda_http = { path = "../../lambda-http", default-features = false, features = [ - "apigw_rest", "apigw_http", "tracing" + "apigw_http", "tracing" ] } thiserror = "2.0" tokio = { version = "1", features = ["macros"] } diff --git a/examples/http-axum-streaming/README.md b/examples/http-axum-streaming/README.md index 1ae89e2b..fa37e31b 100644 --- a/examples/http-axum-streaming/README.md +++ b/examples/http-axum-streaming/README.md @@ -1,14 +1,22 @@ # AWS Lambda Function example -This example demonstrates building a **streaming** HTTP response with Axum, deployed on AWS Lambda using a custom runtime. +This example demonstrates building a **streaming** HTTP response with Axum, +deployed on AWS Lambda using a custom runtime. ## Build & Deploy -1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) +1. Install + [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) 2. Build the function with `cargo lambda build --release` -3. Deploy the function to AWS Lambda with `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE` -4. Enable Lambda streaming response on Lambda console: change the function url's invoke mode to `RESPONSE_STREAM` -5. Verify the function works: `curl -v -N `. The results should be streamed back with 0.5 second pause between each word. +3. Deploy the function to AWS Lambda with: + - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var +USE_NUMBERS=0` to stream words + - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var +USE_NUMBERS=1` to stream numbers. +4. Enable Lambda streaming response on Lambda console: change the function url's + invoke mode to `RESPONSE_STREAM` +5. Verify the function works: `curl -N `. The results should be + streamed back with 0.5 second pause between each word. ## Build for ARM 64 diff --git a/examples/http-axum-streaming/src/main.rs b/examples/http-axum-streaming/src/main.rs index 8ad33cd2..452a6053 100644 --- a/examples/http-axum-streaming/src/main.rs +++ b/examples/http-axum-streaming/src/main.rs @@ -1,7 +1,21 @@ +//! # Example: Axum Streaming Responses on AWS Lambda +//! +//! Demonstrates serving **incremental streaming responses** from Axum handlers +//! running in AWS Lambda. +//! +//! - Streams numbers if `USE_NUMBERS` is set, otherwise streams words. +//! - Uses `BoxService` to erase the router's concrete type so different routers +//! can be selected at runtime. +//! - Runs with `run_with_streaming_response`, which uses the **default Lambda +//! runtime** to convert Axum responses into streaming bodies delivered as +//! data is produced (unlike the OTel example, which used a custom `Runtime` + +//! `StreamAdapter`). + use axum::{ body::Body, - http, + extract::Request, http::{ + self, header::{CACHE_CONTROL, CONTENT_TYPE}, StatusCode, }, @@ -10,8 +24,8 @@ use axum::{ Router, }; use bytes::Bytes; -use lambda_http::{lambda_runtime, tracing, Error, StreamAdapter}; -use std::{convert::Infallible, time::Duration}; +use core::{convert::Infallible, time::Duration}; +use lambda_http::{run_with_streaming_response, tower::util::BoxService, tracing, Error}; use thiserror::Error; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -28,9 +42,25 @@ impl IntoResponse for AppError { } } -type AppResult = Result; +async fn stream_numbers() -> Result { + let (tx, rx) = mpsc::channel::>(8); + let body = Body::from_stream(ReceiverStream::new(rx)); + + tokio::spawn(async move { + for i in 1..=4 { + tokio::time::sleep(Duration::from_millis(500)).await; + let _ = tx.send(Ok(Bytes::from(format!("number: {i}\n")))).await; + } + }); -async fn stream_handler() -> AppResult { + Ok(Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "text/plain; charset=utf-8") + .header(CACHE_CONTROL, "no-cache") + .body(body)?) +} + +async fn stream_words() -> Result { let (tx, rx) = mpsc::channel::>(8); let body = Body::from_stream(ReceiverStream::new(rx)); @@ -50,13 +80,23 @@ async fn stream_handler() -> AppResult { .body(body)?) } +// Creates a dynamic router based on the environment variable. Demonstrating how +// you can type-erase a service +fn create_svc() -> BoxService, Response, Infallible> { + if std::env::var("USE_NUMBERS").as_deref() == Ok("1") { + BoxService::new(Router::new().route("/", get(stream_numbers))) + } else { + BoxService::new(Router::new().route("/", get(stream_words))) + } +} + #[tokio::main] async fn main() -> Result<(), Error> { tracing::init_default_subscriber(); - let app = Router::new().route("/", get(stream_handler)); - - let runtime = lambda_runtime::Runtime::new(StreamAdapter::from(app)); + let svc = create_svc(); - runtime.run().await + // Automatically convert the service into a streaming response with a + // default runtime. + run_with_streaming_response(svc).await } diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index d65d64aa..f1441c5f 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -172,56 +172,3 @@ where } } } - -#[cfg(test)] -mod test_stream_adapter { - use super::*; - - use crate::Body; - use http::StatusCode; - - // A middleware that logs requests before forwarding them to another service - struct LogService { - inner: S, - } - - impl Service> for LogService - where - S: Service>, - { - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, event: LambdaEvent) -> Self::Future { - // Log the request - println!("Lambda event: {event:#?}"); - - self.inner.call(event) - } - } - - /// This tests that `StreamAdapter` can be used in a `tower::Service` where - /// the user may require additional middleware between `lambda_runtime::run` - /// and where the `LambdaEvent` is converted into a `Request`. - #[test] - fn stream_adapter_is_boxable() { - let _svc = ServiceBuilder::new() - .layer_fn(|service| { - // This could be any middleware that logs, inspects, or - // manipulates the `LambdaEvent` before it's converted to a - // `Request` by `Adapter`. - - LogService { inner: service } - }) - .layer_fn(StreamAdapter::from) - .service_fn( - |_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) }, - ) - .boxed(); - } -} From c475254e7dc853124ab28f3e19c9d6cb6ea3e58d Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Mon, 18 Aug 2025 19:49:56 +0200 Subject: [PATCH 15/19] remove --- examples/http-axum-streaming-otel/src/main.rs | 2 -- examples/http-axum-streaming/src/main.rs | 2 -- 2 files changed, 4 deletions(-) diff --git a/examples/http-axum-streaming-otel/src/main.rs b/examples/http-axum-streaming-otel/src/main.rs index 153b0257..51857b63 100644 --- a/examples/http-axum-streaming-otel/src/main.rs +++ b/examples/http-axum-streaming-otel/src/main.rs @@ -105,8 +105,6 @@ async fn stream_words() -> Result { .body(body)?) } -// Creates a dynamic router based on the environment variable. Demonstrating how -// you can type-erase a service fn create_svc() -> BoxService, Response, Infallible> { if std::env::var("USE_NUMBERS").as_deref() == Ok("1") { BoxService::new(Router::new().route("/", get(stream_numbers))) diff --git a/examples/http-axum-streaming/src/main.rs b/examples/http-axum-streaming/src/main.rs index 452a6053..e4f825e6 100644 --- a/examples/http-axum-streaming/src/main.rs +++ b/examples/http-axum-streaming/src/main.rs @@ -80,8 +80,6 @@ async fn stream_words() -> Result { .body(body)?) } -// Creates a dynamic router based on the environment variable. Demonstrating how -// you can type-erase a service fn create_svc() -> BoxService, Response, Infallible> { if std::env::var("USE_NUMBERS").as_deref() == Ok("1") { BoxService::new(Router::new().route("/", get(stream_numbers))) From 1ce5c3ea0d0cdb869c0737b427d48c94dbb3f70f Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Tue, 19 Aug 2025 11:37:02 +0200 Subject: [PATCH 16/19] add back --- lambda-http/src/streaming.rs | 53 ++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index f1441c5f..d65d64aa 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -172,3 +172,56 @@ where } } } + +#[cfg(test)] +mod test_stream_adapter { + use super::*; + + use crate::Body; + use http::StatusCode; + + // A middleware that logs requests before forwarding them to another service + struct LogService { + inner: S, + } + + impl Service> for LogService + where + S: Service>, + { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, event: LambdaEvent) -> Self::Future { + // Log the request + println!("Lambda event: {event:#?}"); + + self.inner.call(event) + } + } + + /// This tests that `StreamAdapter` can be used in a `tower::Service` where + /// the user may require additional middleware between `lambda_runtime::run` + /// and where the `LambdaEvent` is converted into a `Request`. + #[test] + fn stream_adapter_is_boxable() { + let _svc = ServiceBuilder::new() + .layer_fn(|service| { + // This could be any middleware that logs, inspects, or + // manipulates the `LambdaEvent` before it's converted to a + // `Request` by `Adapter`. + + LogService { inner: service } + }) + .layer_fn(StreamAdapter::from) + .service_fn( + |_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) }, + ) + .boxed(); + } +} From b39ab05c6a683ca43d1512b16404eeb6f01d10a1 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Wed, 20 Aug 2025 10:54:39 +0200 Subject: [PATCH 17/19] update --- examples/http-axum-streaming-otel/src/main.rs | 38 +------------------ examples/http-axum-streaming/src/main.rs | 31 +-------------- lambda-http/src/streaming.rs | 25 ++++++------ 3 files changed, 16 insertions(+), 78 deletions(-) diff --git a/examples/http-axum-streaming-otel/src/main.rs b/examples/http-axum-streaming-otel/src/main.rs index 51857b63..a7b8c68f 100644 --- a/examples/http-axum-streaming-otel/src/main.rs +++ b/examples/http-axum-streaming-otel/src/main.rs @@ -13,7 +13,6 @@ use axum::{ body::Body, - extract::Request, http::{ self, header::{CACHE_CONTROL, CONTENT_TYPE}, @@ -31,7 +30,6 @@ use lambda_http::{ tracing::Instrument, Runtime, }, - tower::util::BoxService, tracing, Error, StreamAdapter, }; use opentelemetry::trace::TracerProvider; @@ -53,32 +51,6 @@ impl IntoResponse for AppError { } } -#[tracing::instrument(skip_all)] -async fn stream_numbers() -> Result { - let (tx, rx) = mpsc::channel::>(8); - let body = Body::from_stream(ReceiverStream::new(rx)); - - tokio::spawn( - async move { - for (idx, i) in (1..=4).enumerate() { - tokio::time::sleep(Duration::from_millis(500)).await; - let line = format!("number: {i}\n"); - tracing::info!(chunk.idx = idx, bytes = line.len(), "emit"); - if tx.send(Ok(Bytes::from(line))).await.is_err() { - break; - } - } - } - .instrument(tracing::info_span!("producer.stream_numbers")), - ); - - Ok(Response::builder() - .status(StatusCode::OK) - .header(CONTENT_TYPE, "text/plain; charset=utf-8") - .header(CACHE_CONTROL, "no-cache") - .body(body)?) -} - #[tracing::instrument(skip_all)] async fn stream_words() -> Result { let (tx, rx) = mpsc::channel::>(8); @@ -105,14 +77,6 @@ async fn stream_words() -> Result { .body(body)?) } -fn create_svc() -> BoxService, Response, Infallible> { - if std::env::var("USE_NUMBERS").as_deref() == Ok("1") { - BoxService::new(Router::new().route("/", get(stream_numbers))) - } else { - BoxService::new(Router::new().route("/", get(stream_words))) - } -} - #[tokio::main] async fn main() -> Result<(), Error> { // Set up OpenTelemetry tracer provider that writes spans to stdout for @@ -129,7 +93,7 @@ async fn main() -> Result<(), Error> { )) .init(); - let svc = create_svc(); + let svc = Router::new().route("/", get(stream_words)); // Initialize the Lambda runtime and add OpenTelemetry tracing let runtime = Runtime::new(StreamAdapter::from(svc)).layer( diff --git a/examples/http-axum-streaming/src/main.rs b/examples/http-axum-streaming/src/main.rs index e4f825e6..1aef6919 100644 --- a/examples/http-axum-streaming/src/main.rs +++ b/examples/http-axum-streaming/src/main.rs @@ -13,7 +13,6 @@ use axum::{ body::Body, - extract::Request, http::{ self, header::{CACHE_CONTROL, CONTENT_TYPE}, @@ -25,7 +24,7 @@ use axum::{ }; use bytes::Bytes; use core::{convert::Infallible, time::Duration}; -use lambda_http::{run_with_streaming_response, tower::util::BoxService, tracing, Error}; +use lambda_http::{run_with_streaming_response, tracing, Error}; use thiserror::Error; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -42,24 +41,6 @@ impl IntoResponse for AppError { } } -async fn stream_numbers() -> Result { - let (tx, rx) = mpsc::channel::>(8); - let body = Body::from_stream(ReceiverStream::new(rx)); - - tokio::spawn(async move { - for i in 1..=4 { - tokio::time::sleep(Duration::from_millis(500)).await; - let _ = tx.send(Ok(Bytes::from(format!("number: {i}\n")))).await; - } - }); - - Ok(Response::builder() - .status(StatusCode::OK) - .header(CONTENT_TYPE, "text/plain; charset=utf-8") - .header(CACHE_CONTROL, "no-cache") - .body(body)?) -} - async fn stream_words() -> Result { let (tx, rx) = mpsc::channel::>(8); let body = Body::from_stream(ReceiverStream::new(rx)); @@ -80,19 +61,11 @@ async fn stream_words() -> Result { .body(body)?) } -fn create_svc() -> BoxService, Response, Infallible> { - if std::env::var("USE_NUMBERS").as_deref() == Ok("1") { - BoxService::new(Router::new().route("/", get(stream_numbers))) - } else { - BoxService::new(Router::new().route("/", get(stream_words))) - } -} - #[tokio::main] async fn main() -> Result<(), Error> { tracing::init_default_subscriber(); - let svc = create_svc(); + let svc = Router::new().route("/", get(stream_words)); // Automatically convert the service into a streaming response with a // default runtime. diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index d65d64aa..3df2b558 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -198,26 +198,27 @@ mod test_stream_adapter { } fn call(&mut self, event: LambdaEvent) -> Self::Future { - // Log the request println!("Lambda event: {event:#?}"); - self.inner.call(event) } } - /// This tests that `StreamAdapter` can be used in a `tower::Service` where - /// the user may require additional middleware between `lambda_runtime::run` - /// and where the `LambdaEvent` is converted into a `Request`. + /// Works with a concrete service stack (no boxing) #[test] - fn stream_adapter_is_boxable() { + fn stream_adapter_with_concrete_stack() { let _svc = ServiceBuilder::new() - .layer_fn(|service| { - // This could be any middleware that logs, inspects, or - // manipulates the `LambdaEvent` before it's converted to a - // `Request` by `Adapter`. + .layer_fn(|service| LogService { inner: service }) + .layer_fn(StreamAdapter::from) + .service_fn( + |_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) }, + ); + } - LogService { inner: service } - }) + /// Also works when the stack is boxed (type-erased) + #[test] + fn stream_adapter_with_boxed_stack() { + let _svc = ServiceBuilder::new() + .layer_fn(|service| LogService { inner: service }) .layer_fn(StreamAdapter::from) .service_fn( |_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) }, From e3e8de79abb082269ec6d094cbe523542556b12f Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Wed, 20 Aug 2025 10:57:52 +0200 Subject: [PATCH 18/19] update docs --- examples/http-axum-streaming-otel/README.md | 5 +---- examples/http-axum-streaming-otel/src/main.rs | 3 --- examples/http-axum-streaming/README.md | 5 +---- examples/http-axum-streaming/src/main.rs | 3 --- 4 files changed, 2 insertions(+), 14 deletions(-) diff --git a/examples/http-axum-streaming-otel/README.md b/examples/http-axum-streaming-otel/README.md index 5289ccdd..194fe4e4 100644 --- a/examples/http-axum-streaming-otel/README.md +++ b/examples/http-axum-streaming-otel/README.md @@ -14,10 +14,7 @@ event attributes, which include the precise local timestamps of those events. [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) 2. Build the function with `cargo lambda build --release` 3. Deploy the function to AWS Lambda with: - - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var -USE_NUMBERS=0` to stream words - - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var -USE_NUMBERS=1` to stream numbers. + - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE` to stream words 4. Enable Lambda streaming response on Lambda console: change the function url's invoke mode to `RESPONSE_STREAM` 5. Verify the function works: `curl -N `. The results should be diff --git a/examples/http-axum-streaming-otel/src/main.rs b/examples/http-axum-streaming-otel/src/main.rs index a7b8c68f..64f4e49e 100644 --- a/examples/http-axum-streaming-otel/src/main.rs +++ b/examples/http-axum-streaming-otel/src/main.rs @@ -4,9 +4,6 @@ //! running in AWS Lambda using a **custom** `lambda_runtime::Runtime` with //! OpenTelemetry (OTel) support. //! -//! - Streams numbers if `USE_NUMBERS` is set, otherwise streams words. -//! - Uses `BoxService` to erase the router's concrete type so different routers -//! can be selected at runtime. //! - Runs with a custom `Runtime` + `StreamAdapter`, which convert Axum //! responses into streaming bodies delivered as data is produced (unlike the //! default `run_with_streaming_response` helper). diff --git a/examples/http-axum-streaming/README.md b/examples/http-axum-streaming/README.md index fa37e31b..fe7e573d 100644 --- a/examples/http-axum-streaming/README.md +++ b/examples/http-axum-streaming/README.md @@ -9,10 +9,7 @@ deployed on AWS Lambda using a custom runtime. [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) 2. Build the function with `cargo lambda build --release` 3. Deploy the function to AWS Lambda with: - - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var -USE_NUMBERS=0` to stream words - - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE --env-var -USE_NUMBERS=1` to stream numbers. + - `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE` to stream words 4. Enable Lambda streaming response on Lambda console: change the function url's invoke mode to `RESPONSE_STREAM` 5. Verify the function works: `curl -N `. The results should be diff --git a/examples/http-axum-streaming/src/main.rs b/examples/http-axum-streaming/src/main.rs index 1aef6919..1812f879 100644 --- a/examples/http-axum-streaming/src/main.rs +++ b/examples/http-axum-streaming/src/main.rs @@ -3,9 +3,6 @@ //! Demonstrates serving **incremental streaming responses** from Axum handlers //! running in AWS Lambda. //! -//! - Streams numbers if `USE_NUMBERS` is set, otherwise streams words. -//! - Uses `BoxService` to erase the router's concrete type so different routers -//! can be selected at runtime. //! - Runs with `run_with_streaming_response`, which uses the **default Lambda //! runtime** to convert Axum responses into streaming bodies delivered as //! data is produced (unlike the OTel example, which used a custom `Runtime` + From 45f01e739a56b8a3952c11173ab567509bde5889 Mon Sep 17 00:00:00 2001 From: Nick Angelou Date: Wed, 20 Aug 2025 11:02:20 +0200 Subject: [PATCH 19/19] update --- lambda-http/src/streaming.rs | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/lambda-http/src/streaming.rs b/lambda-http/src/streaming.rs index 3df2b558..6dd17230 100644 --- a/lambda-http/src/streaming.rs +++ b/lambda-http/src/streaming.rs @@ -203,26 +203,16 @@ mod test_stream_adapter { } } - /// Works with a concrete service stack (no boxing) #[test] - fn stream_adapter_with_concrete_stack() { - let _svc = ServiceBuilder::new() + fn stream_adapter_is_boxable() { + // Works with a concrete service stack (no boxing) + let svc = ServiceBuilder::new() .layer_fn(|service| LogService { inner: service }) .layer_fn(StreamAdapter::from) .service_fn( |_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) }, ); - } - - /// Also works when the stack is boxed (type-erased) - #[test] - fn stream_adapter_with_boxed_stack() { - let _svc = ServiceBuilder::new() - .layer_fn(|service| LogService { inner: service }) - .layer_fn(StreamAdapter::from) - .service_fn( - |_req: Request| async move { http::Response::builder().status(StatusCode::OK).body(Body::Empty) }, - ) - .boxed(); + // Also works when the stack is boxed (type-erased) + let _boxed_svc = svc.boxed(); } }