diff --git a/Cargo.lock b/Cargo.lock index 039bc23e58..25fc1df504 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -701,12 +701,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "assert_float_eq" -version = "1.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10d2119f741b79fe9907f5396d19bffcb46568cfcc315e78677d731972ac7085" - [[package]] name = "async-channel" version = "1.9.0" @@ -5675,7 +5669,7 @@ dependencies = [ [[package]] name = "pyth-lazer-client" -version = "3.0.0" +version = "2.0.1" dependencies = [ "alloy-primitives 0.8.25", "anyhow", @@ -5688,7 +5682,7 @@ dependencies = [ "futures-util", "hex", "libsecp256k1 0.7.2", - "pyth-lazer-protocol 0.11.0", + "pyth-lazer-protocol 0.10.2", "serde", "serde_json", "tokio", @@ -5721,11 +5715,10 @@ dependencies = [ [[package]] name = "pyth-lazer-protocol" -version = "0.11.0" +version = "0.10.2" dependencies = [ "alloy-primitives 0.8.25", "anyhow", - "assert_float_eq", "bincode 1.3.3", "bs58", "byteorder", @@ -5742,7 +5735,6 @@ dependencies = [ "rust_decimal", "serde", "serde_json", - "thiserror 2.0.12", ] [[package]] @@ -5761,13 +5753,13 @@ dependencies = [ [[package]] name = "pyth-lazer-publisher-sdk" -version = "0.6.0" +version = "0.5.0" dependencies = [ "anyhow", "fs-err", "protobuf", "protobuf-codegen", - "pyth-lazer-protocol 0.11.0", + "pyth-lazer-protocol 0.10.2", "serde_json", ] diff --git a/lazer/contracts/solana/programs/pyth-lazer-solana-contract/Cargo.toml b/lazer/contracts/solana/programs/pyth-lazer-solana-contract/Cargo.toml index cb2de74e6d..f0dc8d37f5 100644 --- a/lazer/contracts/solana/programs/pyth-lazer-solana-contract/Cargo.toml +++ b/lazer/contracts/solana/programs/pyth-lazer-solana-contract/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-solana-contract" -version = "0.6.0" +version = "0.5.0" edition = "2021" description = "Pyth Lazer Solana contract and SDK." license = "Apache-2.0" @@ -19,7 +19,7 @@ no-log-ix-name = [] idl-build = ["anchor-lang/idl-build"] [dependencies] -pyth-lazer-protocol = { path = "../../../../sdk/rust/protocol", version = "0.11.0" } +pyth-lazer-protocol = { path = "../../../../sdk/rust/protocol", version = "0.10.1" } anchor-lang = "0.31.1" bytemuck = { version = "1.20.0", features = ["derive"] } diff --git a/lazer/publisher_sdk/rust/Cargo.toml b/lazer/publisher_sdk/rust/Cargo.toml index be22223403..656736ed9b 100644 --- a/lazer/publisher_sdk/rust/Cargo.toml +++ b/lazer/publisher_sdk/rust/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "pyth-lazer-publisher-sdk" -version = "0.6.0" +version = "0.5.0" edition = "2021" description = "Pyth Lazer Publisher SDK types." license = "Apache-2.0" repository = "https://github.com/pyth-network/pyth-crosschain" [dependencies] -pyth-lazer-protocol = { version = "0.11.0", path = "../../sdk/rust/protocol" } +pyth-lazer-protocol = { version = "0.10.2", path = "../../sdk/rust/protocol" } anyhow = "1.0.98" protobuf = "3.7.2" serde_json = "1.0.140" diff --git a/lazer/publisher_sdk/rust/src/lib.rs b/lazer/publisher_sdk/rust/src/lib.rs index 9600c61f64..ae98a79c39 100644 --- a/lazer/publisher_sdk/rust/src/lib.rs +++ b/lazer/publisher_sdk/rust/src/lib.rs @@ -3,8 +3,8 @@ use crate::publisher_update::{FeedUpdate, FundingRateUpdate, PriceUpdate}; use crate::state::FeedState; use ::protobuf::MessageField; use pyth_lazer_protocol::jrpc::{FeedUpdateParams, UpdateParams}; +use pyth_lazer_protocol::symbol_state::SymbolState; use pyth_lazer_protocol::FeedKind; -use pyth_lazer_protocol::SymbolState; pub mod transaction_envelope { pub use crate::protobuf::transaction_envelope::*; @@ -56,9 +56,9 @@ impl From for Update { best_bid_price, best_ask_price, } => Update::PriceUpdate(PriceUpdate { - price: Some(price.mantissa_i64()), - best_bid_price: best_bid_price.map(|p| p.mantissa_i64()), - best_ask_price: best_ask_price.map(|p| p.mantissa_i64()), + price: Some(price.0.into()), + best_bid_price: best_bid_price.map(|p| p.0.into()), + best_ask_price: best_ask_price.map(|p| p.0.into()), special_fields: Default::default(), }), UpdateParams::FundingRateUpdate { @@ -66,8 +66,8 @@ impl From for Update { rate, funding_rate_interval, } => Update::FundingRateUpdate(FundingRateUpdate { - price: price.map(|p| p.mantissa_i64()), - rate: Some(rate.mantissa()), + price: price.map(|p| p.0.into()), + rate: Some(rate.0), funding_rate_interval: MessageField::from_option( funding_rate_interval.map(|i| i.into()), ), diff --git a/lazer/sdk/rust/client/Cargo.toml b/lazer/sdk/rust/client/Cargo.toml index 44d595e0c5..36927a49ed 100644 --- a/lazer/sdk/rust/client/Cargo.toml +++ b/lazer/sdk/rust/client/Cargo.toml @@ -1,12 +1,12 @@ [package] name = "pyth-lazer-client" -version = "3.0.0" +version = "2.0.1" edition = "2021" description = "A Rust client for Pyth Lazer" license = "Apache-2.0" [dependencies] -pyth-lazer-protocol = { path = "../protocol", version = "0.11.0" } +pyth-lazer-protocol = { path = "../protocol", version = "0.10.2" } tokio = { version = "1", features = ["full"] } tokio-tungstenite = { version = "0.20", features = ["native-tls"] } futures-util = "0.3" diff --git a/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs b/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs index 9087450f2b..e39c4dc41b 100644 --- a/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs +++ b/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs @@ -4,16 +4,15 @@ use base64::Engine; use pyth_lazer_client::backoff::PythLazerExponentialBackoffBuilder; use pyth_lazer_client::client::PythLazerClientBuilder; use pyth_lazer_client::ws_connection::AnyResponse; -use pyth_lazer_protocol::api::{ - Channel, DeliveryFormat, Format, JsonBinaryEncoding, SubscriptionParams, SubscriptionParamsRepr, -}; -use pyth_lazer_protocol::api::{SubscribeRequest, SubscriptionId, WsResponse}; use pyth_lazer_protocol::message::{ EvmMessage, LeEcdsaMessage, LeUnsignedMessage, Message, SolanaMessage, }; use pyth_lazer_protocol::payload::PayloadData; -use pyth_lazer_protocol::time::FixedRate; -use pyth_lazer_protocol::{PriceFeedId, PriceFeedProperty}; +use pyth_lazer_protocol::router::{ + Channel, DeliveryFormat, FixedRate, Format, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty, + SubscriptionParams, SubscriptionParamsRepr, +}; +use pyth_lazer_protocol::subscription::{Response, SubscribeRequest, SubscriptionId}; use tokio::pin; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; @@ -110,7 +109,7 @@ async fn main() -> anyhow::Result<()> { // The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them. match msg { AnyResponse::Json(msg) => match msg { - WsResponse::StreamUpdated(update) => { + Response::StreamUpdated(update) => { println!("Received a JSON update for {:?}", update.subscription_id); if let Some(evm_data) = update.payload.evm { // Decode binary data diff --git a/lazer/sdk/rust/client/src/client.rs b/lazer/sdk/rust/client/src/client.rs index 15755e9483..ad19052df9 100644 --- a/lazer/sdk/rust/client/src/client.rs +++ b/lazer/sdk/rust/client/src/client.rs @@ -51,7 +51,7 @@ use crate::{ }; use anyhow::{bail, Result}; use backoff::ExponentialBackoff; -use pyth_lazer_protocol::api::{SubscribeRequest, SubscriptionId}; +use pyth_lazer_protocol::subscription::{SubscribeRequest, SubscriptionId}; use tokio::sync::mpsc::{self, error::TrySendError}; use tracing::{error, warn}; use ttl_cache::TtlCache; diff --git a/lazer/sdk/rust/client/src/resilient_ws_connection.rs b/lazer/sdk/rust/client/src/resilient_ws_connection.rs index 96ee51ec0a..70385d5946 100644 --- a/lazer/sdk/rust/client/src/resilient_ws_connection.rs +++ b/lazer/sdk/rust/client/src/resilient_ws_connection.rs @@ -2,7 +2,9 @@ use std::time::Duration; use backoff::{backoff::Backoff, ExponentialBackoff}; use futures_util::StreamExt; -use pyth_lazer_protocol::api::{SubscribeRequest, SubscriptionId, UnsubscribeRequest, WsRequest}; +use pyth_lazer_protocol::subscription::{ + Request, SubscribeRequest, SubscriptionId, UnsubscribeRequest, +}; use tokio::{pin, select, sync::mpsc, time::Instant}; use tracing::{error, info, warn}; use url::Url; @@ -16,7 +18,7 @@ use anyhow::{bail, Context, Result}; const BACKOFF_RESET_DURATION: Duration = Duration::from_secs(10); pub struct PythLazerResilientWSConnection { - request_sender: mpsc::Sender, + request_sender: mpsc::Sender, } impl PythLazerResilientWSConnection { @@ -51,7 +53,7 @@ impl PythLazerResilientWSConnection { pub async fn subscribe(&mut self, request: SubscribeRequest) -> Result<()> { self.request_sender - .send(WsRequest::Subscribe(request)) + .send(Request::Subscribe(request)) .await .context("Failed to send subscribe request")?; Ok(()) @@ -59,9 +61,7 @@ impl PythLazerResilientWSConnection { pub async fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<()> { self.request_sender - .send(WsRequest::Unsubscribe(UnsubscribeRequest { - subscription_id, - })) + .send(Request::Unsubscribe(UnsubscribeRequest { subscription_id })) .await .context("Failed to send unsubscribe request")?; Ok(()) @@ -95,7 +95,7 @@ impl PythLazerResilientWSConnectionTask { pub async fn run( &mut self, response_sender: mpsc::Sender, - request_receiver: &mut mpsc::Receiver, + request_receiver: &mut mpsc::Receiver, ) -> Result<()> { loop { let start_time = Instant::now(); @@ -128,7 +128,7 @@ impl PythLazerResilientWSConnectionTask { pub async fn start( &mut self, sender: mpsc::Sender, - request_receiver: &mut mpsc::Receiver, + request_receiver: &mut mpsc::Receiver, ) -> Result<()> { let mut ws_connection = PythLazerWSConnection::new(self.endpoint.clone(), self.access_token.clone())?; @@ -137,7 +137,7 @@ impl PythLazerResilientWSConnectionTask { for subscription in self.subscriptions.clone() { ws_connection - .send_request(WsRequest::Subscribe(subscription)) + .send_request(Request::Subscribe(subscription)) .await?; } loop { @@ -167,10 +167,10 @@ impl PythLazerResilientWSConnectionTask { } Some(request) = request_receiver.recv() => { match request { - WsRequest::Subscribe(request) => { + Request::Subscribe(request) => { self.subscribe(&mut ws_connection, request).await?; } - WsRequest::Unsubscribe(request) => { + Request::Unsubscribe(request) => { self.unsubscribe(&mut ws_connection, request).await?; } } diff --git a/lazer/sdk/rust/client/src/ws_connection.rs b/lazer/sdk/rust/client/src/ws_connection.rs index 3efbcbd656..385bd2efd7 100644 --- a/lazer/sdk/rust/client/src/ws_connection.rs +++ b/lazer/sdk/rust/client/src/ws_connection.rs @@ -4,8 +4,8 @@ use anyhow::Result; use derive_more::From; use futures_util::{SinkExt, StreamExt, TryStreamExt}; use pyth_lazer_protocol::{ - api::{ErrorResponse, SubscribeRequest, UnsubscribeRequest, WsRequest, WsResponse}, binary_update::BinaryWsUpdate, + subscription::{ErrorResponse, Request, Response, SubscribeRequest, UnsubscribeRequest}, }; use tokio_tungstenite::{connect_async, tungstenite::Message}; use url::Url; @@ -32,7 +32,7 @@ pub struct PythLazerWSConnection { #[derive(Debug, Clone, PartialEq, Eq, Hash, From)] pub enum AnyResponse { - Json(WsResponse), + Json(Response), Binary(BinaryWsUpdate), } @@ -84,13 +84,13 @@ impl PythLazerWSConnection { .try_filter_map(|msg| async { let r: Result> = match msg { Message::Text(text) => { - Ok(Some(serde_json::from_str::(&text)?.into())) + Ok(Some(serde_json::from_str::(&text)?.into())) } Message::Binary(data) => { Ok(Some(BinaryWsUpdate::deserialize_slice(&data)?.into())) } Message::Close(_) => Ok(Some( - WsResponse::Error(ErrorResponse { + Response::Error(ErrorResponse { error: "WebSocket connection closed".to_string(), }) .into(), @@ -103,7 +103,7 @@ impl PythLazerWSConnection { Ok(response_stream) } - pub async fn send_request(&mut self, request: WsRequest) -> Result<()> { + pub async fn send_request(&mut self, request: Request) -> Result<()> { if let Some(sender) = &mut self.ws_sender { let msg = serde_json::to_string(&request)?; sender.send(Message::Text(msg)).await?; @@ -118,7 +118,7 @@ impl PythLazerWSConnection { /// # Arguments /// * `request` - A subscription request containing feed IDs and parameters pub async fn subscribe(&mut self, request: SubscribeRequest) -> Result<()> { - let request = WsRequest::Subscribe(request); + let request = Request::Subscribe(request); self.send_request(request).await } @@ -127,7 +127,7 @@ impl PythLazerWSConnection { /// # Arguments /// * `subscription_id` - The ID of the subscription to cancel pub async fn unsubscribe(&mut self, request: UnsubscribeRequest) -> Result<()> { - let request = WsRequest::Unsubscribe(request); + let request = Request::Unsubscribe(request); self.send_request(request).await } diff --git a/lazer/sdk/rust/protocol/Cargo.toml b/lazer/sdk/rust/protocol/Cargo.toml index e058344eea..5865eb27da 100644 --- a/lazer/sdk/rust/protocol/Cargo.toml +++ b/lazer/sdk/rust/protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-protocol" -version = "0.11.0" +version = "0.10.2" edition = "2021" description = "Pyth Lazer SDK - protocol types." license = "Apache-2.0" @@ -20,7 +20,6 @@ mry = { version = "0.13.0", features = ["serde"], optional = true } chrono = "0.4.41" humantime = "2.2.0" hex = "0.4.3" -thiserror = "2.0.12" [dev-dependencies] bincode = "1.3.3" @@ -29,4 +28,3 @@ hex = "0.4.3" libsecp256k1 = "0.7.1" bs58 = "0.5.1" alloy-primitives = "0.8.19" -assert_float_eq = "1.1.4" diff --git a/lazer/sdk/rust/protocol/src/api.rs b/lazer/sdk/rust/protocol/src/api.rs index 8423c96cb6..4a059ca255 100644 --- a/lazer/sdk/rust/protocol/src/api.rs +++ b/lazer/sdk/rust/protocol/src/api.rs @@ -1,16 +1,8 @@ -use std::{ - fmt::Display, - ops::{Deref, DerefMut}, -}; - -use derive_more::derive::From; -use itertools::Itertools as _; -use serde::{de::Error, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use crate::{ - payload::AggregatedPriceFeedData, - time::{DurationUs, FixedRate, TimestampUs}, - ChannelId, Price, PriceFeedId, PriceFeedProperty, Rate, + router::{Channel, Format, JsonBinaryEncoding, JsonUpdate, PriceFeedId, PriceFeedProperty}, + time::TimestampUs, }; #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -60,417 +52,3 @@ pub type PriceResponse = JsonUpdate; pub fn default_parsed() -> bool { true } - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum DeliveryFormat { - /// Deliver stream updates as JSON text messages. - #[default] - Json, - /// Deliver stream updates as binary messages. - Binary, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum Format { - Evm, - Solana, - LeEcdsa, - LeUnsigned, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum JsonBinaryEncoding { - #[default] - Base64, - Hex, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)] -pub enum Channel { - FixedRate(FixedRate), -} - -impl Serialize for Channel { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - match self { - Channel::FixedRate(fixed_rate) => { - if *fixed_rate == FixedRate::MIN { - return serializer.serialize_str("real_time"); - } - serializer.serialize_str(&format!( - "fixed_rate@{}ms", - fixed_rate.duration().as_millis() - )) - } - } - } -} - -impl Display for Channel { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Channel::FixedRate(fixed_rate) => match *fixed_rate { - FixedRate::MIN => write!(f, "real_time"), - rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()), - }, - } - } -} - -impl Channel { - pub fn id(&self) -> ChannelId { - match self { - Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() { - 1 => ChannelId::FIXED_RATE_1, - 50 => ChannelId::FIXED_RATE_50, - 200 => ChannelId::FIXED_RATE_200, - _ => panic!("unknown channel: {self:?}"), - }, - } - } -} - -#[test] -fn id_supports_all_fixed_rates() { - for rate in FixedRate::ALL { - Channel::FixedRate(rate).id(); - } -} - -fn parse_channel(value: &str) -> Option { - if value == "real_time" { - Some(Channel::FixedRate(FixedRate::MIN)) - } else if let Some(rest) = value.strip_prefix("fixed_rate@") { - let ms_value = rest.strip_suffix("ms")?; - Some(Channel::FixedRate(FixedRate::from_millis( - ms_value.parse().ok()?, - )?)) - } else { - None - } -} - -impl<'de> Deserialize<'de> for Channel { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let value = ::deserialize(deserializer)?; - parse_channel(&value).ok_or_else(|| Error::custom("unknown channel")) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscriptionParamsRepr { - pub price_feed_ids: Vec, - pub properties: Vec, - // "chains" was renamed to "formats". "chains" is still supported for compatibility. - #[serde(alias = "chains")] - pub formats: Vec, - #[serde(default)] - pub delivery_format: DeliveryFormat, - #[serde(default)] - pub json_binary_encoding: JsonBinaryEncoding, - /// If `true`, the stream update will contain a `parsed` JSON field containing - /// all data of the update. - #[serde(default = "default_parsed")] - pub parsed: bool, - pub channel: Channel, - #[serde(default)] - pub ignore_invalid_feed_ids: bool, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscriptionParams(SubscriptionParamsRepr); - -impl<'de> Deserialize<'de> for SubscriptionParams { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let value = SubscriptionParamsRepr::deserialize(deserializer)?; - Self::new(value).map_err(Error::custom) - } -} - -impl SubscriptionParams { - pub fn new(value: SubscriptionParamsRepr) -> Result { - if value.price_feed_ids.is_empty() { - return Err("no price feed ids specified"); - } - if !value.price_feed_ids.iter().all_unique() { - return Err("duplicate price feed ids specified"); - } - if !value.formats.iter().all_unique() { - return Err("duplicate formats or chains specified"); - } - if value.properties.is_empty() { - return Err("no properties specified"); - } - if !value.properties.iter().all_unique() { - return Err("duplicate properties specified"); - } - Ok(Self(value)) - } -} - -impl Deref for SubscriptionParams { - type Target = SubscriptionParamsRepr; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} -impl DerefMut for SubscriptionParams { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct JsonBinaryData { - pub encoding: JsonBinaryEncoding, - pub data: String, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct JsonUpdate { - /// Present unless `parsed = false` is specified in subscription params. - #[serde(skip_serializing_if = "Option::is_none")] - pub parsed: Option, - /// Only present if `Evm` is present in `formats` in subscription params. - #[serde(skip_serializing_if = "Option::is_none")] - pub evm: Option, - /// Only present if `Solana` is present in `formats` in subscription params. - #[serde(skip_serializing_if = "Option::is_none")] - pub solana: Option, - /// Only present if `LeEcdsa` is present in `formats` in subscription params. - #[serde(skip_serializing_if = "Option::is_none")] - pub le_ecdsa: Option, - /// Only present if `LeUnsigned` is present in `formats` in subscription params. - #[serde(skip_serializing_if = "Option::is_none")] - pub le_unsigned: Option, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ParsedPayload { - #[serde(with = "crate::serde_str::timestamp")] - pub timestamp_us: TimestampUs, - pub price_feeds: Vec, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ParsedFeedPayload { - pub price_feed_id: PriceFeedId, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(with = "crate::serde_str::option_price")] - #[serde(default)] - pub price: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(with = "crate::serde_str::option_price")] - #[serde(default)] - pub best_bid_price: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(with = "crate::serde_str::option_price")] - #[serde(default)] - pub best_ask_price: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub publisher_count: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub exponent: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub confidence: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub funding_rate: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub funding_timestamp: Option, - // More fields may be added later. - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub funding_rate_interval: Option, -} - -impl ParsedFeedPayload { - pub fn new( - price_feed_id: PriceFeedId, - exponent: Option, - data: &AggregatedPriceFeedData, - properties: &[PriceFeedProperty], - ) -> Self { - let mut output = Self { - price_feed_id, - price: None, - best_bid_price: None, - best_ask_price: None, - publisher_count: None, - exponent: None, - confidence: None, - funding_rate: None, - funding_timestamp: None, - funding_rate_interval: None, - }; - for &property in properties { - match property { - PriceFeedProperty::Price => { - output.price = data.price; - } - PriceFeedProperty::BestBidPrice => { - output.best_bid_price = data.best_bid_price; - } - PriceFeedProperty::BestAskPrice => { - output.best_ask_price = data.best_ask_price; - } - PriceFeedProperty::PublisherCount => { - output.publisher_count = Some(data.publisher_count); - } - PriceFeedProperty::Exponent => { - output.exponent = exponent; - } - PriceFeedProperty::Confidence => { - output.confidence = data.confidence; - } - PriceFeedProperty::FundingRate => { - output.funding_rate = data.funding_rate; - } - PriceFeedProperty::FundingTimestamp => { - output.funding_timestamp = data.funding_timestamp; - } - PriceFeedProperty::FundingRateInterval => { - output.funding_rate_interval = data.funding_rate_interval; - } - } - } - output - } - - pub fn new_full( - price_feed_id: PriceFeedId, - exponent: Option, - data: &AggregatedPriceFeedData, - ) -> Self { - Self { - price_feed_id, - price: data.price, - best_bid_price: data.best_bid_price, - best_ask_price: data.best_ask_price, - publisher_count: Some(data.publisher_count), - exponent, - confidence: data.confidence, - funding_rate: data.funding_rate, - funding_timestamp: data.funding_timestamp, - funding_rate_interval: data.funding_rate_interval, - } - } -} - -/// A request sent from the client to the server. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(tag = "type")] -#[serde(rename_all = "camelCase")] -pub enum WsRequest { - Subscribe(SubscribeRequest), - Unsubscribe(UnsubscribeRequest), -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] -pub struct SubscriptionId(pub u64); - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscribeRequest { - pub subscription_id: SubscriptionId, - #[serde(flatten)] - pub params: SubscriptionParams, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct UnsubscribeRequest { - pub subscription_id: SubscriptionId, -} - -/// A JSON response sent from the server to the client. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, From)] -#[serde(tag = "type")] -#[serde(rename_all = "camelCase")] -pub enum WsResponse { - Error(ErrorResponse), - Subscribed(SubscribedResponse), - SubscribedWithInvalidFeedIdsIgnored(SubscribedWithInvalidFeedIdsIgnoredResponse), - Unsubscribed(UnsubscribedResponse), - SubscriptionError(SubscriptionErrorResponse), - StreamUpdated(StreamUpdatedResponse), -} - -/// Sent from the server after a successul subscription. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscribedResponse { - pub subscription_id: SubscriptionId, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct InvalidFeedSubscriptionDetails { - pub unknown_ids: Vec, - pub unsupported_channels: Vec, - pub unstable: Vec, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscribedWithInvalidFeedIdsIgnoredResponse { - pub subscription_id: SubscriptionId, - pub subscribed_feed_ids: Vec, - pub ignored_invalid_feed_ids: InvalidFeedSubscriptionDetails, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct UnsubscribedResponse { - pub subscription_id: SubscriptionId, -} - -/// Sent from the server if the requested subscription or unsubscription request -/// could not be fulfilled. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscriptionErrorResponse { - pub subscription_id: SubscriptionId, - pub error: String, -} - -/// Sent from the server if an internal error occured while serving data for an existing subscription, -/// or a client request sent a bad request. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ErrorResponse { - pub error: String, -} - -/// Sent from the server when new data is available for an existing subscription -/// (only if `delivery_format == Json`). -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct StreamUpdatedResponse { - pub subscription_id: SubscriptionId, - #[serde(flatten)] - pub payload: JsonUpdate, -} diff --git a/lazer/sdk/rust/protocol/src/binary_update.rs b/lazer/sdk/rust/protocol/src/binary_update.rs index 9c030ed667..aab2b5b139 100644 --- a/lazer/sdk/rust/protocol/src/binary_update.rs +++ b/lazer/sdk/rust/protocol/src/binary_update.rs @@ -1,5 +1,5 @@ use { - crate::{api::SubscriptionId, message::Message}, + crate::{message::Message, subscription::SubscriptionId}, anyhow::{bail, Context}, byteorder::{WriteBytesExt, BE, LE}, }; diff --git a/lazer/sdk/rust/protocol/src/jrpc.rs b/lazer/sdk/rust/protocol/src/jrpc.rs index ed5e67870d..ceb7704f22 100644 --- a/lazer/sdk/rust/protocol/src/jrpc.rs +++ b/lazer/sdk/rust/protocol/src/jrpc.rs @@ -1,8 +1,6 @@ -use crate::rate::Rate; +use crate::router::{Channel, Price, PriceFeedId, Rate}; use crate::symbol_state::SymbolState; use crate::time::TimestampUs; -use crate::PriceFeedId; -use crate::{api::Channel, price::Price}; use serde::{Deserialize, Serialize}; use std::time::Duration; diff --git a/lazer/sdk/rust/protocol/src/lib.rs b/lazer/sdk/rust/protocol/src/lib.rs index 361539e7ba..21e94b4a4b 100644 --- a/lazer/sdk/rust/protocol/src/lib.rs +++ b/lazer/sdk/rust/protocol/src/lib.rs @@ -1,93 +1,21 @@ -//! Lazer type definitions and utilities. +//! Protocol types. -/// Types describing Lazer HTTP and WebSocket APIs. pub mod api; -/// Binary delivery format for WebSocket. pub mod binary_update; mod dynamic_value; mod feed_kind; -/// Lazer Agent JSON-RPC API. pub mod jrpc; -/// Types describing Lazer's verifiable messages containing signature and payload. pub mod message; -/// Types describing Lazer's message payload. pub mod payload; -mod price; -/// Legacy Websocket API for publishers. pub mod publisher; -mod rate; +pub mod router; mod serde_price_as_i64; mod serde_str; -mod symbol_state; -/// Lazer's types for time representation. +pub mod subscription; +pub mod symbol_state; pub mod time; -use derive_more::derive::{From, Into}; -use serde::{Deserialize, Serialize}; - -pub use crate::{ - dynamic_value::DynamicValue, - feed_kind::FeedKind, - price::{Price, PriceError}, - rate::{Rate, RateError}, - symbol_state::SymbolState, -}; - -#[derive( - Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, -)] -pub struct PublisherId(pub u16); - -#[derive( - Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, -)] -pub struct PriceFeedId(pub u32); - -#[derive( - Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, -)] -pub struct ChannelId(pub u8); - -impl ChannelId { - pub const FIXED_RATE_1: ChannelId = ChannelId(1); - pub const FIXED_RATE_50: ChannelId = ChannelId(2); - pub const FIXED_RATE_200: ChannelId = ChannelId(3); -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum PriceFeedProperty { - Price, - BestBidPrice, - BestAskPrice, - PublisherCount, - Exponent, - Confidence, - FundingRate, - FundingTimestamp, - FundingRateInterval, - // More fields may be added later. -} - -// Operation and coefficient for converting value to mantissa. -enum ExponentFactor { - // mantissa = value * factor - Mul(i64), - // mantissa = value / factor - Div(i64), -} - -impl ExponentFactor { - fn get(exponent: i16) -> Option { - if exponent >= 0 { - let exponent: u32 = exponent.try_into().ok()?; - Some(ExponentFactor::Div(10_i64.checked_pow(exponent)?)) - } else { - let minus_exponent: u32 = exponent.checked_neg()?.try_into().ok()?; - Some(ExponentFactor::Mul(10_i64.checked_pow(minus_exponent)?)) - } - } -} +pub use crate::{dynamic_value::DynamicValue, feed_kind::FeedKind}; #[test] fn magics_in_big_endian() { diff --git a/lazer/sdk/rust/protocol/src/message.rs b/lazer/sdk/rust/protocol/src/message.rs index cfb7832e4e..199e081b56 100644 --- a/lazer/sdk/rust/protocol/src/message.rs +++ b/lazer/sdk/rust/protocol/src/message.rs @@ -1,6 +1,6 @@ use { self::format_magics_le::{EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC}, - crate::api::ParsedPayload, + crate::router::ParsedPayload, anyhow::{bail, Context}, byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE}, derive_more::From, diff --git a/lazer/sdk/rust/protocol/src/payload.rs b/lazer/sdk/rust/protocol/src/payload.rs index 3fab9673f7..144c566e5f 100644 --- a/lazer/sdk/rust/protocol/src/payload.rs +++ b/lazer/sdk/rust/protocol/src/payload.rs @@ -1,10 +1,12 @@ -use crate::{ - price::Price, - rate::Rate, - time::{DurationUs, TimestampUs}, - ChannelId, PriceFeedId, PriceFeedProperty, -}; +//! Types representing binary encoding of signable payloads and signature envelopes. + +use crate::time::DurationUs; use { + super::router::{PriceFeedId, PriceFeedProperty}, + crate::{ + router::{ChannelId, Price, Rate}, + time::TimestampUs, + }, anyhow::bail, byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE}, serde::{Deserialize, Serialize}, @@ -229,12 +231,12 @@ fn write_option_price( mut writer: impl Write, value: Option, ) -> std::io::Result<()> { - writer.write_i64::(value.map_or(0, |v| v.mantissa_i64())) + writer.write_i64::(value.map_or(0, |v| v.0.get())) } fn read_option_price(mut reader: impl Read) -> std::io::Result> { let value = NonZeroI64::new(reader.read_i64::()?); - Ok(value.map(Price::from_nonzero_mantissa)) + Ok(value.map(Price)) } fn write_option_rate( @@ -244,7 +246,7 @@ fn write_option_rate( match value { Some(value) => { writer.write_u8(1)?; - writer.write_i64::(value.mantissa()) + writer.write_i64::(value.0) } None => { writer.write_u8(0)?; @@ -256,7 +258,7 @@ fn write_option_rate( fn read_option_rate(mut reader: impl Read) -> std::io::Result> { let present = reader.read_u8()? != 0; if present { - Ok(Some(Rate::from_mantissa(reader.read_i64::()?))) + Ok(Some(Rate(reader.read_i64::()?))) } else { Ok(None) } diff --git a/lazer/sdk/rust/protocol/src/price.rs b/lazer/sdk/rust/protocol/src/price.rs deleted file mode 100644 index cbc5a8200e..0000000000 --- a/lazer/sdk/rust/protocol/src/price.rs +++ /dev/null @@ -1,155 +0,0 @@ -#[cfg(test)] -mod tests; - -use { - crate::ExponentFactor, - rust_decimal::{prelude::FromPrimitive, Decimal}, - serde::{Deserialize, Serialize}, - std::num::NonZeroI64, - thiserror::Error, -}; - -#[derive(Debug, Error)] -pub enum PriceError { - #[error("decimal parse error: {0}")] - DecimalParse(#[from] rust_decimal::Error), - #[error("price value is more precise than available exponent")] - TooPrecise, - #[error("zero price is unsupported")] - ZeroPriceUnsupported, - #[error("overflow")] - Overflow, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] -#[repr(transparent)] -pub struct Price(NonZeroI64); - -impl Price { - pub fn from_integer(value: i64, exponent: i16) -> Result { - let value = match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? { - ExponentFactor::Mul(coef) => value.checked_mul(coef).ok_or(PriceError::Overflow)?, - ExponentFactor::Div(coef) => value.checked_div(coef).ok_or(PriceError::Overflow)?, - }; - let value = NonZeroI64::new(value).ok_or(PriceError::ZeroPriceUnsupported)?; - Ok(Self(value)) - } - - pub fn parse_str(value: &str, exponent: i16) -> Result { - let value: Decimal = value.parse()?; - let value = match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? { - ExponentFactor::Mul(coef) => value - .checked_mul(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?) - .ok_or(PriceError::Overflow)?, - ExponentFactor::Div(coef) => value - .checked_div(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?) - .ok_or(PriceError::Overflow)?, - }; - if !value.is_integer() { - return Err(PriceError::TooPrecise); - } - let value: i64 = value.try_into().map_err(|_| PriceError::Overflow)?; - let value = NonZeroI64::new(value).ok_or(PriceError::Overflow)?; - Ok(Self(value)) - } - - pub const fn from_nonzero_mantissa(mantissa: NonZeroI64) -> Self { - Self(mantissa) - } - - pub const fn from_mantissa(mantissa: i64) -> Result { - if let Some(value) = NonZeroI64::new(mantissa) { - Ok(Self(value)) - } else { - Err(PriceError::ZeroPriceUnsupported) - } - } - - pub fn mantissa(self) -> NonZeroI64 { - self.0 - } - - pub fn mantissa_i64(self) -> i64 { - self.0.get() - } - - pub fn to_f64(self, exponent: i16) -> Result { - match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? { - // Mul/div is reversed for this conversion - ExponentFactor::Mul(coef) => Ok(self.0.get() as f64 / coef as f64), - ExponentFactor::Div(coef) => Ok(self.0.get() as f64 * coef as f64), - } - } - - pub fn from_f64(value: f64, exponent: i16) -> Result { - let value = Decimal::from_f64(value).ok_or(PriceError::Overflow)?; - let value = match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? { - ExponentFactor::Mul(coef) => value - .checked_mul(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?) - .ok_or(PriceError::Overflow)?, - ExponentFactor::Div(coef) => value - .checked_div(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?) - .ok_or(PriceError::Overflow)?, - }; - let value: i64 = value.try_into().map_err(|_| PriceError::Overflow)?; - Ok(Self( - NonZeroI64::new(value).ok_or(PriceError::ZeroPriceUnsupported)?, - )) - } - - pub fn add_with_same_mantissa(self, other: Price) -> Result { - let value = self - .0 - .get() - .checked_add(other.0.get()) - .ok_or(PriceError::Overflow)?; - Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported) - } - - pub fn sub_with_same_mantissa(self, other: Price) -> Result { - let value = self - .0 - .get() - .checked_sub(other.0.get()) - .ok_or(PriceError::Overflow)?; - Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported) - } - - pub fn mul_integer(self, factor: i64) -> Result { - let value = self - .0 - .get() - .checked_mul(factor) - .ok_or(PriceError::Overflow)?; - Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported) - } - - pub fn div_integer(self, factor: i64) -> Result { - let value = self - .0 - .get() - .checked_div(factor) - .ok_or(PriceError::Overflow)?; - Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported) - } - - pub fn mul_decimal(self, mantissa: i64, rhs_exponent: i16) -> Result { - let left_value = i128::from(self.0.get()); - let right_value = i128::from(mantissa); - - let value = left_value - .checked_mul(right_value) - .ok_or(PriceError::Overflow)?; - - let value = match ExponentFactor::get(rhs_exponent).ok_or(PriceError::Overflow)? { - ExponentFactor::Mul(coef) => { - value.checked_div(coef.into()).ok_or(PriceError::Overflow)? - } - ExponentFactor::Div(coef) => { - value.checked_mul(coef.into()).ok_or(PriceError::Overflow)? - } - }; - let value: i64 = value.try_into().map_err(|_| PriceError::Overflow)?; - Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported) - } -} diff --git a/lazer/sdk/rust/protocol/src/price/tests.rs b/lazer/sdk/rust/protocol/src/price/tests.rs deleted file mode 100644 index 0abe2dd542..0000000000 --- a/lazer/sdk/rust/protocol/src/price/tests.rs +++ /dev/null @@ -1,149 +0,0 @@ -use {super::Price, assert_float_eq::assert_float_absolute_eq}; - -#[test] -fn price_constructs() { - let price = Price::parse_str("42.68", -8).unwrap(); - assert_eq!(price.0.get(), 4_268_000_000); - assert_float_absolute_eq!(price.to_f64(-8).unwrap(), 42.68); - - let price2 = Price::from_integer(2, -8).unwrap(); - assert_eq!(price2.0.get(), 200_000_000); - assert_float_absolute_eq!(price2.to_f64(-8).unwrap(), 2.); - - let price3 = Price::from_mantissa(123_456).unwrap(); - assert_eq!(price3.0.get(), 123_456); - assert_float_absolute_eq!(price3.to_f64(-8).unwrap(), 0.001_234_56); - - let price4 = Price::from_f64(42.68, -8).unwrap(); - assert_eq!(price4.0.get(), 4_268_000_000); - assert_float_absolute_eq!(price4.to_f64(-8).unwrap(), 42.68); -} - -#[test] -fn price_constructs_with_negative_mantissa() { - let price = Price::parse_str("-42.68", -8).unwrap(); - assert_eq!(price.0.get(), -4_268_000_000); - assert_float_absolute_eq!(price.to_f64(-8).unwrap(), -42.68); - - let price2 = Price::from_integer(-2, -8).unwrap(); - assert_eq!(price2.0.get(), -200_000_000); - assert_float_absolute_eq!(price2.to_f64(-8).unwrap(), -2.); - - let price3 = Price::from_mantissa(-123_456).unwrap(); - assert_eq!(price3.0.get(), -123_456); - assert_float_absolute_eq!(price3.to_f64(-8).unwrap(), -0.001_234_56); - - let price4 = Price::from_f64(-42.68, -8).unwrap(); - assert_eq!(price4.0.get(), -4_268_000_000); - assert_float_absolute_eq!(price4.to_f64(-8).unwrap(), -42.68); -} - -#[test] -fn price_constructs_with_zero_exponent() { - let price = Price::parse_str("42", 0).unwrap(); - assert_eq!(price.0.get(), 42); - assert_float_absolute_eq!(price.to_f64(0).unwrap(), 42.); - - let price2 = Price::from_integer(2, 0).unwrap(); - assert_eq!(price2.0.get(), 2); - assert_float_absolute_eq!(price2.to_f64(0).unwrap(), 2.); - - let price3 = Price::from_mantissa(123_456).unwrap(); - assert_eq!(price3.0.get(), 123_456); - assert_float_absolute_eq!(price3.to_f64(0).unwrap(), 123_456.); - - let price4 = Price::from_f64(42., 0).unwrap(); - assert_eq!(price4.0.get(), 42); - assert_float_absolute_eq!(price4.to_f64(0).unwrap(), 42.); -} - -#[test] -fn price_constructs_with_positive_exponent() { - let price = Price::parse_str("42_680_000", 3).unwrap(); - assert_eq!(price.0.get(), 42_680); - assert_float_absolute_eq!(price.to_f64(3).unwrap(), 42_680_000.); - - let price2 = Price::from_integer(200_000, 3).unwrap(); - assert_eq!(price2.0.get(), 200); - assert_float_absolute_eq!(price2.to_f64(3).unwrap(), 200_000.); - - let price3 = Price::from_mantissa(123_456).unwrap(); - assert_eq!(price3.0.get(), 123_456); - assert_float_absolute_eq!(price3.to_f64(3).unwrap(), 123_456_000.); - - let price4 = Price::from_f64(42_680_000., 3).unwrap(); - assert_eq!(price4.0.get(), 42_680); - assert_float_absolute_eq!(price4.to_f64(3).unwrap(), 42_680_000.); -} - -#[test] -fn price_rejects_zero_mantissa() { - Price::parse_str("0.0", -8).unwrap_err(); - Price::from_integer(0, -8).unwrap_err(); - Price::from_mantissa(0).unwrap_err(); - Price::from_f64(-0.0, -8).unwrap_err(); - - Price::parse_str("0.0", 8).unwrap_err(); - Price::from_integer(0, 8).unwrap_err(); - Price::from_f64(-0.0, 8).unwrap_err(); -} - -#[test] -fn price_rejects_too_precise() { - Price::parse_str("42.68", 0).unwrap_err(); - Price::parse_str("42.68", -1).unwrap_err(); - Price::parse_str("42.68", -2).unwrap(); - - Price::parse_str("42_680", 3).unwrap_err(); - Price::parse_str("42_600", 3).unwrap_err(); - Price::parse_str("42_000", 3).unwrap(); -} - -#[test] -fn price_ops() { - let price1 = Price::parse_str("12.34", -8).unwrap(); - let price2 = Price::parse_str("23.45", -8).unwrap(); - assert_float_absolute_eq!( - price1 - .add_with_same_mantissa(price2) - .unwrap() - .to_f64(-8) - .unwrap(), - 12.34 + 23.45 - ); - assert_float_absolute_eq!( - price1 - .sub_with_same_mantissa(price2) - .unwrap() - .to_f64(-8) - .unwrap(), - 12.34 - 23.45 - ); - assert_float_absolute_eq!( - price1.mul_integer(2).unwrap().to_f64(-8).unwrap(), - 12.34 * 2. - ); - assert_float_absolute_eq!( - price1.div_integer(2).unwrap().to_f64(-8).unwrap(), - 12.34 / 2. - ); - - assert_float_absolute_eq!( - price1.mul_decimal(3456, -2).unwrap().to_f64(-8).unwrap(), - 12.34 * 34.56 - ); - - let price2 = Price::parse_str("42_000", 3).unwrap(); - assert_float_absolute_eq!( - price2.mul_integer(2).unwrap().to_f64(3).unwrap(), - 42_000. * 2. - ); - assert_float_absolute_eq!( - price2.div_integer(2).unwrap().to_f64(3).unwrap(), - 42_000. / 2. - ); - assert_float_absolute_eq!( - price2.mul_decimal(3456, -2).unwrap().to_f64(3).unwrap(), - (42_000_f64 * 34.56 / 1000.).floor() * 1000. - ); -} diff --git a/lazer/sdk/rust/protocol/src/publisher.rs b/lazer/sdk/rust/protocol/src/publisher.rs index 74dc90cbd7..ebcee2c89c 100644 --- a/lazer/sdk/rust/protocol/src/publisher.rs +++ b/lazer/sdk/rust/protocol/src/publisher.rs @@ -1,5 +1,10 @@ +//! WebSocket JSON protocol types for API the publisher provides to the router. +//! Publisher data sourcing may also be implemented in the router process, +//! eliminating WebSocket overhead. + use { - crate::{price::Price, rate::Rate, time::TimestampUs, PriceFeedId}, + super::router::{Price, PriceFeedId, Rate}, + crate::time::TimestampUs, derive_more::derive::From, serde::{Deserialize, Serialize}, }; @@ -99,11 +104,9 @@ fn price_feed_data_v1_serde() { price_feed_id: PriceFeedId(1), source_timestamp_us: TimestampUs::from_micros(2), publisher_timestamp_us: TimestampUs::from_micros(3), - price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())), - best_bid_price: Some(Price::from_nonzero_mantissa(5.try_into().unwrap())), - best_ask_price: Some(Price::from_nonzero_mantissa( - (2 * 256 + 6).try_into().unwrap(), - )), + price: Some(Price(4.try_into().unwrap())), + best_bid_price: Some(Price(5.try_into().unwrap())), + best_ask_price: Some(Price((2 * 256 + 6).try_into().unwrap())), }; assert_eq!( bincode::deserialize::(&data).unwrap(), @@ -123,7 +126,7 @@ fn price_feed_data_v1_serde() { price_feed_id: PriceFeedId(1), source_timestamp_us: TimestampUs::from_micros(2), publisher_timestamp_us: TimestampUs::from_micros(3), - price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())), + price: Some(Price(4.try_into().unwrap())), best_bid_price: None, best_ask_price: None, }; @@ -150,11 +153,9 @@ fn price_feed_data_v2_serde() { price_feed_id: PriceFeedId(1), source_timestamp_us: TimestampUs::from_micros(2), publisher_timestamp_us: TimestampUs::from_micros(3), - price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())), - best_bid_price: Some(Price::from_nonzero_mantissa(5.try_into().unwrap())), - best_ask_price: Some(Price::from_nonzero_mantissa( - (2 * 256 + 6).try_into().unwrap(), - )), + price: Some(Price(4.try_into().unwrap())), + best_bid_price: Some(Price(5.try_into().unwrap())), + best_ask_price: Some(Price((2 * 256 + 6).try_into().unwrap())), funding_rate: None, }; assert_eq!( @@ -176,10 +177,10 @@ fn price_feed_data_v2_serde() { price_feed_id: PriceFeedId(1), source_timestamp_us: TimestampUs::from_micros(2), publisher_timestamp_us: TimestampUs::from_micros(3), - price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())), + price: Some(Price(4.try_into().unwrap())), best_bid_price: None, best_ask_price: None, - funding_rate: Some(Rate::from_mantissa(3 * 256 + 7)), + funding_rate: Some(Rate(3 * 256 + 7)), }; assert_eq!( bincode::deserialize::(&data2).unwrap(), diff --git a/lazer/sdk/rust/protocol/src/rate.rs b/lazer/sdk/rust/protocol/src/rate.rs deleted file mode 100644 index 43af27a1c7..0000000000 --- a/lazer/sdk/rust/protocol/src/rate.rs +++ /dev/null @@ -1,80 +0,0 @@ -#[cfg(test)] -mod tests; - -use { - crate::ExponentFactor, - rust_decimal::{prelude::FromPrimitive, Decimal}, - serde::{Deserialize, Serialize}, - thiserror::Error, -}; - -#[derive(Debug, Error)] -pub enum RateError { - #[error("decimal parse error: {0}")] - DecimalParse(#[from] rust_decimal::Error), - #[error("price value is more precise than available exponent")] - TooPrecise, - #[error("overflow")] - Overflow, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] -#[repr(transparent)] -pub struct Rate(i64); - -impl Rate { - pub fn from_integer(value: i64, exponent: i16) -> Result { - let value = match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? { - ExponentFactor::Mul(coef) => value.checked_mul(coef).ok_or(RateError::Overflow)?, - ExponentFactor::Div(coef) => value.checked_div(coef).ok_or(RateError::Overflow)?, - }; - Ok(Self(value)) - } - - pub fn parse_str(value: &str, exponent: i16) -> Result { - let value: Decimal = value.parse()?; - let value = match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? { - ExponentFactor::Mul(coef) => value - .checked_mul(Decimal::from_i64(coef).ok_or(RateError::Overflow)?) - .ok_or(RateError::Overflow)?, - ExponentFactor::Div(coef) => value - .checked_div(Decimal::from_i64(coef).ok_or(RateError::Overflow)?) - .ok_or(RateError::Overflow)?, - }; - if !value.is_integer() { - return Err(RateError::TooPrecise); - } - let value: i64 = value.try_into().map_err(|_| RateError::Overflow)?; - Ok(Self(value)) - } - - pub const fn from_mantissa(mantissa: i64) -> Self { - Self(mantissa) - } - - pub fn from_f64(value: f64, exponent: i16) -> Result { - let value = Decimal::from_f64(value).ok_or(RateError::Overflow)?; - let value = match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? { - ExponentFactor::Mul(coef) => value - .checked_mul(Decimal::from_i64(coef).ok_or(RateError::Overflow)?) - .ok_or(RateError::Overflow)?, - ExponentFactor::Div(coef) => value - .checked_div(Decimal::from_i64(coef).ok_or(RateError::Overflow)?) - .ok_or(RateError::Overflow)?, - }; - let value: i64 = value.try_into().map_err(|_| RateError::Overflow)?; - Ok(Self(value)) - } - - pub fn mantissa(self) -> i64 { - self.0 - } - - pub fn to_f64(self, exponent: i16) -> Result { - match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? { - // Mul/div is reversed for this conversion - ExponentFactor::Mul(coef) => Ok(self.0 as f64 / coef as f64), - ExponentFactor::Div(coef) => Ok(self.0 as f64 * coef as f64), - } - } -} diff --git a/lazer/sdk/rust/protocol/src/rate/tests.rs b/lazer/sdk/rust/protocol/src/rate/tests.rs deleted file mode 100644 index 26b49528ec..0000000000 --- a/lazer/sdk/rust/protocol/src/rate/tests.rs +++ /dev/null @@ -1,107 +0,0 @@ -use {crate::rate::Rate, assert_float_eq::assert_float_absolute_eq}; - -#[test] -fn rate_constructs() { - let rate = Rate::parse_str("42.68", -8).unwrap(); - assert_eq!(rate.0, 4_268_000_000); - assert_float_absolute_eq!(rate.to_f64(-8).unwrap(), 42.68); - - let rate2 = Rate::from_integer(2, -8).unwrap(); - assert_eq!(rate2.0, 200_000_000); - assert_float_absolute_eq!(rate2.to_f64(-8).unwrap(), 2.); - - let rate3 = Rate::from_mantissa(123_456); - assert_eq!(rate3.0, 123_456); - assert_float_absolute_eq!(rate3.to_f64(-8).unwrap(), 0.001_234_56); - - let rate4 = Rate::from_f64(42.68, -8).unwrap(); - assert_eq!(rate4.0, 4_268_000_000); - assert_float_absolute_eq!(rate4.to_f64(-8).unwrap(), 42.68); -} - -#[test] -fn rate_constructs_with_negative_mantissa() { - let rate = Rate::parse_str("-42.68", -8).unwrap(); - assert_eq!(rate.0, -4_268_000_000); - assert_float_absolute_eq!(rate.to_f64(-8).unwrap(), -42.68); - - let rate2 = Rate::from_integer(-2, -8).unwrap(); - assert_eq!(rate2.0, -200_000_000); - assert_float_absolute_eq!(rate2.to_f64(-8).unwrap(), -2.); - - let rate3 = Rate::from_mantissa(-123_456); - assert_eq!(rate3.0, -123_456); - assert_float_absolute_eq!(rate3.to_f64(-8).unwrap(), -0.001_234_56); - - let rate4 = Rate::from_f64(-42.68, -8).unwrap(); - assert_eq!(rate4.0, -4_268_000_000); - assert_float_absolute_eq!(rate4.to_f64(-8).unwrap(), -42.68); -} - -#[test] -fn rate_constructs_with_zero_exponent() { - let rate = Rate::parse_str("42", 0).unwrap(); - assert_eq!(rate.0, 42); - assert_float_absolute_eq!(rate.to_f64(0).unwrap(), 42.); - - let rate2 = Rate::from_integer(2, 0).unwrap(); - assert_eq!(rate2.0, 2); - assert_float_absolute_eq!(rate2.to_f64(0).unwrap(), 2.); - - let rate3 = Rate::from_mantissa(123_456); - assert_eq!(rate3.0, 123_456); - assert_float_absolute_eq!(rate3.to_f64(0).unwrap(), 123_456.); - - let rate4 = Rate::from_f64(42., 0).unwrap(); - assert_eq!(rate4.0, 42); - assert_float_absolute_eq!(rate4.to_f64(0).unwrap(), 42.); -} - -#[test] -fn rate_constructs_with_zero_mantissa() { - let rate1 = Rate::parse_str("0.0", -8).unwrap(); - assert_eq!(rate1.0, 0); - let rate2 = Rate::from_integer(0, -8).unwrap(); - assert_eq!(rate2.0, 0); - let rate3 = Rate::from_mantissa(0); - assert_eq!(rate3.0, 0); - let rate4 = Rate::from_f64(-0.0, -8).unwrap(); - assert_eq!(rate4.0, 0); - - let rate1 = Rate::parse_str("0.0", 8).unwrap(); - assert_eq!(rate1.0, 0); - let rate2 = Rate::from_integer(0, 8).unwrap(); - assert_eq!(rate2.0, 0); - let rate4 = Rate::from_f64(-0.0, 8).unwrap(); - assert_eq!(rate4.0, 0); -} - -#[test] -fn rate_constructs_with_positive_exponent() { - let rate = Rate::parse_str("42_680_000", 3).unwrap(); - assert_eq!(rate.0, 42_680); - assert_float_absolute_eq!(rate.to_f64(3).unwrap(), 42_680_000.); - - let rate2 = Rate::from_integer(200_000, 3).unwrap(); - assert_eq!(rate2.0, 200); - assert_float_absolute_eq!(rate2.to_f64(3).unwrap(), 200_000.); - - let rate3 = Rate::from_mantissa(123_456); - assert_eq!(rate3.0, 123_456); - assert_float_absolute_eq!(rate3.to_f64(3).unwrap(), 123_456_000.); - - let rate4 = Rate::from_f64(42_680_000., 3).unwrap(); - assert_eq!(rate4.0, 42_680); - assert_float_absolute_eq!(rate4.to_f64(3).unwrap(), 42_680_000.); -} - -#[test] -fn rate_rejects_too_precise() { - Rate::parse_str("42.68", 0).unwrap_err(); - Rate::parse_str("42.68", -1).unwrap_err(); - Rate::parse_str("42.68", -2).unwrap(); - - Rate::parse_str("42_680", 3).unwrap_err(); - Rate::parse_str("42_600", 3).unwrap_err(); - Rate::parse_str("42_000", 3).unwrap(); -} diff --git a/lazer/sdk/rust/protocol/src/router.rs b/lazer/sdk/rust/protocol/src/router.rs new file mode 100644 index 0000000000..e995380e8b --- /dev/null +++ b/lazer/sdk/rust/protocol/src/router.rs @@ -0,0 +1,605 @@ +//! WebSocket JSON protocol types for the API the router provides to consumers and publishers. + +use { + crate::{ + payload::AggregatedPriceFeedData, + time::{DurationUs, TimestampUs}, + }, + anyhow::{bail, Context}, + derive_more::derive::{From, Into}, + itertools::Itertools, + protobuf::well_known_types::duration::Duration as ProtobufDuration, + rust_decimal::{prelude::FromPrimitive, Decimal}, + serde::{de::Error, Deserialize, Serialize}, + std::{ + fmt::Display, + num::NonZeroI64, + ops::{Add, Deref, DerefMut, Div, Sub}, + }, +}; + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, +)] +pub struct PublisherId(pub u16); + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, +)] +pub struct PriceFeedId(pub u32); + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, +)] +pub struct ChannelId(pub u8); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[repr(transparent)] +pub struct Rate(pub i64); + +impl Rate { + pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result { + let value: Decimal = value.parse()?; + let coef = 10i64.checked_pow(exponent).context("overflow")?; + let coef = Decimal::from_i64(coef).context("overflow")?; + let value = value.checked_mul(coef).context("overflow")?; + if !value.is_integer() { + bail!("price value is more precise than available exponent"); + } + let value: i64 = value.try_into().context("overflow")?; + Ok(Self(value)) + } + + pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result { + let value = Decimal::from_f64(value).context("overflow")?; + let coef = 10i64.checked_pow(exponent).context("overflow")?; + let coef = Decimal::from_i64(coef).context("overflow")?; + let value = value.checked_mul(coef).context("overflow")?; + let value: i64 = value.try_into().context("overflow")?; + Ok(Self(value)) + } + + pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result { + let coef = 10i64.checked_pow(exponent).context("overflow")?; + let value = value.checked_mul(coef).context("overflow")?; + Ok(Self(value)) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[repr(transparent)] +pub struct Price(pub NonZeroI64); + +impl Price { + pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result { + let coef = 10i64.checked_pow(exponent).context("overflow")?; + let value = value.checked_mul(coef).context("overflow")?; + let value = NonZeroI64::new(value).context("zero price is unsupported")?; + Ok(Self(value)) + } + + pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result { + let value: Decimal = value.parse()?; + let coef = 10i64.checked_pow(exponent).context("overflow")?; + let coef = Decimal::from_i64(coef).context("overflow")?; + let value = value.checked_mul(coef).context("overflow")?; + if !value.is_integer() { + bail!("price value is more precise than available exponent"); + } + let value: i64 = value.try_into().context("overflow")?; + let value = NonZeroI64::new(value).context("zero price is unsupported")?; + Ok(Self(value)) + } + + pub fn new(value: i64) -> anyhow::Result { + let value = NonZeroI64::new(value).context("zero price is unsupported")?; + Ok(Self(value)) + } + + pub fn into_inner(self) -> NonZeroI64 { + self.0 + } + + pub fn to_f64(self, exponent: u32) -> anyhow::Result { + Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64) + } + + pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result { + let value = (value * 10f64.powi(exponent as i32)) as i64; + let value = NonZeroI64::new(value).context("zero price is unsupported")?; + Ok(Self(value)) + } + + pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result { + let left_value = i128::from(self.0.get()); + let right_value = i128::from(rhs.0.get()); + + let value = left_value * right_value / 10i128.pow(rhs_exponent); + let value = value.try_into()?; + NonZeroI64::new(value) + .context("zero price is unsupported") + .map(Self) + } +} + +impl Sub for Price { + type Output = Option; + + fn sub(self, rhs: i64) -> Self::Output { + let value = self.0.get().saturating_sub(rhs); + NonZeroI64::new(value).map(Self) + } +} + +impl Add for Price { + type Output = Option; + + fn add(self, rhs: i64) -> Self::Output { + let value = self.0.get().saturating_add(rhs); + NonZeroI64::new(value).map(Self) + } +} + +impl Add for Price { + type Output = Option; + fn add(self, rhs: Price) -> Self::Output { + let value = self.0.get().saturating_add(rhs.0.get()); + NonZeroI64::new(value).map(Self) + } +} + +impl Sub for Price { + type Output = Option; + fn sub(self, rhs: Price) -> Self::Output { + let value = self.0.get().saturating_sub(rhs.0.get()); + NonZeroI64::new(value).map(Self) + } +} + +impl Div for Price { + type Output = Option; + fn div(self, rhs: i64) -> Self::Output { + let value = self.0.get().saturating_div(rhs); + NonZeroI64::new(value).map(Self) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum PriceFeedProperty { + Price, + BestBidPrice, + BestAskPrice, + PublisherCount, + Exponent, + Confidence, + FundingRate, + FundingTimestamp, + FundingRateInterval, + // More fields may be added later. +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum DeliveryFormat { + /// Deliver stream updates as JSON text messages. + #[default] + Json, + /// Deliver stream updates as binary messages. + Binary, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Format { + Evm, + Solana, + LeEcdsa, + LeUnsigned, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum JsonBinaryEncoding { + #[default] + Base64, + Hex, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)] +pub enum Channel { + FixedRate(FixedRate), +} + +impl Serialize for Channel { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + Channel::FixedRate(fixed_rate) => { + if *fixed_rate == FixedRate::MIN { + return serializer.serialize_str("real_time"); + } + serializer.serialize_str(&format!( + "fixed_rate@{}ms", + fixed_rate.duration().as_millis() + )) + } + } + } +} + +pub mod channel_ids { + use super::ChannelId; + + pub const FIXED_RATE_1: ChannelId = ChannelId(1); + pub const FIXED_RATE_50: ChannelId = ChannelId(2); + pub const FIXED_RATE_200: ChannelId = ChannelId(3); +} + +impl Display for Channel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Channel::FixedRate(fixed_rate) => match *fixed_rate { + FixedRate::MIN => write!(f, "real_time"), + rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()), + }, + } + } +} + +impl Channel { + pub fn id(&self) -> ChannelId { + match self { + Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() { + 1 => channel_ids::FIXED_RATE_1, + 50 => channel_ids::FIXED_RATE_50, + 200 => channel_ids::FIXED_RATE_200, + _ => panic!("unknown channel: {self:?}"), + }, + } + } +} + +#[test] +fn id_supports_all_fixed_rates() { + for rate in FixedRate::ALL { + Channel::FixedRate(rate).id(); + } +} + +fn parse_channel(value: &str) -> Option { + if value == "real_time" { + Some(Channel::FixedRate(FixedRate::MIN)) + } else if let Some(rest) = value.strip_prefix("fixed_rate@") { + let ms_value = rest.strip_suffix("ms")?; + Some(Channel::FixedRate(FixedRate::from_millis( + ms_value.parse().ok()?, + )?)) + } else { + None + } +} + +impl<'de> Deserialize<'de> for Channel { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let value = ::deserialize(deserializer)?; + parse_channel(&value).ok_or_else(|| Error::custom("unknown channel")) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct FixedRate { + rate: DurationUs, +} + +impl FixedRate { + pub const RATE_1_MS: Self = Self { + rate: DurationUs::from_millis_u32(1), + }; + pub const RATE_50_MS: Self = Self { + rate: DurationUs::from_millis_u32(50), + }; + pub const RATE_200_MS: Self = Self { + rate: DurationUs::from_millis_u32(200), + }; + + // Assumptions (tested below): + // - Values are sorted. + // - 1 second contains a whole number of each interval. + // - all intervals are divisable by the smallest interval. + pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS]; + pub const MIN: Self = Self::ALL[0]; + + pub fn from_millis(millis: u32) -> Option { + Self::ALL + .into_iter() + .find(|v| v.rate.as_millis() == u64::from(millis)) + } + + pub fn duration(self) -> DurationUs { + self.rate + } +} + +impl TryFrom for FixedRate { + type Error = anyhow::Error; + + fn try_from(value: DurationUs) -> Result { + Self::ALL + .into_iter() + .find(|v| v.rate == value) + .with_context(|| format!("unsupported rate: {value:?}")) + } +} + +impl TryFrom<&ProtobufDuration> for FixedRate { + type Error = anyhow::Error; + + fn try_from(value: &ProtobufDuration) -> Result { + let duration = DurationUs::try_from(value)?; + Self::try_from(duration) + } +} + +impl TryFrom for FixedRate { + type Error = anyhow::Error; + + fn try_from(duration: ProtobufDuration) -> anyhow::Result { + TryFrom::<&ProtobufDuration>::try_from(&duration) + } +} + +impl From for DurationUs { + fn from(value: FixedRate) -> Self { + value.rate + } +} + +impl From for ProtobufDuration { + fn from(value: FixedRate) -> Self { + value.rate.into() + } +} + +#[test] +fn fixed_rate_values() { + assert!( + FixedRate::ALL.windows(2).all(|w| w[0] < w[1]), + "values must be unique and sorted" + ); + for value in FixedRate::ALL { + assert_eq!( + 1_000_000 % value.duration().as_micros(), + 0, + "1 s must contain whole number of intervals" + ); + assert_eq!( + value.duration().as_micros() % FixedRate::MIN.duration().as_micros(), + 0, + "the interval's borders must be a subset of the minimal interval's borders" + ); + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscriptionParamsRepr { + pub price_feed_ids: Vec, + pub properties: Vec, + // "chains" was renamed to "formats". "chains" is still supported for compatibility. + #[serde(alias = "chains")] + pub formats: Vec, + #[serde(default)] + pub delivery_format: DeliveryFormat, + #[serde(default)] + pub json_binary_encoding: JsonBinaryEncoding, + /// If `true`, the stream update will contain a `parsed` JSON field containing + /// all data of the update. + #[serde(default = "default_parsed")] + pub parsed: bool, + pub channel: Channel, + #[serde(default)] + pub ignore_invalid_feed_ids: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscriptionParams(SubscriptionParamsRepr); + +impl<'de> Deserialize<'de> for SubscriptionParams { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let value = SubscriptionParamsRepr::deserialize(deserializer)?; + Self::new(value).map_err(Error::custom) + } +} + +impl SubscriptionParams { + pub fn new(value: SubscriptionParamsRepr) -> Result { + if value.price_feed_ids.is_empty() { + return Err("no price feed ids specified"); + } + if !value.price_feed_ids.iter().all_unique() { + return Err("duplicate price feed ids specified"); + } + if !value.formats.iter().all_unique() { + return Err("duplicate formats or chains specified"); + } + if value.properties.is_empty() { + return Err("no properties specified"); + } + if !value.properties.iter().all_unique() { + return Err("duplicate properties specified"); + } + Ok(Self(value)) + } +} + +impl Deref for SubscriptionParams { + type Target = SubscriptionParamsRepr; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for SubscriptionParams { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +pub fn default_parsed() -> bool { + true +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JsonBinaryData { + pub encoding: JsonBinaryEncoding, + pub data: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JsonUpdate { + /// Present unless `parsed = false` is specified in subscription params. + #[serde(skip_serializing_if = "Option::is_none")] + pub parsed: Option, + /// Only present if `Evm` is present in `formats` in subscription params. + #[serde(skip_serializing_if = "Option::is_none")] + pub evm: Option, + /// Only present if `Solana` is present in `formats` in subscription params. + #[serde(skip_serializing_if = "Option::is_none")] + pub solana: Option, + /// Only present if `LeEcdsa` is present in `formats` in subscription params. + #[serde(skip_serializing_if = "Option::is_none")] + pub le_ecdsa: Option, + /// Only present if `LeUnsigned` is present in `formats` in subscription params. + #[serde(skip_serializing_if = "Option::is_none")] + pub le_unsigned: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ParsedPayload { + #[serde(with = "crate::serde_str::timestamp")] + pub timestamp_us: TimestampUs, + pub price_feeds: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ParsedFeedPayload { + pub price_feed_id: PriceFeedId, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "crate::serde_str::option_price")] + #[serde(default)] + pub price: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "crate::serde_str::option_price")] + #[serde(default)] + pub best_bid_price: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "crate::serde_str::option_price")] + #[serde(default)] + pub best_ask_price: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub publisher_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub exponent: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub confidence: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub funding_rate: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub funding_timestamp: Option, + // More fields may be added later. + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub funding_rate_interval: Option, +} + +impl ParsedFeedPayload { + pub fn new( + price_feed_id: PriceFeedId, + exponent: Option, + data: &AggregatedPriceFeedData, + properties: &[PriceFeedProperty], + ) -> Self { + let mut output = Self { + price_feed_id, + price: None, + best_bid_price: None, + best_ask_price: None, + publisher_count: None, + exponent: None, + confidence: None, + funding_rate: None, + funding_timestamp: None, + funding_rate_interval: None, + }; + for &property in properties { + match property { + PriceFeedProperty::Price => { + output.price = data.price; + } + PriceFeedProperty::BestBidPrice => { + output.best_bid_price = data.best_bid_price; + } + PriceFeedProperty::BestAskPrice => { + output.best_ask_price = data.best_ask_price; + } + PriceFeedProperty::PublisherCount => { + output.publisher_count = Some(data.publisher_count); + } + PriceFeedProperty::Exponent => { + output.exponent = exponent; + } + PriceFeedProperty::Confidence => { + output.confidence = data.confidence; + } + PriceFeedProperty::FundingRate => { + output.funding_rate = data.funding_rate; + } + PriceFeedProperty::FundingTimestamp => { + output.funding_timestamp = data.funding_timestamp; + } + PriceFeedProperty::FundingRateInterval => { + output.funding_rate_interval = data.funding_rate_interval; + } + } + } + output + } + + pub fn new_full( + price_feed_id: PriceFeedId, + exponent: Option, + data: &AggregatedPriceFeedData, + ) -> Self { + Self { + price_feed_id, + price: data.price, + best_bid_price: data.best_bid_price, + best_ask_price: data.best_ask_price, + publisher_count: Some(data.publisher_count), + exponent, + confidence: data.confidence, + funding_rate: data.funding_rate, + funding_timestamp: data.funding_timestamp, + funding_rate_interval: data.funding_rate_interval, + } + } +} diff --git a/lazer/sdk/rust/protocol/src/serde_price_as_i64.rs b/lazer/sdk/rust/protocol/src/serde_price_as_i64.rs index 1fea60cc8a..b2830c6d9f 100644 --- a/lazer/sdk/rust/protocol/src/serde_price_as_i64.rs +++ b/lazer/sdk/rust/protocol/src/serde_price_as_i64.rs @@ -1,5 +1,5 @@ use { - crate::price::Price, + crate::router::Price, serde::{Deserialize, Deserializer, Serialize, Serializer}, std::num::NonZeroI64, }; @@ -9,7 +9,7 @@ where S: Serializer, { value - .map_or(0i64, |price| price.mantissa_i64()) + .map_or(0i64, |price| price.0.get()) .serialize(serializer) } @@ -18,5 +18,5 @@ where D: Deserializer<'de>, { let value = i64::deserialize(deserializer)?; - Ok(NonZeroI64::new(value).map(Price::from_nonzero_mantissa)) + Ok(NonZeroI64::new(value).map(Price)) } diff --git a/lazer/sdk/rust/protocol/src/serde_str.rs b/lazer/sdk/rust/protocol/src/serde_str.rs index cb3b27af52..1446fb4332 100644 --- a/lazer/sdk/rust/protocol/src/serde_str.rs +++ b/lazer/sdk/rust/protocol/src/serde_str.rs @@ -1,6 +1,6 @@ pub mod option_price { use { - crate::price::Price, + crate::router::Price, serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}, std::num::NonZeroI64, }; @@ -10,7 +10,7 @@ pub mod option_price { S: Serializer, { value - .map(|price| price.mantissa_i64().to_string()) + .map(|price| price.0.get().to_string()) .serialize(serializer) } @@ -22,7 +22,7 @@ pub mod option_price { if let Some(value) = value { let value: i64 = value.parse().map_err(D::Error::custom)?; let value = NonZeroI64::new(value).ok_or_else(|| D::Error::custom("zero price"))?; - Ok(Some(Price::from_nonzero_mantissa(value))) + Ok(Some(Price(value))) } else { Ok(None) } diff --git a/lazer/sdk/rust/protocol/src/subscription.rs b/lazer/sdk/rust/protocol/src/subscription.rs new file mode 100644 index 0000000000..c773283839 --- /dev/null +++ b/lazer/sdk/rust/protocol/src/subscription.rs @@ -0,0 +1,103 @@ +//! Types descibing general WebSocket subscription/unsubscription JSON messages +//! used across publishers, agents and routers. + +use { + crate::router::{JsonUpdate, PriceFeedId, SubscriptionParams}, + derive_more::From, + serde::{Deserialize, Serialize}, +}; + +/// A request sent from the client to the server. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(tag = "type")] +#[serde(rename_all = "camelCase")] +pub enum Request { + Subscribe(SubscribeRequest), + Unsubscribe(UnsubscribeRequest), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub struct SubscriptionId(pub u64); + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscribeRequest { + pub subscription_id: SubscriptionId, + #[serde(flatten)] + pub params: SubscriptionParams, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UnsubscribeRequest { + pub subscription_id: SubscriptionId, +} + +/// A JSON response sent from the server to the client. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, From)] +#[serde(tag = "type")] +#[serde(rename_all = "camelCase")] +pub enum Response { + Error(ErrorResponse), + Subscribed(SubscribedResponse), + SubscribedWithInvalidFeedIdsIgnored(SubscribedWithInvalidFeedIdsIgnoredResponse), + Unsubscribed(UnsubscribedResponse), + SubscriptionError(SubscriptionErrorResponse), + StreamUpdated(StreamUpdatedResponse), +} + +/// Sent from the server after a successul subscription. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscribedResponse { + pub subscription_id: SubscriptionId, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct InvalidFeedSubscriptionDetails { + pub unknown_ids: Vec, + pub unsupported_channels: Vec, + pub unstable: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscribedWithInvalidFeedIdsIgnoredResponse { + pub subscription_id: SubscriptionId, + pub subscribed_feed_ids: Vec, + pub ignored_invalid_feed_ids: InvalidFeedSubscriptionDetails, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UnsubscribedResponse { + pub subscription_id: SubscriptionId, +} + +/// Sent from the server if the requested subscription or unsubscription request +/// could not be fulfilled. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscriptionErrorResponse { + pub subscription_id: SubscriptionId, + pub error: String, +} + +/// Sent from the server if an internal error occured while serving data for an existing subscription, +/// or a client request sent a bad request. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ErrorResponse { + pub error: String, +} + +/// Sent from the server when new data is available for an existing subscription +/// (only if `delivery_format == Json`). +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct StreamUpdatedResponse { + pub subscription_id: SubscriptionId, + #[serde(flatten)] + pub payload: JsonUpdate, +} diff --git a/lazer/sdk/rust/protocol/src/time.rs b/lazer/sdk/rust/protocol/src/time.rs index 8362d5a6c9..5fbc6956c4 100644 --- a/lazer/sdk/rust/protocol/src/time.rs +++ b/lazer/sdk/rust/protocol/src/time.rs @@ -486,97 +486,3 @@ pub mod duration_us_serde_humantime { value.into_inner().try_into().map_err(D::Error::custom) } } - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct FixedRate { - rate: DurationUs, -} - -impl FixedRate { - pub const RATE_1_MS: Self = Self { - rate: DurationUs::from_millis_u32(1), - }; - pub const RATE_50_MS: Self = Self { - rate: DurationUs::from_millis_u32(50), - }; - pub const RATE_200_MS: Self = Self { - rate: DurationUs::from_millis_u32(200), - }; - - // Assumptions (tested below): - // - Values are sorted. - // - 1 second contains a whole number of each interval. - // - all intervals are divisable by the smallest interval. - pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS]; - pub const MIN: Self = Self::ALL[0]; - - pub fn from_millis(millis: u32) -> Option { - Self::ALL - .into_iter() - .find(|v| v.rate.as_millis() == u64::from(millis)) - } - - pub fn duration(self) -> DurationUs { - self.rate - } -} - -impl TryFrom for FixedRate { - type Error = anyhow::Error; - - fn try_from(value: DurationUs) -> Result { - Self::ALL - .into_iter() - .find(|v| v.rate == value) - .with_context(|| format!("unsupported rate: {value:?}")) - } -} - -impl TryFrom<&ProtobufDuration> for FixedRate { - type Error = anyhow::Error; - - fn try_from(value: &ProtobufDuration) -> Result { - let duration = DurationUs::try_from(value)?; - Self::try_from(duration) - } -} - -impl TryFrom for FixedRate { - type Error = anyhow::Error; - - fn try_from(duration: ProtobufDuration) -> anyhow::Result { - TryFrom::<&ProtobufDuration>::try_from(&duration) - } -} - -impl From for DurationUs { - fn from(value: FixedRate) -> Self { - value.rate - } -} - -impl From for ProtobufDuration { - fn from(value: FixedRate) -> Self { - value.rate.into() - } -} - -#[test] -fn fixed_rate_values() { - assert!( - FixedRate::ALL.windows(2).all(|w| w[0] < w[1]), - "values must be unique and sorted" - ); - for value in FixedRate::ALL { - assert_eq!( - 1_000_000 % value.duration().as_micros(), - 0, - "1 s must contain whole number of intervals" - ); - assert_eq!( - value.duration().as_micros() % FixedRate::MIN.duration().as_micros(), - 0, - "the interval's borders must be a subset of the minimal interval's borders" - ); - } -}