diff --git a/.github/workflows/pr-main_l1.yaml b/.github/workflows/pr-main_l1.yaml index 4e53a85a00..abe0bd48fc 100644 --- a/.github/workflows/pr-main_l1.yaml +++ b/.github/workflows/pr-main_l1.yaml @@ -203,7 +203,7 @@ jobs: ethrex_flags: "" - name: "Engine withdrawal tests" simulation: ethereum/engine - test_pattern: "engine-withdrawals/Corrupted Block Hash Payload|Empty Withdrawals|engine-withdrawals test loader|GetPayloadBodies|GetPayloadV2 Block Value|Max Initcode Size|Sync after 2 blocks - Withdrawals on Genesis|Withdraw many accounts|Withdraw to a single account|Withdraw to two accounts|Withdraw zero amount|Withdraw many accounts|Withdrawals Fork on Block 1 - 1 Block Re-Org|Withdrawals Fork on Block 1 - 8 Block Re-Org NewPayload|Withdrawals Fork on Block 2|Withdrawals Fork on Block 3|Withdrawals Fork on Block 8 - 10 Block Re-Org NewPayload|Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org [^S]|Withdrawals Fork on Canonical Block 8 / Side Block 9 - 10 Block Re-Org [^S]" + test_pattern: "engine-withdrawals" - name: "Sync full" simulation: ethereum/sync test_pattern: "" diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index a26222b43c..ff7ed226de 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -57,6 +57,12 @@ const BYTECODE_CHUNK_SIZE: usize = 50_000; /// that are unlikely to be re-orged. const MISSING_SLOTS_PERCENTAGE: f64 = 0.8; +/// Searching for pending blocks to include during syncing can potentially slow down block processing if we have a large chain of pending blocks. +/// For example, if we are syncing from genesis, it might take a while until this chain connects, but the searching for pending blocks loop will +/// happen each time we process a batch of blocks, and can potentially take long, that's why we just limit this check to 32 blocks. +/// Eventually we will implement some in-memory structure to make this check easy. +const PENDING_BLOCKS_RETRIEVAL_LIMIT: usize = 32; + #[cfg(feature = "sync-test")] lazy_static::lazy_static! { static ref EXECUTE_BATCH_SIZE: usize = std::env::var("EXECUTE_BATCH_SIZE").map(|var| var.parse().expect("Execute batch size environmental variable is not a number")).unwrap_or(EXECUTE_BATCH_SIZE_DEFAULT); @@ -337,6 +343,16 @@ impl Syncer { current_head, sync_head ); + // Try syncing backwards from the sync_head to find a common ancestor + if self + .synced_new_to_old(&mut block_sync_state, sync_head, store) + .await? + { + return Ok(()); + } + // synced_new_to_old returns true in case syncing was finished from NewToOld or if the requested headers were None + // if sync_finished is false that means we are more than 1024 blocks behind so, for now, we go back to syncing as it follows. + // TODO: Have full syncing always be from NewToOld, issue: https://github.com/lambdaclass/ethrex/issues/4717 loop { debug!("Sync Log 1: In Full Sync"); debug!( @@ -348,7 +364,7 @@ impl Syncer { block_sync_state.current_blocks.len() ); - debug!("Requesting Block Headers from {current_head}"); + debug!("Requesting Block Headers from OldToNew from current_head {current_head}"); let Some(mut block_headers) = self .peers @@ -371,6 +387,7 @@ impl Syncer { Some(header) => (header.hash(), header.number), None => continue, }; + // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck // on a loop when the target head is not found, i.e. on a reorg with a side-chain. if first_block_hash == last_block_hash @@ -426,9 +443,88 @@ impl Syncer { break; }; } + Ok(()) } + /// Tries to perform syncing going backwards from the sync_head with one batch of requested headers. + /// This is to cover the case where we are on a sidechain and the peer doesn't have our current_head + /// so when requesting headers from our current_head on we get None and we never get to finish syncing. + /// For more context go to the PR https://github.com/lambdaclass/ethrex/pull/4676 + /// + /// # Returns + /// + /// Returns an error if the sync fails at any given step and aborts all active processes + /// otherwise returns true whether syncing was finished or in case the request of headers returned None, + /// otherwise returns false, this means we couldn't find a common ancestor within the requested headers, + /// which in turn means the chain is more than 1024 blocks behind. + async fn synced_new_to_old( + &mut self, + block_sync_state: &mut FullBlockSyncState, + sync_head: H256, + store: Store, + ) -> Result { + debug!("Sync Log 1: In Full Sync"); + debug!( + "Sync Log 3: State current headers len {}", + block_sync_state.current_headers.len() + ); + debug!( + "Sync Log 4: State current blocks len {}", + block_sync_state.current_blocks.len() + ); + + debug!("Requesting Block Headers from NewToOld from sync_head {sync_head}"); + + // Get oldest pending block to use in the request for headers + let mut requested_header = sync_head; + while let Some(block) = store.get_pending_block(requested_header).await? { + requested_header = block.header.parent_hash; + } + + let Some(mut block_headers) = self + .peers + .request_block_headers_from_hash(requested_header, BlockRequestOrder::NewToOld) + .await? + else { + // sync_head or sync_head parent was not found + warn!("Sync failed to find target block header, aborting"); + debug!("Sync Log 8: Sync failed to find target block header, aborting"); + return Ok(true); + }; + + debug!("Sync Log 9: Received {} block headers", block_headers.len()); + + let mut found_common_ancestor = false; + for i in 0..block_headers.len() { + if store + .get_block_by_hash(block_headers[i].hash()) + .await? + .is_some() + { + block_headers.drain(i..); + found_common_ancestor = true; + break; + } + } + + if found_common_ancestor { + block_headers.reverse(); + block_sync_state + .process_incoming_headers( + block_headers, + sync_head, + true, // sync_head_found is true because of the NewToOld headers request + self.blockchain.clone(), + self.peers.clone(), + self.cancel_token.clone(), + ) + .await?; + return Ok(true); + } + Ok(false) + } + /// Executes the given blocks and stores them /// If sync_head_found is true, they will be executed one by one /// If sync_head_found is false, they will be executed in a single batch @@ -621,16 +717,32 @@ impl FullBlockSyncState { self.current_blocks.extend(blocks); // } - // If we have the sync_head as a pending block from a new_payload request and its parent_hash matches the hash of the latest received header - // we set the sync_head as found. Then we add it in current_blocks for execution. - if let Some(block) = self.store.get_pending_block(sync_head).await? { - if let Some(last_block) = self.current_blocks.last() { - if last_block.hash() == block.header.parent_hash { - self.current_blocks.push(block); - sync_head_found = true; - } + // We check if we have pending blocks we didn't request that are needed for syncing + // Then we add it in current_blocks for execution. + let mut pending_block_to_sync = vec![]; + let mut last_header_to_sync = sync_head; + let mut pending_blocks_retieved = 0; + while let Some(block) = self.store.get_pending_block(last_header_to_sync).await? { + let block_parent = block.header.parent_hash; + if self + .current_blocks + .last() + .is_some_and(|block| block.hash() == block_parent) + { + pending_block_to_sync.push(block); + sync_head_found = true; + pending_block_to_sync.reverse(); + self.current_blocks.extend(pending_block_to_sync); + break; } + pending_block_to_sync.push(block); + last_header_to_sync = block_parent; + if pending_blocks_retieved > PENDING_BLOCKS_RETRIEVAL_LIMIT { + break; + } + pending_blocks_retieved += 1; } + // Execute full blocks // while self.current_blocks.len() >= *EXECUTE_BATCH_SIZE // || (!self.current_blocks.is_empty() && sync_head_found) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 71bba40309..82616858be 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -5,6 +5,7 @@ use std::{ }; use ethrex::{cli::Options, initializers::init_tracing}; +use ethrex_common::U256; use ethrex_l2_rpc::signer::{LocalSigner, Signer}; use tokio::sync::Mutex; use tracing::{error, info, warn}; @@ -32,9 +33,8 @@ async fn main() { info!(""); run_test(&cmd_path, test_one_block_reorg_and_back).await; - - // TODO: this test is failing - // run_test(&cmd_path, test_many_blocks_reorg).await; + run_test(&cmd_path, test_storage_slots_reorg).await; + run_test(&cmd_path, test_many_blocks_reorg).await; } async fn get_ethrex_version(cmd_path: &Path) -> String { @@ -54,7 +54,10 @@ where let start = std::time::Instant::now(); info!(test=%test_name, "Running test"); - let simulator = Arc::new(Mutex::new(Simulator::new(cmd_path.to_path_buf()))); + let simulator = Arc::new(Mutex::new(Simulator::new( + cmd_path.to_path_buf(), + test_name.to_string(), + ))); // Run in another task to clean up properly on panic let result = tokio::spawn(test_fn(simulator.clone())).await; @@ -146,7 +149,6 @@ async fn test_one_block_reorg_and_back(simulator: Arc>) { assert_eq!(new_balance, initial_balance); } -#[expect(unused)] async fn test_many_blocks_reorg(simulator: Arc>) { let mut simulator = simulator.lock().await; let signer: Signer = LocalSigner::new( @@ -217,3 +219,107 @@ async fn test_many_blocks_reorg(simulator: Arc>) { let new_balance = node0.get_balance(recipient).await; assert_eq!(new_balance, initial_balance + transfer_amount); } + +async fn test_storage_slots_reorg(simulator: Arc>) { + let mut simulator = simulator.lock().await; + // Initcode for deploying a contract that receives two `bytes32` parameters and sets `storage[param0] = param1` + let contract_deploy_bytecode = hex::decode("656020355f35555f526006601af3").unwrap().into(); + let signer: Signer = LocalSigner::new( + "941e103320615d394a55708be13e45994c7d93b932b064dbcb2b511fe3254e2e" + .parse() + .unwrap(), + ) + .into(); + + let slot_key0 = U256::from(42); + let slot_value0 = U256::from(1163); + let slot_key1 = U256::from(25); + let slot_value1 = U256::from(7474); + + let node0 = simulator.start_node().await; + let node1 = simulator.start_node().await; + + // Create a chain with a few empty blocks + let mut base_chain = simulator.get_base_chain(); + + // Send a deploy tx for a contract which receives: `(bytes32 key, bytes32 value)` as parameters + let contract_address = node0 + .send_contract_deploy(&signer, contract_deploy_bytecode) + .await; + + for _ in 0..10 { + let extended_base_chain = node0.build_payload(base_chain).await; + node0.notify_new_payload(&extended_base_chain).await; + node0.update_forkchoice(&extended_base_chain).await; + + node1.notify_new_payload(&extended_base_chain).await; + node1.update_forkchoice(&extended_base_chain).await; + base_chain = extended_base_chain; + } + + // Sanity check: storage slots are initially empty + let initial_value = node0.get_storage_at(contract_address, slot_key0).await; + assert_eq!(initial_value, U256::zero()); + let initial_value = node0.get_storage_at(contract_address, slot_key1).await; + assert_eq!(initial_value, U256::zero()); + + // Fork the chain + let mut side_chain = base_chain.fork(); + + // Create a side chain with multiple blocks only known to node0 + for _ in 0..10 { + side_chain = node0.build_payload(side_chain).await; + node0.notify_new_payload(&side_chain).await; + node0.update_forkchoice(&side_chain).await; + } + + // Advance the base chain with multiple blocks only known to node1 + for _ in 0..10 { + base_chain = node1.build_payload(base_chain).await; + node1.notify_new_payload(&base_chain).await; + node1.update_forkchoice(&base_chain).await; + } + + // Set a storage slot in the contract in node0 + let calldata0 = [slot_key0.to_big_endian(), slot_value0.to_big_endian()] + .concat() + .into(); + node0.send_call(&signer, contract_address, calldata0).await; + + // Set another storage slot in the contract in node1 + let calldata1 = [slot_key1.to_big_endian(), slot_value1.to_big_endian()] + .concat() + .into(); + node1.send_call(&signer, contract_address, calldata1).await; + + // Build a block in the side chain + side_chain = node0.build_payload(side_chain).await; + node0.notify_new_payload(&side_chain).await; + node0.update_forkchoice(&side_chain).await; + + // Build a block in the base chain + base_chain = node1.build_payload(base_chain).await; + node1.notify_new_payload(&base_chain).await; + node1.update_forkchoice(&base_chain).await; + + // Assert the storage slots are as expected in both forks + let value_slot0 = node0.get_storage_at(contract_address, slot_key0).await; + assert_eq!(value_slot0, slot_value0); + let value_slot1 = node0.get_storage_at(contract_address, slot_key1).await; + assert_eq!(value_slot1, U256::zero()); + + let value_slot0 = node1.get_storage_at(contract_address, slot_key0).await; + assert_eq!(value_slot0, U256::zero()); + let value_slot1 = node1.get_storage_at(contract_address, slot_key1).await; + assert_eq!(value_slot1, slot_value1); + + // Reorg the node0 to the base chain + node0.notify_new_payload(&base_chain).await; + node0.update_forkchoice(&base_chain).await; + + // Check the storage slots are as expected after the reorg + let value_slot0 = node0.get_storage_at(contract_address, slot_key0).await; + assert_eq!(value_slot0, U256::zero()); + let value_slot1 = node0.get_storage_at(contract_address, slot_key1).await; + assert_eq!(value_slot1, slot_value1); +} diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs index b70617f2a0..227db3e2c7 100644 --- a/tooling/reorgs/src/simulator.rs +++ b/tooling/reorgs/src/simulator.rs @@ -1,8 +1,11 @@ -use std::{fs::File, io::Read, path::PathBuf, process::Stdio, time::Duration}; +use std::{ + fs::File, io::Read, path::PathBuf, process::Stdio, sync::atomic::AtomicU16, time::Duration, +}; use ethrex::{cli::Options, initializers::get_network}; use ethrex_common::{ - Bytes, H160, H256, U256, + Address, Bytes, H160, H256, U256, + evm::calculate_create_address, types::{ Block, EIP1559Transaction, Genesis, Transaction, TxKind, requests::compute_requests_hash, }, @@ -26,6 +29,8 @@ use tracing::{error, info}; pub struct Simulator { cmd_path: PathBuf, + test_name: String, + base_opts: Options, jwt_secret: Bytes, genesis_path: PathBuf, @@ -35,7 +40,7 @@ pub struct Simulator { } impl Simulator { - pub fn new(cmd_path: PathBuf) -> Self { + pub fn new(cmd_path: PathBuf, test_name: String) -> Self { let mut opts = Options::default_l1(); let jwt_secret = generate_jwt_secret(); std::fs::write("jwt.hex", hex::encode(&jwt_secret)).unwrap(); @@ -52,6 +57,7 @@ impl Simulator { opts.network = Some(Network::GenesisPath(genesis_path.clone())); Self { cmd_path, + test_name, base_opts: opts, genesis_path, jwt_secret, @@ -69,18 +75,23 @@ impl Simulator { pub async fn start_node(&mut self) -> Node { let n = self.configs.len(); + let test_name = &self.test_name; info!(node = n, "Starting node"); let mut opts = self.base_opts.clone(); - opts.http_port = (8545 + n * 2).to_string(); - opts.authrpc_port = (8545 + n * 2 + 1).to_string(); - opts.p2p_port = (30303 + n).to_string(); - opts.discovery_port = (30303 + n).to_string(); - opts.datadir = format!("data/node{n}").into(); + opts.datadir = format!("data/{test_name}/node{n}").into(); + + opts.http_port = get_next_port().to_string(); + opts.authrpc_port = get_next_port().to_string(); + + // These are one TCP and one UDP + let p2p_port = get_next_port(); + opts.p2p_port = p2p_port.to_string(); + opts.discovery_port = p2p_port.to_string(); let _ = std::fs::remove_dir_all(&opts.datadir); std::fs::create_dir_all(&opts.datadir).expect("Failed to create data directory"); - let logs_file_path = format!("data/node{n}.log"); + let logs_file_path = format!("data/{test_name}/node{n}.log"); let logs_file = File::create(&logs_file_path).expect("Failed to create logs file"); let cancel = CancellationToken::new(); @@ -333,14 +344,96 @@ impl Node { .unwrap(); } + pub async fn send_call(&self, signer: &Signer, contract: H160, data: Bytes) { + info!(node = self.index, sender=%signer.address(), %contract, "Sending contract call"); + let chain_id = self + .rpc_client + .get_chain_id() + .await + .unwrap() + .try_into() + .unwrap(); + let sender_address = signer.address(); + let nonce = self + .rpc_client + .get_nonce(sender_address, BlockIdentifier::Tag(BlockTag::Latest)) + .await + .unwrap(); + let tx = EIP1559Transaction { + chain_id, + nonce, + max_priority_fee_per_gas: 0, + max_fee_per_gas: 1_000_000_000, + gas_limit: 50_000, + to: TxKind::Call(contract), + data, + ..Default::default() + }; + let mut tx = Transaction::EIP1559Transaction(tx); + tx.sign_inplace(signer).await.unwrap(); + let encoded_tx = tx.encode_canonical_to_vec(); + self.rpc_client + .send_raw_transaction(&encoded_tx) + .await + .unwrap(); + } + + pub async fn send_contract_deploy( + &self, + signer: &Signer, + contract_deploy_bytecode: Bytes, + ) -> Address { + info!(node = self.index, sender=%signer.address(), "Deploying contract"); + let chain_id = self + .rpc_client + .get_chain_id() + .await + .unwrap() + .try_into() + .unwrap(); + let sender_address = signer.address(); + let nonce = self + .rpc_client + .get_nonce(sender_address, BlockIdentifier::Tag(BlockTag::Latest)) + .await + .unwrap(); + let tx = EIP1559Transaction { + chain_id, + nonce, + max_priority_fee_per_gas: 0, + max_fee_per_gas: 1_000_000_000, + gas_limit: 100_000, + to: TxKind::Create, + data: contract_deploy_bytecode, + ..Default::default() + }; + let mut tx = Transaction::EIP1559Transaction(tx); + tx.sign_inplace(signer).await.unwrap(); + let encoded_tx = tx.encode_canonical_to_vec(); + self.rpc_client + .send_raw_transaction(&encoded_tx) + .await + .unwrap(); + + calculate_create_address(sender_address, nonce) + } + pub async fn get_balance(&self, address: H160) -> U256 { self.rpc_client .get_balance(address, Default::default()) .await .unwrap() } + + pub async fn get_storage_at(&self, address: H160, key: U256) -> U256 { + self.rpc_client + .get_storage_at(address, key, Default::default()) + .await + .unwrap() + } } +#[derive(Debug)] pub struct Chain { block_hashes: Vec, blocks: Vec, @@ -431,3 +524,8 @@ async fn wait_until_synced(engine_client: &EngineClient, fork_choice_state: Fork tokio::time::sleep(Duration::from_millis(100)).await; } } + +fn get_next_port() -> u16 { + static NEXT_PORT: AtomicU16 = AtomicU16::new(8560); + NEXT_PORT.fetch_add(1, std::sync::atomic::Ordering::Relaxed) +}