diff --git a/Cargo.lock b/Cargo.lock index f41a51eba4..0cae4f2afb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5653,7 +5653,7 @@ dependencies = [ [[package]] name = "pyth-lazer-client" -version = "1.0.0" +version = "2.0.0" dependencies = [ "alloy-primitives 0.8.25", "anyhow", diff --git a/lazer/sdk/rust/client/Cargo.toml b/lazer/sdk/rust/client/Cargo.toml index ad62dc359f..a4c0e700fc 100644 --- a/lazer/sdk/rust/client/Cargo.toml +++ b/lazer/sdk/rust/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-client" -version = "1.0.0" +version = "2.0.0" edition = "2021" description = "A Rust client for Pyth Lazer" license = "Apache-2.0" diff --git a/lazer/sdk/rust/client/src/backoff.rs b/lazer/sdk/rust/client/src/backoff.rs index 263a09b57f..b3218b7abe 100644 --- a/lazer/sdk/rust/client/src/backoff.rs +++ b/lazer/sdk/rust/client/src/backoff.rs @@ -1,3 +1,8 @@ +//! Exponential backoff implementation for Pyth Lazer client. +//! +//! This module provides a wrapper around the [`backoff`] crate's exponential backoff functionality, +//! offering a simplified interface tailored for Pyth Lazer client operations. + use std::time::Duration; use backoff::{ @@ -5,6 +10,42 @@ use backoff::{ ExponentialBackoff, ExponentialBackoffBuilder, }; +/// A wrapper around the backoff crate's exponential backoff configuration. +/// +/// This struct encapsulates the parameters needed to configure exponential backoff +/// behavior and can be converted into the backoff crate's [`ExponentialBackoff`] type. +#[derive(Debug)] +pub struct PythLazerExponentialBackoff { + /// The initial retry interval. + initial_interval: Duration, + /// The randomization factor to use for creating a range around the retry interval. + /// + /// A randomization factor of 0.5 results in a random period ranging between 50% below and 50% + /// above the retry interval. + randomization_factor: f64, + /// The value to multiply the current interval with for each retry attempt. + multiplier: f64, + /// The maximum value of the back off period. Once the retry interval reaches this + /// value it stops increasing. + max_interval: Duration, +} + +impl From for ExponentialBackoff { + fn from(val: PythLazerExponentialBackoff) -> Self { + ExponentialBackoffBuilder::default() + .with_initial_interval(val.initial_interval) + .with_randomization_factor(val.randomization_factor) + .with_multiplier(val.multiplier) + .with_max_interval(val.max_interval) + .with_max_elapsed_time(None) + .build() + } +} + +/// Builder for [`PythLazerExponentialBackoff`]. +/// +/// Provides a fluent interface for configuring exponential backoff parameters +/// with sensible defaults from the backoff crate. #[derive(Debug)] pub struct PythLazerExponentialBackoffBuilder { initial_interval: Duration, @@ -25,45 +66,53 @@ impl Default for PythLazerExponentialBackoffBuilder { } impl PythLazerExponentialBackoffBuilder { + /// Creates a new builder with default values. pub fn new() -> Self { Default::default() } - /// The initial retry interval. + /// Sets the initial retry interval. + /// + /// This is the starting interval for the first retry attempt. pub fn with_initial_interval(&mut self, initial_interval: Duration) -> &mut Self { self.initial_interval = initial_interval; self } - /// The randomization factor to use for creating a range around the retry interval. + /// Sets the randomization factor to use for creating a range around the retry interval. /// /// A randomization factor of 0.5 results in a random period ranging between 50% below and 50% - /// above the retry interval. + /// above the retry interval. This helps avoid the "thundering herd" problem when multiple + /// clients retry at the same time. pub fn with_randomization_factor(&mut self, randomization_factor: f64) -> &mut Self { self.randomization_factor = randomization_factor; self } - /// The value to multiply the current interval with for each retry attempt. + /// Sets the value to multiply the current interval with for each retry attempt. + /// + /// A multiplier of 2.0 means each retry interval will be double the previous one. pub fn with_multiplier(&mut self, multiplier: f64) -> &mut Self { self.multiplier = multiplier; self } - /// The maximum value of the back off period. Once the retry interval reaches this - /// value it stops increasing. + /// Sets the maximum value of the back off period. + /// + /// Once the retry interval reaches this value it stops increasing, providing + /// an upper bound on the wait time between retries. pub fn with_max_interval(&mut self, max_interval: Duration) -> &mut Self { self.max_interval = max_interval; self } - pub fn build(&self) -> ExponentialBackoff { - ExponentialBackoffBuilder::default() - .with_initial_interval(self.initial_interval) - .with_randomization_factor(self.randomization_factor) - .with_multiplier(self.multiplier) - .with_max_interval(self.max_interval) - .with_max_elapsed_time(None) - .build() + /// Builds the [`PythLazerExponentialBackoff`] configuration. + pub fn build(&self) -> PythLazerExponentialBackoff { + PythLazerExponentialBackoff { + initial_interval: self.initial_interval, + randomization_factor: self.randomization_factor, + multiplier: self.multiplier, + max_interval: self.max_interval, + } } } diff --git a/lazer/sdk/rust/client/src/client.rs b/lazer/sdk/rust/client/src/client.rs index 3b3e38bf48..ad19052df9 100644 --- a/lazer/sdk/rust/client/src/client.rs +++ b/lazer/sdk/rust/client/src/client.rs @@ -1,7 +1,52 @@ +//! # Pyth Lazer Client +//! +//! This module provides a high-level client for connecting to Pyth Lazer data streams. +//! The client maintains multiple WebSocket connections for redundancy and provides +//! automatic deduplication of messages. +//! +//! ## Features +//! +//! - Multiple redundant WebSocket connections +//! - Automatic message deduplication +//! - Exponential backoff for reconnections +//! - Configurable timeouts and channel capacities +//! - Builder pattern for easy configuration +//! +//! ## Basic Usage +//! +//! ```rust,ignore +//! use pyth_lazer_client::PythLazerClientBuilder; +//! use pyth_lazer_protocol::subscription::SubscribeRequest; +//! +//! #[tokio::main] +//! async fn main() -> anyhow::Result<()> { +//! let mut client = PythLazerClientBuilder::new("your_access_token".to_string()) +//! .with_num_connections(2) +//! .build()?; +//! +//! let mut receiver = client.start().await?; +//! +//! // Subscribe to price feeds +//! let subscribe_request = SubscribeRequest { +//! // ... configure subscription +//! }; +//! client.subscribe(subscribe_request).await?; +//! +//! // Process incoming messages +//! while let Some(response) = receiver.recv().await { +//! println!("Received: {:?}", response); +//! } +//! +//! Ok(()) +//! } +//! ``` + use std::time::Duration; use crate::{ - resilient_ws_connection::PythLazerResilientWSConnection, ws_connection::AnyResponse, + backoff::{PythLazerExponentialBackoff, PythLazerExponentialBackoffBuilder}, + resilient_ws_connection::PythLazerResilientWSConnection, + ws_connection::AnyResponse, CHANNEL_CAPACITY, }; use anyhow::{bail, Result}; @@ -22,6 +67,18 @@ const DEFAULT_ENDPOINTS: [&str; 2] = [ const DEFAULT_NUM_CONNECTIONS: usize = 4; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); +/// A high-performance client for connecting to Pyth Lazer data streams. +/// +/// The `PythLazerClient` maintains multiple WebSocket connections to Pyth Lazer endpoints +/// for redundancy. It automatically handles connection management, +/// message deduplication, and provides a unified stream of price updates. +/// +/// ## Architecture +/// +/// - Maintains multiple WebSocket connections to different endpoints +/// - Uses a TTL cache for deduplicating messages across connections +/// - Provides a single channel for consuming deduplicated messages +/// - Handles connection failures with exponential backoff pub struct PythLazerClient { endpoints: Vec, access_token: String, @@ -33,23 +90,37 @@ pub struct PythLazerClient { } impl PythLazerClient { - /// Creates a new client instance + /// Creates a new Pyth Lazer client instance. + /// + /// This is a low-level constructor. Consider using [`PythLazerClientBuilder`] for a more + /// convenient way to create clients with sensible defaults. /// /// # Arguments - /// * `endpoints` - A vector of endpoint URLs - /// * `access_token` - The access token for authentication - /// * `num_connections` - The number of WebSocket connections to maintain + /// + /// * `endpoints` - A vector of WebSocket endpoint URLs to connect to. Must not be empty. + /// * `access_token` - The authentication token for accessing Pyth Lazer services + /// * `num_connections` - The number of WebSocket connections to maintain for redundancy + /// * `backoff` - The exponential backoff configuration for connection retries + /// * `timeout` - The timeout duration for WebSocket operations + /// * `channel_capacity` - The capacity of the message channel + /// + /// # Returns + /// + /// Returns `Ok(PythLazerClient)` on success, or an error if the configuration is invalid. + /// + /// # Errors + /// + /// Returns an error if: + /// - The `endpoints` vector is empty + /// pub fn new( endpoints: Vec, access_token: String, num_connections: usize, - backoff: ExponentialBackoff, + backoff: PythLazerExponentialBackoff, timeout: Duration, channel_capacity: usize, ) -> Result { - if backoff.max_elapsed_time.is_some() { - bail!("max_elapsed_time is not supported in Pyth Lazer client"); - } if endpoints.is_empty() { bail!("At least one endpoint must be provided"); } @@ -58,12 +129,35 @@ impl PythLazerClient { access_token, num_connections, ws_connections: Vec::with_capacity(num_connections), - backoff, + backoff: backoff.into(), timeout, channel_capacity, }) } + /// Starts the client and begins establishing WebSocket connections. + /// + /// This method initializes all WebSocket connections and starts the message processing + /// loop. It returns a receiver channel that will yield deduplicated messages from + /// all connections. + /// + /// # Returns + /// + /// Returns a `Receiver` that yields deduplicated messages from all + /// WebSocket connections. The receiver will continue to yield messages until + /// all connections are closed or the client is dropped. + /// + /// # Errors + /// + /// This method itself doesn't return errors, but individual connection failures + /// are handled internally with automatic reconnection using the configured backoff + /// strategy. + /// + /// # Message Deduplication + /// + /// Messages are deduplicated using a TTL cache with a 10-second window. This ensures + /// that identical messages received from multiple connections are only delivered once. + /// pub async fn start(&mut self) -> Result> { let (sender, receiver) = mpsc::channel::(self.channel_capacity); let (ws_connection_sender, mut ws_connection_receiver) = @@ -109,6 +203,21 @@ impl PythLazerClient { Ok(receiver) } + /// Subscribes to data streams across all WebSocket connections. + /// + /// This method sends the subscription request to all active WebSocket connections, + /// ensuring redundancy. If any connection fails to subscribe, + /// an error is returned, but other connections may still be subscribed. + /// + /// # Arguments + /// + /// * `subscribe_request` - The subscription request specifying which data streams to subscribe to + /// + /// # Returns + /// + /// Returns `Ok(())` if the subscription was successfully sent to all connections, + /// or an error if any connection failed to process the subscription. + /// pub async fn subscribe(&mut self, subscribe_request: SubscribeRequest) -> Result<()> { for connection in &mut self.ws_connections { connection.subscribe(subscribe_request.clone()).await?; @@ -116,6 +225,20 @@ impl PythLazerClient { Ok(()) } + /// Unsubscribes from a specific data stream across all WebSocket connections. + /// + /// This method sends an unsubscribe request for the specified subscription ID + /// to all active WebSocket connections. + /// + /// # Arguments + /// + /// * `subscription_id` - The ID of the subscription to cancel + /// + /// # Returns + /// + /// Returns `Ok(())` if the unsubscribe request was successfully sent to all connections, + /// or an error if any connection failed to process the request. + /// pub async fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<()> { for connection in &mut self.ws_connections { connection.unsubscribe(subscription_id).await?; @@ -124,16 +247,41 @@ impl PythLazerClient { } } +/// A builder for creating [`PythLazerClient`] instances with customizable configuration. +/// +/// The builder provides a convenient way to configure a Pyth Lazer client with sensible +/// defaults while allowing customization of all parameters. It follows the builder pattern +/// for a fluent API. +/// +/// ## Default Configuration +/// +/// - **Endpoints**: Uses Pyth Lazer's default production endpoints +/// - **Connections**: 4 concurrent WebSocket connections +/// - **Timeout**: 5 seconds for WebSocket operations +/// - **Backoff**: Exponential backoff with default settings +/// - **Channel Capacity**: Uses the default 1000 +/// pub struct PythLazerClientBuilder { endpoints: Vec, access_token: String, num_connections: usize, - backoff: ExponentialBackoff, + backoff: PythLazerExponentialBackoff, timeout: Duration, channel_capacity: usize, } impl PythLazerClientBuilder { + /// Creates a new builder with default configuration. + /// + /// This initializes the builder with sensible defaults for production use: + /// - Default Pyth Lazer endpoints + /// - 4 WebSocket connections + /// - 5-second timeout + /// + /// # Arguments + /// + /// * `access_token` - The authentication token for accessing Pyth Lazer services + /// pub fn new(access_token: String) -> Self { Self { endpoints: DEFAULT_ENDPOINTS @@ -142,37 +290,101 @@ impl PythLazerClientBuilder { .collect(), access_token, num_connections: DEFAULT_NUM_CONNECTIONS, - backoff: ExponentialBackoff::default(), + backoff: PythLazerExponentialBackoffBuilder::default().build(), timeout: DEFAULT_TIMEOUT, channel_capacity: CHANNEL_CAPACITY, } } + /// Sets custom WebSocket endpoints for the client. + /// + /// By default, the client uses Pyth Lazer's production endpoints. Use this method + /// to connect to different environments (staging, local development) or to use + /// custom endpoint configurations. + /// + /// # Arguments + /// + /// * `endpoints` - A vector of WebSocket endpoint URLs. Must not be empty. + /// pub fn with_endpoints(mut self, endpoints: Vec) -> Self { self.endpoints = endpoints; self } + /// Sets the number of concurrent WebSocket connections to maintain. + /// + /// More connections provide better redundancy and can improve throughput, + /// but also consume more resources. + /// + /// # Arguments + /// + /// * `num_connections` - The number of WebSocket connections (must be > 0) + /// pub fn with_num_connections(mut self, num_connections: usize) -> Self { self.num_connections = num_connections; self } - pub fn with_backoff(mut self, backoff: ExponentialBackoff) -> Self { + /// Sets the exponential backoff configuration for connection retries. + /// + /// The backoff strategy determines how the client handles connection failures + /// and retries. + /// + /// # Arguments + /// + /// * `backoff` - The exponential backoff configuration + /// + pub fn with_backoff(mut self, backoff: PythLazerExponentialBackoff) -> Self { self.backoff = backoff; self } + /// Sets the timeout duration for WebSocket operations. + /// + /// This timeout applies to each WebSocket connection, + /// if no response is received within this duration, + /// the connection will be considered failed and retried. + /// + /// # Arguments + /// + /// * `timeout` - The timeout duration for each WebSocket + /// pub fn with_timeout(mut self, timeout: Duration) -> Self { self.timeout = timeout; self } + /// Sets the capacity of the internal message channel. + /// + /// This determines how many messages can be buffered internally before + /// the client starts applying backpressure. + /// + /// # Arguments + /// + /// * `channel_capacity` - The channel capacity (number of messages) + /// pub fn with_channel_capacity(mut self, channel_capacity: usize) -> Self { self.channel_capacity = channel_capacity; self } + /// Builds the configured [`PythLazerClient`] instance. + /// + /// This consumes the builder and creates a new client with the specified + /// configuration. The client is ready to use but connections are not + /// established until [`PythLazerClient::start`] is called. + /// + /// # Returns + /// + /// Returns `Ok(PythLazerClient)` on success, or an error if the configuration + /// is invalid. + /// + /// # Errors + /// + /// Returns an error if: + /// - No endpoints are configured + /// - Any configuration parameter is invalid + /// pub fn build(self) -> Result { PythLazerClient::new( self.endpoints,