diff --git a/examples/extension-internal-flush/Cargo.toml b/examples/extension-internal-flush/Cargo.toml new file mode 100644 index 00000000..daadd0eb --- /dev/null +++ b/examples/extension-internal-flush/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "extension-internal-flush" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1" +aws_lambda_events = { path = "../../lambda-events" } +lambda-extension = { path = "../../lambda-extension" } +lambda_runtime = { path = "../../lambda-runtime" } +serde = "1.0.136" +tokio = { version = "1", features = ["macros", "sync"] } diff --git a/examples/extension-internal-flush/README.md b/examples/extension-internal-flush/README.md new file mode 100644 index 00000000..553f7a3d --- /dev/null +++ b/examples/extension-internal-flush/README.md @@ -0,0 +1,30 @@ +# AWS Lambda runtime + internal extension example + +This example demonstrates how to build an AWS Lambda function that includes a +[Lambda internal extension](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html). +Unlike external extensions that run as separate processes, an internal extension runs within the +main runtime process. + +One use case for internal extensions is to flush logs or telemetry data after the Lambda runtime +handler has finished processing an event but before the execution environment is frozen awaiting the +arrival of the next event. Without an explicit flush, telemetry data may never be sent since the +execution environment will remain frozen and eventually be terminated if no additional events arrive. + +Note that for +[synchronous](https://docs.aws.amazon.com/lambda/latest/dg/invocation-sync.html) Lambda invocations +(e.g., via +[Amazon API Gateway](https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-integrations.html)), +the Lambda service returns the response to the caller immediately. Extensions may continue to run +without introducing an observable delay. + +## Build & Deploy + +1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation) +2. Build the extension with `cargo lambda build --release` +3. Deploy the function to AWS Lambda with `cargo lambda deploy --iam-role YOUR_ROLE` + +The last command will give you an ARN for the extension layer that you can use in your functions. + +## Build for ARM 64 + +Build the extension with `cargo lambda build --release --arm64` diff --git a/examples/extension-internal-flush/src/main.rs b/examples/extension-internal-flush/src/main.rs new file mode 100644 index 00000000..3706809d --- /dev/null +++ b/examples/extension-internal-flush/src/main.rs @@ -0,0 +1,112 @@ +use anyhow::anyhow; +use aws_lambda_events::sqs::{SqsBatchResponse, SqsEventObj}; +use lambda_extension::{service_fn, Error, Extension, NextEvent}; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::Mutex; + +use std::sync::Arc; + +/// Implements an internal Lambda extension to flush logs/telemetry after each request. +struct FlushExtension { + request_done_receiver: Mutex>, +} + +impl FlushExtension { + pub fn new(request_done_receiver: UnboundedReceiver<()>) -> Self { + Self { + request_done_receiver: Mutex::new(request_done_receiver), + } + } + + pub async fn invoke(&self, event: lambda_extension::LambdaEvent) -> Result<(), Error> { + match event.next { + // NB: Internal extensions only support the INVOKE event. + NextEvent::Shutdown(shutdown) => { + return Err(anyhow!("extension received unexpected SHUTDOWN event: {:?}", shutdown).into()); + } + NextEvent::Invoke(_e) => {} + } + + eprintln!("[extension] waiting for event to be processed"); + + // Wait for runtime to finish processing event. + self.request_done_receiver + .lock() + .await + .recv() + .await + .ok_or_else(|| anyhow!("channel is closed"))?; + + eprintln!("[extension] flushing logs and telemetry"); + + // + + Ok(()) + } +} + +/// Object that you send to SQS and plan to process with the function. +#[derive(Debug, Deserialize, Serialize)] +struct Data { + a: String, + b: i64, +} + +/// Implements the main event handler for processing events from an SQS queue. +struct EventHandler { + request_done_sender: UnboundedSender<()>, +} + +impl EventHandler { + pub fn new(request_done_sender: UnboundedSender<()>) -> Self { + Self { request_done_sender } + } + + pub async fn invoke( + &self, + event: lambda_runtime::LambdaEvent>, + ) -> Result { + let data = &event.payload.records[0].body; + eprintln!("[runtime] received event {data:?}"); + + // + + // Notify the extension to flush traces. + self.request_done_sender.send(()).map_err(Box::new)?; + + Ok(SqsBatchResponse::default()) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let (request_done_sender, request_done_receiver) = unbounded_channel::<()>(); + + let flush_extension = Arc::new(FlushExtension::new(request_done_receiver)); + let extension = Extension::new() + // Internal extensions only support INVOKE events. + .with_events(&["INVOKE"]) + .with_events_processor(service_fn(|event| { + let flush_extension = flush_extension.clone(); + async move { flush_extension.invoke(event).await } + })) + // Internal extension names MUST be unique within a given Lambda function. + .with_extension_name("internal-flush") + // Extensions MUST be registered before calling lambda_runtime::run(), which ends the Init + // phase and begins the Invoke phase. + .register() + .await?; + + let handler = Arc::new(EventHandler::new(request_done_sender)); + + tokio::try_join!( + lambda_runtime::run(service_fn(|event| { + let handler = handler.clone(); + async move { handler.invoke(event).await } + })), + extension.run(), + )?; + + Ok(()) +} diff --git a/lambda-extension/src/extension.rs b/lambda-extension/src/extension.rs index 4747b041..d653e0dc 100644 --- a/lambda-extension/src/extension.rs +++ b/lambda-extension/src/extension.rs @@ -215,14 +215,21 @@ where } } - /// Execute the given extension - pub async fn run(self) -> Result<(), Error> { + /// Register the extension. + /// + /// Performs the + /// [init phase](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-ib) + /// Lambda lifecycle operations to register the extension. When implementing an internal Lambda + /// extension, it is safe to call `lambda_runtime::run` once the future returned by this + /// function resolves. + pub async fn register(self) -> Result, Error> { let client = &Client::builder().build()?; let extension_id = register(client, self.extension_name, self.events).await?; let extension_id = extension_id.to_str()?; - let mut ep = self.events_processor; + // Logs API subscriptions must be requested during the Lambda init phase (see + // https://docs.aws.amazon.com/lambda/latest/dg/runtimes-logs-api.html#runtimes-logs-api-subscribing). if let Some(mut log_processor) = self.logs_processor { trace!("Log processor found"); @@ -262,6 +269,8 @@ where trace!("Registered extension with Logs API"); } + // Telemetry API subscriptions must be requested during the Lambda init phase (see + // https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html#telemetry-api-registration if let Some(mut telemetry_processor) = self.telemetry_processor { trace!("Telemetry processor found"); @@ -301,6 +310,42 @@ where trace!("Registered extension with Telemetry API"); } + Ok(RegisteredExtension { + extension_id: extension_id.to_string(), + events_processor: self.events_processor, + }) + } + + /// Execute the given extension. + pub async fn run(self) -> Result<(), Error> { + self.register().await?.run().await + } +} + +/// An extension registered by calling [`Extension::register`]. +pub struct RegisteredExtension { + extension_id: String, + events_processor: E, +} + +impl RegisteredExtension +where + E: Service, + E::Future: Future>, + E::Error: Into> + fmt::Display + fmt::Debug, +{ + /// Execute the extension's run loop. + /// + /// Performs the + /// [invoke](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-invoke) + /// and, for external Lambda extensions registered to receive the `SHUTDOWN` event, the + /// [shutdown](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-shutdown) + /// Lambda lifecycle phases. + pub async fn run(self) -> Result<(), Error> { + let client = &Client::builder().build()?; + let mut ep = self.events_processor; + let extension_id = &self.extension_id; + let incoming = async_stream::stream! { loop { trace!("Waiting for next event (incoming loop)"); @@ -351,6 +396,8 @@ where return Err(err.into()); } } + + // Unreachable. Ok(()) } }