diff --git a/apps/hermes/server/Cargo.lock b/apps/hermes/server/Cargo.lock index 00414c68f8..d2b8507cd5 100644 --- a/apps/hermes/server/Cargo.lock +++ b/apps/hermes/server/Cargo.lock @@ -1880,7 +1880,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermes" -version = "0.10.4" +version = "0.10.5" dependencies = [ "anyhow", "async-trait", diff --git a/apps/hermes/server/Cargo.toml b/apps/hermes/server/Cargo.toml index a21455ec94..ae002fa959 100644 --- a/apps/hermes/server/Cargo.toml +++ b/apps/hermes/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.10.4" +version = "0.10.5" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/apps/hermes/server/src/api/metrics_middleware.rs b/apps/hermes/server/src/api/metrics_middleware.rs index aef971e48c..9d63901607 100644 --- a/apps/hermes/server/src/api/metrics_middleware.rs +++ b/apps/hermes/server/src/api/metrics_middleware.rs @@ -18,6 +18,7 @@ use { pub struct ApiMetrics { pub requests: Family, pub latencies: Family, + pub sse_broadcast_latency: Histogram, } impl ApiMetrics { @@ -36,11 +37,18 @@ impl ApiMetrics { .into_iter(), ) }), + sse_broadcast_latency: Histogram::new( + [ + 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ), }; { let requests = new.requests.clone(); let latencies = new.latencies.clone(); + let sse_broadcast_latency = new.sse_broadcast_latency.clone(); tokio::spawn(async move { Metrics::register( @@ -58,6 +66,16 @@ impl ApiMetrics { ), ) .await; + + Metrics::register( + &*state, + ( + "sse_broadcast_latency_seconds", + "Latency from Hermes receive_time to SSE send in seconds", + sse_broadcast_latency, + ), + ) + .await; }); } diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index e67f7b1825..5be1e902d4 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -206,6 +206,20 @@ where return Ok(None); } + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + for pu in &parsed_price_updates { + if let Some(receive_time) = pu.metadata.proof_available_time { + let latency = now_secs - (receive_time as f64); + state + .metrics + .sse_broadcast_latency + .observe(latency.max(0.0)); + } + } + let price_update_data = price_feeds_with_update_data.update_data; let encoded_data: Vec = price_update_data .into_iter() diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index 2d1098f6c0..6f4c9bae86 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -85,6 +85,7 @@ pub struct Labels { pub struct WsMetrics { pub interactions: Family, + pub broadcast_latency: prometheus_client::metrics::histogram::Histogram, } impl WsMetrics { @@ -95,10 +96,17 @@ impl WsMetrics { { let new = Self { interactions: Family::default(), + broadcast_latency: prometheus_client::metrics::histogram::Histogram::new( + [ + 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ), }; { let interactions = new.interactions.clone(); + let ws_broadcast_latency = new.broadcast_latency.clone(); tokio::spawn(async move { Metrics::register( @@ -110,6 +118,16 @@ impl WsMetrics { ), ) .await; + + Metrics::register( + &*state, + ( + "ws_broadcast_latency_seconds", + "Latency from Hermes receive_time to WS send in seconds", + ws_broadcast_latency, + ), + ) + .await; }); } @@ -401,6 +419,13 @@ where } }; + // Capture the minimum receive_time from the updates batch + let min_received_at = updates + .price_feeds + .iter() + .filter_map(|update| update.received_at) + .min(); + for update in updates.price_feeds { let config = self .price_feeds_with_config @@ -480,6 +505,21 @@ where } self.sender.flush().await?; + + // Record latency from receive to ws send after flushing + if let Some(min_received_at) = min_received_at { + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + // Histogram only accepts f64. The conversion is safe (never panics), but very large values lose precision. + let latency = now_secs - (min_received_at as f64); + self.ws_state + .metrics + .broadcast_latency + .observe(latency.max(0.0)); + } + Ok(()) } diff --git a/apps/hermes/server/src/state/aggregate.rs b/apps/hermes/server/src/state/aggregate.rs index 01e0f1e36d..314b0e6864 100644 --- a/apps/hermes/server/src/state/aggregate.rs +++ b/apps/hermes/server/src/state/aggregate.rs @@ -367,6 +367,16 @@ where // we can build the message states let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?; + { + let mut data = self.into().data.write().await; + for ms in &message_states { + let publish = ms.message.publish_time(); + let receive = ms.received_at; + let latency = receive - publish; + data.metrics.observe_publish_to_receive(latency); + } + } + let message_state_keys = message_states .iter() .map(|message_state| message_state.key()) diff --git a/apps/hermes/server/src/state/aggregate/metrics.rs b/apps/hermes/server/src/state/aggregate/metrics.rs index 77009f7ecd..d1c3b05af0 100644 --- a/apps/hermes/server/src/state/aggregate/metrics.rs +++ b/apps/hermes/server/src/state/aggregate/metrics.rs @@ -34,6 +34,7 @@ struct ObservedSlotLabels { pub struct Metrics { observed_slot: Family, observed_slot_latency: Family, + publish_to_receive_latency: Histogram, first_observed_time_of_slot: BTreeMap, newest_observed_slot: HashMap, } @@ -50,6 +51,12 @@ impl Metrics { .into_iter(), ) }), + publish_to_receive_latency: Histogram::new( + [ + 0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ), first_observed_time_of_slot: BTreeMap::new(), newest_observed_slot: HashMap::new(), }; @@ -69,11 +76,25 @@ impl Metrics { "Latency of observed slots in seconds", observed_slot_latency, ); + + metrics_registry.register( + "publish_to_receive_latency_seconds", + "Latency from message publish_time to Hermes receive_time in seconds", + new.publish_to_receive_latency.clone(), + ); } new } + pub fn observe_publish_to_receive(&mut self, latency_secs: i64) { + // Histogram only accepts f64. The conversion is safe (never panics), but very large values lose precision. + let latency_secs = latency_secs as f64; + if latency_secs.is_finite() && latency_secs >= 0.0 { + self.publish_to_receive_latency.observe(latency_secs); + } + } + /// Observe a slot and event. An event at a slot should be observed only once. pub fn observe(&mut self, slot: Slot, event: Event) { let order = if self