diff --git a/.github/workflows/pr-main_l1.yaml b/.github/workflows/pr-main_l1.yaml index 5d824fd179..bce166f543 100644 --- a/.github/workflows/pr-main_l1.yaml +++ b/.github/workflows/pr-main_l1.yaml @@ -223,7 +223,7 @@ jobs: # The purpose of this job is to add it as a required check in GitHub so that we don't have to add every individual job as a required check all-tests: - # "Integration Test" is a required check, don't change the name + # "Integration Test" is a required check, don't change the name name: Integration Test runs-on: ubuntu-latest needs: [run-assertoor, run-hive] @@ -241,3 +241,20 @@ jobs: echo "Job Hive failed" exit 1 fi + + reorg-tests: + name: Reorg Tests + runs-on: ubuntu-latest + if: ${{ github.event_name != 'merge_group' }} + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Setup Rust Environment + uses: ./.github/actions/setup-rust + + - name: Compile ethrex binary + run: cargo build --bin ethrex + + - name: Run reorg tests + run: cd tooling/reorgs && cargo run diff --git a/.gitignore b/.gitignore index 094df84e1f..f81e8f7dd6 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,8 @@ tooling/ef_tests/state/vectors tooling/ef_tests/state/runner_v2/failure_report.txt tooling/ef_tests/state/runner_v2/success_report.txt +tooling/reorgs/data + # Repos checked out by make target /hive/ ethereum-package/ diff --git a/Cargo.lock b/Cargo.lock index d9e69e1555..4d5a19a666 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9446,6 +9446,27 @@ dependencies = [ "bytecheck", ] +[[package]] +name = "reorgs" +version = "0.1.0" +dependencies = [ + "ethrex", + "ethrex-blockchain", + "ethrex-common", + "ethrex-config", + "ethrex-l2-common", + "ethrex-l2-rpc", + "ethrex-rpc", + "hex", + "nix", + "rand 0.8.5", + "secp256k1", + "sha2", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "reqwest" version = "0.11.27" diff --git a/Cargo.toml b/Cargo.toml index 2ffaeaa8ce..cf60b63528 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "tooling/archive_sync", "tooling/replayer", "crates/common/config", + "tooling/reorgs", ] resolver = "2" diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index 066fe5dd81..f7048c8ba0 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -34,7 +34,7 @@ pub struct CLI { pub command: Option, } -#[derive(ClapParser, Debug)] +#[derive(ClapParser, Debug, Clone)] pub struct Options { #[arg( long = "network", diff --git a/crates/networking/rpc/types/fork_choice.rs b/crates/networking/rpc/types/fork_choice.rs index a010cdc681..1411958582 100644 --- a/crates/networking/rpc/types/fork_choice.rs +++ b/crates/networking/rpc/types/fork_choice.rs @@ -2,7 +2,7 @@ use super::payload::PayloadStatus; use ethrex_common::{Address, H256, serde_utils, types::Withdrawal}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct ForkChoiceState { #[allow(unused)] diff --git a/crates/networking/rpc/types/payload.rs b/crates/networking/rpc/types/payload.rs index 5d0960bd9b..526180c03a 100644 --- a/crates/networking/rpc/types/payload.rs +++ b/crates/networking/rpc/types/payload.rs @@ -174,7 +174,7 @@ pub struct PayloadStatus { pub validation_error: Option, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] #[serde(rename_all = "UPPERCASE")] pub enum PayloadValidationStatus { Valid, diff --git a/tooling/reorgs/Cargo.toml b/tooling/reorgs/Cargo.toml new file mode 100644 index 0000000000..7fbc7f941d --- /dev/null +++ b/tooling/reorgs/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "reorgs" +version.workspace = true +edition.workspace = true + +[dependencies] +ethrex.workspace = true +ethrex-common.workspace = true +ethrex-blockchain.workspace = true +ethrex-rpc.workspace = true +ethrex-config.workspace = true +ethrex-l2-common.workspace = true +ethrex-l2-rpc.workspace = true + +tokio.workspace = true +tokio-util.workspace = true +tracing.workspace = true +rand.workspace = true +sha2.workspace = true +hex.workspace = true +nix = { version = "0.30", features = ["signal"] } +secp256k1.workspace = true diff --git a/tooling/reorgs/README.md b/tooling/reorgs/README.md new file mode 100644 index 0000000000..37688a9540 --- /dev/null +++ b/tooling/reorgs/README.md @@ -0,0 +1,23 @@ +# Reorg integration tests + +This directory contains tests for chain reorganization. + +## How to run + +First, compile the `ethrex` binary if you haven't already: + +```bash +cargo build --workspace --bin ethrex +``` + +Then, run the reorg tests using: + +```bash +cargo run +``` + +You can run a custom binary by specifying the path: + +```bash +cargo run -- /path/to/your/binary +``` diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs new file mode 100644 index 0000000000..71bba40309 --- /dev/null +++ b/tooling/reorgs/src/main.rs @@ -0,0 +1,219 @@ +use std::{ + path::{Path, PathBuf}, + process::Command, + sync::Arc, +}; + +use ethrex::{cli::Options, initializers::init_tracing}; +use ethrex_l2_rpc::signer::{LocalSigner, Signer}; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +use crate::simulator::Simulator; + +mod simulator; + +#[tokio::main] +async fn main() { + // Setup logging + init_tracing(&Options::default_l1()); + + // Fetch the path to the ethrex binary from the command line arguments + // If not provided, use the default path + let cmd_path: PathBuf = std::env::args() + .nth(1) + .map(|o| o.parse().unwrap()) + .unwrap_or_else(|| "../../target/debug/ethrex".parse().unwrap()); + + let version = get_ethrex_version(&cmd_path).await; + + info!(%version, binary_path = %cmd_path.display(), "Fetched ethrex binary version"); + info!("Starting test run"); + info!(""); + + run_test(&cmd_path, test_one_block_reorg_and_back).await; + + // TODO: this test is failing + // run_test(&cmd_path, test_many_blocks_reorg).await; +} + +async fn get_ethrex_version(cmd_path: &Path) -> String { + let version_output = Command::new(cmd_path) + .arg("--version") + .output() + .expect("failed to get ethrex version"); + String::from_utf8(version_output.stdout).expect("failed to parse version output") +} + +async fn run_test(cmd_path: &Path, test_fn: F) +where + F: Fn(Arc>) -> Fut, + Fut: Future + Send + 'static, +{ + let test_name = std::any::type_name::(); + let start = std::time::Instant::now(); + + info!(test=%test_name, "Running test"); + let simulator = Arc::new(Mutex::new(Simulator::new(cmd_path.to_path_buf()))); + + // Run in another task to clean up properly on panic + let result = tokio::spawn(test_fn(simulator.clone())).await; + + simulator.lock_owned().await.stop(); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + match result { + Ok(_) => info!(test=%test_name, elapsed=?start.elapsed(), "test completed successfully"), + Err(err) if err.is_panic() => { + error!(test=%test_name, %err, "test panicked"); + std::process::exit(1); + } + Err(err) => { + warn!(test=%test_name, %err, "test task was cancelled"); + } + } + // Add a blank line after each test for readability + info!(""); +} + +async fn test_one_block_reorg_and_back(simulator: Arc>) { + let mut simulator = simulator.lock().await; + let signer: Signer = LocalSigner::new( + "941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e" + .parse() + .unwrap(), + ) + .into(); + // Some random address + let recipient = "941e103320615d394a55708be13e45994c7d93b0".parse().unwrap(); + let transfer_amount = 1000000; + + let node0 = simulator.start_node().await; + let node1 = simulator.start_node().await; + + // Create a chain with a few empty blocks + let mut base_chain = simulator.get_base_chain(); + for _ in 0..10 { + let extended_base_chain = node0.build_payload(base_chain).await; + node0.notify_new_payload(&extended_base_chain).await; + node0.update_forkchoice(&extended_base_chain).await; + + node1.notify_new_payload(&extended_base_chain).await; + node1.update_forkchoice(&extended_base_chain).await; + base_chain = extended_base_chain; + } + + let initial_balance = node0.get_balance(recipient).await; + + // Fork the chain + let side_chain = base_chain.fork(); + + // Mine a new block in the base chain + let base_chain = node0.build_payload(base_chain).await; + node0.notify_new_payload(&base_chain).await; + node0.update_forkchoice(&base_chain).await; + + // Mine a new block in the base chain (but don't announce it yet) + let extended_base_chain = node0.build_payload(base_chain).await; + + // In parallel, mine a block in the side chain, with an ETH transfer + node1 + .send_eth_transfer(&signer, recipient, transfer_amount) + .await; + + let side_chain = node1.build_payload(side_chain).await; + node1.notify_new_payload(&side_chain).await; + node1.update_forkchoice(&side_chain).await; + + // Sanity check: balance hasn't changed + let same_balance = node0.get_balance(recipient).await; + assert_eq!(same_balance, initial_balance); + + // Notify the first node of the side chain block, it should reorg + node0.notify_new_payload(&side_chain).await; + node0.update_forkchoice(&side_chain).await; + + // Check the transfer has been processed + let new_balance = node0.get_balance(recipient).await; + assert_eq!(new_balance, initial_balance + transfer_amount); + + // Finally, move to the extended base chain, it should reorg back + node0.notify_new_payload(&extended_base_chain).await; + node0.update_forkchoice(&extended_base_chain).await; + + // Check the transfer has been reverted + let new_balance = node0.get_balance(recipient).await; + assert_eq!(new_balance, initial_balance); +} + +#[expect(unused)] +async fn test_many_blocks_reorg(simulator: Arc>) { + let mut simulator = simulator.lock().await; + let signer: Signer = LocalSigner::new( + "941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e" + .parse() + .unwrap(), + ) + .into(); + // Some random address + let recipient = "941e103320615d394a55708be13e45994c7d93b0".parse().unwrap(); + let transfer_amount = 1000000; + + let node0 = simulator.start_node().await; + let node1 = simulator.start_node().await; + + // Create a chain with a few empty blocks + let mut base_chain = simulator.get_base_chain(); + for _ in 0..10 { + let extended_base_chain = node0.build_payload(base_chain).await; + node0.notify_new_payload(&extended_base_chain).await; + node0.update_forkchoice(&extended_base_chain).await; + + node1.notify_new_payload(&extended_base_chain).await; + node1.update_forkchoice(&extended_base_chain).await; + base_chain = extended_base_chain; + } + + let initial_balance = node0.get_balance(recipient).await; + + // Fork the chain + let mut side_chain = base_chain.fork(); + + // Create a side chain with multiple blocks only known to node0 + for _ in 0..10 { + side_chain = node0.build_payload(side_chain).await; + node0.notify_new_payload(&side_chain).await; + node0.update_forkchoice(&side_chain).await; + } + + // Sanity check: balance hasn't changed + let same_balance = node0.get_balance(recipient).await; + assert_eq!(same_balance, initial_balance); + + // Advance the base chain with multiple blocks only known to node1 + for _ in 0..10 { + base_chain = node1.build_payload(base_chain).await; + node1.notify_new_payload(&base_chain).await; + node1.update_forkchoice(&base_chain).await; + } + + // Sanity check: balance hasn't changed + let same_balance = node0.get_balance(recipient).await; + assert_eq!(same_balance, initial_balance); + + // Advance the side chain with one more block and an ETH transfer + node1 + .send_eth_transfer(&signer, recipient, transfer_amount) + .await; + base_chain = node1.build_payload(base_chain).await; + node1.notify_new_payload(&base_chain).await; + node1.update_forkchoice(&base_chain).await; + + // Bring node0 again to the base chain, it should reorg + node0.notify_new_payload(&base_chain).await; + node0.update_forkchoice(&base_chain).await; + + // Check the transfer has been processed + let new_balance = node0.get_balance(recipient).await; + assert_eq!(new_balance, initial_balance + transfer_amount); +} diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs new file mode 100644 index 0000000000..b70617f2a0 --- /dev/null +++ b/tooling/reorgs/src/simulator.rs @@ -0,0 +1,433 @@ +use std::{fs::File, io::Read, path::PathBuf, process::Stdio, time::Duration}; + +use ethrex::{cli::Options, initializers::get_network}; +use ethrex_common::{ + Bytes, H160, H256, U256, + types::{ + Block, EIP1559Transaction, Genesis, Transaction, TxKind, requests::compute_requests_hash, + }, +}; +use ethrex_config::networks::Network; +use ethrex_l2_rpc::signer::{Signable, Signer}; +use ethrex_rpc::{ + EngineClient, EthClient, + types::{ + block_identifier::{BlockIdentifier, BlockTag}, + fork_choice::{ForkChoiceState, PayloadAttributesV3}, + payload::{ExecutionPayload, PayloadValidationStatus}, + }, +}; +use nix::sys::signal::{self, Signal}; +use nix::unistd::Pid; +use sha2::{Digest, Sha256}; +use tokio::process::Command; +use tokio_util::sync::CancellationToken; +use tracing::{error, info}; + +pub struct Simulator { + cmd_path: PathBuf, + base_opts: Options, + jwt_secret: Bytes, + genesis_path: PathBuf, + configs: Vec, + enodes: Vec, + cancellation_tokens: Vec, +} + +impl Simulator { + pub fn new(cmd_path: PathBuf) -> Self { + let mut opts = Options::default_l1(); + let jwt_secret = generate_jwt_secret(); + std::fs::write("jwt.hex", hex::encode(&jwt_secret)).unwrap(); + + let genesis_path = std::path::absolute("../../fixtures/genesis/l1-dev.json") + .unwrap() + .canonicalize() + .unwrap(); + + opts.authrpc_jwtsecret = "jwt.hex".to_string(); + opts.dev = false; + opts.http_addr = "localhost".to_string(); + opts.authrpc_addr = "localhost".to_string(); + opts.network = Some(Network::GenesisPath(genesis_path.clone())); + Self { + cmd_path, + base_opts: opts, + genesis_path, + jwt_secret, + configs: vec![], + cancellation_tokens: vec![], + enodes: vec![], + } + } + + pub fn get_base_chain(&self) -> Chain { + let network = get_network(&self.base_opts); + let genesis = network.get_genesis().unwrap(); + Chain::new(genesis) + } + + pub async fn start_node(&mut self) -> Node { + let n = self.configs.len(); + info!(node = n, "Starting node"); + let mut opts = self.base_opts.clone(); + opts.http_port = (8545 + n * 2).to_string(); + opts.authrpc_port = (8545 + n * 2 + 1).to_string(); + opts.p2p_port = (30303 + n).to_string(); + opts.discovery_port = (30303 + n).to_string(); + opts.datadir = format!("data/node{n}").into(); + + let _ = std::fs::remove_dir_all(&opts.datadir); + std::fs::create_dir_all(&opts.datadir).expect("Failed to create data directory"); + + let logs_file_path = format!("data/node{n}.log"); + let logs_file = File::create(&logs_file_path).expect("Failed to create logs file"); + + let cancel = CancellationToken::new(); + + self.configs.push(opts.clone()); + self.cancellation_tokens.push(cancel.clone()); + + let mut cmd = Command::new(&self.cmd_path); + cmd.args([ + format!("--http.addr={}", opts.http_addr), + format!("--http.port={}", opts.http_port), + format!("--authrpc.addr={}", opts.authrpc_addr), + format!("--authrpc.port={}", opts.authrpc_port), + format!("--p2p.port={}", opts.p2p_port), + format!("--discovery.port={}", opts.discovery_port), + format!("--datadir={}", opts.datadir.display()), + format!("--network={}", self.genesis_path.display()), + "--force".to_string(), + ]) + .stdin(Stdio::null()) + .stdout(logs_file.try_clone().unwrap()) + .stderr(logs_file); + + if !self.enodes.is_empty() { + cmd.arg(format!("--bootnodes={}", self.enodes.join(","))); + } + + let child = cmd.spawn().expect("Failed to start ethrex process"); + + let logs_file = File::open(&logs_file_path).expect("Failed to open logs file"); + let enode = + tokio::time::timeout(Duration::from_secs(5), wait_for_initialization(logs_file)) + .await + .expect("node initialization timed out"); + self.enodes.push(enode); + + tokio::spawn(async move { + let mut child = child; + tokio::select! { + _ = cancel.cancelled() => { + if let Some(pid) = child.id() { + // NOTE: we use SIGTERM instead of child.kill() so sockets are closed + signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM).unwrap(); + } + } + res = child.wait() => { + assert!(res.unwrap().success()); + } + } + }); + + info!( + "Started node {n} at http://{}:{}", + opts.http_addr, opts.http_port + ); + + self.get_node(n) + } + + pub fn stop(&self) { + for token in &self.cancellation_tokens { + token.cancel(); + } + } + + fn get_http_url(&self, index: usize) -> String { + let opts = &self.configs[index]; + format!("http://{}:{}", opts.http_addr, opts.http_port) + } + + fn get_auth_url(&self, index: usize) -> String { + let opts = &self.configs[index]; + format!("http://{}:{}", opts.authrpc_addr, opts.authrpc_port) + } + + fn get_node(&self, index: usize) -> Node { + let auth_url = self.get_auth_url(index); + let engine_client = EngineClient::new(&auth_url, self.jwt_secret.clone()); + + let http_url = self.get_http_url(index); + let rpc_client = EthClient::new(&http_url).unwrap(); + + Node { + index, + engine_client, + rpc_client, + } + } +} + +/// Waits until the node is initialized by reading its logs. +/// Returns the enode URL of the node. +async fn wait_for_initialization(mut logs_file: File) -> String { + const NODE_STARTED_LOG: &str = "Starting Auth-RPC server at"; + + let mut file_contents = String::new(); + + // Wait a bit until the node starts + loop { + tokio::time::sleep(Duration::from_millis(100)).await; + + logs_file.read_to_string(&mut file_contents).unwrap(); + + if file_contents.contains(NODE_STARTED_LOG) { + break; + } + } + let node_enode_log = file_contents + .lines() + .find(|line| line.contains("Local node initialized")) + .unwrap(); + // Look for the "enode://node_id@host:port" part + let prefix = "enode://"; + let node_enode = node_enode_log.split_once(prefix).unwrap().1; + format!("{prefix}{}", node_enode.trim_end()) +} + +pub struct Node { + index: usize, + engine_client: EngineClient, + rpc_client: EthClient, +} + +impl Node { + pub async fn update_forkchoice(&self, chain: &Chain) { + let fork_choice_state = chain.get_fork_choice_state(); + info!( + node = self.index, + head = %fork_choice_state.head_block_hash, + "Updating fork choice" + ); + let syncing_fut = wait_until_synced(&self.engine_client, fork_choice_state); + + tokio::time::timeout(Duration::from_secs(5), syncing_fut) + .await + .inspect_err(|_| { + error!(node = self.index, "Timed out waiting for node to sync"); + }) + .expect("timed out waiting for node to sync"); + } + + pub async fn build_payload(&self, mut chain: Chain) -> Chain { + let fork_choice_state = chain.get_fork_choice_state(); + let mut payload_attributes = chain.get_next_payload_attributes(); + // Set index as fee recipient to differentiate between nodes + payload_attributes.suggested_fee_recipient = H160::from_low_u64_be(self.index as u64); + let head = fork_choice_state.head_block_hash; + + let parent_beacon_block_root = payload_attributes.parent_beacon_block_root; + + info!( + node = self.index, + %head, + "Starting payload build" + ); + + let fork_choice_response = self + .engine_client + .engine_forkchoice_updated_v3(fork_choice_state, Some(payload_attributes)) + .await + .unwrap(); + + assert_eq!( + fork_choice_response.payload_status.status, + PayloadValidationStatus::Valid, + "Validation failed with error: {:?}", + fork_choice_response.payload_status.validation_error + ); + let payload_id = fork_choice_response.payload_id.unwrap(); + + let payload_response = self + .engine_client + .engine_get_payload_v4(payload_id) + .await + .unwrap(); + + let requests_hash = compute_requests_hash(&payload_response.execution_requests.unwrap()); + let block = payload_response + .execution_payload + .into_block(parent_beacon_block_root, Some(requests_hash)) + .unwrap(); + + info!( + node = self.index, + %head, + block = %block.hash(), + "#txs"=%block.body.transactions.len(), + "Built payload" + ); + chain.append_block(block); + chain + } + + pub async fn notify_new_payload(&self, chain: &Chain) { + let head = chain.blocks.last().unwrap(); + let execution_payload = ExecutionPayload::from_block(head.clone()); + // Support blobs + // let commitments = execution_payload_response + // .blobs_bundle + // .unwrap_or_default() + // .commitments + // .iter() + // .map(|commitment| { + // let mut hash = keccak256(commitment).0; + // // https://eips.ethereum.org/EIPS/eip-4844 -> kzg_to_versioned_hash + // hash[0] = 0x01; + // H256::from_slice(&hash) + // }) + // .collect(); + let commitments = vec![]; + let parent_beacon_block_root = head.header.parent_beacon_block_root.unwrap(); + let _payload_status = self + .engine_client + .engine_new_payload_v4(execution_payload, commitments, parent_beacon_block_root) + .await + .unwrap(); + } + + pub async fn send_eth_transfer(&self, signer: &Signer, recipient: H160, amount: u64) { + info!(node = self.index, sender=%signer.address(), %recipient, amount, "Sending ETH transfer tx"); + let chain_id = self + .rpc_client + .get_chain_id() + .await + .unwrap() + .try_into() + .unwrap(); + let sender_address = signer.address(); + let nonce = self + .rpc_client + .get_nonce(sender_address, BlockIdentifier::Tag(BlockTag::Latest)) + .await + .unwrap(); + let tx = EIP1559Transaction { + chain_id, + nonce, + max_priority_fee_per_gas: 0, + max_fee_per_gas: 1_000_000_000, + gas_limit: 50_000, + to: TxKind::Call(recipient), + value: amount.into(), + ..Default::default() + }; + let mut tx = Transaction::EIP1559Transaction(tx); + tx.sign_inplace(signer).await.unwrap(); + let encoded_tx = tx.encode_canonical_to_vec(); + self.rpc_client + .send_raw_transaction(&encoded_tx) + .await + .unwrap(); + } + + pub async fn get_balance(&self, address: H160) -> U256 { + self.rpc_client + .get_balance(address, Default::default()) + .await + .unwrap() + } +} + +pub struct Chain { + block_hashes: Vec, + blocks: Vec, + safe_height: usize, +} + +impl Chain { + fn new(genesis: Genesis) -> Self { + let genesis_block = genesis.get_block(); + Self { + block_hashes: vec![genesis_block.hash()], + blocks: vec![genesis_block], + safe_height: 0, + } + } + + fn append_block(&mut self, block: Block) { + self.block_hashes.push(block.hash()); + self.blocks.push(block); + } + + pub fn fork(&self) -> Self { + Self { + block_hashes: self.block_hashes.clone(), + blocks: self.blocks.clone(), + safe_height: self.safe_height, + } + } + + fn get_fork_choice_state(&self) -> ForkChoiceState { + let head_block_hash = *self.block_hashes.last().unwrap(); + let finalized_block_hash = self.block_hashes[self.safe_height]; + ForkChoiceState { + head_block_hash, + safe_block_hash: finalized_block_hash, + finalized_block_hash, + } + } + + fn get_next_payload_attributes(&self) -> PayloadAttributesV3 { + let timestamp = self.blocks.last().unwrap().header.timestamp + 12; + let head_hash = self.get_fork_choice_state().head_block_hash; + // Generate dummy values by hashing multiple times + let parent_beacon_block_root = keccak256(&head_hash.0); + let prev_randao = keccak256(&parent_beacon_block_root.0); + let suggested_fee_recipient = Default::default(); + // TODO: add withdrawals + let withdrawals = vec![]; + PayloadAttributesV3 { + timestamp, + prev_randao, + suggested_fee_recipient, + parent_beacon_block_root: Some(parent_beacon_block_root), + withdrawals: Some(withdrawals), + } + } +} + +fn generate_jwt_secret() -> Bytes { + use rand::Rng; + let mut rng = rand::thread_rng(); + let mut secret = [0u8; 32]; + rng.fill(&mut secret); + Bytes::from(secret.to_vec()) +} + +fn keccak256(data: &[u8]) -> H256 { + H256( + Sha256::new_with_prefix(data) + .finalize() + .as_slice() + .try_into() + .unwrap(), + ) +} + +async fn wait_until_synced(engine_client: &EngineClient, fork_choice_state: ForkChoiceState) { + loop { + let fork_choice_response = engine_client + .engine_forkchoice_updated_v3(fork_choice_state, None) + .await + .unwrap(); + + let status = fork_choice_response.payload_status.status; + if status == PayloadValidationStatus::Valid { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } +}