Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"
anyhow = "1.0.98"
borsh = "0.9.3"
clap = { version = "4.5.39", features = ["derive", "env"] }
futures = "0.3.31"
hex = { version = "0.4.3", features = ["serde"] }
prost = "0.14.1"
reqwest = { version = "0.12.19", features = ["json"] }
Expand Down
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ Make sure to set `RUST_LOG=INFO` to enable logs from tracing:
```bash
RUST_LOG=INFO cargo run -- run \
--pythnet-url wss://api2.pythnet.pyth.network \
--server-url https://quorum.pyth.network \
--server-url https://quorum-green.pyth.network \
--server-url https://quorum-yellow.pyth.network \
--server-url https://quorum-cyan.pyth.network \
--secret-key /path/to/secret.key \
--wormhole-pid H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU
```

You can specify multiple `--server-url` flags to broadcast observations to more than one server.


---

### 🌱 Environment Variables (Optional)
Expand All @@ -41,14 +46,16 @@ Instead of CLI flags, you can also set environment variables:

```bash
export PYTHNET_URL=wss://api2.pythnet.pyth.network
export SERVER_URL=https://quorum.pyth.network
export SERVER_URL=https://quorum-green.pyth.network,https://quorum-yellow.pyth.network,https://quorum-cyan.pyth.network
export SECRET_KEY=/path/to/secret.key
export WORMHOLE_PID=H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU
export RUST_LOG=INFO

cargo run
```

You can provide multiple server URLs in the `SERVER_URL` environment variable by separating them with commas.

---

### 🔑 Generate a Secret Key
Expand Down
4 changes: 4 additions & 0 deletions src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ impl<P: Serialize> Observation<P> {
}

impl ApiClient {
pub fn get_base_url(&self) -> &Url {
&self.inner.base_url
}

pub fn try_new(
base_url: String,
config: Option<ApiClientConfig>,
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub struct RunOptions {
default_value = "H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU"
)]
pub wormhole_pid: String,
#[arg(long = "server-url", env = "SERVER_URL")]
pub server_url: String,
#[arg(long = "server-url", env = "SERVER_URL", value_delimiter = ',')]
pub server_urls: Vec<String>,
#[arg(long = "mode", env = "MODE", default_value = "production")]
pub mode: Mode,
}
Expand Down
32 changes: 22 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
api_client::{ApiClient, Observation},
borsh::BorshDeserialize,
clap::Parser,
futures::future::join_all,
posted_message::PostedMessageUnreliableData,
prost::Message,
secp256k1::{rand::rngs::OsRng, Secp256k1},
Expand Down Expand Up @@ -41,7 +42,7 @@ struct RunListenerInput<T: Signer> {
signer: T,
wormhole_pid: Pubkey,
accumulator_address: Pubkey,
api_client: ApiClient,
api_clients: Vec<ApiClient>,
}

fn find_message_pda(wormhole_pid: &Pubkey, slot: u64) -> Pubkey {
Expand Down Expand Up @@ -148,16 +149,22 @@ async fn run_listener<T: Signer + 'static>(
};

tokio::spawn({
let (api_client, signer) = (input.api_client.clone(), input.signer.clone());
let (api_clients, signer) = (input.api_clients.clone(), input.signer.clone());
async move {
let body = message_data_to_body(&unreliable_data);
match Observation::try_new(body.clone(), signer.clone()) {
Ok(observation) => {
if let Err(e) = api_client.post_observation(observation).await {
tracing::error!(error = ?e, "Failed to post observation");
} else {
tracing::info!("Observation posted successfully");
};
join_all(api_clients.iter().map(|api_client| {
let observation = observation.clone();
let api_client = api_client.clone();
async move {
if let Err(e) = api_client.post_observation(observation).await {
tracing::error!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation");
} else {
tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully");
}
}
})).await;
}
Err(e) => tracing::error!(error = ?e, "Failed to create observation"),
}
Expand All @@ -182,8 +189,13 @@ async fn run(run_options: config::RunOptions) {
.expect("Invalid accumulator address");
let wormhole_pid =
Pubkey::from_str(&run_options.wormhole_pid).expect("Invalid Wormhole program ID");
let api_client =
ApiClient::try_new(run_options.server_url, None).expect("Failed to create API client");
let api_clients: Vec<ApiClient> = run_options
.server_urls
.into_iter()
.map(|server_url| {
ApiClient::try_new(server_url, None).expect("Failed to create API client")
})
.collect();

let (pubkey, pubkey_evm) = signer.get_public_key().expect("Failed to get public key");
let evm_encded_public_key = format!("0x{}", hex::encode(pubkey_evm));
Expand All @@ -199,7 +211,7 @@ async fn run(run_options: config::RunOptions) {
signer: signer.clone(),
wormhole_pid,
accumulator_address,
api_client: api_client.clone(),
api_clients: api_clients.clone(),
})
.await
{
Expand Down
Loading