Skip to content

feat(hermes): add staleness metrics #2978

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.10.4"
version = "0.10.5"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
18 changes: 18 additions & 0 deletions apps/hermes/server/src/api/metrics_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use {
pub struct ApiMetrics {
pub requests: Family<Labels, Counter>,
pub latencies: Family<Labels, Histogram>,
pub sse_broadcast_latency: Histogram,
}

impl ApiMetrics {
Expand All @@ -36,11 +37,18 @@ impl ApiMetrics {
.into_iter(),
)
}),
sse_broadcast_latency: Histogram::new(
[
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
]
.into_iter(),
),
};

{
let requests = new.requests.clone();
let latencies = new.latencies.clone();
let sse_broadcast_latency = new.sse_broadcast_latency.clone();

tokio::spawn(async move {
Metrics::register(
Expand All @@ -58,6 +66,16 @@ impl ApiMetrics {
),
)
.await;

Metrics::register(
&*state,
(
"sse_broadcast_latency_seconds",
"Latency from Hermes receive_time to SSE send in seconds",
sse_broadcast_latency,
),
)
.await;
});
}

Expand Down
14 changes: 14 additions & 0 deletions apps/hermes/server/src/api/rest/v2/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,20 @@ where
return Ok(None);
}

let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0);
for pu in &parsed_price_updates {
if let Some(receive_time) = pu.metadata.proof_available_time {
let latency = now_secs - (receive_time as f64);
state
.metrics
.sse_broadcast_latency
.observe(latency.max(0.0));
}
}

let price_update_data = price_feeds_with_update_data.update_data;
let encoded_data: Vec<String> = price_update_data
.into_iter()
Expand Down
40 changes: 40 additions & 0 deletions apps/hermes/server/src/api/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub struct Labels {

pub struct WsMetrics {
pub interactions: Family<Labels, Counter>,
pub broadcast_latency: prometheus_client::metrics::histogram::Histogram,
}

impl WsMetrics {
Expand All @@ -95,10 +96,17 @@ impl WsMetrics {
{
let new = Self {
interactions: Family::default(),
broadcast_latency: prometheus_client::metrics::histogram::Histogram::new(
[
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
]
.into_iter(),
),
};

{
let interactions = new.interactions.clone();
let ws_broadcast_latency = new.broadcast_latency.clone();

tokio::spawn(async move {
Metrics::register(
Expand All @@ -110,6 +118,16 @@ impl WsMetrics {
),
)
.await;

Metrics::register(
&*state,
(
"ws_broadcast_latency_seconds",
"Latency from Hermes receive_time to WS send in seconds",
ws_broadcast_latency,
),
)
.await;
});
}

Expand Down Expand Up @@ -401,6 +419,13 @@ where
}
};

// Capture the minimum receive_time from the updates batch
let min_received_at = updates
.price_feeds
.iter()
.filter_map(|update| update.received_at)
.min();

for update in updates.price_feeds {
let config = self
.price_feeds_with_config
Expand Down Expand Up @@ -480,6 +505,21 @@ where
}

self.sender.flush().await?;

// Record latency from receive to ws send after flushing
if let Some(min_received_at) = min_received_at {
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0);
// Histogram only accepts f64. The conversion is safe (never panics), but very large values lose precision.
let latency = now_secs - (min_received_at as f64);
self.ws_state
.metrics
.broadcast_latency
.observe(latency.max(0.0));
}

Ok(())
}

Expand Down
10 changes: 10 additions & 0 deletions apps/hermes/server/src/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,16 @@ where
// we can build the message states
let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?;

{
let mut data = self.into().data.write().await;
for ms in &message_states {
let publish = ms.message.publish_time();
let receive = ms.received_at;
let latency = receive - publish;
data.metrics.observe_publish_to_receive(latency);
}
}

let message_state_keys = message_states
.iter()
.map(|message_state| message_state.key())
Expand Down
21 changes: 21 additions & 0 deletions apps/hermes/server/src/state/aggregate/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct ObservedSlotLabels {
pub struct Metrics {
observed_slot: Family<ObservedSlotLabels, Counter>,
observed_slot_latency: Family<ObservedSlotLabels, Histogram>,
publish_to_receive_latency: Histogram,
first_observed_time_of_slot: BTreeMap<Slot, Instant>,
newest_observed_slot: HashMap<Event, Slot>,
}
Expand All @@ -50,6 +51,12 @@ impl Metrics {
.into_iter(),
)
}),
publish_to_receive_latency: Histogram::new(
[
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
]
.into_iter(),
),
first_observed_time_of_slot: BTreeMap::new(),
newest_observed_slot: HashMap::new(),
};
Expand All @@ -69,11 +76,25 @@ impl Metrics {
"Latency of observed slots in seconds",
observed_slot_latency,
);

metrics_registry.register(
"publish_to_receive_latency_seconds",
"Latency from message publish_time to Hermes receive_time in seconds",
new.publish_to_receive_latency.clone(),
);
}

new
}

pub fn observe_publish_to_receive(&mut self, latency_secs: i64) {
// Histogram only accepts f64. The conversion is safe (never panics), but very large values lose precision.
let latency_secs = latency_secs as f64;
if latency_secs.is_finite() && latency_secs >= 0.0 {
self.publish_to_receive_latency.observe(latency_secs);
}
}

/// Observe a slot and event. An event at a slot should be observed only once.
pub fn observe(&mut self, slot: Slot, event: Event) {
let order = if self
Expand Down
Loading