From 452145db16d65c6627d281964a7a285b5f36717a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 22 Sep 2025 12:49:07 -0300 Subject: [PATCH 01/37] test: add reorg testing framework --- .gitignore | 2 + Cargo.lock | 21 ++ Cargo.toml | 1 + cmd/ethrex/cli.rs | 2 +- crates/networking/rpc/types/payload.rs | 2 +- tooling/reorgs/Cargo.toml | 22 ++ tooling/reorgs/src/main.rs | 469 +++++++++++++++++++++++++ 7 files changed, 517 insertions(+), 2 deletions(-) create mode 100644 tooling/reorgs/Cargo.toml create mode 100644 tooling/reorgs/src/main.rs diff --git a/.gitignore b/.gitignore index 094df84e1f..f81e8f7dd6 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,8 @@ tooling/ef_tests/state/vectors tooling/ef_tests/state/runner_v2/failure_report.txt tooling/ef_tests/state/runner_v2/success_report.txt +tooling/reorgs/data + # Repos checked out by make target /hive/ ethereum-package/ diff --git a/Cargo.lock b/Cargo.lock index e626ea7d10..7bafe601fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9446,6 +9446,27 @@ dependencies = [ "bytecheck", ] +[[package]] +name = "reorgs" +version = "0.1.0" +dependencies = [ + "ethrex", + "ethrex-blockchain", + "ethrex-common", + "ethrex-config", + "ethrex-l2-common", + "ethrex-l2-rpc", + "ethrex-rpc", + "hex", + "nix", + "rand 0.8.5", + "secp256k1", + "sha2", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "reqwest" version = "0.11.27" diff --git a/Cargo.toml b/Cargo.toml index def93b7ce0..4db26d2c51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "tooling/archive_sync", "tooling/replayer", "crates/common/config", + "tooling/reorgs", ] resolver = "2" diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index 066fe5dd81..f7048c8ba0 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -34,7 +34,7 @@ pub struct CLI { pub command: Option, } -#[derive(ClapParser, Debug)] +#[derive(ClapParser, Debug, Clone)] pub struct Options { #[arg( long = "network", diff --git a/crates/networking/rpc/types/payload.rs b/crates/networking/rpc/types/payload.rs index 5d0960bd9b..526180c03a 100644 --- a/crates/networking/rpc/types/payload.rs +++ b/crates/networking/rpc/types/payload.rs @@ -174,7 +174,7 @@ pub struct PayloadStatus { pub validation_error: Option, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] #[serde(rename_all = "UPPERCASE")] pub enum PayloadValidationStatus { Valid, diff --git a/tooling/reorgs/Cargo.toml b/tooling/reorgs/Cargo.toml new file mode 100644 index 0000000000..7fbc7f941d --- /dev/null +++ b/tooling/reorgs/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "reorgs" +version.workspace = true +edition.workspace = true + +[dependencies] +ethrex.workspace = true +ethrex-common.workspace = true +ethrex-blockchain.workspace = true +ethrex-rpc.workspace = true +ethrex-config.workspace = true +ethrex-l2-common.workspace = true +ethrex-l2-rpc.workspace = true + +tokio.workspace = true +tokio-util.workspace = true +tracing.workspace = true +rand.workspace = true +sha2.workspace = true +hex.workspace = true +nix = { version = "0.30", features = ["signal"] } +secp256k1.workspace = true diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs new file mode 100644 index 0000000000..d9ff3bf83c --- /dev/null +++ b/tooling/reorgs/src/main.rs @@ -0,0 +1,469 @@ +use std::{fs::File, path::PathBuf, process::Stdio, sync::Arc, time::Duration}; + +use ethrex::{ + cli::Options, + initializers::{get_network, init_tracing}, +}; +use ethrex_common::{ + Bytes, H160, H256, + types::{ + Block, EIP1559Transaction, Genesis, Transaction, TxKind, requests::compute_requests_hash, + }, +}; +use ethrex_config::networks::Network; +use ethrex_l2_rpc::signer::{LocalSigner, Signable, Signer}; +use ethrex_rpc::{ + EngineClient, EthClient, + types::{ + block_identifier::{BlockIdentifier, BlockTag}, + fork_choice::{ForkChoiceState, PayloadAttributesV3}, + payload::{ExecutionPayload, PayloadValidationStatus}, + }, +}; +use nix::sys::signal::{self, Signal}; +use nix::unistd::Pid; +use sha2::{Digest, Sha256}; +use tokio::{process::Command, sync::Mutex}; +use tokio_util::sync::CancellationToken; +use tracing::info; + +#[tokio::main] +async fn main() { + let cmd_path = std::env::args() + .nth(1) + .map(|o| o.parse().unwrap()) + .unwrap_or_else(|| { + println!("No binary path provided, using default"); + "../../target/debug/ethrex".parse().unwrap() + }); + + let simulator = Arc::new(Mutex::new(Simulator::new(cmd_path))); + simulator.lock().await.init_tracing(); + + // Run in another task to clean up properly on panic + let result = tokio::spawn(run_test(simulator.clone())).await; + + simulator.lock_owned().await.stop(); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + if result.is_err() { + eprintln!("Test panicked"); + std::process::exit(1); + } +} + +async fn run_test(simulator: Arc>) { + let mut simulator = simulator.lock().await; + let signer: Signer = LocalSigner::new( + "941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e" + .parse() + .unwrap(), + ) + .into(); + + let node0 = simulator.start_node().await; + let node1 = simulator.start_node().await; + + let network = get_network(&simulator.base_opts); + let genesis = network.get_genesis().unwrap(); + + // Create a chain with a few empty blocks + let mut base_chain = Chain::new(genesis); + for _ in 0..10 { + let extended_base_chain = node0.build_payload(base_chain).await; + node0.notify_new_payload(&extended_base_chain).await; + node0.update_forkchoice(&extended_base_chain).await; + + node1.notify_new_payload(&extended_base_chain).await; + node1.update_forkchoice(&extended_base_chain).await; + base_chain = extended_base_chain; + } + + // Fork the chain + let side_chain = base_chain.fork(); + + // Mine a new block in the base chain + let base_chain = node0.build_payload(base_chain).await; + node0.notify_new_payload(&base_chain).await; + node0.update_forkchoice(&base_chain).await; + + // Mine a new block in the base chain (but don't announce it yet) + let extended_base_chain = node0.build_payload(base_chain).await; + + // In parallel, mine a block in the side chain, with an ETH transfer + let recipient = "941e103320615d394a55708be13e45994c7d93b0".parse().unwrap(); + node1.send_eth_transfer(&signer, recipient, 1000000).await; + + let side_chain = node1.build_payload(side_chain).await; + node1.notify_new_payload(&side_chain).await; + node1.update_forkchoice(&side_chain).await; + + // Notify the first node of the side chain block, it should reorg + node0.notify_new_payload(&side_chain).await; + node0.update_forkchoice(&side_chain).await; + + // Finally, move to the extended base chain, it should reorg back + node0.notify_new_payload(&extended_base_chain).await; + node0.update_forkchoice(&extended_base_chain).await; +} + +struct Simulator { + cmd_path: PathBuf, + base_opts: Options, + jwt_secret: Bytes, + genesis_path: PathBuf, + configs: Vec, + cancellation_tokens: Vec, +} + +impl Simulator { + fn new(cmd_path: PathBuf) -> Self { + let mut opts = Options::default_l1(); + let jwt_secret = generate_jwt_secret(); + std::fs::write("jwt.hex", hex::encode(&jwt_secret)).unwrap(); + + let genesis_path = std::path::absolute("../../fixtures/genesis/l1-dev.json") + .unwrap() + .canonicalize() + .unwrap(); + + opts.authrpc_jwtsecret = "jwt.hex".to_string(); + opts.dev = false; + opts.http_addr = "localhost".to_string(); + opts.authrpc_addr = "localhost".to_string(); + opts.network = Some(Network::GenesisPath(genesis_path.clone())); + Self { + cmd_path, + base_opts: opts, + genesis_path, + jwt_secret, + configs: vec![], + cancellation_tokens: vec![], + } + } + + fn init_tracing(&self) { + init_tracing(&self.base_opts); + } + + async fn start_node(&mut self) -> Node { + let n = self.configs.len(); + let mut opts = self.base_opts.clone(); + opts.http_port = (8545 + n * 2).to_string(); + opts.authrpc_port = (8545 + n * 2 + 1).to_string(); + opts.p2p_port = (30303 + n).to_string(); + opts.discovery_port = (30303 + n).to_string(); + opts.datadir = format!("data/node{n}").into(); + + let _ = std::fs::remove_dir_all(&opts.datadir); + std::fs::create_dir_all(&opts.datadir).expect("Failed to create data directory"); + + let logs_file = + File::create(format!("data/node{n}.log")).expect("Failed to create logs file"); + + let cancel = CancellationToken::new(); + + self.configs.push(opts.clone()); + self.cancellation_tokens.push(cancel.clone()); + + let mut cmd = Command::new(&self.cmd_path); + cmd.args([ + format!("--http.addr={}", opts.http_addr), + format!("--http.port={}", opts.http_port), + format!("--authrpc.addr={}", opts.authrpc_addr), + format!("--authrpc.port={}", opts.authrpc_port), + format!("--p2p.port={}", opts.p2p_port), + format!("--discovery.port={}", opts.discovery_port), + format!("--datadir={}", opts.datadir.display()), + format!("--network={}", self.genesis_path.display()), + "--force".to_string(), + ]) + .stdin(Stdio::null()) + .stdout(logs_file.try_clone().expect("Failed to clone logs file")) + .stderr(logs_file); + + let child = cmd.spawn().expect("Failed to start ethrex process"); + + tokio::spawn(async move { + let mut child = child; + tokio::select! { + _ = cancel.cancelled() => { + if let Some(pid) = child.id() { + signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM).unwrap(); + } + } + res = child.wait() => { + assert!(res.unwrap().success()); + } + } + }); + + info!( + "Started node {n} at http://{}:{}", + opts.http_addr, opts.http_port + ); + + tokio::time::sleep(Duration::from_millis(200)).await; + + self.get_node(n) + } + + fn stop(&self) { + for token in &self.cancellation_tokens { + token.cancel(); + } + } + + fn get_http_url(&self, index: usize) -> String { + let opts = &self.configs[index]; + format!("http://{}:{}", opts.http_addr, opts.http_port) + } + + fn get_auth_url(&self, index: usize) -> String { + let opts = &self.configs[index]; + format!("http://{}:{}", opts.authrpc_addr, opts.authrpc_port) + } + + fn get_node(&self, index: usize) -> Node { + let auth_url = self.get_auth_url(index); + let engine_client = EngineClient::new(&auth_url, self.jwt_secret.clone()); + + let http_url = self.get_http_url(index); + let rpc_client = EthClient::new(&http_url).unwrap(); + + Node { + index, + engine_client, + rpc_client, + } + } +} + +struct Node { + index: usize, + engine_client: EngineClient, + rpc_client: EthClient, +} + +impl Node { + async fn update_forkchoice(&self, chain: &Chain) { + let fork_choice_state = chain.get_fork_choice_state(); + info!( + node = self.index, + head = %fork_choice_state.head_block_hash, + "Updating fork choice" + ); + + let fork_choice_response = self + .engine_client + .engine_forkchoice_updated_v3(fork_choice_state, None) + .await + .unwrap(); + + assert_eq!( + fork_choice_response.payload_status.status, + PayloadValidationStatus::Valid, + "Validation failed with error: {:?}", + fork_choice_response.payload_status.validation_error + ); + assert!(fork_choice_response.payload_id.is_none()); + } + + async fn build_payload(&self, mut chain: Chain) -> Chain { + let fork_choice_state = chain.get_fork_choice_state(); + let payload_attributes = chain.get_next_payload_attributes(); + let head = fork_choice_state.head_block_hash; + + let parent_beacon_block_root = payload_attributes.parent_beacon_block_root; + + info!( + node = self.index, + %head, + "Starting payload build" + ); + + let fork_choice_response = self + .engine_client + .engine_forkchoice_updated_v3(fork_choice_state, Some(payload_attributes)) + .await + .unwrap(); + + assert_eq!( + fork_choice_response.payload_status.status, + PayloadValidationStatus::Valid, + "Validation failed with error: {:?}", + fork_choice_response.payload_status.validation_error + ); + let payload_id = fork_choice_response.payload_id.unwrap(); + + let payload_response = self + .engine_client + .engine_get_payload_v4(payload_id) + .await + .unwrap(); + + let requests_hash = compute_requests_hash(&payload_response.execution_requests.unwrap()); + let block = payload_response + .execution_payload + .into_block(parent_beacon_block_root, Some(requests_hash)) + .unwrap(); + + info!( + node = self.index, + %head, + block = %block.hash(), + "#txs"=%block.body.transactions.len(), + "Built payload" + ); + chain.append_block(block); + chain + } + + async fn notify_new_payload(&self, chain: &Chain) { + let head = chain.blocks.last().unwrap(); + let execution_payload = ExecutionPayload::from_block(head.clone()); + // Support blobs + // let commitments = execution_payload_response + // .blobs_bundle + // .unwrap_or_default() + // .commitments + // .iter() + // .map(|commitment| { + // let mut hash = keccak256(commitment).0; + // // https://eips.ethereum.org/EIPS/eip-4844 -> kzg_to_versioned_hash + // hash[0] = 0x01; + // H256::from_slice(&hash) + // }) + // .collect(); + let commitments = vec![]; + let parent_beacon_block_root = head.header.parent_beacon_block_root.unwrap(); + let payload_status = self + .engine_client + .engine_new_payload_v4(execution_payload, commitments, parent_beacon_block_root) + .await + .unwrap(); + + assert_eq!( + payload_status.status, + PayloadValidationStatus::Valid, + "Validation failed with error: {:?}", + payload_status.validation_error + ); + } + + async fn send_eth_transfer(&self, signer: &Signer, recipient: H160, amount: u64) { + info!(node = self.index, sender=%signer.address(), %recipient, amount, "Sending ETH transfer tx"); + let chain_id = self + .rpc_client + .get_chain_id() + .await + .unwrap() + .try_into() + .unwrap(); + let sender_address = signer.address(); + let nonce = self + .rpc_client + .get_nonce(sender_address, BlockIdentifier::Tag(BlockTag::Latest)) + .await + .unwrap(); + let tx = EIP1559Transaction { + chain_id, + nonce, + max_priority_fee_per_gas: 0, + max_fee_per_gas: 1_000_000_000, + gas_limit: 50_000, + to: TxKind::Call(recipient), + value: amount.into(), + ..Default::default() + }; + let mut tx = Transaction::EIP1559Transaction(tx); + tx.sign_inplace(signer).await.unwrap(); + let encoded_tx = tx.encode_canonical_to_vec(); + self.rpc_client + .send_raw_transaction(&encoded_tx) + .await + .unwrap(); + } +} + +struct Chain { + block_hashes: Vec, + blocks: Vec, + safe_height: usize, +} + +impl Chain { + fn new(genesis: Genesis) -> Self { + let genesis_block = genesis.get_block(); + Self { + block_hashes: vec![genesis_block.hash()], + blocks: vec![genesis_block], + safe_height: 0, + } + } + + fn append_block(&mut self, block: Block) { + self.block_hashes.push(block.hash()); + self.blocks.push(block); + } + + fn fork(&self) -> Self { + Self { + block_hashes: self.block_hashes.clone(), + blocks: self.blocks.clone(), + safe_height: self.safe_height, + } + } + + fn get_fork_choice_state(&self) -> ForkChoiceState { + let head_block_hash = *self.block_hashes.last().unwrap(); + let finalized_block_hash = self.block_hashes[self.safe_height]; + ForkChoiceState { + head_block_hash, + safe_block_hash: finalized_block_hash, + finalized_block_hash, + } + } + + fn get_next_payload_attributes(&self) -> PayloadAttributesV3 { + let timestamp = self.blocks.last().unwrap().header.timestamp + 12; + let head_hash = self.get_fork_choice_state().head_block_hash; + // Generate dummy values by hashing multiple times + let parent_beacon_block_root = keccak256(&head_hash.0); + let prev_randao = keccak256(&parent_beacon_block_root.0); + // Address of 0x941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e, a rich account + let suggested_fee_recipient = H160( + hex::decode("4417092B70a3E5f10Dc504d0947DD256B965fc62") + .unwrap() + .try_into() + .unwrap(), + ); + // TODO: add withdrawals + let withdrawals = vec![]; + PayloadAttributesV3 { + timestamp, + prev_randao, + suggested_fee_recipient, + parent_beacon_block_root: Some(parent_beacon_block_root), + withdrawals: Some(withdrawals), + } + } +} + +fn generate_jwt_secret() -> Bytes { + use rand::Rng; + let mut rng = rand::thread_rng(); + let mut secret = [0u8; 32]; + rng.fill(&mut secret); + Bytes::from(secret.to_vec()) +} + +fn keccak256(data: &[u8]) -> H256 { + H256( + Sha256::new_with_prefix(data) + .finalize() + .as_slice() + .try_into() + .unwrap(), + ) +} From 547fff846614d64677aeff2b40a12cb2d5a1bb84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 16:17:31 -0300 Subject: [PATCH 02/37] test: add additional checks --- tooling/reorgs/src/main.rs | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index d9ff3bf83c..cfd31e2bc4 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -5,7 +5,7 @@ use ethrex::{ initializers::{get_network, init_tracing}, }; use ethrex_common::{ - Bytes, H160, H256, + Bytes, H160, H256, U256, types::{ Block, EIP1559Transaction, Genesis, Transaction, TxKind, requests::compute_requests_hash, }, @@ -60,6 +60,9 @@ async fn run_test(simulator: Arc>) { .unwrap(), ) .into(); + // Some random address + let recipient = "941e103320615d394a55708be13e45994c7d93b0".parse().unwrap(); + let transfer_amount = 1000000; let node0 = simulator.start_node().await; let node1 = simulator.start_node().await; @@ -79,6 +82,8 @@ async fn run_test(simulator: Arc>) { base_chain = extended_base_chain; } + let initial_balance = node0.get_balance(recipient).await; + // Fork the chain let side_chain = base_chain.fork(); @@ -91,20 +96,33 @@ async fn run_test(simulator: Arc>) { let extended_base_chain = node0.build_payload(base_chain).await; // In parallel, mine a block in the side chain, with an ETH transfer - let recipient = "941e103320615d394a55708be13e45994c7d93b0".parse().unwrap(); - node1.send_eth_transfer(&signer, recipient, 1000000).await; + node1 + .send_eth_transfer(&signer, recipient, transfer_amount) + .await; let side_chain = node1.build_payload(side_chain).await; node1.notify_new_payload(&side_chain).await; node1.update_forkchoice(&side_chain).await; + // Sanity check: balance hasn't changed + let same_balance = node0.get_balance(recipient).await; + assert_eq!(same_balance, initial_balance); + // Notify the first node of the side chain block, it should reorg node0.notify_new_payload(&side_chain).await; node0.update_forkchoice(&side_chain).await; + // Check the transfer has been processed + let new_balance = node0.get_balance(recipient).await; + assert_eq!(new_balance, initial_balance + transfer_amount); + // Finally, move to the extended base chain, it should reorg back node0.notify_new_payload(&extended_base_chain).await; node0.update_forkchoice(&extended_base_chain).await; + + // Check the transfer has been reverted + let new_balance = node0.get_balance(recipient).await; + assert_eq!(new_balance, initial_balance); } struct Simulator { @@ -384,6 +402,13 @@ impl Node { .await .unwrap(); } + + async fn get_balance(&self, address: H160) -> U256 { + self.rpc_client + .get_balance(address, Default::default()) + .await + .unwrap() + } } struct Chain { From 89f5548f27c692062c9f80929e8dba05eec6e343 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 16:41:36 -0300 Subject: [PATCH 03/37] refactor: split into two files --- tooling/reorgs/src/main.rs | 433 +++----------------------------- tooling/reorgs/src/simulator.rs | 395 +++++++++++++++++++++++++++++ 2 files changed, 424 insertions(+), 404 deletions(-) create mode 100644 tooling/reorgs/src/simulator.rs diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index cfd31e2bc4..9233771c3d 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -1,35 +1,24 @@ -use std::{fs::File, path::PathBuf, process::Stdio, sync::Arc, time::Duration}; - -use ethrex::{ - cli::Options, - initializers::{get_network, init_tracing}, -}; -use ethrex_common::{ - Bytes, H160, H256, U256, - types::{ - Block, EIP1559Transaction, Genesis, Transaction, TxKind, requests::compute_requests_hash, - }, -}; -use ethrex_config::networks::Network; -use ethrex_l2_rpc::signer::{LocalSigner, Signable, Signer}; -use ethrex_rpc::{ - EngineClient, EthClient, - types::{ - block_identifier::{BlockIdentifier, BlockTag}, - fork_choice::{ForkChoiceState, PayloadAttributesV3}, - payload::{ExecutionPayload, PayloadValidationStatus}, - }, +use std::{ + path::{Path, PathBuf}, + sync::Arc, }; -use nix::sys::signal::{self, Signal}; -use nix::unistd::Pid; -use sha2::{Digest, Sha256}; -use tokio::{process::Command, sync::Mutex}; -use tokio_util::sync::CancellationToken; -use tracing::info; + +use ethrex::{cli::Options, initializers::init_tracing}; +use ethrex_l2_rpc::signer::{LocalSigner, Signer}; +use tokio::sync::Mutex; + +use crate::simulator::Simulator; + +mod simulator; #[tokio::main] async fn main() { - let cmd_path = std::env::args() + // Setup logging + init_tracing(&Options::default_l1()); + + // Fetch the path to the ethrex binary from the command line arguments + // If not provided, use the default path + let cmd_path: PathBuf = std::env::args() .nth(1) .map(|o| o.parse().unwrap()) .unwrap_or_else(|| { @@ -37,11 +26,18 @@ async fn main() { "../../target/debug/ethrex".parse().unwrap() }); - let simulator = Arc::new(Mutex::new(Simulator::new(cmd_path))); - simulator.lock().await.init_tracing(); + run_test(&cmd_path, test_simple_reorg_and_back).await; +} + +async fn run_test(cmd_path: &Path, test_fn: F) +where + F: Fn(Arc>) -> Fut, + Fut: Future + Send + 'static, +{ + let simulator = Arc::new(Mutex::new(Simulator::new(cmd_path.to_path_buf()))); // Run in another task to clean up properly on panic - let result = tokio::spawn(run_test(simulator.clone())).await; + let result = tokio::spawn(test_fn(simulator.clone())).await; simulator.lock_owned().await.stop(); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; @@ -52,7 +48,7 @@ async fn main() { } } -async fn run_test(simulator: Arc>) { +async fn test_simple_reorg_and_back(simulator: Arc>) { let mut simulator = simulator.lock().await; let signer: Signer = LocalSigner::new( "941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e" @@ -67,11 +63,8 @@ async fn run_test(simulator: Arc>) { let node0 = simulator.start_node().await; let node1 = simulator.start_node().await; - let network = get_network(&simulator.base_opts); - let genesis = network.get_genesis().unwrap(); - // Create a chain with a few empty blocks - let mut base_chain = Chain::new(genesis); + let mut base_chain = simulator.get_base_chain(); for _ in 0..10 { let extended_base_chain = node0.build_payload(base_chain).await; node0.notify_new_payload(&extended_base_chain).await; @@ -124,371 +117,3 @@ async fn run_test(simulator: Arc>) { let new_balance = node0.get_balance(recipient).await; assert_eq!(new_balance, initial_balance); } - -struct Simulator { - cmd_path: PathBuf, - base_opts: Options, - jwt_secret: Bytes, - genesis_path: PathBuf, - configs: Vec, - cancellation_tokens: Vec, -} - -impl Simulator { - fn new(cmd_path: PathBuf) -> Self { - let mut opts = Options::default_l1(); - let jwt_secret = generate_jwt_secret(); - std::fs::write("jwt.hex", hex::encode(&jwt_secret)).unwrap(); - - let genesis_path = std::path::absolute("../../fixtures/genesis/l1-dev.json") - .unwrap() - .canonicalize() - .unwrap(); - - opts.authrpc_jwtsecret = "jwt.hex".to_string(); - opts.dev = false; - opts.http_addr = "localhost".to_string(); - opts.authrpc_addr = "localhost".to_string(); - opts.network = Some(Network::GenesisPath(genesis_path.clone())); - Self { - cmd_path, - base_opts: opts, - genesis_path, - jwt_secret, - configs: vec![], - cancellation_tokens: vec![], - } - } - - fn init_tracing(&self) { - init_tracing(&self.base_opts); - } - - async fn start_node(&mut self) -> Node { - let n = self.configs.len(); - let mut opts = self.base_opts.clone(); - opts.http_port = (8545 + n * 2).to_string(); - opts.authrpc_port = (8545 + n * 2 + 1).to_string(); - opts.p2p_port = (30303 + n).to_string(); - opts.discovery_port = (30303 + n).to_string(); - opts.datadir = format!("data/node{n}").into(); - - let _ = std::fs::remove_dir_all(&opts.datadir); - std::fs::create_dir_all(&opts.datadir).expect("Failed to create data directory"); - - let logs_file = - File::create(format!("data/node{n}.log")).expect("Failed to create logs file"); - - let cancel = CancellationToken::new(); - - self.configs.push(opts.clone()); - self.cancellation_tokens.push(cancel.clone()); - - let mut cmd = Command::new(&self.cmd_path); - cmd.args([ - format!("--http.addr={}", opts.http_addr), - format!("--http.port={}", opts.http_port), - format!("--authrpc.addr={}", opts.authrpc_addr), - format!("--authrpc.port={}", opts.authrpc_port), - format!("--p2p.port={}", opts.p2p_port), - format!("--discovery.port={}", opts.discovery_port), - format!("--datadir={}", opts.datadir.display()), - format!("--network={}", self.genesis_path.display()), - "--force".to_string(), - ]) - .stdin(Stdio::null()) - .stdout(logs_file.try_clone().expect("Failed to clone logs file")) - .stderr(logs_file); - - let child = cmd.spawn().expect("Failed to start ethrex process"); - - tokio::spawn(async move { - let mut child = child; - tokio::select! { - _ = cancel.cancelled() => { - if let Some(pid) = child.id() { - signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM).unwrap(); - } - } - res = child.wait() => { - assert!(res.unwrap().success()); - } - } - }); - - info!( - "Started node {n} at http://{}:{}", - opts.http_addr, opts.http_port - ); - - tokio::time::sleep(Duration::from_millis(200)).await; - - self.get_node(n) - } - - fn stop(&self) { - for token in &self.cancellation_tokens { - token.cancel(); - } - } - - fn get_http_url(&self, index: usize) -> String { - let opts = &self.configs[index]; - format!("http://{}:{}", opts.http_addr, opts.http_port) - } - - fn get_auth_url(&self, index: usize) -> String { - let opts = &self.configs[index]; - format!("http://{}:{}", opts.authrpc_addr, opts.authrpc_port) - } - - fn get_node(&self, index: usize) -> Node { - let auth_url = self.get_auth_url(index); - let engine_client = EngineClient::new(&auth_url, self.jwt_secret.clone()); - - let http_url = self.get_http_url(index); - let rpc_client = EthClient::new(&http_url).unwrap(); - - Node { - index, - engine_client, - rpc_client, - } - } -} - -struct Node { - index: usize, - engine_client: EngineClient, - rpc_client: EthClient, -} - -impl Node { - async fn update_forkchoice(&self, chain: &Chain) { - let fork_choice_state = chain.get_fork_choice_state(); - info!( - node = self.index, - head = %fork_choice_state.head_block_hash, - "Updating fork choice" - ); - - let fork_choice_response = self - .engine_client - .engine_forkchoice_updated_v3(fork_choice_state, None) - .await - .unwrap(); - - assert_eq!( - fork_choice_response.payload_status.status, - PayloadValidationStatus::Valid, - "Validation failed with error: {:?}", - fork_choice_response.payload_status.validation_error - ); - assert!(fork_choice_response.payload_id.is_none()); - } - - async fn build_payload(&self, mut chain: Chain) -> Chain { - let fork_choice_state = chain.get_fork_choice_state(); - let payload_attributes = chain.get_next_payload_attributes(); - let head = fork_choice_state.head_block_hash; - - let parent_beacon_block_root = payload_attributes.parent_beacon_block_root; - - info!( - node = self.index, - %head, - "Starting payload build" - ); - - let fork_choice_response = self - .engine_client - .engine_forkchoice_updated_v3(fork_choice_state, Some(payload_attributes)) - .await - .unwrap(); - - assert_eq!( - fork_choice_response.payload_status.status, - PayloadValidationStatus::Valid, - "Validation failed with error: {:?}", - fork_choice_response.payload_status.validation_error - ); - let payload_id = fork_choice_response.payload_id.unwrap(); - - let payload_response = self - .engine_client - .engine_get_payload_v4(payload_id) - .await - .unwrap(); - - let requests_hash = compute_requests_hash(&payload_response.execution_requests.unwrap()); - let block = payload_response - .execution_payload - .into_block(parent_beacon_block_root, Some(requests_hash)) - .unwrap(); - - info!( - node = self.index, - %head, - block = %block.hash(), - "#txs"=%block.body.transactions.len(), - "Built payload" - ); - chain.append_block(block); - chain - } - - async fn notify_new_payload(&self, chain: &Chain) { - let head = chain.blocks.last().unwrap(); - let execution_payload = ExecutionPayload::from_block(head.clone()); - // Support blobs - // let commitments = execution_payload_response - // .blobs_bundle - // .unwrap_or_default() - // .commitments - // .iter() - // .map(|commitment| { - // let mut hash = keccak256(commitment).0; - // // https://eips.ethereum.org/EIPS/eip-4844 -> kzg_to_versioned_hash - // hash[0] = 0x01; - // H256::from_slice(&hash) - // }) - // .collect(); - let commitments = vec![]; - let parent_beacon_block_root = head.header.parent_beacon_block_root.unwrap(); - let payload_status = self - .engine_client - .engine_new_payload_v4(execution_payload, commitments, parent_beacon_block_root) - .await - .unwrap(); - - assert_eq!( - payload_status.status, - PayloadValidationStatus::Valid, - "Validation failed with error: {:?}", - payload_status.validation_error - ); - } - - async fn send_eth_transfer(&self, signer: &Signer, recipient: H160, amount: u64) { - info!(node = self.index, sender=%signer.address(), %recipient, amount, "Sending ETH transfer tx"); - let chain_id = self - .rpc_client - .get_chain_id() - .await - .unwrap() - .try_into() - .unwrap(); - let sender_address = signer.address(); - let nonce = self - .rpc_client - .get_nonce(sender_address, BlockIdentifier::Tag(BlockTag::Latest)) - .await - .unwrap(); - let tx = EIP1559Transaction { - chain_id, - nonce, - max_priority_fee_per_gas: 0, - max_fee_per_gas: 1_000_000_000, - gas_limit: 50_000, - to: TxKind::Call(recipient), - value: amount.into(), - ..Default::default() - }; - let mut tx = Transaction::EIP1559Transaction(tx); - tx.sign_inplace(signer).await.unwrap(); - let encoded_tx = tx.encode_canonical_to_vec(); - self.rpc_client - .send_raw_transaction(&encoded_tx) - .await - .unwrap(); - } - - async fn get_balance(&self, address: H160) -> U256 { - self.rpc_client - .get_balance(address, Default::default()) - .await - .unwrap() - } -} - -struct Chain { - block_hashes: Vec, - blocks: Vec, - safe_height: usize, -} - -impl Chain { - fn new(genesis: Genesis) -> Self { - let genesis_block = genesis.get_block(); - Self { - block_hashes: vec![genesis_block.hash()], - blocks: vec![genesis_block], - safe_height: 0, - } - } - - fn append_block(&mut self, block: Block) { - self.block_hashes.push(block.hash()); - self.blocks.push(block); - } - - fn fork(&self) -> Self { - Self { - block_hashes: self.block_hashes.clone(), - blocks: self.blocks.clone(), - safe_height: self.safe_height, - } - } - - fn get_fork_choice_state(&self) -> ForkChoiceState { - let head_block_hash = *self.block_hashes.last().unwrap(); - let finalized_block_hash = self.block_hashes[self.safe_height]; - ForkChoiceState { - head_block_hash, - safe_block_hash: finalized_block_hash, - finalized_block_hash, - } - } - - fn get_next_payload_attributes(&self) -> PayloadAttributesV3 { - let timestamp = self.blocks.last().unwrap().header.timestamp + 12; - let head_hash = self.get_fork_choice_state().head_block_hash; - // Generate dummy values by hashing multiple times - let parent_beacon_block_root = keccak256(&head_hash.0); - let prev_randao = keccak256(&parent_beacon_block_root.0); - // Address of 0x941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e, a rich account - let suggested_fee_recipient = H160( - hex::decode("4417092B70a3E5f10Dc504d0947DD256B965fc62") - .unwrap() - .try_into() - .unwrap(), - ); - // TODO: add withdrawals - let withdrawals = vec![]; - PayloadAttributesV3 { - timestamp, - prev_randao, - suggested_fee_recipient, - parent_beacon_block_root: Some(parent_beacon_block_root), - withdrawals: Some(withdrawals), - } - } -} - -fn generate_jwt_secret() -> Bytes { - use rand::Rng; - let mut rng = rand::thread_rng(); - let mut secret = [0u8; 32]; - rng.fill(&mut secret); - Bytes::from(secret.to_vec()) -} - -fn keccak256(data: &[u8]) -> H256 { - H256( - Sha256::new_with_prefix(data) - .finalize() - .as_slice() - .try_into() - .unwrap(), - ) -} diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs new file mode 100644 index 0000000000..60da5ccbd6 --- /dev/null +++ b/tooling/reorgs/src/simulator.rs @@ -0,0 +1,395 @@ +use std::{fs::File, path::PathBuf, process::Stdio, time::Duration}; + +use ethrex::{cli::Options, initializers::get_network}; +use ethrex_common::{ + Bytes, H160, H256, U256, + types::{ + Block, EIP1559Transaction, Genesis, Transaction, TxKind, requests::compute_requests_hash, + }, +}; +use ethrex_config::networks::Network; +use ethrex_l2_rpc::signer::{Signable, Signer}; +use ethrex_rpc::{ + EngineClient, EthClient, + types::{ + block_identifier::{BlockIdentifier, BlockTag}, + fork_choice::{ForkChoiceState, PayloadAttributesV3}, + payload::{ExecutionPayload, PayloadValidationStatus}, + }, +}; +use nix::sys::signal::{self, Signal}; +use nix::unistd::Pid; +use sha2::{Digest, Sha256}; +use tokio::process::Command; +use tokio_util::sync::CancellationToken; +use tracing::info; + +pub struct Simulator { + cmd_path: PathBuf, + base_opts: Options, + jwt_secret: Bytes, + genesis_path: PathBuf, + configs: Vec, + cancellation_tokens: Vec, +} + +impl Simulator { + pub fn new(cmd_path: PathBuf) -> Self { + let mut opts = Options::default_l1(); + let jwt_secret = generate_jwt_secret(); + std::fs::write("jwt.hex", hex::encode(&jwt_secret)).unwrap(); + + let genesis_path = std::path::absolute("../../fixtures/genesis/l1-dev.json") + .unwrap() + .canonicalize() + .unwrap(); + + opts.authrpc_jwtsecret = "jwt.hex".to_string(); + opts.dev = false; + opts.http_addr = "localhost".to_string(); + opts.authrpc_addr = "localhost".to_string(); + opts.network = Some(Network::GenesisPath(genesis_path.clone())); + Self { + cmd_path, + base_opts: opts, + genesis_path, + jwt_secret, + configs: vec![], + cancellation_tokens: vec![], + } + } + + pub fn get_base_chain(&self) -> Chain { + let network = get_network(&self.base_opts); + let genesis = network.get_genesis().unwrap(); + Chain::new(genesis) + } + + pub async fn start_node(&mut self) -> Node { + let n = self.configs.len(); + let mut opts = self.base_opts.clone(); + opts.http_port = (8545 + n * 2).to_string(); + opts.authrpc_port = (8545 + n * 2 + 1).to_string(); + opts.p2p_port = (30303 + n).to_string(); + opts.discovery_port = (30303 + n).to_string(); + opts.datadir = format!("data/node{n}").into(); + + let _ = std::fs::remove_dir_all(&opts.datadir); + std::fs::create_dir_all(&opts.datadir).expect("Failed to create data directory"); + + let logs_file = + File::create(format!("data/node{n}.log")).expect("Failed to create logs file"); + + let cancel = CancellationToken::new(); + + self.configs.push(opts.clone()); + self.cancellation_tokens.push(cancel.clone()); + + let mut cmd = Command::new(&self.cmd_path); + cmd.args([ + format!("--http.addr={}", opts.http_addr), + format!("--http.port={}", opts.http_port), + format!("--authrpc.addr={}", opts.authrpc_addr), + format!("--authrpc.port={}", opts.authrpc_port), + format!("--p2p.port={}", opts.p2p_port), + format!("--discovery.port={}", opts.discovery_port), + format!("--datadir={}", opts.datadir.display()), + format!("--network={}", self.genesis_path.display()), + "--force".to_string(), + ]) + .stdin(Stdio::null()) + .stdout(logs_file.try_clone().expect("Failed to clone logs file")) + .stderr(logs_file); + + let child = cmd.spawn().expect("Failed to start ethrex process"); + + tokio::spawn(async move { + let mut child = child; + tokio::select! { + _ = cancel.cancelled() => { + if let Some(pid) = child.id() { + signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM).unwrap(); + } + } + res = child.wait() => { + assert!(res.unwrap().success()); + } + } + }); + + info!( + "Started node {n} at http://{}:{}", + opts.http_addr, opts.http_port + ); + + tokio::time::sleep(Duration::from_millis(200)).await; + + self.get_node(n) + } + + pub fn stop(&self) { + for token in &self.cancellation_tokens { + token.cancel(); + } + } + + fn get_http_url(&self, index: usize) -> String { + let opts = &self.configs[index]; + format!("http://{}:{}", opts.http_addr, opts.http_port) + } + + fn get_auth_url(&self, index: usize) -> String { + let opts = &self.configs[index]; + format!("http://{}:{}", opts.authrpc_addr, opts.authrpc_port) + } + + fn get_node(&self, index: usize) -> Node { + let auth_url = self.get_auth_url(index); + let engine_client = EngineClient::new(&auth_url, self.jwt_secret.clone()); + + let http_url = self.get_http_url(index); + let rpc_client = EthClient::new(&http_url).unwrap(); + + Node { + index, + engine_client, + rpc_client, + } + } +} + +pub struct Node { + index: usize, + engine_client: EngineClient, + rpc_client: EthClient, +} + +impl Node { + pub async fn update_forkchoice(&self, chain: &Chain) { + let fork_choice_state = chain.get_fork_choice_state(); + info!( + node = self.index, + head = %fork_choice_state.head_block_hash, + "Updating fork choice" + ); + + let fork_choice_response = self + .engine_client + .engine_forkchoice_updated_v3(fork_choice_state, None) + .await + .unwrap(); + + assert_eq!( + fork_choice_response.payload_status.status, + PayloadValidationStatus::Valid, + "Validation failed with error: {:?}", + fork_choice_response.payload_status.validation_error + ); + assert!(fork_choice_response.payload_id.is_none()); + } + + pub async fn build_payload(&self, mut chain: Chain) -> Chain { + let fork_choice_state = chain.get_fork_choice_state(); + let payload_attributes = chain.get_next_payload_attributes(); + let head = fork_choice_state.head_block_hash; + + let parent_beacon_block_root = payload_attributes.parent_beacon_block_root; + + info!( + node = self.index, + %head, + "Starting payload build" + ); + + let fork_choice_response = self + .engine_client + .engine_forkchoice_updated_v3(fork_choice_state, Some(payload_attributes)) + .await + .unwrap(); + + assert_eq!( + fork_choice_response.payload_status.status, + PayloadValidationStatus::Valid, + "Validation failed with error: {:?}", + fork_choice_response.payload_status.validation_error + ); + let payload_id = fork_choice_response.payload_id.unwrap(); + + let payload_response = self + .engine_client + .engine_get_payload_v4(payload_id) + .await + .unwrap(); + + let requests_hash = compute_requests_hash(&payload_response.execution_requests.unwrap()); + let block = payload_response + .execution_payload + .into_block(parent_beacon_block_root, Some(requests_hash)) + .unwrap(); + + info!( + node = self.index, + %head, + block = %block.hash(), + "#txs"=%block.body.transactions.len(), + "Built payload" + ); + chain.append_block(block); + chain + } + + pub async fn notify_new_payload(&self, chain: &Chain) { + let head = chain.blocks.last().unwrap(); + let execution_payload = ExecutionPayload::from_block(head.clone()); + // Support blobs + // let commitments = execution_payload_response + // .blobs_bundle + // .unwrap_or_default() + // .commitments + // .iter() + // .map(|commitment| { + // let mut hash = keccak256(commitment).0; + // // https://eips.ethereum.org/EIPS/eip-4844 -> kzg_to_versioned_hash + // hash[0] = 0x01; + // H256::from_slice(&hash) + // }) + // .collect(); + let commitments = vec![]; + let parent_beacon_block_root = head.header.parent_beacon_block_root.unwrap(); + let payload_status = self + .engine_client + .engine_new_payload_v4(execution_payload, commitments, parent_beacon_block_root) + .await + .unwrap(); + + assert_eq!( + payload_status.status, + PayloadValidationStatus::Valid, + "Validation failed with error: {:?}", + payload_status.validation_error + ); + } + + pub async fn send_eth_transfer(&self, signer: &Signer, recipient: H160, amount: u64) { + info!(node = self.index, sender=%signer.address(), %recipient, amount, "Sending ETH transfer tx"); + let chain_id = self + .rpc_client + .get_chain_id() + .await + .unwrap() + .try_into() + .unwrap(); + let sender_address = signer.address(); + let nonce = self + .rpc_client + .get_nonce(sender_address, BlockIdentifier::Tag(BlockTag::Latest)) + .await + .unwrap(); + let tx = EIP1559Transaction { + chain_id, + nonce, + max_priority_fee_per_gas: 0, + max_fee_per_gas: 1_000_000_000, + gas_limit: 50_000, + to: TxKind::Call(recipient), + value: amount.into(), + ..Default::default() + }; + let mut tx = Transaction::EIP1559Transaction(tx); + tx.sign_inplace(signer).await.unwrap(); + let encoded_tx = tx.encode_canonical_to_vec(); + self.rpc_client + .send_raw_transaction(&encoded_tx) + .await + .unwrap(); + } + + pub async fn get_balance(&self, address: H160) -> U256 { + self.rpc_client + .get_balance(address, Default::default()) + .await + .unwrap() + } +} + +pub struct Chain { + block_hashes: Vec, + blocks: Vec, + safe_height: usize, +} + +impl Chain { + fn new(genesis: Genesis) -> Self { + let genesis_block = genesis.get_block(); + Self { + block_hashes: vec![genesis_block.hash()], + blocks: vec![genesis_block], + safe_height: 0, + } + } + + fn append_block(&mut self, block: Block) { + self.block_hashes.push(block.hash()); + self.blocks.push(block); + } + + pub fn fork(&self) -> Self { + Self { + block_hashes: self.block_hashes.clone(), + blocks: self.blocks.clone(), + safe_height: self.safe_height, + } + } + + fn get_fork_choice_state(&self) -> ForkChoiceState { + let head_block_hash = *self.block_hashes.last().unwrap(); + let finalized_block_hash = self.block_hashes[self.safe_height]; + ForkChoiceState { + head_block_hash, + safe_block_hash: finalized_block_hash, + finalized_block_hash, + } + } + + fn get_next_payload_attributes(&self) -> PayloadAttributesV3 { + let timestamp = self.blocks.last().unwrap().header.timestamp + 12; + let head_hash = self.get_fork_choice_state().head_block_hash; + // Generate dummy values by hashing multiple times + let parent_beacon_block_root = keccak256(&head_hash.0); + let prev_randao = keccak256(&parent_beacon_block_root.0); + // Address of 0x941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e, a rich account + let suggested_fee_recipient = H160( + hex::decode("4417092B70a3E5f10Dc504d0947DD256B965fc62") + .unwrap() + .try_into() + .unwrap(), + ); + // TODO: add withdrawals + let withdrawals = vec![]; + PayloadAttributesV3 { + timestamp, + prev_randao, + suggested_fee_recipient, + parent_beacon_block_root: Some(parent_beacon_block_root), + withdrawals: Some(withdrawals), + } + } +} + +fn generate_jwt_secret() -> Bytes { + use rand::Rng; + let mut rng = rand::thread_rng(); + let mut secret = [0u8; 32]; + rng.fill(&mut secret); + Bytes::from(secret.to_vec()) +} + +fn keccak256(data: &[u8]) -> H256 { + H256( + Sha256::new_with_prefix(data) + .finalize() + .as_slice() + .try_into() + .unwrap(), + ) +} From 496bfaf534b88088cc5950bc00474855ea06887d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 17:48:34 -0300 Subject: [PATCH 04/37] feat: connect peers via P2P --- tooling/reorgs/src/simulator.rs | 50 +++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs index 60da5ccbd6..c4f2a2632c 100644 --- a/tooling/reorgs/src/simulator.rs +++ b/tooling/reorgs/src/simulator.rs @@ -1,4 +1,4 @@ -use std::{fs::File, path::PathBuf, process::Stdio, time::Duration}; +use std::{fs::File, io::Read, path::PathBuf, process::Stdio, time::Duration}; use ethrex::{cli::Options, initializers::get_network}; use ethrex_common::{ @@ -30,6 +30,7 @@ pub struct Simulator { jwt_secret: Bytes, genesis_path: PathBuf, configs: Vec, + enodes: Vec, cancellation_tokens: Vec, } @@ -56,6 +57,7 @@ impl Simulator { jwt_secret, configs: vec![], cancellation_tokens: vec![], + enodes: vec![], } } @@ -77,8 +79,8 @@ impl Simulator { let _ = std::fs::remove_dir_all(&opts.datadir); std::fs::create_dir_all(&opts.datadir).expect("Failed to create data directory"); - let logs_file = - File::create(format!("data/node{n}.log")).expect("Failed to create logs file"); + let logs_file_path = format!("data/node{n}.log"); + let logs_file = File::create(&logs_file_path).expect("Failed to create logs file"); let cancel = CancellationToken::new(); @@ -98,11 +100,22 @@ impl Simulator { "--force".to_string(), ]) .stdin(Stdio::null()) - .stdout(logs_file.try_clone().expect("Failed to clone logs file")) + .stdout(logs_file.try_clone().unwrap()) .stderr(logs_file); + if !self.enodes.is_empty() { + cmd.arg(format!("--bootnodes={}", self.enodes.join(","))); + } + let child = cmd.spawn().expect("Failed to start ethrex process"); + let logs_file = File::open(&logs_file_path).expect("Failed to open logs file"); + let enode = + tokio::time::timeout(Duration::from_secs(1), wait_for_initialization(logs_file)) + .await + .expect("node initialization timed out"); + self.enodes.push(enode); + tokio::spawn(async move { let mut child = child; tokio::select! { @@ -122,8 +135,6 @@ impl Simulator { opts.http_addr, opts.http_port ); - tokio::time::sleep(Duration::from_millis(200)).await; - self.get_node(n) } @@ -158,6 +169,33 @@ impl Simulator { } } +/// Waits until the node is initialized by reading its logs. +/// Returns the enode URL of the node. +async fn wait_for_initialization(mut logs_file: File) -> String { + const NODE_STARTED_LOG: &str = "Starting Auth-RPC server at"; + + let mut file_contents = String::new(); + + // Wait a bit until the node starts + loop { + tokio::time::sleep(Duration::from_millis(100)).await; + + logs_file.read_to_string(&mut file_contents).unwrap(); + + if file_contents.contains(NODE_STARTED_LOG) { + break; + } + } + let node_enode_log = file_contents + .lines() + .find(|line| line.contains("Local node initialized")) + .unwrap(); + // Look for the "enode://node_id@host:port" part + let prefix = "enode://"; + let node_enode = node_enode_log.split_once(prefix).unwrap().1; + format!("{prefix}{}", node_enode.trim_end()) +} + pub struct Node { index: usize, engine_client: EngineClient, From e685093e89cf0cac7968106aabb84be18c781bd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 18:00:59 -0300 Subject: [PATCH 05/37] test: add second test --- tooling/reorgs/src/main.rs | 82 +++++++++++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 2 deletions(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 9233771c3d..27a8b05a0b 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -26,7 +26,8 @@ async fn main() { "../../target/debug/ethrex".parse().unwrap() }); - run_test(&cmd_path, test_simple_reorg_and_back).await; + // run_test(&cmd_path, test_one_block_reorg_and_back).await; + run_test(&cmd_path, test_many_blocks_reorg).await; } async fn run_test(cmd_path: &Path, test_fn: F) @@ -48,7 +49,7 @@ where } } -async fn test_simple_reorg_and_back(simulator: Arc>) { +async fn test_one_block_reorg_and_back(simulator: Arc>) { let mut simulator = simulator.lock().await; let signer: Signer = LocalSigner::new( "941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e" @@ -117,3 +118,80 @@ async fn test_simple_reorg_and_back(simulator: Arc>) { let new_balance = node0.get_balance(recipient).await; assert_eq!(new_balance, initial_balance); } + +async fn test_many_blocks_reorg(simulator: Arc>) { + let mut simulator = simulator.lock().await; + let signer: Signer = LocalSigner::new( + "941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e" + .parse() + .unwrap(), + ) + .into(); + // Some random address + let recipient = "941e103320615d394a55708be13e45994c7d93b0".parse().unwrap(); + let transfer_amount = 1000000; + + let node0 = simulator.start_node().await; + let node1 = simulator.start_node().await; + + // Create a chain with a few empty blocks + let mut base_chain = simulator.get_base_chain(); + for _ in 0..10 { + let extended_base_chain = node0.build_payload(base_chain).await; + node0.notify_new_payload(&extended_base_chain).await; + node0.update_forkchoice(&extended_base_chain).await; + + node1.notify_new_payload(&extended_base_chain).await; + node1.update_forkchoice(&extended_base_chain).await; + base_chain = extended_base_chain; + } + + let initial_balance = node0.get_balance(recipient).await; + + // Fork the chain + let mut side_chain = base_chain.fork(); + + // Create a side chain with multiple blocks only known to node0 + for _ in 0..10 { + side_chain = node0.build_payload(side_chain).await; + node0.notify_new_payload(&side_chain).await; + node0.update_forkchoice(&side_chain).await; + } + + // Sanity check: balance hasn't changed + let same_balance = node0.get_balance(recipient).await; + assert_eq!(same_balance, initial_balance); + + // Make an ETH transfer in the base chain + // NOTE: we do this to ensure both blocks are different + node1 + .send_eth_transfer(&signer, recipient, transfer_amount) + .await; + + // Advance the base chain with multiple blocks only known to node1 + for _ in 0..10 { + base_chain = node1.build_payload(base_chain).await; + node1.notify_new_payload(&base_chain).await; + node1.update_forkchoice(&base_chain).await; + } + + // Sanity check: balance hasn't changed + let same_balance = node0.get_balance(recipient).await; + assert_eq!(same_balance, initial_balance); + + // Advance the side chain with one more block and another ETH transfer + node1 + .send_eth_transfer(&signer, recipient, transfer_amount) + .await; + base_chain = node1.build_payload(base_chain).await; + node1.notify_new_payload(&base_chain).await; + node1.update_forkchoice(&base_chain).await; + + // Bring node0 again to the base chain, it should reorg + node0.notify_new_payload(&base_chain).await; + node0.update_forkchoice(&base_chain).await; + + // Check the transfer has been processed + let new_balance = node0.get_balance(recipient).await; + assert_eq!(new_balance, initial_balance + transfer_amount); +} From 05bf17d345123b917b082508852377b4f02931ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 18:09:19 -0300 Subject: [PATCH 06/37] fix: handle syncing status --- crates/networking/rpc/types/fork_choice.rs | 2 +- tooling/reorgs/src/simulator.rs | 38 +++++++++++----------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/crates/networking/rpc/types/fork_choice.rs b/crates/networking/rpc/types/fork_choice.rs index a010cdc681..1411958582 100644 --- a/crates/networking/rpc/types/fork_choice.rs +++ b/crates/networking/rpc/types/fork_choice.rs @@ -2,7 +2,7 @@ use super::payload::PayloadStatus; use ethrex_common::{Address, H256, serde_utils, types::Withdrawal}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct ForkChoiceState { #[allow(unused)] diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs index c4f2a2632c..76adb76da2 100644 --- a/tooling/reorgs/src/simulator.rs +++ b/tooling/reorgs/src/simulator.rs @@ -1,6 +1,7 @@ use std::{fs::File, io::Read, path::PathBuf, process::Stdio, time::Duration}; use ethrex::{cli::Options, initializers::get_network}; +use ethrex_blockchain::fork_choice; use ethrex_common::{ Bytes, H160, H256, U256, types::{ @@ -210,20 +211,11 @@ impl Node { head = %fork_choice_state.head_block_hash, "Updating fork choice" ); + let syncing_fut = wait_until_synced(&self.engine_client, fork_choice_state); - let fork_choice_response = self - .engine_client - .engine_forkchoice_updated_v3(fork_choice_state, None) + tokio::time::timeout(Duration::from_secs(5), syncing_fut) .await .unwrap(); - - assert_eq!( - fork_choice_response.payload_status.status, - PayloadValidationStatus::Valid, - "Validation failed with error: {:?}", - fork_choice_response.payload_status.validation_error - ); - assert!(fork_choice_response.payload_id.is_none()); } pub async fn build_payload(&self, mut chain: Chain) -> Chain { @@ -294,18 +286,11 @@ impl Node { // .collect(); let commitments = vec![]; let parent_beacon_block_root = head.header.parent_beacon_block_root.unwrap(); - let payload_status = self + let _payload_status = self .engine_client .engine_new_payload_v4(execution_payload, commitments, parent_beacon_block_root) .await .unwrap(); - - assert_eq!( - payload_status.status, - PayloadValidationStatus::Valid, - "Validation failed with error: {:?}", - payload_status.validation_error - ); } pub async fn send_eth_transfer(&self, signer: &Signer, recipient: H160, amount: u64) { @@ -431,3 +416,18 @@ fn keccak256(data: &[u8]) -> H256 { .unwrap(), ) } + +async fn wait_until_synced(engine_client: &EngineClient, fork_choice_state: ForkChoiceState) { + loop { + let fork_choice_response = engine_client + .engine_forkchoice_updated_v3(fork_choice_state, None) + .await + .unwrap(); + + let status = fork_choice_response.payload_status.status; + if status == PayloadValidationStatus::Valid { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } +} From db158b487383d5ef11af0eb960649b9f6d3808cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 18:09:57 -0300 Subject: [PATCH 07/37] chore: remove unused import --- tooling/reorgs/src/simulator.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs index 76adb76da2..936abb621a 100644 --- a/tooling/reorgs/src/simulator.rs +++ b/tooling/reorgs/src/simulator.rs @@ -1,7 +1,6 @@ use std::{fs::File, io::Read, path::PathBuf, process::Stdio, time::Duration}; use ethrex::{cli::Options, initializers::get_network}; -use ethrex_blockchain::fork_choice; use ethrex_common::{ Bytes, H160, H256, U256, types::{ From 8ed7a6caed425903a8fff101d7da32e55a7758ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 18:28:16 -0300 Subject: [PATCH 08/37] feat: improve error log --- tooling/reorgs/src/main.rs | 3 ++- tooling/reorgs/src/simulator.rs | 10 +++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 27a8b05a0b..7092a17b4f 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -6,6 +6,7 @@ use std::{ use ethrex::{cli::Options, initializers::init_tracing}; use ethrex_l2_rpc::signer::{LocalSigner, Signer}; use tokio::sync::Mutex; +use tracing::warn; use crate::simulator::Simulator; @@ -22,7 +23,7 @@ async fn main() { .nth(1) .map(|o| o.parse().unwrap()) .unwrap_or_else(|| { - println!("No binary path provided, using default"); + warn!("No binary path provided, using default"); "../../target/debug/ethrex".parse().unwrap() }); diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs index 936abb621a..37a8dd74a7 100644 --- a/tooling/reorgs/src/simulator.rs +++ b/tooling/reorgs/src/simulator.rs @@ -22,7 +22,7 @@ use nix::unistd::Pid; use sha2::{Digest, Sha256}; use tokio::process::Command; use tokio_util::sync::CancellationToken; -use tracing::info; +use tracing::{error, info}; pub struct Simulator { cmd_path: PathBuf, @@ -69,6 +69,7 @@ impl Simulator { pub async fn start_node(&mut self) -> Node { let n = self.configs.len(); + info!(node = n, "Starting node"); let mut opts = self.base_opts.clone(); opts.http_port = (8545 + n * 2).to_string(); opts.authrpc_port = (8545 + n * 2 + 1).to_string(); @@ -111,7 +112,7 @@ impl Simulator { let logs_file = File::open(&logs_file_path).expect("Failed to open logs file"); let enode = - tokio::time::timeout(Duration::from_secs(1), wait_for_initialization(logs_file)) + tokio::time::timeout(Duration::from_secs(5), wait_for_initialization(logs_file)) .await .expect("node initialization timed out"); self.enodes.push(enode); @@ -214,7 +215,10 @@ impl Node { tokio::time::timeout(Duration::from_secs(5), syncing_fut) .await - .unwrap(); + .inspect_err(|_| { + error!(node = self.index, "Timed out waiting for node to sync"); + }) + .expect("timed out waiting for node to sync"); } pub async fn build_payload(&self, mut chain: Chain) -> Chain { From f3fe84567ea740278e667c90b9d1c07a1c0507b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 18:28:32 -0300 Subject: [PATCH 09/37] docs: add readme with instructions to run it --- tooling/reorgs/README.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 tooling/reorgs/README.md diff --git a/tooling/reorgs/README.md b/tooling/reorgs/README.md new file mode 100644 index 0000000000..37688a9540 --- /dev/null +++ b/tooling/reorgs/README.md @@ -0,0 +1,23 @@ +# Reorg integration tests + +This directory contains tests for chain reorganization. + +## How to run + +First, compile the `ethrex` binary if you haven't already: + +```bash +cargo build --workspace --bin ethrex +``` + +Then, run the reorg tests using: + +```bash +cargo run +``` + +You can run a custom binary by specifying the path: + +```bash +cargo run -- /path/to/your/binary +``` From ffd27bdc1003af9fed1e839ad7df026db4b4b00c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 18:39:32 -0300 Subject: [PATCH 10/37] fix: improve multi-test runs --- tooling/reorgs/src/main.rs | 30 +++++++++++++++++++++--------- tooling/reorgs/src/simulator.rs | 1 + 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 7092a17b4f..688ac55bf0 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -6,7 +6,7 @@ use std::{ use ethrex::{cli::Options, initializers::init_tracing}; use ethrex_l2_rpc::signer::{LocalSigner, Signer}; use tokio::sync::Mutex; -use tracing::warn; +use tracing::{error, info, warn}; use crate::simulator::Simulator; @@ -22,12 +22,12 @@ async fn main() { let cmd_path: PathBuf = std::env::args() .nth(1) .map(|o| o.parse().unwrap()) - .unwrap_or_else(|| { - warn!("No binary path provided, using default"); - "../../target/debug/ethrex".parse().unwrap() - }); + .unwrap_or_else(|| "../../target/debug/ethrex".parse().unwrap()); - // run_test(&cmd_path, test_one_block_reorg_and_back).await; + info!(binary = %cmd_path.display(), "Starting test run"); + info!(""); + + run_test(&cmd_path, test_one_block_reorg_and_back).await; run_test(&cmd_path, test_many_blocks_reorg).await; } @@ -36,6 +36,10 @@ where F: Fn(Arc>) -> Fut, Fut: Future + Send + 'static, { + let test_name = std::any::type_name::(); + let start = std::time::Instant::now(); + + info!(test=%test_name, "Running test"); let simulator = Arc::new(Mutex::new(Simulator::new(cmd_path.to_path_buf()))); // Run in another task to clean up properly on panic @@ -44,10 +48,18 @@ where simulator.lock_owned().await.stop(); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - if result.is_err() { - eprintln!("Test panicked"); - std::process::exit(1); + match result { + Ok(_) => info!(test=%test_name, elapsed=?start.elapsed(), "test completed successfully"), + Err(err) if err.is_panic() => { + error!(test=%test_name, %err, "test panicked"); + std::process::exit(1); + } + Err(err) => { + warn!(test=%test_name, %err, "test task was cancelled"); + } } + // Add a blank line after each test for readability + info!(""); } async fn test_one_block_reorg_and_back(simulator: Arc>) { diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs index 37a8dd74a7..c3c04b4eab 100644 --- a/tooling/reorgs/src/simulator.rs +++ b/tooling/reorgs/src/simulator.rs @@ -122,6 +122,7 @@ impl Simulator { tokio::select! { _ = cancel.cancelled() => { if let Some(pid) = child.id() { + // NOTE: we use SIGTERM instead of child.kill() so sockets are closed signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM).unwrap(); } } From 07411725a7c243beb36708a7e775d6f3fb9ffb28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 18:46:29 -0300 Subject: [PATCH 11/37] ci: run reorg tests in CI --- .github/workflows/pr-main_l1.yaml | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pr-main_l1.yaml b/.github/workflows/pr-main_l1.yaml index 30d4138f45..7a6ef233cc 100644 --- a/.github/workflows/pr-main_l1.yaml +++ b/.github/workflows/pr-main_l1.yaml @@ -217,7 +217,7 @@ jobs: # The purpose of this job is to add it as a required check in GitHub so that we don't have to add every individual job as a required check all-tests: - # "Integration Test" is a required check, don't change the name + # "Integration Test" is a required check, don't change the name name: Integration Test runs-on: ubuntu-latest needs: [run-assertoor, run-hive] @@ -235,3 +235,20 @@ jobs: echo "Job Hive failed" exit 1 fi + + reorg-tests: + name: Reorg Tests + runs-on: ubuntu-latest + if: ${{ github.event_name != 'merge_group' }} + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Setup Rust Environment + uses: ./.github/actions/setup-rust + + - name: Compile ethrex binary + run: cargo build --bin ethrex + + - name: Run reorg tests + run: cd tooling/reorgs && cargo run From de50cc8236f4037d66ea41a4308d3daa3947ddce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 19:10:25 -0300 Subject: [PATCH 12/37] feat: print ethrex version when running tests --- tooling/reorgs/src/main.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 688ac55bf0..ce1090417b 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -1,5 +1,6 @@ use std::{ path::{Path, PathBuf}, + process::Command, sync::Arc, }; @@ -24,13 +25,24 @@ async fn main() { .map(|o| o.parse().unwrap()) .unwrap_or_else(|| "../../target/debug/ethrex".parse().unwrap()); - info!(binary = %cmd_path.display(), "Starting test run"); + let version = get_ethrex_version(&cmd_path).await; + + info!(%version, binary_path = %cmd_path.display(), "Fetched ethrex binary version"); + info!("Starting test run"); info!(""); run_test(&cmd_path, test_one_block_reorg_and_back).await; run_test(&cmd_path, test_many_blocks_reorg).await; } +async fn get_ethrex_version(cmd_path: &Path) -> String { + let version_output = Command::new(&cmd_path) + .arg("--version") + .output() + .expect("failed to get ethrex version"); + String::from_utf8(version_output.stdout).expect("failed to parse version output") +} + async fn run_test(cmd_path: &Path, test_fn: F) where F: Fn(Arc>) -> Fut, From 172649406205981ef3de233bae47c50d16880f5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 23 Sep 2025 19:11:10 -0300 Subject: [PATCH 13/37] chore: comment failing test --- tooling/reorgs/src/main.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index ce1090417b..8e3fe00562 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -32,7 +32,9 @@ async fn main() { info!(""); run_test(&cmd_path, test_one_block_reorg_and_back).await; - run_test(&cmd_path, test_many_blocks_reorg).await; + + // TODO: this test is failing + // run_test(&cmd_path, test_many_blocks_reorg).await; } async fn get_ethrex_version(cmd_path: &Path) -> String { @@ -144,6 +146,7 @@ async fn test_one_block_reorg_and_back(simulator: Arc>) { assert_eq!(new_balance, initial_balance); } +#[expect(unused)] async fn test_many_blocks_reorg(simulator: Arc>) { let mut simulator = simulator.lock().await; let signer: Signer = LocalSigner::new( From 9c6774d0139674e58cb5854003424b8a247effcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 24 Sep 2025 11:30:46 -0300 Subject: [PATCH 14/37] chore: fix clippy lint --- tooling/reorgs/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 8e3fe00562..17c50c7b42 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -38,7 +38,7 @@ async fn main() { } async fn get_ethrex_version(cmd_path: &Path) -> String { - let version_output = Command::new(&cmd_path) + let version_output = Command::new(cmd_path) .arg("--version") .output() .expect("failed to get ethrex version"); From 9279bf058f4af5be1fba704a8cef565721f38834 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 25 Sep 2025 15:20:23 -0300 Subject: [PATCH 15/37] fix: differentiate blocks according to builder --- tooling/reorgs/src/main.rs | 12 +++--------- tooling/reorgs/src/simulator.rs | 12 ++++-------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 17c50c7b42..d79c3638dd 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -34,7 +34,7 @@ async fn main() { run_test(&cmd_path, test_one_block_reorg_and_back).await; // TODO: this test is failing - // run_test(&cmd_path, test_many_blocks_reorg).await; + run_test(&cmd_path, test_many_blocks_reorg).await; } async fn get_ethrex_version(cmd_path: &Path) -> String { @@ -146,7 +146,7 @@ async fn test_one_block_reorg_and_back(simulator: Arc>) { assert_eq!(new_balance, initial_balance); } -#[expect(unused)] +// #[expect(unused)] async fn test_many_blocks_reorg(simulator: Arc>) { let mut simulator = simulator.lock().await; let signer: Signer = LocalSigner::new( @@ -190,12 +190,6 @@ async fn test_many_blocks_reorg(simulator: Arc>) { let same_balance = node0.get_balance(recipient).await; assert_eq!(same_balance, initial_balance); - // Make an ETH transfer in the base chain - // NOTE: we do this to ensure both blocks are different - node1 - .send_eth_transfer(&signer, recipient, transfer_amount) - .await; - // Advance the base chain with multiple blocks only known to node1 for _ in 0..10 { base_chain = node1.build_payload(base_chain).await; @@ -207,7 +201,7 @@ async fn test_many_blocks_reorg(simulator: Arc>) { let same_balance = node0.get_balance(recipient).await; assert_eq!(same_balance, initial_balance); - // Advance the side chain with one more block and another ETH transfer + // Advance the side chain with one more block and an ETH transfer node1 .send_eth_transfer(&signer, recipient, transfer_amount) .await; diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs index c3c04b4eab..b70617f2a0 100644 --- a/tooling/reorgs/src/simulator.rs +++ b/tooling/reorgs/src/simulator.rs @@ -224,7 +224,9 @@ impl Node { pub async fn build_payload(&self, mut chain: Chain) -> Chain { let fork_choice_state = chain.get_fork_choice_state(); - let payload_attributes = chain.get_next_payload_attributes(); + let mut payload_attributes = chain.get_next_payload_attributes(); + // Set index as fee recipient to differentiate between nodes + payload_attributes.suggested_fee_recipient = H160::from_low_u64_be(self.index as u64); let head = fork_choice_state.head_block_hash; let parent_beacon_block_root = payload_attributes.parent_beacon_block_root; @@ -384,13 +386,7 @@ impl Chain { // Generate dummy values by hashing multiple times let parent_beacon_block_root = keccak256(&head_hash.0); let prev_randao = keccak256(&parent_beacon_block_root.0); - // Address of 0x941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e, a rich account - let suggested_fee_recipient = H160( - hex::decode("4417092B70a3E5f10Dc504d0947DD256B965fc62") - .unwrap() - .try_into() - .unwrap(), - ); + let suggested_fee_recipient = Default::default(); // TODO: add withdrawals let withdrawals = vec![]; PayloadAttributesV3 { From e8d65cae0d33a54efb9e9452ade1cc5603fcc0d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 25 Sep 2025 15:30:07 -0300 Subject: [PATCH 16/37] chore: comment failing test again --- tooling/reorgs/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index d79c3638dd..71bba40309 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -34,7 +34,7 @@ async fn main() { run_test(&cmd_path, test_one_block_reorg_and_back).await; // TODO: this test is failing - run_test(&cmd_path, test_many_blocks_reorg).await; + // run_test(&cmd_path, test_many_blocks_reorg).await; } async fn get_ethrex_version(cmd_path: &Path) -> String { @@ -146,7 +146,7 @@ async fn test_one_block_reorg_and_back(simulator: Arc>) { assert_eq!(new_balance, initial_balance); } -// #[expect(unused)] +#[expect(unused)] async fn test_many_blocks_reorg(simulator: Arc>) { let mut simulator = simulator.lock().await; let signer: Signer = LocalSigner::new( From a8e2e220d0a85eb0f5aee8fe682b5a46d454bdb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 24 Sep 2025 18:01:18 -0300 Subject: [PATCH 17/37] test(l1): add storage reorg test --- tooling/reorgs/src/main.rs | 96 +++++++++++++++++++++++++++++++++ tooling/reorgs/src/simulator.rs | 84 ++++++++++++++++++++++++++++- 2 files changed, 179 insertions(+), 1 deletion(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 71bba40309..018c0a7795 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -5,6 +5,7 @@ use std::{ }; use ethrex::{cli::Options, initializers::init_tracing}; +use ethrex_common::U256; use ethrex_l2_rpc::signer::{LocalSigner, Signer}; use tokio::sync::Mutex; use tracing::{error, info, warn}; @@ -32,6 +33,7 @@ async fn main() { info!(""); run_test(&cmd_path, test_one_block_reorg_and_back).await; + run_test(&cmd_path, test_storage_slots_reorg).await; // TODO: this test is failing // run_test(&cmd_path, test_many_blocks_reorg).await; @@ -217,3 +219,97 @@ async fn test_many_blocks_reorg(simulator: Arc>) { let new_balance = node0.get_balance(recipient).await; assert_eq!(new_balance, initial_balance + transfer_amount); } + +async fn test_storage_slots_reorg(simulator: Arc>) { + let mut simulator = simulator.lock().await; + // Initcode for deploying a contract that receives two `bytes32` parameters and sets `storage[param0] = param1` + let contract_deploy_bytecode = hex::decode("656020355f35555f526006601af3").unwrap().into(); + let signer: Signer = LocalSigner::new( + "941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e" + .parse() + .unwrap(), + ) + .into(); + + let slot_key0 = U256::from(42); + let slot_value0 = U256::from(1163); + let slot_key1 = U256::from(25); + let slot_value1 = U256::from(7474); + + let node0 = simulator.start_node().await; + let node1 = simulator.start_node().await; + + // Create a chain with a few empty blocks + let mut base_chain = simulator.get_base_chain(); + + // Send a deploy tx for a contract which receives: `(bytes32 key, bytes32 value)` as parameters + let contract_address = node0 + .send_contract_deploy(&signer, contract_deploy_bytecode) + .await; + + for _ in 0..10 { + let extended_base_chain = node0.build_payload(base_chain).await; + node0.notify_new_payload(&extended_base_chain).await; + node0.update_forkchoice(&extended_base_chain).await; + + node1.notify_new_payload(&extended_base_chain).await; + node1.update_forkchoice(&extended_base_chain).await; + base_chain = extended_base_chain; + } + + // Sanity check: storage slots are initially empty + let initial_value = node0.get_storage_at(contract_address, slot_key0).await; + assert_eq!(initial_value, U256::zero()); + let initial_value = node0.get_storage_at(contract_address, slot_key1).await; + assert_eq!(initial_value, U256::zero()); + + // Fork the chain + let mut side_chain = base_chain.fork(); + + // Create a side chain with multiple blocks only known to node0 + for _ in 0..10 { + side_chain = node0.build_payload(side_chain).await; + node0.notify_new_payload(&side_chain).await; + node0.update_forkchoice(&side_chain).await; + } + + // Advance the base chain with multiple blocks only known to node1 + for _ in 0..10 { + base_chain = node1.build_payload(base_chain).await; + node1.notify_new_payload(&base_chain).await; + node1.update_forkchoice(&base_chain).await; + } + + // Set a storage slot in the contract in node0 + let calldata0 = [slot_key0.to_big_endian(), slot_value0.to_big_endian()] + .concat() + .into(); + node0.send_call(&signer, contract_address, calldata0).await; + + // Set another storage slot in the contract in node1 + let calldata1 = [slot_key1.to_big_endian(), slot_value1.to_big_endian()] + .concat() + .into(); + node1.send_call(&signer, contract_address, calldata1).await; + + // Build a block in the side chain + side_chain = node0.build_payload(side_chain).await; + node0.notify_new_payload(&side_chain).await; + node0.update_forkchoice(&side_chain).await; + + // Build a block in the base chain + base_chain = node1.build_payload(base_chain).await; + node1.notify_new_payload(&base_chain).await; + node1.update_forkchoice(&base_chain).await; + + // Assert the storage slots are as expected in both forks + let value_slot0 = node0.get_storage_at(contract_address, slot_key0).await; + assert_eq!(value_slot0, slot_value0); + let value_slot1 = node0.get_storage_at(contract_address, slot_key1).await; + assert_eq!(value_slot1, U256::zero()); + + let value_slot0 = node1.get_storage_at(contract_address, slot_key0).await; + assert_eq!(value_slot0, U256::zero()); + let value_slot1 = node1.get_storage_at(contract_address, slot_key1).await; + assert_eq!(value_slot1, slot_value1); +} diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs index b70617f2a0..910d79fdd3 100644 --- a/tooling/reorgs/src/simulator.rs +++ b/tooling/reorgs/src/simulator.rs @@ -2,7 +2,8 @@ use std::{fs::File, io::Read, path::PathBuf, process::Stdio, time::Duration}; use ethrex::{cli::Options, initializers::get_network}; use ethrex_common::{ - Bytes, H160, H256, U256, + Address, Bytes, H160, H256, U256, + evm::calculate_create_address, types::{ Block, EIP1559Transaction, Genesis, Transaction, TxKind, requests::compute_requests_hash, }, @@ -333,12 +334,93 @@ impl Node { .unwrap(); } + pub async fn send_call(&self, signer: &Signer, contract: H160, data: Bytes) { + info!(node = self.index, sender=%signer.address(), %contract, "Sending contract call"); + let chain_id = self + .rpc_client + .get_chain_id() + .await + .unwrap() + .try_into() + .unwrap(); + let sender_address = signer.address(); + let nonce = self + .rpc_client + .get_nonce(sender_address, BlockIdentifier::Tag(BlockTag::Latest)) + .await + .unwrap(); + let tx = EIP1559Transaction { + chain_id, + nonce, + max_priority_fee_per_gas: 0, + max_fee_per_gas: 1_000_000_000, + gas_limit: 50_000, + to: TxKind::Call(contract), + data, + ..Default::default() + }; + let mut tx = Transaction::EIP1559Transaction(tx); + tx.sign_inplace(signer).await.unwrap(); + let encoded_tx = tx.encode_canonical_to_vec(); + self.rpc_client + .send_raw_transaction(&encoded_tx) + .await + .unwrap(); + } + + pub async fn send_contract_deploy( + &self, + signer: &Signer, + contract_deploy_bytecode: Bytes, + ) -> Address { + info!(node = self.index, sender=%signer.address(), "Deploying contract"); + let chain_id = self + .rpc_client + .get_chain_id() + .await + .unwrap() + .try_into() + .unwrap(); + let sender_address = signer.address(); + let nonce = self + .rpc_client + .get_nonce(sender_address, BlockIdentifier::Tag(BlockTag::Latest)) + .await + .unwrap(); + let tx = EIP1559Transaction { + chain_id, + nonce, + max_priority_fee_per_gas: 0, + max_fee_per_gas: 1_000_000_000, + gas_limit: 100_000, + to: TxKind::Create, + data: contract_deploy_bytecode, + ..Default::default() + }; + let mut tx = Transaction::EIP1559Transaction(tx); + tx.sign_inplace(signer).await.unwrap(); + let encoded_tx = tx.encode_canonical_to_vec(); + self.rpc_client + .send_raw_transaction(&encoded_tx) + .await + .unwrap(); + + calculate_create_address(sender_address, nonce) + } + pub async fn get_balance(&self, address: H160) -> U256 { self.rpc_client .get_balance(address, Default::default()) .await .unwrap() } + + pub async fn get_storage_at(&self, address: H160, key: U256) -> U256 { + self.rpc_client + .get_storage_at(address, key, Default::default()) + .await + .unwrap() + } } pub struct Chain { From 2a75471b15652961b86dcc0e15c5292027f1f2c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 25 Sep 2025 19:04:05 -0300 Subject: [PATCH 18/37] refactor: use one datadir and logs file per test --- tooling/reorgs/src/main.rs | 5 ++++- tooling/reorgs/src/simulator.rs | 10 +++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 018c0a7795..9be89103d3 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -56,7 +56,10 @@ where let start = std::time::Instant::now(); info!(test=%test_name, "Running test"); - let simulator = Arc::new(Mutex::new(Simulator::new(cmd_path.to_path_buf()))); + let simulator = Arc::new(Mutex::new(Simulator::new( + cmd_path.to_path_buf(), + test_name.to_string(), + ))); // Run in another task to clean up properly on panic let result = tokio::spawn(test_fn(simulator.clone())).await; diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs index 910d79fdd3..c36019813f 100644 --- a/tooling/reorgs/src/simulator.rs +++ b/tooling/reorgs/src/simulator.rs @@ -27,6 +27,8 @@ use tracing::{error, info}; pub struct Simulator { cmd_path: PathBuf, + test_name: String, + base_opts: Options, jwt_secret: Bytes, genesis_path: PathBuf, @@ -36,7 +38,7 @@ pub struct Simulator { } impl Simulator { - pub fn new(cmd_path: PathBuf) -> Self { + pub fn new(cmd_path: PathBuf, test_name: String) -> Self { let mut opts = Options::default_l1(); let jwt_secret = generate_jwt_secret(); std::fs::write("jwt.hex", hex::encode(&jwt_secret)).unwrap(); @@ -53,6 +55,7 @@ impl Simulator { opts.network = Some(Network::GenesisPath(genesis_path.clone())); Self { cmd_path, + test_name, base_opts: opts, genesis_path, jwt_secret, @@ -70,18 +73,19 @@ impl Simulator { pub async fn start_node(&mut self) -> Node { let n = self.configs.len(); + let test_name = &self.test_name; info!(node = n, "Starting node"); let mut opts = self.base_opts.clone(); opts.http_port = (8545 + n * 2).to_string(); opts.authrpc_port = (8545 + n * 2 + 1).to_string(); opts.p2p_port = (30303 + n).to_string(); opts.discovery_port = (30303 + n).to_string(); - opts.datadir = format!("data/node{n}").into(); + opts.datadir = format!("data/{test_name}/node{n}").into(); let _ = std::fs::remove_dir_all(&opts.datadir); std::fs::create_dir_all(&opts.datadir).expect("Failed to create data directory"); - let logs_file_path = format!("data/node{n}.log"); + let logs_file_path = format!("data/{test_name}/node{n}.log"); let logs_file = File::create(&logs_file_path).expect("Failed to create logs file"); let cancel = CancellationToken::new(); From 51e4b97d640aa0c886a87b0c105020e8a18513ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 26 Sep 2025 17:35:12 -0300 Subject: [PATCH 19/37] test: add additional check to test --- tooling/reorgs/src/main.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 9be89103d3..62861c405a 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -315,4 +315,11 @@ async fn test_storage_slots_reorg(simulator: Arc>) { assert_eq!(value_slot0, U256::zero()); let value_slot1 = node1.get_storage_at(contract_address, slot_key1).await; assert_eq!(value_slot1, slot_value1); + + node0.update_forkchoice(&base_chain).await; + + let value_slot0 = node0.get_storage_at(contract_address, slot_key0).await; + assert_eq!(value_slot0, U256::zero()); + let value_slot1 = node0.get_storage_at(contract_address, slot_key1).await; + assert_eq!(value_slot1, slot_value1); } From 36d57ee6b78d6983700f981ecf2219b6388e491a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 26 Sep 2025 17:46:06 -0300 Subject: [PATCH 20/37] docs: add comment --- tooling/reorgs/src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 62861c405a..f3a00d6dfe 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -316,8 +316,10 @@ async fn test_storage_slots_reorg(simulator: Arc>) { let value_slot1 = node1.get_storage_at(contract_address, slot_key1).await; assert_eq!(value_slot1, slot_value1); + // Reorg the node0 to the base chain node0.update_forkchoice(&base_chain).await; + // Check the storage slots are as expected after the reorg let value_slot0 = node0.get_storage_at(contract_address, slot_key0).await; assert_eq!(value_slot0, U256::zero()); let value_slot1 = node0.get_storage_at(contract_address, slot_key1).await; From fbaa687966aa83e31d87d8dd663bde08aaab58fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 26 Sep 2025 18:13:39 -0300 Subject: [PATCH 21/37] feat: use unique ports across each run --- tooling/reorgs/src/main.rs | 1 + tooling/reorgs/src/simulator.rs | 22 +++++++++++++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index f3a00d6dfe..5b417dedc5 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -317,6 +317,7 @@ async fn test_storage_slots_reorg(simulator: Arc>) { assert_eq!(value_slot1, slot_value1); // Reorg the node0 to the base chain + node0.notify_new_payload(&base_chain).await; node0.update_forkchoice(&base_chain).await; // Check the storage slots are as expected after the reorg diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs index c36019813f..227db3e2c7 100644 --- a/tooling/reorgs/src/simulator.rs +++ b/tooling/reorgs/src/simulator.rs @@ -1,4 +1,6 @@ -use std::{fs::File, io::Read, path::PathBuf, process::Stdio, time::Duration}; +use std::{ + fs::File, io::Read, path::PathBuf, process::Stdio, sync::atomic::AtomicU16, time::Duration, +}; use ethrex::{cli::Options, initializers::get_network}; use ethrex_common::{ @@ -76,12 +78,16 @@ impl Simulator { let test_name = &self.test_name; info!(node = n, "Starting node"); let mut opts = self.base_opts.clone(); - opts.http_port = (8545 + n * 2).to_string(); - opts.authrpc_port = (8545 + n * 2 + 1).to_string(); - opts.p2p_port = (30303 + n).to_string(); - opts.discovery_port = (30303 + n).to_string(); opts.datadir = format!("data/{test_name}/node{n}").into(); + opts.http_port = get_next_port().to_string(); + opts.authrpc_port = get_next_port().to_string(); + + // These are one TCP and one UDP + let p2p_port = get_next_port(); + opts.p2p_port = p2p_port.to_string(); + opts.discovery_port = p2p_port.to_string(); + let _ = std::fs::remove_dir_all(&opts.datadir); std::fs::create_dir_all(&opts.datadir).expect("Failed to create data directory"); @@ -427,6 +433,7 @@ impl Node { } } +#[derive(Debug)] pub struct Chain { block_hashes: Vec, blocks: Vec, @@ -517,3 +524,8 @@ async fn wait_until_synced(engine_client: &EngineClient, fork_choice_state: Fork tokio::time::sleep(Duration::from_millis(100)).await; } } + +fn get_next_port() -> u16 { + static NEXT_PORT: AtomicU16 = AtomicU16::new(8560); + NEXT_PORT.fetch_add(1, std::sync::atomic::Ordering::Relaxed) +} From 1b90a6d6dc54151cb4f76f294a9f49b906a318d2 Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Fri, 26 Sep 2025 18:56:11 -0300 Subject: [PATCH 22/37] include requesting headers from newToOld --- crates/networking/p2p/sync.rs | 40 ++++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 27e7e01229..ff162ec9c3 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -348,14 +348,44 @@ impl Syncer { debug!("Requesting Block Headers from {current_head}"); - let Some(mut block_headers) = self + let mut block_headers = match self .peers .request_block_headers_from_hash(current_head, BlockRequestOrder::OldToNew) .await - else { - warn!("Sync failed to find target block header, aborting"); - debug!("Sync Log 8: Sync failed to find target block header, aborting"); - return Ok(()); + { + Some(block_headers) => block_headers, + None => { + let mut all_block_headers = vec![]; + let mut found_common_ancestor = false; + loop { + let Some(mut block_headers) = self + .peers + .request_block_headers_from_hash(sync_head, BlockRequestOrder::NewToOld) + .await + else { + warn!("Sync failed to find target block header, aborting"); + debug!("Sync Log 8: Sync failed to find target block header, aborting"); + return Ok(()); + }; + for i in 0..block_headers.len() { + if store + .get_block_by_hash(block_headers[i].hash()) + .await? + .is_some() + { + block_headers.drain(..i); + found_common_ancestor = true; + break; + } + } + all_block_headers.extend(block_headers); + if found_common_ancestor { + break; + } + } + all_block_headers.reverse(); + all_block_headers + } }; debug!("Sync Log 9: Received {} block headers", block_headers.len()); From 357fd680e365e91b1ff340422e995b159ebffa61 Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Fri, 26 Sep 2025 19:02:06 -0300 Subject: [PATCH 23/37] update ci test pattern --- .github/workflows/pr-main_l1.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-main_l1.yaml b/.github/workflows/pr-main_l1.yaml index bce166f543..dbbf98001f 100644 --- a/.github/workflows/pr-main_l1.yaml +++ b/.github/workflows/pr-main_l1.yaml @@ -191,7 +191,7 @@ jobs: ethrex_flags: "" - name: "Engine withdrawal tests" simulation: ethereum/engine - test_pattern: "engine-withdrawals/Corrupted Block Hash Payload|Empty Withdrawals|engine-withdrawals test loader|GetPayloadBodies|GetPayloadV2 Block Value|Max Initcode Size|Sync after 2 blocks - Withdrawals on Genesis|Withdraw many accounts|Withdraw to a single account|Withdraw to two accounts|Withdraw zero amount|Withdraw many accounts|Withdrawals Fork on Block 1 - 1 Block Re-Org|Withdrawals Fork on Block 1 - 8 Block Re-Org NewPayload|Withdrawals Fork on Block 2|Withdrawals Fork on Block 3|Withdrawals Fork on Block 8 - 10 Block Re-Org NewPayload|Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org [^S]|Withdrawals Fork on Canonical Block 8 / Side Block 9 - 10 Block Re-Org [^S]" + test_pattern: "engine-withdrawals/engine-withdrawals/Corrupted Block Hash Payload|Empty Withdrawals|engine-withdrawals test loader|GetPayloadBodies|GetPayloadV2 Block Value|Max Initcode Size|Sync after 2 blocks - Withdrawals on Genesis|Withdraw many accounts|Withdraw to a single account|Withdraw to two accounts|Withdraw zero amount|Withdraw many accounts|Withdrawals Fork on Block 1 - 1 Block Re-Org|Withdrawals Fork on Block 1 - 8 Block Re-Org |Withdrawals Fork on Block 2|Withdrawals Fork on Block 3|Withdrawals Fork on Block 8 - 10 Block Re-Org |Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org |Withdrawals Fork on Canonical Block 8 / Side Block 9 - 10 Block Re-Org" - name: "Sync full" simulation: ethereum/sync test_pattern: "" From 4b8d362e86a4825787f5278a97635500efc1f590 Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Fri, 26 Sep 2025 19:07:25 -0300 Subject: [PATCH 24/37] fix pattern on ci --- .github/workflows/pr-main_l1.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-main_l1.yaml b/.github/workflows/pr-main_l1.yaml index dbbf98001f..e8c1da3d24 100644 --- a/.github/workflows/pr-main_l1.yaml +++ b/.github/workflows/pr-main_l1.yaml @@ -191,7 +191,7 @@ jobs: ethrex_flags: "" - name: "Engine withdrawal tests" simulation: ethereum/engine - test_pattern: "engine-withdrawals/engine-withdrawals/Corrupted Block Hash Payload|Empty Withdrawals|engine-withdrawals test loader|GetPayloadBodies|GetPayloadV2 Block Value|Max Initcode Size|Sync after 2 blocks - Withdrawals on Genesis|Withdraw many accounts|Withdraw to a single account|Withdraw to two accounts|Withdraw zero amount|Withdraw many accounts|Withdrawals Fork on Block 1 - 1 Block Re-Org|Withdrawals Fork on Block 1 - 8 Block Re-Org |Withdrawals Fork on Block 2|Withdrawals Fork on Block 3|Withdrawals Fork on Block 8 - 10 Block Re-Org |Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org |Withdrawals Fork on Canonical Block 8 / Side Block 9 - 10 Block Re-Org" + test_pattern: "engine-withdrawals/Corrupted Block Hash Payload|Empty Withdrawals|engine-withdrawals test loader|GetPayloadBodies|GetPayloadV2 Block Value|Max Initcode Size|Sync after 2 blocks - Withdrawals on Genesis|Withdraw many accounts|Withdraw to a single account|Withdraw to two accounts|Withdraw zero amount|Withdraw many accounts|Withdrawals Fork on Block 1 - 1 Block Re-Org|Withdrawals Fork on Block 1 - 8 Block Re-Org |Withdrawals Fork on Block 2|Withdrawals Fork on Block 3|Withdrawals Fork on Block 8 - 10 Block Re-Org |Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org |Withdrawals Fork on Canonical Block 8 / Side Block 9 - 10 Block Re-Org" - name: "Sync full" simulation: ethereum/sync test_pattern: "" From fd976b822392a68ee6aef399a5a6a77a84b7a475 Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Tue, 30 Sep 2025 14:48:11 -0300 Subject: [PATCH 25/37] refactor previous solution --- crates/networking/p2p/sync.rs | 245 +++++++++++++++++++--------------- 1 file changed, 139 insertions(+), 106 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 144dd29c93..588d0b930e 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -336,126 +336,159 @@ impl Syncer { current_head, sync_head ); - loop { - debug!("Sync Log 1: In Full Sync"); - debug!( - "Sync Log 3: State current headers len {}", - block_sync_state.current_headers.len() - ); - debug!( - "Sync Log 4: State current blocks len {}", - block_sync_state.current_blocks.len() - ); + info!("Sync Log 1: In Full Sync"); + debug!( + "Sync Log 3: State current headers len {}", + block_sync_state.current_headers.len() + ); + debug!( + "Sync Log 4: State current blocks len {}", + block_sync_state.current_blocks.len() + ); - debug!("Requesting Block Headers from {current_head}"); + debug!("Requesting Block Headers from NewToOld from sync_head {sync_head}"); - let mut block_headers = match self - .peers - .request_block_headers_from_hash(current_head, BlockRequestOrder::OldToNew) - .await - { - Some(block_headers) => block_headers, - None => { - let mut all_block_headers = vec![]; - let mut found_common_ancestor = false; - loop { - let Some(mut block_headers) = self - .peers - .request_block_headers_from_hash(sync_head, BlockRequestOrder::NewToOld) - .await - else { - warn!("Sync failed to find target block header, aborting"); - debug!("Sync Log 8: Sync failed to find target block header, aborting"); - return Ok(()); - }; - for i in 0..block_headers.len() { - if store - .get_block_by_hash(block_headers[i].hash()) - .await? - .is_some() - { - block_headers.drain(..i); - found_common_ancestor = true; - break; - } - } - all_block_headers.extend(block_headers); - if found_common_ancestor { - break; - } - } - all_block_headers.reverse(); - all_block_headers - } + let requested_header = + if let Some(sync_head_block) = store.get_pending_block(sync_head).await? { + sync_head_block.header.parent_hash + } else { + sync_head }; - debug!("Sync Log 9: Received {} block headers", block_headers.len()); + let Some(mut block_headers) = self + .peers + .request_block_headers_from_hash(requested_header, BlockRequestOrder::NewToOld) + .await + else { + // sync_head or sync_head parent was not found + warn!("Sync failed to find target block header, aborting"); + debug!("Sync Log 8: Sync failed to find target block header, aborting"); + return Ok(()); + }; - let (first_block_hash, first_block_number, first_block_parent_hash) = - match block_headers.first() { - Some(header) => (header.hash(), header.number, header.parent_hash), - None => continue, - }; - let (last_block_hash, last_block_number) = match block_headers.last() { - Some(header) => (header.hash(), header.number), - None => continue, - }; - // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck - // on a loop when the target head is not found, i.e. on a reorg with a side-chain. - if first_block_hash == last_block_hash - && first_block_hash == current_head - && current_head != sync_head + debug!("Sync Log 9: Received {} block headers", block_headers.len()); + + let mut found_common_ancestor = false; + for i in 0..block_headers.len() { + if store + .get_block_by_hash(block_headers[i].hash()) + .await? + .is_some() { - // There is no path to the sync head this goes back until it find a common ancerstor - warn!("Sync failed to find target block header, going back to the previous parent"); - current_head = first_block_parent_hash; - continue; + block_headers.drain(i..); + found_common_ancestor = true; + break; } + } + if found_common_ancestor { + block_headers.reverse(); + block_sync_state + .process_incoming_headers( + block_headers, + sync_head, + true, // sync_head_found is true because of the NewToOld headers request + self.blockchain.clone(), + self.peers.clone(), + self.cancel_token.clone(), + ) + .await?; + return Ok(()); + } else { + // if found_common_ancestor is false that means we are more than 1024 blocks behind so, for now, we go back to syncing as it follows. + // TODO: Have full syncing always be from NewToOld + loop { + info!("Sync Log 1: In Full Sync"); + debug!( + "Sync Log 3: State current headers len {}", + block_sync_state.current_headers.len() + ); + debug!( + "Sync Log 4: State current blocks len {}", + block_sync_state.current_blocks.len() + ); - debug!( - "Received {} block headers| First Number: {} Last Number: {}", - block_headers.len(), - first_block_number, - last_block_number - ); + debug!("Requesting Block Headers from OldToNew from current_head {current_head}"); - // Filter out everything after the sync_head - let mut sync_head_found = false; - if let Some(index) = block_headers - .iter() - .position(|header| header.hash() == sync_head) - { - sync_head_found = true; - block_headers.drain(index + 1..); - } + let Some(mut block_headers) = self + .peers + .request_block_headers_from_hash(current_head, BlockRequestOrder::OldToNew) + .await + else { + warn!("Sync failed to find target block header, aborting"); + debug!("Sync Log 8: Sync failed to find target block header, aborting"); + return Ok(()); + }; - // Update current fetch head - current_head = last_block_hash; + debug!("Sync Log 9: Received {} block headers", block_headers.len()); - // Discard the first header as we already have it - block_headers.remove(0); - if !block_headers.is_empty() { - let mut finished = false; - while !finished { - (finished, sync_head_found) = block_sync_state - .process_incoming_headers( - block_headers.clone(), - sync_head, - sync_head_found, - self.blockchain.clone(), - self.peers.clone(), - self.cancel_token.clone(), - ) - .await?; - block_headers.clear(); + let (first_block_hash, first_block_number, first_block_parent_hash) = + match block_headers.first() { + Some(header) => (header.hash(), header.number, header.parent_hash), + None => continue, + }; + let (last_block_hash, last_block_number) = match block_headers.last() { + Some(header) => (header.hash(), header.number), + None => continue, + }; + // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck + // on a loop when the target head is not found, i.e. on a reorg with a side-chain. + if first_block_hash == last_block_hash + && first_block_hash == current_head + && current_head != sync_head + { + // There is no path to the sync head this goes back until it find a common ancerstor + warn!( + "Sync failed to find target block header, going back to the previous parent" + ); + current_head = first_block_parent_hash; + continue; } - } - if sync_head_found { - break; - }; + debug!( + "Received {} block headers| First Number: {} Last Number: {}", + block_headers.len(), + first_block_number, + last_block_number + ); + + // Filter out everything after the sync_head + let mut sync_head_found = false; + if let Some(index) = block_headers + .iter() + .position(|header| header.hash() == sync_head) + { + sync_head_found = true; + block_headers.drain(index + 1..); + } + + // Update current fetch head + current_head = last_block_hash; + + // Discard the first header as we already have it + block_headers.remove(0); + if !block_headers.is_empty() { + let mut finished = false; + while !finished { + (finished, sync_head_found) = block_sync_state + .process_incoming_headers( + block_headers.clone(), + sync_head, + sync_head_found, + self.blockchain.clone(), + self.peers.clone(), + self.cancel_token.clone(), + ) + .await?; + block_headers.clear(); + } + } + + if sync_head_found { + break; + }; + } + Ok(()) } - Ok(()) } /// Executes the given blocks and stores them From cb010f9879ced6aca0a3260cd34508d2379d9806 Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Tue, 30 Sep 2025 15:03:28 -0300 Subject: [PATCH 26/37] update current_head and run clippy --- crates/networking/p2p/sync.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 588d0b930e..b1530bde6e 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -380,6 +380,10 @@ impl Syncer { break; } } + + // Update current fetch head + current_head = sync_head; + if found_common_ancestor { block_headers.reverse(); block_sync_state @@ -392,7 +396,7 @@ impl Syncer { self.cancel_token.clone(), ) .await?; - return Ok(()); + Ok(()) } else { // if found_common_ancestor is false that means we are more than 1024 blocks behind so, for now, we go back to syncing as it follows. // TODO: Have full syncing always be from NewToOld From 80be0b24c677fdd0f3855a7eaf9f79c36b51cf60 Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Tue, 30 Sep 2025 16:19:07 -0300 Subject: [PATCH 27/37] drop current_head update since it's not needed --- crates/networking/p2p/sync.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index b1530bde6e..c18c7f034f 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -381,9 +381,6 @@ impl Syncer { } } - // Update current fetch head - current_head = sync_head; - if found_common_ancestor { block_headers.reverse(); block_sync_state From cdf5644cc5d81cf64a2a34d90dd02be6f7547e9c Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Wed, 1 Oct 2025 14:29:24 -0300 Subject: [PATCH 28/37] have newToOld syncing be a seprate function --- .github/workflows/pr-main_l1.yaml | 2 +- crates/networking/p2p/sync.rs | 237 +++++++++++++++++------------- 2 files changed, 134 insertions(+), 105 deletions(-) diff --git a/.github/workflows/pr-main_l1.yaml b/.github/workflows/pr-main_l1.yaml index e8c1da3d24..52527e0978 100644 --- a/.github/workflows/pr-main_l1.yaml +++ b/.github/workflows/pr-main_l1.yaml @@ -191,7 +191,7 @@ jobs: ethrex_flags: "" - name: "Engine withdrawal tests" simulation: ethereum/engine - test_pattern: "engine-withdrawals/Corrupted Block Hash Payload|Empty Withdrawals|engine-withdrawals test loader|GetPayloadBodies|GetPayloadV2 Block Value|Max Initcode Size|Sync after 2 blocks - Withdrawals on Genesis|Withdraw many accounts|Withdraw to a single account|Withdraw to two accounts|Withdraw zero amount|Withdraw many accounts|Withdrawals Fork on Block 1 - 1 Block Re-Org|Withdrawals Fork on Block 1 - 8 Block Re-Org |Withdrawals Fork on Block 2|Withdrawals Fork on Block 3|Withdrawals Fork on Block 8 - 10 Block Re-Org |Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org |Withdrawals Fork on Canonical Block 8 / Side Block 9 - 10 Block Re-Org" + test_pattern: "engine-withdrawals" - name: "Sync full" simulation: ethereum/sync test_pattern: "" diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index c18c7f034f..c57b925f10 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -336,7 +336,131 @@ impl Syncer { current_head, sync_head ); - info!("Sync Log 1: In Full Sync"); + match self + .synced_new_to_old(&mut block_sync_state, sync_head, store) + .await? + { + // synced_new_to_old returns (sync_finished, sync_head_found) + (_, false) | (true, true) => Ok(()), + (false, true) => { + // if sync_finished is false that means we are more than 1024 blocks behind so, for now, we go back to syncing as it follows. + // TODO: Have full syncing always be from NewToOld, issue: https://github.com/lambdaclass/ethrex/issues/4717 + loop { + debug!("Sync Log 1: In Full Sync"); + debug!( + "Sync Log 3: State current headers len {}", + block_sync_state.current_headers.len() + ); + debug!( + "Sync Log 4: State current blocks len {}", + block_sync_state.current_blocks.len() + ); + + debug!( + "Requesting Block Headers from OldToNew from current_head {current_head}" + ); + + let Some(mut block_headers) = self + .peers + .request_block_headers_from_hash(current_head, BlockRequestOrder::OldToNew) + .await + else { + warn!("Sync failed to find target block header, aborting"); + debug!("Sync Log 8: Sync failed to find target block header, aborting"); + return Ok(()); + }; + + debug!("Sync Log 9: Received {} block headers", block_headers.len()); + + let (first_block_hash, first_block_number, first_block_parent_hash) = + match block_headers.first() { + Some(header) => (header.hash(), header.number, header.parent_hash), + None => continue, + }; + let (last_block_hash, last_block_number) = match block_headers.last() { + Some(header) => (header.hash(), header.number), + None => continue, + }; + // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck + // on a loop when the target head is not found, i.e. on a reorg with a side-chain. + if first_block_hash == last_block_hash + && first_block_hash == current_head + && current_head != sync_head + { + // There is no path to the sync head this goes back until it find a common ancerstor + warn!( + "Sync failed to find target block header, going back to the previous parent" + ); + current_head = first_block_parent_hash; + continue; + } + + debug!( + "Received {} block headers| First Number: {} Last Number: {}", + block_headers.len(), + first_block_number, + last_block_number + ); + + // Filter out everything after the sync_head + let mut sync_head_found = false; + if let Some(index) = block_headers + .iter() + .position(|header| header.hash() == sync_head) + { + sync_head_found = true; + block_headers.drain(index + 1..); + } + + // Update current fetch head + current_head = last_block_hash; + + // Discard the first header as we already have it + block_headers.remove(0); + if !block_headers.is_empty() { + let mut finished = false; + while !finished { + (finished, sync_head_found) = block_sync_state + .process_incoming_headers( + block_headers.clone(), + sync_head, + sync_head_found, + self.blockchain.clone(), + self.peers.clone(), + self.cancel_token.clone(), + ) + .await?; + block_headers.clear(); + } + } + + if sync_head_found { + break; + }; + } + Ok(()) + } + } + } + + /// Tries to perform syncing going backwards from the sync_head with one batch of requested headers. + /// This is to cover the case where we are on a sidechain and the peer doesn't have our current_head + /// so when requesting headers from our current_head on we get None and we never get to finish syncing. + /// For more context go to the PR https://github.com/lambdaclass/ethrex/pull/4676 + /// + /// # Returns + /// + /// Returns an error if the sync fails at any given step and aborts all active processes + /// otherwise returns two bools (sync_finished, sync_head_found) + /// sync_finished could be false if we can't find a common ancestor within the requested headers, which means + /// the chain is more than 1024 block behind. + async fn synced_new_to_old( + &mut self, + block_sync_state: &mut FullBlockSyncState, + sync_head: H256, + store: Store, + ) -> Result<(bool, bool), SyncError> { + debug!("Sync Log 1: In Full Sync"); debug!( "Sync Log 3: State current headers len {}", block_sync_state.current_headers.len() @@ -348,12 +472,11 @@ impl Syncer { debug!("Requesting Block Headers from NewToOld from sync_head {sync_head}"); - let requested_header = - if let Some(sync_head_block) = store.get_pending_block(sync_head).await? { - sync_head_block.header.parent_hash - } else { - sync_head - }; + // Get oldest pending block to use in the request for headers + let mut requested_header = sync_head; + while let Some(block) = store.get_pending_block(requested_header).await? { + requested_header = block.header.parent_hash; + } let Some(mut block_headers) = self .peers @@ -363,7 +486,7 @@ impl Syncer { // sync_head or sync_head parent was not found warn!("Sync failed to find target block header, aborting"); debug!("Sync Log 8: Sync failed to find target block header, aborting"); - return Ok(()); + return Ok((false, false)); }; debug!("Sync Log 9: Received {} block headers", block_headers.len()); @@ -393,103 +516,9 @@ impl Syncer { self.cancel_token.clone(), ) .await?; - Ok(()) - } else { - // if found_common_ancestor is false that means we are more than 1024 blocks behind so, for now, we go back to syncing as it follows. - // TODO: Have full syncing always be from NewToOld - loop { - info!("Sync Log 1: In Full Sync"); - debug!( - "Sync Log 3: State current headers len {}", - block_sync_state.current_headers.len() - ); - debug!( - "Sync Log 4: State current blocks len {}", - block_sync_state.current_blocks.len() - ); - - debug!("Requesting Block Headers from OldToNew from current_head {current_head}"); - - let Some(mut block_headers) = self - .peers - .request_block_headers_from_hash(current_head, BlockRequestOrder::OldToNew) - .await - else { - warn!("Sync failed to find target block header, aborting"); - debug!("Sync Log 8: Sync failed to find target block header, aborting"); - return Ok(()); - }; - - debug!("Sync Log 9: Received {} block headers", block_headers.len()); - - let (first_block_hash, first_block_number, first_block_parent_hash) = - match block_headers.first() { - Some(header) => (header.hash(), header.number, header.parent_hash), - None => continue, - }; - let (last_block_hash, last_block_number) = match block_headers.last() { - Some(header) => (header.hash(), header.number), - None => continue, - }; - // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck - // on a loop when the target head is not found, i.e. on a reorg with a side-chain. - if first_block_hash == last_block_hash - && first_block_hash == current_head - && current_head != sync_head - { - // There is no path to the sync head this goes back until it find a common ancerstor - warn!( - "Sync failed to find target block header, going back to the previous parent" - ); - current_head = first_block_parent_hash; - continue; - } - - debug!( - "Received {} block headers| First Number: {} Last Number: {}", - block_headers.len(), - first_block_number, - last_block_number - ); - - // Filter out everything after the sync_head - let mut sync_head_found = false; - if let Some(index) = block_headers - .iter() - .position(|header| header.hash() == sync_head) - { - sync_head_found = true; - block_headers.drain(index + 1..); - } - - // Update current fetch head - current_head = last_block_hash; - - // Discard the first header as we already have it - block_headers.remove(0); - if !block_headers.is_empty() { - let mut finished = false; - while !finished { - (finished, sync_head_found) = block_sync_state - .process_incoming_headers( - block_headers.clone(), - sync_head, - sync_head_found, - self.blockchain.clone(), - self.peers.clone(), - self.cancel_token.clone(), - ) - .await?; - block_headers.clear(); - } - } - - if sync_head_found { - break; - }; - } - Ok(()) + return Ok((true, true)); } + Ok((false, true)) } /// Executes the given blocks and stores them From 40a2d04c18ebdcfa6820d0a950b5eae90f4b79cd Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Wed, 1 Oct 2025 16:54:49 -0300 Subject: [PATCH 29/37] fix issue with including pending blocks for syncing --- crates/networking/p2p/sync.rs | 214 +++++++++++++++++----------------- 1 file changed, 110 insertions(+), 104 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index c57b925f10..9a28e0ef62 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -336,111 +336,106 @@ impl Syncer { current_head, sync_head ); - match self + if !self .synced_new_to_old(&mut block_sync_state, sync_head, store) .await? { - // synced_new_to_old returns (sync_finished, sync_head_found) - (_, false) | (true, true) => Ok(()), - (false, true) => { - // if sync_finished is false that means we are more than 1024 blocks behind so, for now, we go back to syncing as it follows. - // TODO: Have full syncing always be from NewToOld, issue: https://github.com/lambdaclass/ethrex/issues/4717 - loop { - debug!("Sync Log 1: In Full Sync"); - debug!( - "Sync Log 3: State current headers len {}", - block_sync_state.current_headers.len() - ); - debug!( - "Sync Log 4: State current blocks len {}", - block_sync_state.current_blocks.len() - ); + // synced_new_to_old returns true in case syncing was finished from NewToOld or if the requested headers were None + // if sync_finished is false that means we are more than 1024 blocks behind so, for now, we go back to syncing as it follows. + // TODO: Have full syncing always be from NewToOld, issue: https://github.com/lambdaclass/ethrex/issues/4717 + loop { + debug!("Sync Log 1: In Full Sync"); + debug!( + "Sync Log 3: State current headers len {}", + block_sync_state.current_headers.len() + ); + debug!( + "Sync Log 4: State current blocks len {}", + block_sync_state.current_blocks.len() + ); - debug!( - "Requesting Block Headers from OldToNew from current_head {current_head}" - ); + debug!("Requesting Block Headers from OldToNew from current_head {current_head}"); - let Some(mut block_headers) = self - .peers - .request_block_headers_from_hash(current_head, BlockRequestOrder::OldToNew) - .await - else { - warn!("Sync failed to find target block header, aborting"); - debug!("Sync Log 8: Sync failed to find target block header, aborting"); - return Ok(()); - }; + let Some(mut block_headers) = self + .peers + .request_block_headers_from_hash(current_head, BlockRequestOrder::OldToNew) + .await + else { + warn!("Sync failed to find target block header, aborting"); + debug!("Sync Log 8: Sync failed to find target block header, aborting"); + return Ok(()); + }; - debug!("Sync Log 9: Received {} block headers", block_headers.len()); + debug!("Sync Log 9: Received {} block headers", block_headers.len()); - let (first_block_hash, first_block_number, first_block_parent_hash) = - match block_headers.first() { - Some(header) => (header.hash(), header.number, header.parent_hash), - None => continue, - }; - let (last_block_hash, last_block_number) = match block_headers.last() { - Some(header) => (header.hash(), header.number), + let (first_block_hash, first_block_number, first_block_parent_hash) = + match block_headers.first() { + Some(header) => (header.hash(), header.number, header.parent_hash), None => continue, }; - // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck - // on a loop when the target head is not found, i.e. on a reorg with a side-chain. - if first_block_hash == last_block_hash - && first_block_hash == current_head - && current_head != sync_head - { - // There is no path to the sync head this goes back until it find a common ancerstor - warn!( - "Sync failed to find target block header, going back to the previous parent" - ); - current_head = first_block_parent_hash; - continue; - } - - debug!( - "Received {} block headers| First Number: {} Last Number: {}", - block_headers.len(), - first_block_number, - last_block_number + let (last_block_hash, last_block_number) = match block_headers.last() { + Some(header) => (header.hash(), header.number), + None => continue, + }; + // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck + // on a loop when the target head is not found, i.e. on a reorg with a side-chain. + if first_block_hash == last_block_hash + && first_block_hash == current_head + && current_head != sync_head + { + // There is no path to the sync head this goes back until it find a common ancerstor + warn!( + "Sync failed to find target block header, going back to the previous parent" ); + current_head = first_block_parent_hash; + continue; + } - // Filter out everything after the sync_head - let mut sync_head_found = false; - if let Some(index) = block_headers - .iter() - .position(|header| header.hash() == sync_head) - { - sync_head_found = true; - block_headers.drain(index + 1..); - } + debug!( + "Received {} block headers| First Number: {} Last Number: {}", + block_headers.len(), + first_block_number, + last_block_number + ); - // Update current fetch head - current_head = last_block_hash; - - // Discard the first header as we already have it - block_headers.remove(0); - if !block_headers.is_empty() { - let mut finished = false; - while !finished { - (finished, sync_head_found) = block_sync_state - .process_incoming_headers( - block_headers.clone(), - sync_head, - sync_head_found, - self.blockchain.clone(), - self.peers.clone(), - self.cancel_token.clone(), - ) - .await?; - block_headers.clear(); - } - } + // Filter out everything after the sync_head + let mut sync_head_found = false; + if let Some(index) = block_headers + .iter() + .position(|header| header.hash() == sync_head) + { + sync_head_found = true; + block_headers.drain(index + 1..); + } - if sync_head_found { - break; - }; + // Update current fetch head + current_head = last_block_hash; + + // Discard the first header as we already have it + block_headers.remove(0); + if !block_headers.is_empty() { + let mut finished = false; + while !finished { + (finished, sync_head_found) = block_sync_state + .process_incoming_headers( + block_headers.clone(), + sync_head, + sync_head_found, + self.blockchain.clone(), + self.peers.clone(), + self.cancel_token.clone(), + ) + .await?; + block_headers.clear(); + } } - Ok(()) + + if sync_head_found { + break; + }; } } + Ok(()) } /// Tries to perform syncing going backwards from the sync_head with one batch of requested headers. @@ -451,15 +446,15 @@ impl Syncer { /// # Returns /// /// Returns an error if the sync fails at any given step and aborts all active processes - /// otherwise returns two bools (sync_finished, sync_head_found) - /// sync_finished could be false if we can't find a common ancestor within the requested headers, which means - /// the chain is more than 1024 block behind. + /// otherwise returns true whether syncing was finished or in case the request of headers returned None, + /// otherwise returns false, this means we couldn't find a common ancestor within the requested headers, + /// which in turn means the chain is more than 1024 blocks behind. async fn synced_new_to_old( &mut self, block_sync_state: &mut FullBlockSyncState, sync_head: H256, store: Store, - ) -> Result<(bool, bool), SyncError> { + ) -> Result { debug!("Sync Log 1: In Full Sync"); debug!( "Sync Log 3: State current headers len {}", @@ -486,7 +481,7 @@ impl Syncer { // sync_head or sync_head parent was not found warn!("Sync failed to find target block header, aborting"); debug!("Sync Log 8: Sync failed to find target block header, aborting"); - return Ok((false, false)); + return Ok(true); }; debug!("Sync Log 9: Received {} block headers", block_headers.len()); @@ -516,9 +511,9 @@ impl Syncer { self.cancel_token.clone(), ) .await?; - return Ok((true, true)); + return Ok(true); } - Ok((false, true)) + Ok(false) } /// Executes the given blocks and stores them @@ -713,15 +708,26 @@ impl FullBlockSyncState { self.current_blocks.extend(blocks); // } - // If we have the sync_head as a pending block from a new_payload request and its parent_hash matches the hash of the latest received header - // we set the sync_head as found. Then we add it in current_blocks for execution. - if let Some(block) = self.store.get_pending_block(sync_head).await? { - if let Some(last_block) = self.current_blocks.last() { - if last_block.hash() == block.header.parent_hash { - self.current_blocks.push(block); - sync_head_found = true; - } + // We check if we have pending blocks we didn't request that are needed for syncing + // Then we add it in current_blocks for execution. + let mut pending_block_to_sync = vec![]; + let mut last_header_to_sync = sync_head; + while let Some(block) = self.store.get_pending_block(last_header_to_sync).await? { + pending_block_to_sync.push(block.clone()); + if self + .current_blocks + .last() + .is_some_and(|block| block.hash() == block.header.parent_hash) + { + sync_head_found = true; + break; } + last_header_to_sync = block.header.parent_hash; + } + + if sync_head_found { + pending_block_to_sync.reverse(); + self.current_blocks.extend(pending_block_to_sync); } // Execute full blocks // while self.current_blocks.len() >= *EXECUTE_BATCH_SIZE From 8cf5f1b512c45ed3848a1d240ed1a7a361323757 Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Wed, 1 Oct 2025 17:34:22 -0300 Subject: [PATCH 30/37] fix in using pending blocks --- crates/networking/p2p/sync.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 9a28e0ef62..352c47c635 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -713,16 +713,18 @@ impl FullBlockSyncState { let mut pending_block_to_sync = vec![]; let mut last_header_to_sync = sync_head; while let Some(block) = self.store.get_pending_block(last_header_to_sync).await? { - pending_block_to_sync.push(block.clone()); + let block_parent = block.header.parent_hash; if self .current_blocks .last() - .is_some_and(|block| block.hash() == block.header.parent_hash) + .is_some_and(|block| block.hash() == block_parent) + || sync_head_found { sync_head_found = true; break; } - last_header_to_sync = block.header.parent_hash; + pending_block_to_sync.push(block); + last_header_to_sync = block_parent; } if sync_head_found { From b076181cbfbd4ca4b1cab2979cc462424de3a4fc Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Thu, 2 Oct 2025 11:31:17 -0300 Subject: [PATCH 31/37] fix sync tests --- crates/networking/p2p/sync.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 352c47c635..495430a1b0 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -718,19 +718,18 @@ impl FullBlockSyncState { .current_blocks .last() .is_some_and(|block| block.hash() == block_parent) - || sync_head_found + && !self.current_blocks.contains(&block) { + pending_block_to_sync.push(block); sync_head_found = true; + pending_block_to_sync.reverse(); + self.current_blocks.extend(pending_block_to_sync); break; } pending_block_to_sync.push(block); last_header_to_sync = block_parent; } - if sync_head_found { - pending_block_to_sync.reverse(); - self.current_blocks.extend(pending_block_to_sync); - } // Execute full blocks // while self.current_blocks.len() >= *EXECUTE_BATCH_SIZE // || (!self.current_blocks.is_empty() && sync_head_found) From 5ce915c13e419ab4c1d6f956aa92bd1e5e7c616b Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Thu, 2 Oct 2025 16:47:11 -0300 Subject: [PATCH 32/37] add pending blocks retrieval limit --- crates/networking/p2p/sync.rs | 36 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 495430a1b0..aa10672ab9 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -56,6 +56,12 @@ const BYTECODE_CHUNK_SIZE: usize = 50_000; /// that are unlikely to be re-orged. const MISSING_SLOTS_PERCENTAGE: f64 = 0.8; +/// Searching for pending blocks to include during syncing can potentially slow down block processing if we have a large chain of pending blocks. +/// For example, if we are syncing from genesis, it might take a while until this chain connects, but the searching for pending blocks loop will +/// happen each time we process a batch of blocks, and can potentially take long, that's why we just limit this check to 32 blocks. +/// Eventually we will implement some in-memory structure to make this check easy. +const PENDING_BLOCKS_RETRIEVAL_LIMIT: usize = 32; + #[cfg(feature = "sync-test")] lazy_static::lazy_static! { static ref EXECUTE_BATCH_SIZE: usize = std::env::var("EXECUTE_BATCH_SIZE").map(|var| var.parse().expect("Execute batch size environmental variable is not a number")).unwrap_or(EXECUTE_BATCH_SIZE_DEFAULT); @@ -368,28 +374,14 @@ impl Syncer { debug!("Sync Log 9: Received {} block headers", block_headers.len()); - let (first_block_hash, first_block_number, first_block_parent_hash) = - match block_headers.first() { - Some(header) => (header.hash(), header.number, header.parent_hash), - None => continue, - }; + let first_block_number = match block_headers.first() { + Some(header) => header.number, + None => continue, + }; let (last_block_hash, last_block_number) = match block_headers.last() { Some(header) => (header.hash(), header.number), None => continue, }; - // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck - // on a loop when the target head is not found, i.e. on a reorg with a side-chain. - if first_block_hash == last_block_hash - && first_block_hash == current_head - && current_head != sync_head - { - // There is no path to the sync head this goes back until it find a common ancerstor - warn!( - "Sync failed to find target block header, going back to the previous parent" - ); - current_head = first_block_parent_hash; - continue; - } debug!( "Received {} block headers| First Number: {} Last Number: {}", @@ -431,7 +423,7 @@ impl Syncer { } if sync_head_found { - break; + return Ok(()); }; } } @@ -712,13 +704,13 @@ impl FullBlockSyncState { // Then we add it in current_blocks for execution. let mut pending_block_to_sync = vec![]; let mut last_header_to_sync = sync_head; + let mut pending_blocks_retieved = 0; while let Some(block) = self.store.get_pending_block(last_header_to_sync).await? { let block_parent = block.header.parent_hash; if self .current_blocks .last() .is_some_and(|block| block.hash() == block_parent) - && !self.current_blocks.contains(&block) { pending_block_to_sync.push(block); sync_head_found = true; @@ -728,6 +720,10 @@ impl FullBlockSyncState { } pending_block_to_sync.push(block); last_header_to_sync = block_parent; + if pending_blocks_retieved > PENDING_BLOCKS_RETRIEVAL_LIMIT { + break; + } + pending_blocks_retieved += 1; } // Execute full blocks From 3fb6179bdab9a356ca4fae6933ce770bd767b400 Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Thu, 2 Oct 2025 17:28:49 -0300 Subject: [PATCH 33/37] restore TODO 2126 if since it could cover some edge case --- crates/networking/p2p/sync.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index aa10672ab9..edb33dde74 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -374,15 +374,30 @@ impl Syncer { debug!("Sync Log 9: Received {} block headers", block_headers.len()); - let first_block_number = match block_headers.first() { - Some(header) => header.number, - None => continue, - }; + let (first_block_hash, first_block_number, first_block_parent_hash) = + match block_headers.first() { + Some(header) => (header.hash(), header.number, header.parent_hash), + None => continue, + }; let (last_block_hash, last_block_number) = match block_headers.last() { Some(header) => (header.hash(), header.number), None => continue, }; + // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck + // on a loop when the target head is not found, i.e. on a reorg with a side-chain. + if first_block_hash == last_block_hash + && first_block_hash == current_head + && current_head != sync_head + { + // There is no path to the sync head this goes back until it find a common ancerstor + warn!( + "Sync failed to find target block header, going back to the previous parent" + ); + current_head = first_block_parent_hash; + continue; + } + debug!( "Received {} block headers| First Number: {} Last Number: {}", block_headers.len(), From 11ec6081ee29ff115a14113163beb5246b931c21 Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Fri, 3 Oct 2025 14:32:09 -0300 Subject: [PATCH 34/37] uncomment tooling/reorgs test --- tooling/reorgs/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 71bba40309..1fc83816df 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -33,8 +33,7 @@ async fn main() { run_test(&cmd_path, test_one_block_reorg_and_back).await; - // TODO: this test is failing - // run_test(&cmd_path, test_many_blocks_reorg).await; + run_test(&cmd_path, test_many_blocks_reorg).await; } async fn get_ethrex_version(cmd_path: &Path) -> String { From 420365461ab06af37904421a3798e2c8cc91bf53 Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Fri, 3 Oct 2025 14:55:45 -0300 Subject: [PATCH 35/37] fix error in merge --- crates/networking/p2p/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 19e576bf9f..48cdd619fe 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -484,7 +484,7 @@ impl Syncer { let Some(mut block_headers) = self .peers .request_block_headers_from_hash(requested_header, BlockRequestOrder::NewToOld) - .await + .await? else { // sync_head or sync_head parent was not found warn!("Sync failed to find target block header, aborting"); From e77f174d9cebd91f6d9e754c2b3719ae686bf2cf Mon Sep 17 00:00:00 2001 From: Camila Di Ielsi Date: Fri, 3 Oct 2025 14:56:46 -0300 Subject: [PATCH 36/37] fix clippy --- tooling/reorgs/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 1fc83816df..3c28924520 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -145,7 +145,6 @@ async fn test_one_block_reorg_and_back(simulator: Arc>) { assert_eq!(new_balance, initial_balance); } -#[expect(unused)] async fn test_many_blocks_reorg(simulator: Arc>) { let mut simulator = simulator.lock().await; let signer: Signer = LocalSigner::new( From a69456d55990d8bde88eae5e259d05f7cd9de3c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 3 Oct 2025 19:22:09 -0300 Subject: [PATCH 37/37] refactor: do an early return --- crates/networking/p2p/sync.rs | 167 +++++++++++++++++----------------- tooling/reorgs/src/main.rs | 1 - 2 files changed, 84 insertions(+), 84 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 48cdd619fe..ff7ed226de 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -343,106 +343,107 @@ impl Syncer { current_head, sync_head ); - if !self + // Try syncing backwards from the sync_head to find a common ancestor + if self .synced_new_to_old(&mut block_sync_state, sync_head, store) .await? { - // synced_new_to_old returns true in case syncing was finished from NewToOld or if the requested headers were None - // if sync_finished is false that means we are more than 1024 blocks behind so, for now, we go back to syncing as it follows. - // TODO: Have full syncing always be from NewToOld, issue: https://github.com/lambdaclass/ethrex/issues/4717 - loop { - debug!("Sync Log 1: In Full Sync"); - debug!( - "Sync Log 3: State current headers len {}", - block_sync_state.current_headers.len() - ); - debug!( - "Sync Log 4: State current blocks len {}", - block_sync_state.current_blocks.len() - ); + return Ok(()); + } + // synced_new_to_old returns true in case syncing was finished from NewToOld or if the requested headers were None + // if sync_finished is false that means we are more than 1024 blocks behind so, for now, we go back to syncing as it follows. + // TODO: Have full syncing always be from NewToOld, issue: https://github.com/lambdaclass/ethrex/issues/4717 + loop { + debug!("Sync Log 1: In Full Sync"); + debug!( + "Sync Log 3: State current headers len {}", + block_sync_state.current_headers.len() + ); + debug!( + "Sync Log 4: State current blocks len {}", + block_sync_state.current_blocks.len() + ); - debug!("Requesting Block Headers from OldToNew from current_head {current_head}"); + debug!("Requesting Block Headers from OldToNew from current_head {current_head}"); - let Some(mut block_headers) = self - .peers - .request_block_headers_from_hash(current_head, BlockRequestOrder::OldToNew) - .await? - else { - warn!("Sync failed to find target block header, aborting"); - debug!("Sync Log 8: Sync failed to find target block header, aborting"); - return Ok(()); - }; + let Some(mut block_headers) = self + .peers + .request_block_headers_from_hash(current_head, BlockRequestOrder::OldToNew) + .await? + else { + warn!("Sync failed to find target block header, aborting"); + debug!("Sync Log 8: Sync failed to find target block header, aborting"); + return Ok(()); + }; - debug!("Sync Log 9: Received {} block headers", block_headers.len()); + debug!("Sync Log 9: Received {} block headers", block_headers.len()); - let (first_block_hash, first_block_number, first_block_parent_hash) = - match block_headers.first() { - Some(header) => (header.hash(), header.number, header.parent_hash), - None => continue, - }; - let (last_block_hash, last_block_number) = match block_headers.last() { - Some(header) => (header.hash(), header.number), + let (first_block_hash, first_block_number, first_block_parent_hash) = + match block_headers.first() { + Some(header) => (header.hash(), header.number, header.parent_hash), None => continue, }; + let (last_block_hash, last_block_number) = match block_headers.last() { + Some(header) => (header.hash(), header.number), + None => continue, + }; - // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck - // on a loop when the target head is not found, i.e. on a reorg with a side-chain. - if first_block_hash == last_block_hash - && first_block_hash == current_head - && current_head != sync_head - { - // There is no path to the sync head this goes back until it find a common ancerstor - warn!( - "Sync failed to find target block header, going back to the previous parent" - ); - current_head = first_block_parent_hash; - continue; - } + // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck + // on a loop when the target head is not found, i.e. on a reorg with a side-chain. + if first_block_hash == last_block_hash + && first_block_hash == current_head + && current_head != sync_head + { + // There is no path to the sync head this goes back until it find a common ancerstor + warn!("Sync failed to find target block header, going back to the previous parent"); + current_head = first_block_parent_hash; + continue; + } - debug!( - "Received {} block headers| First Number: {} Last Number: {}", - block_headers.len(), - first_block_number, - last_block_number - ); + debug!( + "Received {} block headers| First Number: {} Last Number: {}", + block_headers.len(), + first_block_number, + last_block_number + ); - // Filter out everything after the sync_head - let mut sync_head_found = false; - if let Some(index) = block_headers - .iter() - .position(|header| header.hash() == sync_head) - { - sync_head_found = true; - block_headers.drain(index + 1..); - } + // Filter out everything after the sync_head + let mut sync_head_found = false; + if let Some(index) = block_headers + .iter() + .position(|header| header.hash() == sync_head) + { + sync_head_found = true; + block_headers.drain(index + 1..); + } - // Update current fetch head - current_head = last_block_hash; + // Update current fetch head + current_head = last_block_hash; - // Discard the first header as we already have it - block_headers.remove(0); - if !block_headers.is_empty() { - let mut finished = false; - while !finished { - (finished, sync_head_found) = block_sync_state - .process_incoming_headers( - block_headers.clone(), - sync_head, - sync_head_found, - self.blockchain.clone(), - self.peers.clone(), - self.cancel_token.clone(), - ) - .await?; - block_headers.clear(); - } + // Discard the first header as we already have it + block_headers.remove(0); + if !block_headers.is_empty() { + let mut finished = false; + while !finished { + (finished, sync_head_found) = block_sync_state + .process_incoming_headers( + block_headers.clone(), + sync_head, + sync_head_found, + self.blockchain.clone(), + self.peers.clone(), + self.cancel_token.clone(), + ) + .await?; + block_headers.clear(); } - - if sync_head_found { - return Ok(()); - }; } + + if sync_head_found { + break; + }; } + Ok(()) } diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 3c28924520..ace37efe6a 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -32,7 +32,6 @@ async fn main() { info!(""); run_test(&cmd_path, test_one_block_reorg_and_back).await; - run_test(&cmd_path, test_many_blocks_reorg).await; }