diff --git a/crates/common/rlp/decode.rs b/crates/common/rlp/decode.rs index 8c03e81dce..a68ec34de8 100644 --- a/crates/common/rlp/decode.rs +++ b/crates/common/rlp/decode.rs @@ -513,7 +513,7 @@ pub fn decode_bytes(data: &[u8]) -> Result<(&[u8], &[u8]), RLPDecodeError> { /// Pads a slice of bytes with zeros on the left to make it a fixed size slice. /// The size of the data must be less than or equal to the size of the output array. #[inline] -pub fn static_left_pad(data: &[u8]) -> Result<[u8; N], RLPDecodeError> { +pub(crate) fn static_left_pad(data: &[u8]) -> Result<[u8; N], RLPDecodeError> { let mut result = [0; N]; if data.is_empty() { diff --git a/crates/common/rlp/encode.rs b/crates/common/rlp/encode.rs index 1de929a8b9..29819a7a8c 100644 --- a/crates/common/rlp/encode.rs +++ b/crates/common/rlp/encode.rs @@ -188,7 +188,7 @@ impl RLPEncode for Vec { } } -pub fn encode_length(total_len: usize, buf: &mut dyn BufMut) { +pub(crate) fn encode_length(total_len: usize, buf: &mut dyn BufMut) { if total_len < 56 { buf.put_u8(0xc0 + total_len as u8); } else { diff --git a/crates/common/rlp/error.rs b/crates/common/rlp/error.rs index 91d4961db2..17f3a222b2 100644 --- a/crates/common/rlp/error.rs +++ b/crates/common/rlp/error.rs @@ -26,6 +26,10 @@ pub enum RLPDecodeError { pub enum RLPEncodeError { #[error("InvalidCompression")] InvalidCompression(#[from] snap::Error), + #[error("IncompatibleProtocol")] + IncompatibleProtocol, + #[error("MalformedData")] + MalformedData, #[error("{0}")] Custom(String), } diff --git a/crates/networking/p2p/discv4/messages.rs b/crates/networking/p2p/discv4/messages.rs index ea479eb260..8186b32a0e 100644 --- a/crates/networking/p2p/discv4/messages.rs +++ b/crates/networking/p2p/discv4/messages.rs @@ -229,8 +229,6 @@ impl PingMessage { } } - // TODO: remove when used - #[allow(unused)] pub fn with_enr_seq(self, enr_seq: u64) -> Self { Self { enr_seq: Some(enr_seq), diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index be73be6f2f..5df3457e63 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -329,9 +329,15 @@ impl PeerHandler { if let Some(receipts) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move { loop { match receiver.recv().await { - Some(RLPxMessage::Receipts(receipts)) => { - if receipts.get_id() == request_id { - return Some(receipts.get_receipts()); + Some(RLPxMessage::Receipts68(receipts)) => { + if receipts.id == request_id { + return Some(receipts.receipts); + } + return None; + } + Some(RLPxMessage::Receipts69(receipts)) => { + if receipts.id == request_id { + return Some(receipts.receipts); } return None; } diff --git a/crates/networking/p2p/rlpx/connection/codec.rs b/crates/networking/p2p/rlpx/connection/codec.rs index 24525ba42c..50bda33e51 100644 --- a/crates/networking/p2p/rlpx/connection/codec.rs +++ b/crates/networking/p2p/rlpx/connection/codec.rs @@ -1,4 +1,7 @@ +use crate::rlpx::connection::server::Capabilities; use crate::rlpx::{error::RLPxError, message as rlpx, utils::ecdh_xchng}; +use std::sync::Arc; +use tokio::sync::Mutex; use super::handshake::{LocalState, RemoteState}; use aes::{ @@ -24,6 +27,7 @@ pub(crate) struct RLPxCodec { pub(crate) egress_mac: Keccak256, pub(crate) ingress_aes: Aes256Ctr64BE, pub(crate) egress_aes: Aes256Ctr64BE, + pub capabilities: Arc>, } impl RLPxCodec { @@ -31,6 +35,7 @@ impl RLPxCodec { local_state: &LocalState, remote_state: &RemoteState, hashed_nonces: [u8; 32], + capabilities: Arc>, ) -> Result { let ephemeral_key_secret = ecdh_xchng( &local_state.ephemeral_key, @@ -67,8 +72,49 @@ impl RLPxCodec { egress_mac, ingress_aes, egress_aes, + capabilities, }) } + + // pub fn set_p2p_protocol(&mut self, cap: &Capability) -> Result<(), RLPxError> { + // if !cap.is_p2p() { + // return Err(RLPxError::InternalError( + // "The protocol should be p2p".into(), + // )); + // } + // self.p2p_protocol = Some(cap.clone()); + // Ok(()) + // } + + // pub fn set_eth_protocol(&mut self, cap: &Capability) -> Result<(), RLPxError> { + // if !cap.is_eth() { + // return Err(RLPxError::InternalError( + // "The protocol should be eth".into(), + // )); + // } + // if self.p2p_protocol.is_none() { + // return Err(RLPxError::InternalError( + // "p2p protocol should be established first".into(), + // )); + // } + // self.eth_protocol = Some(cap.clone()); + // Ok(()) + // } + + // pub fn set_snap_protocol(&mut self, cap: &Capability) -> Result<(), RLPxError> { + // if !cap.is_snap() { + // return Err(RLPxError::InternalError( + // "The protocol should be snap".into(), + // )); + // } + // if self.eth_protocol.is_none() { + // return Err(RLPxError::InternalError( + // "Eth protocol should be established first".into(), + // )); + // } + // self.snap_protocol = Some(cap.clone()); + // Ok(()) + // } } impl Decoder for RLPxCodec { @@ -193,7 +239,18 @@ impl Decoder for RLPxCodec { let (frame_data, _padding) = frame_ciphertext.split_at(frame_size); let (msg_id, msg_data): (u8, _) = RLPDecode::decode_unfinished(frame_data)?; - Ok(Some(rlpx::Message::decode(msg_id, msg_data)?)) + + // NOTE: this crashes since it is not possible to create a runtime within an async context. + // Find the right way to lock the capabilities mutex + let rt = tokio::runtime::Runtime::new().unwrap(); + let capabilities = rt.block_on(async { self.capabilities.lock().await }); + Ok(Some(rlpx::Message::decode( + msg_id, + msg_data, + &capabilities.p2p, + &capabilities.eth, + &capabilities.snap, + )?)) } fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { @@ -222,7 +279,17 @@ impl Encoder for RLPxCodec { fn encode(&mut self, message: rlpx::Message, buffer: &mut BytesMut) -> Result<(), Self::Error> { let mut frame_data = vec![]; - message.encode(&mut frame_data)?; + + // NOTE: this crashes since it is not possible to create a runtime within an async context. + // Find the right way to lock the capabilities mutex + let rt = tokio::runtime::Runtime::new().unwrap(); + let capabilities = rt.block_on(async { self.capabilities.lock().await }); + message.encode( + &mut frame_data, + &capabilities.p2p, + &capabilities.eth, + &capabilities.snap, + )?; let mac_aes_cipher = Aes256Enc::new_from_slice(&self.mac_key.0)?; diff --git a/crates/networking/p2p/rlpx/connection/handshake.rs b/crates/networking/p2p/rlpx/connection/handshake.rs index 7764e0cdff..6089b41b97 100644 --- a/crates/networking/p2p/rlpx/connection/handshake.rs +++ b/crates/networking/p2p/rlpx/connection/handshake.rs @@ -6,7 +6,7 @@ use std::{ use crate::{ rlpx::{ - connection::server::{Established, InnerState}, + connection::server::{Capabilities, Established, InnerState}, error::RLPxError, utils::{ compress_pubkey, decompress_pubkey, ecdh_xchng, kdf, log_peer_debug, sha256, @@ -64,6 +64,7 @@ pub(crate) struct LocalState { pub(crate) async fn perform( state: InnerState, ) -> Result<(Established, SplitStream>), RLPxError> { + let capabilities = Arc::new(Mutex::new(Capabilities::default())); let (context, node, framed, inbound) = match state { InnerState::Initiator(Initiator { context, node }) => { let addr = SocketAddr::new(node.ip, node.tcp_port); @@ -81,7 +82,12 @@ pub(crate) async fn perform( // keccak256(nonce || initiator-nonce) let hashed_nonces: [u8; 32] = Keccak256::digest([remote_state.nonce.0, local_state.nonce.0].concat()).into(); - let codec = RLPxCodec::new(&local_state, &remote_state, hashed_nonces)?; + let codec = RLPxCodec::new( + &local_state, + &remote_state, + hashed_nonces, + capabilities.clone(), + )?; log_peer_debug(&node, "Completed handshake as initiator"); (context, node, Framed::new(stream, codec), false) } @@ -99,7 +105,12 @@ pub(crate) async fn perform( // keccak256(nonce || initiator-nonce) let hashed_nonces: [u8; 32] = Keccak256::digest([local_state.nonce.0, remote_state.nonce.0].concat()).into(); - let codec = RLPxCodec::new(&local_state, &remote_state, hashed_nonces)?; + let codec = RLPxCodec::new( + &local_state, + &remote_state, + hashed_nonces, + capabilities.clone(), + )?; let node = Node::new( peer_addr.ip(), peer_addr.port(), @@ -122,8 +133,6 @@ pub(crate) async fn perform( storage: context.storage.clone(), blockchain: context.blockchain.clone(), capabilities: vec![], - negotiated_eth_capability: None, - negotiated_snap_capability: None, last_block_range_update_block: 0, broadcasted_txs: HashSet::new(), requested_pooled_txs: HashMap::new(), @@ -132,6 +141,7 @@ pub(crate) async fn perform( table: context.table.clone(), backend_channel: None, inbound, + negotiated_capabilities: capabilities, }, stream, )) diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 51bfd6f066..7ceeb26d83 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -37,8 +37,8 @@ use crate::{ eth::{ backend, blocks::{BlockBodies, BlockHeaders}, - receipts::{GetReceipts, Receipts}, - status::StatusMessage, + receipts::{GetReceipts, Receipts68, Receipts69}, + status::{Status68Message, Status69Message}, transactions::{GetPooledTransactions, NewPooledTransactionHashes, Transactions}, update::BlockRangeUpdate, }, @@ -91,8 +91,7 @@ pub struct Established { pub(crate) storage: Store, pub(crate) blockchain: Arc, pub(crate) capabilities: Vec, - pub(crate) negotiated_eth_capability: Option, - pub(crate) negotiated_snap_capability: Option, + pub(crate) negotiated_capabilities: Arc>, pub(crate) last_block_range_update_block: u64, pub(crate) broadcasted_txs: HashSet, pub(crate) requested_pooled_txs: HashMap, @@ -113,6 +112,13 @@ pub struct Established { pub(crate) inbound: bool, } +#[derive(Default, Clone)] +pub struct Capabilities { + pub p2p: Option, + pub eth: Option, + pub snap: Option, +} + #[derive(Clone)] pub enum InnerState { Initiator(Initiator), @@ -323,7 +329,8 @@ where spawn_listener(handle.clone(), &state.node, stream); - spawn_broadcast_listener(handle.clone(), state); + let eth_capability = { &state.negotiated_capabilities.lock().await.eth }; + spawn_broadcast_listener(handle.clone(), state, eth_capability); Ok(()) } @@ -366,8 +373,9 @@ async fn send_new_pooled_tx_hashes(state: &mut Established) -> Result<(), RLPxEr } async fn send_block_range_update(state: &mut Established) -> Result<(), RLPxError> { + let eth_capability = { &state.negotiated_capabilities.lock().await.eth }; // BlockRangeUpdate was introduced in eth/69 - if let Some(eth) = &state.negotiated_eth_capability { + if let Some(eth) = eth_capability { if eth.version >= 69 { log_peer_debug(&state.node, "Sending BlockRangeUpdate"); let update = BlockRangeUpdate::new(&state.storage).await?; @@ -393,11 +401,16 @@ async fn init_capabilities(state: &mut Established, stream: &mut S) -> Result where S: Unpin + Stream>, { + let eth_capability = { &state.negotiated_capabilities.lock().await.eth }; // Sending eth Status if peer supports it - if let Some(eth) = state.negotiated_eth_capability.clone() { - let status = StatusMessage::new(&state.storage, ð).await?; + if let Some(eth) = eth_capability { + let status = match eth.version { + 68 => Message::Status68(Status68Message::new(&state.storage, ð).await?), + 69 => Message::Status69(Status69Message::new(&state.storage, ð).await?), + _ => return Err(RLPxError::IncompatibleProtocol), + }; log_peer_debug(&state.node, "Sending status"); - send(state, Message::Status(status)).await?; + send(state, status).await?; // The next immediate message in the ETH protocol is the // status, reference here: // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#status-0x00 @@ -406,9 +419,13 @@ where None => return Err(RLPxError::Disconnected()), }; match msg { - Message::Status(msg_data) => { + Message::Status68(msg_data) => { log_peer_debug(&state.node, "Received Status"); - backend::validate_status(msg_data, &state.storage, ð).await? + backend::validate_status(Box::new(msg_data), &state.storage, ð).await? + } + Message::Status69(msg_data) => { + log_peer_debug(&state.node, "Received Status"); + backend::validate_status(Box::new(msg_data), &state.storage, ð).await? } Message::Disconnect(disconnect) => { return Err(RLPxError::HandshakeError(format!( @@ -559,11 +576,13 @@ where return Err(RLPxError::NoMatchingCapabilities()); } debug!("Negotatied eth version: eth/{}", negotiated_eth_version); - state.negotiated_eth_capability = Some(Capability::eth(negotiated_eth_version)); - - if negotiated_snap_version != 0 { - debug!("Negotatied snap version: snap/{}", negotiated_snap_version); - state.negotiated_snap_capability = Some(Capability::snap(negotiated_snap_version)); + { + let mut capabilities = state.negotiated_capabilities.lock().await; + capabilities.eth = Some(Capability::eth(negotiated_eth_version)); + if negotiated_snap_version != 0 { + debug!("Negotatied snap version: snap/{}", negotiated_snap_version); + capabilities.snap = Some(Capability::snap(negotiated_snap_version)); + } } state.node.version = Some(hello_message.client_id); @@ -578,7 +597,7 @@ where } } -async fn send(state: &mut Established, message: Message) -> Result<(), RLPxError> { +async fn send(state: &Established, message: Message) -> Result<(), RLPxError> { state.sink.lock().await.send(message).await } @@ -631,14 +650,18 @@ where // See https://github.com/lambdaclass/ethrex/issues/3387 and // https://github.com/lambdaclass/spawned/issues/17 and // https://github.com/lambdaclass/ethrex/issues/3388 -fn spawn_broadcast_listener(mut handle: RLPxConnectionHandle, state: &mut Established) { +fn spawn_broadcast_listener( + mut handle: RLPxConnectionHandle, + state: &Established, + eth_capability: &Option, +) { // Subscribe this connection to the broadcasting channel. // TODO currently spawning a listener task that will suscribe to a broadcast channel and // create RLPxConnection Broadcast messages to send the Genserver // We have to improve this mechanism to avoid manual creation of channels and subscriptions // (That is, we should have a spawned-based broadcaster or maybe the backend should handle the // transactions propagation) - if state.negotiated_eth_capability.is_some() { + if eth_capability.is_some() { let mut receiver = state.connection_broadcast_send.subscribe(); spawned_rt::tasks::spawn(async move { loop { @@ -651,7 +674,7 @@ fn spawn_broadcast_listener(mut handle: RLPxConnectionHandle, state: &mut Establ } async fn handle_peer_message(state: &mut Established, message: Message) -> Result<(), RLPxError> { - let peer_supports_eth = state.negotiated_eth_capability.is_some(); + let peer_supports_eth = { state.negotiated_capabilities.lock().await.eth.is_some() }; match message { Message::Disconnect(msg_data) => { log_peer_debug( @@ -668,9 +691,16 @@ async fn handle_peer_message(state: &mut Established, message: Message) -> Resul Message::Pong(_) => { // We ignore received Pong messages } - Message::Status(msg_data) => { - if let Some(eth) = &state.negotiated_eth_capability { - backend::validate_status(msg_data, &state.storage, eth).await? + Message::Status68(msg_data) => { + let eth_capability = { &state.negotiated_capabilities.lock().await.eth }; + if let Some(eth) = eth_capability { + backend::validate_status(Box::new(msg_data), &state.storage, eth).await? + }; + } + Message::Status69(msg_data) => { + let eth_capability = { &state.negotiated_capabilities.lock().await.eth }; + if let Some(eth) = eth_capability { + backend::validate_status(Box::new(msg_data), &state.storage, eth).await? }; } Message::GetAccountRange(req) => { @@ -707,13 +737,23 @@ async fn handle_peer_message(state: &mut Established, message: Message) -> Resul send(state, Message::BlockBodies(response)).await?; } Message::GetReceipts(GetReceipts { id, block_hashes }) if peer_supports_eth => { - if let Some(eth) = &state.negotiated_eth_capability { + let eth_capability = { &state.negotiated_capabilities.lock().await.eth }; + if let Some(eth) = eth_capability { let mut receipts = Vec::new(); for hash in block_hashes.iter() { receipts.push(state.storage.get_receipts_for_block(hash)?); } - let response = Receipts::new(id, receipts, eth)?; - send(state, Message::Receipts(response)).await?; + match eth.version { + 68 => { + let response = Message::Receipts68(Receipts68::new(id, receipts)); + send(state, response).await?; + } + 69 => { + let response = Message::Receipts69(Receipts69::new(id, receipts)); + send(state, response).await? + } + _ => return Err(RLPxError::IncompatibleProtocol), + }; } } Message::BlockRangeUpdate(update) => { @@ -779,7 +819,8 @@ async fn handle_peer_message(state: &mut Established, message: Message) -> Resul | message @ Message::TrieNodes(_) | message @ Message::BlockBodies(_) | message @ Message::BlockHeaders(_) - | message @ Message::Receipts(_) => { + | message @ Message::Receipts68(_) + | message @ Message::Receipts69(_) => { state .backend_channel .as_mut() diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index 0bf1ad7a16..e61dc12888 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -67,6 +67,8 @@ pub enum RLPxError { IncompatibleProtocol, #[error("Invalid block range")] InvalidBlockRange, + #[error("Internal Error: {0}")] + InternalError(String), } // tokio::sync::mpsc::error::SendError is too large to be part of the RLPxError enum directly diff --git a/crates/networking/p2p/rlpx/eth/backend.rs b/crates/networking/p2p/rlpx/eth/backend.rs index 6e22858a26..4cfaafdcee 100644 --- a/crates/networking/p2p/rlpx/eth/backend.rs +++ b/crates/networking/p2p/rlpx/eth/backend.rs @@ -1,12 +1,10 @@ use ethrex_common::types::ForkId; use ethrex_storage::Store; -use crate::rlpx::{error::RLPxError, p2p::Capability}; - -use super::status::StatusMessage; +use crate::rlpx::{error::RLPxError, eth::status::StatusMessage, p2p::Capability}; pub async fn validate_status( - msg_data: StatusMessage, + msg_data: Box, storage: &Store, eth_capability: &Capability, ) -> Result<(), RLPxError> { @@ -29,26 +27,26 @@ pub async fn validate_status( ); //Check networkID - if msg_data.get_network_id() != chain_config.chain_id { + if msg_data.network_id() != chain_config.chain_id { return Err(RLPxError::HandshakeError( "Network Id does not match".to_string(), )); } //Check Protocol Version - if msg_data.get_eth_version() != eth_capability.version { + if msg_data.eth_version() != eth_capability.version { return Err(RLPxError::HandshakeError( "Eth protocol version does not match".to_string(), )); } //Check Genesis - if msg_data.get_genesis() != genesis_hash { + if msg_data.genesis() != genesis_hash { return Err(RLPxError::HandshakeError( "Genesis does not match".to_string(), )); } // Check ForkID if !fork_id.is_valid( - msg_data.get_fork_id(), + msg_data.fork_id(), latest_block_number, latest_block_header.timestamp, chain_config, @@ -63,14 +61,12 @@ pub async fn validate_status( #[cfg(test)] mod tests { use super::validate_status; - use crate::rlpx::eth::eth68::status::StatusMessage68; - use crate::rlpx::eth::status::StatusMessage; + use crate::rlpx::eth::status::Status68Message; use crate::rlpx::p2p::Capability; use ethrex_common::{ H256, U256, types::{ForkId, Genesis}, }; - use ethrex_storage::{EngineType, Store}; use std::{fs::File, io::BufReader}; @@ -86,6 +82,7 @@ mod tests { let reader = BufReader::new(file); let genesis: Genesis = serde_json::from_reader(reader).expect("Failed to deserialize genesis file"); + storage .add_initial_state(genesis.clone()) .await @@ -97,14 +94,15 @@ mod tests { let fork_id = ForkId::new(config, genesis_header, 2707305664, 123); let eth = Capability::eth(68); - let message = StatusMessage::StatusMessage68(StatusMessage68 { + let message = Box::new(Status68Message { eth_version: eth.version, network_id: 3503995874084926, total_difficulty, - block_hash: H256::random(), genesis: genesis_hash, fork_id, + latest_block_hash: H256::random(), }); + let result = validate_status(message, &storage, ð).await; assert!(result.is_ok()); } diff --git a/crates/networking/p2p/rlpx/eth/eth68/mod.rs b/crates/networking/p2p/rlpx/eth/eth68/mod.rs deleted file mode 100644 index 8abfab1019..0000000000 --- a/crates/networking/p2p/rlpx/eth/eth68/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod receipts; -pub mod status; diff --git a/crates/networking/p2p/rlpx/eth/eth69/mod.rs b/crates/networking/p2p/rlpx/eth/eth69/mod.rs deleted file mode 100644 index 8abfab1019..0000000000 --- a/crates/networking/p2p/rlpx/eth/eth69/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod receipts; -pub mod status; diff --git a/crates/networking/p2p/rlpx/eth/mod.rs b/crates/networking/p2p/rlpx/eth/mod.rs index 3d57bd6630..a9c599e517 100644 --- a/crates/networking/p2p/rlpx/eth/mod.rs +++ b/crates/networking/p2p/rlpx/eth/mod.rs @@ -1,7 +1,5 @@ pub(crate) mod backend; pub(crate) mod blocks; -mod eth68; -mod eth69; pub(crate) mod receipts; pub(crate) mod status; pub(crate) mod transactions; diff --git a/crates/networking/p2p/rlpx/eth/receipts.rs b/crates/networking/p2p/rlpx/eth/receipts.rs index 1ce76f052c..3ccf044c5c 100644 --- a/crates/networking/p2p/rlpx/eth/receipts.rs +++ b/crates/networking/p2p/rlpx/eth/receipts.rs @@ -1,17 +1,13 @@ -use super::eth68::receipts::Receipts68; -use super::eth69::receipts::Receipts69; +use std::fmt::Debug; + use crate::rlpx::{ - error::RLPxError, message::RLPxMessage, - p2p::Capability, utils::{snappy_compress, snappy_decompress}, }; -use ethereum_types::Bloom; use bytes::BufMut; -use ethrex_common::types::{BlockHash, Receipt}; +use ethrex_common::types::{BlockHash, Receipt, ReceiptWithBloom}; use ethrex_rlp::{ - decode::static_left_pad, error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; @@ -57,126 +53,101 @@ impl RLPxMessage for GetReceipts { // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#receipts-0x10 #[derive(Debug, Clone)] -pub(crate) enum Receipts { - Receipts68(Receipts68), - Receipts69(Receipts69), +pub(crate) struct Receipts68 { + pub id: u64, + pub receipts: Vec>, } -impl Receipts { - pub fn new(id: u64, receipts: Vec>, eth: &Capability) -> Result { - match eth.version { - 68 => Ok(Receipts::Receipts68(Receipts68::new(id, receipts))), - 69 => Ok(Receipts::Receipts69(Receipts69::new(id, receipts))), - _ => Err(RLPxError::IncompatibleProtocol), - } - } - - pub fn get_receipts(&self) -> Vec> { - match self { - Receipts::Receipts68(msg) => msg.get_receipts(), - Receipts::Receipts69(msg) => msg.receipts.clone(), - } - } - - pub fn get_id(&self) -> u64 { - match self { - Receipts::Receipts68(msg) => msg.id, - Receipts::Receipts69(msg) => msg.id, - } +impl Receipts68 { + pub fn new(id: u64, receipts: Vec>) -> Self { + Self { id, receipts } } } -impl RLPxMessage for Receipts { +impl RLPxMessage for Receipts68 { const CODE: u8 = 0x10; fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - match self { - Receipts::Receipts68(msg) => msg.encode(buf), - Receipts::Receipts69(msg) => msg.encode(buf), - } + let mut encoded_data = vec![]; + + // Map nested Receipts to ReceiptWithBloom + let receipts_with_bloom: Vec> = self + .receipts + .iter() + .map(|receipt_list| receipt_list.iter().map(|receipt| receipt.into()).collect()) + .collect(); + + Encoder::new(&mut encoded_data) + .encode_field(&self.id) + .encode_field(&receipts_with_bloom) + .finish(); + + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) } fn decode(msg_data: &[u8]) -> Result { - if has_bloom(msg_data)? { - Ok(Receipts::Receipts68(Receipts68::decode(msg_data)?)) - } else { - Ok(Receipts::Receipts69(Receipts69::decode(msg_data)?)) - } + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; + let (receipts_with_bloom, _): (Vec>, _) = + decoder.decode_field("receipts")?; + + // Map nested ReceiptWithBloom to Receipts + let receipts: Vec> = receipts_with_bloom + .iter() + .map(|receipt_list| receipt_list.iter().map(|receipt| receipt.into()).collect()) + .collect(); + + Ok(Self::new(id, receipts)) } } -// We should receive something like this: -// [request-id, [[r1], [r2], [r3],... ]] -// in this fn, we're checking if r1 has a bloom field inside -fn has_bloom(msg_data: &[u8]) -> Result { - let decompressed_data = snappy_decompress(msg_data)?; - let decoder = Decoder::new(&decompressed_data)?; - let (_, decoder): (u64, _) = decoder.decode_field("request-id")?; - - //a list should be received - let (data, _) = decoder.get_encoded_item()?; - let decoder = Decoder::new(&data)?; - //check if the list is empty - if decoder.is_done() { - return Ok(false); +#[derive(Debug, Clone)] +pub(crate) struct Receipts69 { + pub id: u64, + pub receipts: Vec>, +} + +impl Receipts69 { + pub fn new(id: u64, receipts: Vec>) -> Self { + Self { id, receipts } } +} + +impl RLPxMessage for Receipts69 { + const CODE: u8 = 0x10; + + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.id) + .encode_field(&self.receipts) + .finish(); - // inner list - let (data, _) = decoder.get_encoded_item()?; - let decoder = Decoder::new(&data)?; - if decoder.is_done() { - return Ok(false); + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) } - // we only need one element - // all elements should be the same - let (data, _) = decoder.get_encoded_item()?; - let data = match data[0] { - 0x80..=0xB7 => { - let length = (data[0] - 0x80) as usize; - if data.len() < length + 1 { - return Err(RLPDecodeError::InvalidLength); - } - &data[1..length + 1] - } - 0xB8..=0xBF => { - let length_of_length = (data[0] - 0xB7) as usize; - if data.len() < length_of_length + 1 { - return Err(RLPDecodeError::InvalidLength); - } - let length_bytes = &data[1..length_of_length + 1]; - let length = usize::from_be_bytes(static_left_pad(length_bytes)?); - if data.len() < length_of_length + length + 1 { - return Err(RLPDecodeError::InvalidLength); - } - &data[length_of_length + 1..length_of_length + length + 1] - } - _ => return Ok(false), - }; - let data = match data[0] { - tx_type if tx_type < 0x7f => &data[1..], - _ => &data[0..], - }; - let decoder = Decoder::new(data)?; - let (_, decoder): (bool, _) = decoder.decode_field("succeeded")?; - let (_, decoder): (u64, _) = decoder.decode_field("cumulative_gas_used")?; - // try to decode the bloom field - match decoder.decode_field::("bloom") { - Ok(_) => Ok(true), - Err(_) => Ok(false), + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; + let (receipts, _): (Vec>, _) = decoder.decode_field("receipts")?; + + Ok(Self::new(id, receipts)) } } #[cfg(test)] mod tests { - use crate::rlpx::eth::receipts::has_bloom; use crate::rlpx::{ - eth::receipts::{GetReceipts, Receipts}, + eth::receipts::{GetReceipts, Receipts68, Receipts69}, message::RLPxMessage, - p2p::Capability, }; - use ethrex_common::types::transaction::TxType; - use ethrex_common::types::{BlockHash, Receipt}; + use ethrex_common::types::{BlockHash, Receipt, TxType}; #[test] fn get_receipts_empty_message() { @@ -209,36 +180,66 @@ mod tests { } #[test] - fn receipts_empty_message() { + fn receipts68_empty_message() { let receipts = vec![]; - let receipts = Receipts::new(1, receipts, &Capability::eth(68)).unwrap(); + let receipts = Receipts68::new(1, receipts); let mut buf = Vec::new(); receipts.encode(&mut buf).unwrap(); - let decoded = Receipts::decode(&buf).unwrap(); + let decoded = Receipts68::decode(&buf).unwrap(); - assert_eq!(decoded.get_id(), 1); - assert_eq!(decoded.get_receipts(), Vec::>::new()); + assert_eq!(decoded.id, 1); + assert_eq!(decoded.receipts, Vec::>::new()); } #[test] - fn receipts_check_bloom() { + fn receipts68_not_empty_message() { let receipts = vec![vec![ Receipt::new(TxType::EIP7702, true, 210000, vec![]), Receipt::new(TxType::EIP7702, true, 210000, vec![]), Receipt::new(TxType::EIP7702, true, 210000, vec![]), Receipt::new(TxType::EIP7702, true, 210000, vec![]), ]]; - let receipts68 = Receipts::new(255, receipts.clone(), &Capability::eth(68)).unwrap(); - let receipts69 = Receipts::new(255, receipts, &Capability::eth(69)).unwrap(); + let id = 255; + let receipts68 = Receipts68::new(id, receipts.clone()); let mut buf = Vec::new(); receipts68.encode(&mut buf).unwrap(); - assert!(has_bloom(&buf).unwrap()); + let receipts68_decoded = Receipts68::decode(&buf).unwrap(); + assert_eq!(receipts68_decoded.id, id); + assert_eq!(receipts68_decoded.receipts, receipts); + } + + #[test] + fn receipts69_empty_message() { + let receipts = vec![]; + let receipts = Receipts69::new(1, receipts); + + let mut buf = Vec::new(); + receipts.encode(&mut buf).unwrap(); + + let decoded = Receipts69::decode(&buf).unwrap(); + + assert_eq!(decoded.id, 1); + assert_eq!(decoded.receipts, Vec::>::new()); + } + + #[test] + fn receipts69_not_empty_message() { + let receipts = vec![vec![ + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + ]]; + let id = 255; + let receipts69 = Receipts69::new(id, receipts.clone()); let mut buf = Vec::new(); receipts69.encode(&mut buf).unwrap(); - assert!(!has_bloom(&buf).unwrap()); + let receipts69_decoded = Receipts69::decode(&buf).unwrap(); + assert_eq!(receipts69_decoded.id, id); + assert_eq!(receipts69_decoded.receipts, receipts); } } diff --git a/crates/networking/p2p/rlpx/eth/status.rs b/crates/networking/p2p/rlpx/eth/status.rs index c6660bb9ec..0c233693a3 100644 --- a/crates/networking/p2p/rlpx/eth/status.rs +++ b/crates/networking/p2p/rlpx/eth/status.rs @@ -1,48 +1,50 @@ -use super::eth68::status::StatusMessage68; -use super::eth69::status::StatusMessage69; +use std::fmt::Debug; + use crate::rlpx::message::RLPxMessage; -use crate::rlpx::utils::snappy_decompress; +use crate::rlpx::utils::{snappy_compress, snappy_decompress}; use crate::rlpx::{error::RLPxError, p2p::Capability}; use bytes::BufMut; use ethrex_common::U256; use ethrex_common::types::{BlockHash, ForkId}; -use ethrex_rlp::error::{RLPDecodeError, RLPEncodeError}; -use ethrex_rlp::structs::Decoder; +use ethrex_rlp::{ + error::{RLPDecodeError, RLPEncodeError}, + structs::{Decoder, Encoder}, +}; use ethrex_storage::Store; +pub trait StatusMessage { + fn eth_version(&self) -> u8; + fn network_id(&self) -> u64; + fn genesis(&self) -> BlockHash; + fn fork_id(&self) -> ForkId; +} + #[derive(Debug, Clone)] -pub enum StatusMessage { - StatusMessage68(StatusMessage68), - StatusMessage69(StatusMessage69), +pub struct Status68Message { + pub(crate) eth_version: u8, + pub(crate) network_id: u64, + pub(crate) total_difficulty: U256, + pub(crate) genesis: BlockHash, + pub(crate) fork_id: ForkId, + pub(crate) latest_block_hash: BlockHash, } -impl RLPxMessage for StatusMessage { - const CODE: u8 = 0x00; - fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - match self { - StatusMessage::StatusMessage68(msg) => msg.encode(buf), - StatusMessage::StatusMessage69(msg) => msg.encode(buf), - } +impl StatusMessage for Status68Message { + fn eth_version(&self) -> u8 { + self.eth_version } - - fn decode(msg_data: &[u8]) -> Result { - let decompressed_data = snappy_decompress(msg_data)?; - let decoder = Decoder::new(&decompressed_data)?; - let (eth_version, _): (u32, _) = decoder.decode_field("protocolVersion")?; - - match eth_version { - 68 => Ok(StatusMessage::StatusMessage68(StatusMessage68::decode( - msg_data, - )?)), - 69 => Ok(StatusMessage::StatusMessage69(StatusMessage69::decode( - msg_data, - )?)), - _ => Err(RLPDecodeError::IncompatibleProtocol), - } + fn network_id(&self) -> u64 { + self.network_id + } + fn genesis(&self) -> BlockHash { + self.genesis + } + fn fork_id(&self) -> ForkId { + self.fork_id.clone() } } -impl StatusMessage { +impl Status68Message { pub async fn new(storage: &Store, eth: &Capability) -> Result { let chain_config = storage.get_chain_config()?; let total_difficulty = @@ -53,67 +55,186 @@ impl StatusMessage { let genesis_header = storage .get_block_header(0)? .ok_or(RLPxError::NotFound("Genesis Block".to_string()))?; - let lastest_block = storage.get_latest_block_number().await?; + let latest_block = storage.get_latest_block_number().await?; let block_header = storage - .get_block_header(lastest_block)? - .ok_or(RLPxError::NotFound(format!("Block {lastest_block}")))?; + .get_block_header(latest_block)? + .ok_or(RLPxError::NotFound(format!("Block {latest_block}")))?; let genesis = genesis_header.hash(); - let lastest_block_hash = block_header.hash(); + let latest_block_hash = block_header.hash(); let fork_id = ForkId::new( chain_config, genesis_header, block_header.timestamp, - lastest_block, + latest_block, ); - match eth.version { - 68 => Ok(StatusMessage::StatusMessage68(StatusMessage68 { - eth_version: eth.version, - network_id, - total_difficulty, - block_hash: lastest_block_hash, - genesis, - fork_id, - })), - 69 => Ok(StatusMessage::StatusMessage69(StatusMessage69 { - eth_version: eth.version, - network_id, - genesis, - fork_id, - earliest_block: 0, - lastest_block, - lastest_block_hash, - })), - _ => Err(RLPxError::IncompatibleProtocol), - } + Ok(Self { + eth_version: eth.version, + network_id, + total_difficulty, + genesis, + fork_id, + latest_block_hash, + }) } +} - pub fn get_network_id(&self) -> u64 { - match self { - StatusMessage::StatusMessage68(msg) => msg.network_id, - StatusMessage::StatusMessage69(msg) => msg.network_id, - } +#[derive(Debug, Clone)] +pub struct Status69Message { + pub(crate) eth_version: u8, + pub(crate) network_id: u64, + pub(crate) genesis: BlockHash, + pub(crate) fork_id: ForkId, + pub(crate) earliest_block: u64, + pub(crate) latest_block: u64, + pub(crate) latest_block_hash: BlockHash, +} + +impl StatusMessage for Status69Message { + fn eth_version(&self) -> u8 { + self.eth_version + } + fn network_id(&self) -> u64 { + self.network_id + } + fn genesis(&self) -> BlockHash { + self.genesis + } + fn fork_id(&self) -> ForkId { + self.fork_id.clone() } +} - pub fn get_eth_version(&self) -> u8 { - match self { - StatusMessage::StatusMessage68(msg) => msg.eth_version, - StatusMessage::StatusMessage69(msg) => msg.eth_version, - } +impl Status69Message { + pub async fn new(storage: &Store, eth: &Capability) -> Result { + let chain_config = storage.get_chain_config()?; + let network_id = chain_config.chain_id; + + // These blocks must always be available + let genesis_header = storage + .get_block_header(0)? + .ok_or(RLPxError::NotFound("Genesis Block".to_string()))?; + let latest_block = storage.get_latest_block_number().await?; + let block_header = storage + .get_block_header(latest_block)? + .ok_or(RLPxError::NotFound(format!("Block {latest_block}")))?; + + let genesis = genesis_header.hash(); + let latest_block_hash = block_header.hash(); + let fork_id = ForkId::new( + chain_config, + genesis_header, + block_header.timestamp, + latest_block, + ); + + Ok(Self { + eth_version: eth.version, + network_id, + genesis, + fork_id, + earliest_block: 0, + latest_block, + latest_block_hash, + }) } +} + +impl RLPxMessage for Status68Message { + const CODE: u8 = 0x00; + + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.eth_version) + .encode_field(&self.network_id) + .encode_field(&self.total_difficulty) + .encode_field(&self.latest_block_hash) + .encode_field(&self.genesis) + .encode_field(&self.fork_id) + .finish(); + + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (eth_version, decoder): (u32, _) = decoder.decode_field("protocolVersion")?; - pub fn get_fork_id(&self) -> ForkId { - match self { - StatusMessage::StatusMessage68(msg) => msg.fork_id.clone(), - StatusMessage::StatusMessage69(msg) => msg.fork_id.clone(), + if eth_version != 68 { + return Err(RLPDecodeError::MalformedData); } + + let (network_id, decoder): (u64, _) = decoder.decode_field("networkId")?; + let (total_difficulty, decoder): (U256, _) = decoder.decode_field("totalDifficulty")?; + let (latest_block_hash, decoder): (BlockHash, _) = decoder.decode_field("blockHash")?; + let (genesis, decoder): (BlockHash, _) = decoder.decode_field("genesis")?; + let (fork_id, decoder): (ForkId, _) = decoder.decode_field("forkId")?; + + // Implementations must ignore any additional list elements + let _padding = decoder.finish_unchecked(); + + Ok(Self { + eth_version: eth_version as u8, + network_id, + total_difficulty, + genesis, + fork_id, + latest_block_hash, + }) } +} + +impl RLPxMessage for Status69Message { + const CODE: u8 = 0x00; + + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.eth_version) + .encode_field(&self.network_id) + .encode_field(&self.genesis) + .encode_field(&self.fork_id) + .encode_field(&self.earliest_block) + .encode_field(&self.latest_block) + .encode_field(&self.latest_block_hash) + .finish(); - pub fn get_genesis(&self) -> BlockHash { - match self { - StatusMessage::StatusMessage68(msg) => msg.genesis, - StatusMessage::StatusMessage69(msg) => msg.genesis, + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (eth_version, decoder): (u32, _) = decoder.decode_field("protocolVersion")?; + + if eth_version != 69 { + return Err(RLPDecodeError::MalformedData); } + + let (network_id, decoder): (u64, _) = decoder.decode_field("networkId")?; + let (genesis, decoder): (BlockHash, _) = decoder.decode_field("genesis")?; + let (fork_id, decoder): (ForkId, _) = decoder.decode_field("forkId")?; + let (earliest_block, decoder): (u64, _) = decoder.decode_field("earliestBlock")?; + let (latest_block, decoder): (u64, _) = decoder.decode_field("lastestBlock")?; + let (latest_block_hash, decoder): (BlockHash, _) = decoder.decode_field("latestHash")?; + // Implementations must ignore any additional list elements + let _padding = decoder.finish_unchecked(); + + Ok(Self { + eth_version: eth_version as u8, + network_id, + genesis, + fork_id, + earliest_block, + latest_block, + latest_block_hash, + }) } } diff --git a/crates/networking/p2p/rlpx/message.rs b/crates/networking/p2p/rlpx/message.rs index 0b2814bf20..55262af516 100644 --- a/crates/networking/p2p/rlpx/message.rs +++ b/crates/networking/p2p/rlpx/message.rs @@ -2,23 +2,28 @@ use bytes::BufMut; use ethrex_rlp::error::{RLPDecodeError, RLPEncodeError}; use std::fmt::Display; +use crate::rlpx::eth::receipts::{Receipts68, Receipts69}; +use crate::rlpx::eth::status::{Status68Message, Status69Message}; + use super::eth::blocks::{BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders}; -use super::eth::receipts::{GetReceipts, Receipts}; -use super::eth::status::StatusMessage; +use super::eth::receipts::GetReceipts; use super::eth::transactions::{ GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions, }; use super::eth::update::BlockRangeUpdate; -use super::p2p::{DisconnectMessage, HelloMessage, PingMessage, PongMessage}; +use super::p2p::{Capability, DisconnectMessage, HelloMessage, PingMessage, PongMessage}; use super::snap::{ AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, GetTrieNodes, StorageRanges, TrieNodes, }; - use ethrex_rlp::encode::RLPEncode; -const ETH_CAPABILITY_OFFSET: u8 = 0x10; -const SNAP_CAPABILITY_OFFSET: u8 = 0x21; +#[allow(clippy::upper_case_acronyms)] +enum MessageProtocol { + P2P, + ETH, + SNAP, +} pub trait RLPxMessage: Sized { const CODE: u8; @@ -29,13 +34,16 @@ pub trait RLPxMessage: Sized { } #[derive(Debug, Clone)] pub(crate) enum Message { + // p2p capability + // https://github.com/ethereum/devp2p/blob/master/rlpx.md Hello(HelloMessage), Disconnect(DisconnectMessage), Ping(PingMessage), Pong(PongMessage), - Status(StatusMessage), // eth capability // https://github.com/ethereum/devp2p/blob/master/caps/eth.md + Status68(Status68Message), + Status69(Status69Message), GetBlockHeaders(GetBlockHeaders), BlockHeaders(BlockHeaders), Transactions(Transactions), @@ -45,7 +53,8 @@ pub(crate) enum Message { GetPooledTransactions(GetPooledTransactions), PooledTransactions(PooledTransactions), GetReceipts(GetReceipts), - Receipts(Receipts), + Receipts68(Receipts68), + Receipts69(Receipts69), BlockRangeUpdate(BlockRangeUpdate), // snap capability // https://github.com/ethereum/devp2p/blob/master/caps/snap.md @@ -60,44 +69,128 @@ pub(crate) enum Message { } impl Message { - pub const fn code(&self) -> u8 { + fn protocol(&self) -> MessageProtocol { + match self { + // p2p capability + Message::Hello(_) => MessageProtocol::P2P, + Message::Disconnect(_) => MessageProtocol::P2P, + Message::Ping(_) => MessageProtocol::P2P, + Message::Pong(_) => MessageProtocol::P2P, + + // eth capability + Message::Status68(_) => MessageProtocol::ETH, + Message::Status69(_) => MessageProtocol::ETH, + Message::Transactions(_) => MessageProtocol::ETH, + Message::GetBlockHeaders(_) => MessageProtocol::ETH, + Message::BlockHeaders(_) => MessageProtocol::ETH, + Message::GetBlockBodies(_) => MessageProtocol::ETH, + Message::BlockBodies(_) => MessageProtocol::ETH, + Message::NewPooledTransactionHashes(_) => MessageProtocol::ETH, + Message::GetPooledTransactions(_) => MessageProtocol::ETH, + Message::PooledTransactions(_) => MessageProtocol::ETH, + Message::GetReceipts(_) => MessageProtocol::ETH, + Message::Receipts68(_) => MessageProtocol::ETH, + Message::Receipts69(_) => MessageProtocol::ETH, + Message::BlockRangeUpdate(_) => MessageProtocol::ETH, + + // snap capability + Message::GetAccountRange(_) => MessageProtocol::SNAP, + Message::AccountRange(_) => MessageProtocol::SNAP, + Message::GetStorageRanges(_) => MessageProtocol::SNAP, + Message::StorageRanges(_) => MessageProtocol::SNAP, + Message::GetByteCodes(_) => MessageProtocol::SNAP, + Message::ByteCodes(_) => MessageProtocol::SNAP, + Message::GetTrieNodes(_) => MessageProtocol::SNAP, + Message::TrieNodes(_) => MessageProtocol::SNAP, + } + } + + pub fn code(&self) -> u8 { match self { + // p2p capability Message::Hello(_) => HelloMessage::CODE, Message::Disconnect(_) => DisconnectMessage::CODE, Message::Ping(_) => PingMessage::CODE, Message::Pong(_) => PongMessage::CODE, // eth capability - Message::Status(_) => ETH_CAPABILITY_OFFSET + StatusMessage::CODE, - Message::Transactions(_) => ETH_CAPABILITY_OFFSET + Transactions::CODE, - Message::GetBlockHeaders(_) => ETH_CAPABILITY_OFFSET + GetBlockHeaders::CODE, - Message::BlockHeaders(_) => ETH_CAPABILITY_OFFSET + BlockHeaders::CODE, - Message::GetBlockBodies(_) => ETH_CAPABILITY_OFFSET + GetBlockBodies::CODE, - Message::BlockBodies(_) => ETH_CAPABILITY_OFFSET + BlockBodies::CODE, - Message::NewPooledTransactionHashes(_) => { - ETH_CAPABILITY_OFFSET + NewPooledTransactionHashes::CODE + Message::Status68(_) => Status68Message::CODE, + Message::Status69(_) => Status69Message::CODE, + Message::Transactions(_) => Transactions::CODE, + Message::GetBlockHeaders(_) => GetBlockHeaders::CODE, + Message::BlockHeaders(_) => BlockHeaders::CODE, + Message::GetBlockBodies(_) => GetBlockBodies::CODE, + Message::BlockBodies(_) => BlockBodies::CODE, + Message::NewPooledTransactionHashes(_) => NewPooledTransactionHashes::CODE, + Message::GetPooledTransactions(_) => GetPooledTransactions::CODE, + Message::PooledTransactions(_) => PooledTransactions::CODE, + Message::GetReceipts(_) => GetReceipts::CODE, + Message::Receipts68(_) => Receipts68::CODE, + Message::Receipts69(_) => Receipts69::CODE, + Message::BlockRangeUpdate(_) => BlockRangeUpdate::CODE, + + // snap capability + Message::GetAccountRange(_) => GetAccountRange::CODE, + Message::AccountRange(_) => AccountRange::CODE, + Message::GetStorageRanges(_) => GetStorageRanges::CODE, + Message::StorageRanges(_) => StorageRanges::CODE, + Message::GetByteCodes(_) => GetByteCodes::CODE, + Message::ByteCodes(_) => ByteCodes::CODE, + Message::GetTrieNodes(_) => GetTrieNodes::CODE, + Message::TrieNodes(_) => TrieNodes::CODE, + } + } + + pub fn offset( + &self, + p2p_capability: &Option, + eth_capability: &Option, + snap_capability: &Option, + ) -> Result { + match self.protocol() { + MessageProtocol::P2P => { + if let Some(p2p_capability) = p2p_capability { + if self.code() < p2p_capability.length() { + return Ok(0); + } + } + } + MessageProtocol::ETH => { + if let (Some(p2p_capability), Some(eth_capability)) = + (p2p_capability, eth_capability) + { + if self.code() < eth_capability.length() { + return Ok(p2p_capability.length()); + } + } } - Message::GetPooledTransactions(_) => { - ETH_CAPABILITY_OFFSET + GetPooledTransactions::CODE + MessageProtocol::SNAP => { + if let (Some(p2p_capability), Some(eth_capability), Some(snap_capability)) = + (p2p_capability, eth_capability, snap_capability) + { + if self.code() < snap_capability.length() { + return Ok(p2p_capability.length() + eth_capability.length()); + } + } } - Message::PooledTransactions(_) => ETH_CAPABILITY_OFFSET + PooledTransactions::CODE, - Message::GetReceipts(_) => ETH_CAPABILITY_OFFSET + GetReceipts::CODE, - Message::Receipts(_) => ETH_CAPABILITY_OFFSET + Receipts::CODE, - Message::BlockRangeUpdate(_) => ETH_CAPABILITY_OFFSET + BlockRangeUpdate::CODE, - // snap capability - Message::GetAccountRange(_) => SNAP_CAPABILITY_OFFSET + GetAccountRange::CODE, - Message::AccountRange(_) => SNAP_CAPABILITY_OFFSET + AccountRange::CODE, - Message::GetStorageRanges(_) => SNAP_CAPABILITY_OFFSET + GetStorageRanges::CODE, - Message::StorageRanges(_) => SNAP_CAPABILITY_OFFSET + StorageRanges::CODE, - Message::GetByteCodes(_) => SNAP_CAPABILITY_OFFSET + GetByteCodes::CODE, - Message::ByteCodes(_) => SNAP_CAPABILITY_OFFSET + ByteCodes::CODE, - Message::GetTrieNodes(_) => SNAP_CAPABILITY_OFFSET + GetTrieNodes::CODE, - Message::TrieNodes(_) => SNAP_CAPABILITY_OFFSET + TrieNodes::CODE, } + Err(RLPEncodeError::IncompatibleProtocol) } - pub fn decode(msg_id: u8, data: &[u8]) -> Result { - if msg_id < ETH_CAPABILITY_OFFSET { - match msg_id { + + pub fn decode( + msg_id: u8, + data: &[u8], + p2p_capability: &Option, + eth_capability: &Option, + snap_capability: &Option, + ) -> Result { + let p2p_capability = p2p_capability + .as_ref() + .ok_or(RLPDecodeError::IncompatibleProtocol)?; + + // P2P protocol + if msg_id < p2p_capability.length() { + return match msg_id { HelloMessage::CODE => Ok(Message::Hello(HelloMessage::decode(data)?)), DisconnectMessage::CODE => { Ok(Message::Disconnect(DisconnectMessage::decode(data)?)) @@ -105,11 +198,24 @@ impl Message { PingMessage::CODE => Ok(Message::Ping(PingMessage::decode(data)?)), PongMessage::CODE => Ok(Message::Pong(PongMessage::decode(data)?)), _ => Err(RLPDecodeError::MalformedData), - } - } else if msg_id < SNAP_CAPABILITY_OFFSET { - // eth capability - match msg_id - ETH_CAPABILITY_OFFSET { - StatusMessage::CODE => Ok(Message::Status(StatusMessage::decode(data)?)), + }; + } + + let eth_msg_id = msg_id - p2p_capability.length(); + + let eth_capability = eth_capability + .as_ref() + .ok_or(RLPDecodeError::MalformedData)?; + + // eth (wire) protocol + + if eth_msg_id < eth_capability.length() { + return match eth_msg_id { + Status68Message::CODE => match eth_capability.version { + 68 => Ok(Message::Status68(Status68Message::decode(data)?)), + 69 => Ok(Message::Status69(Status69Message::decode(data)?)), + _ => Err(RLPDecodeError::IncompatibleProtocol), + }, Transactions::CODE => Ok(Message::Transactions(Transactions::decode(data)?)), GetBlockHeaders::CODE => { Ok(Message::GetBlockHeaders(GetBlockHeaders::decode(data)?)) @@ -127,40 +233,75 @@ impl Message { PooledTransactions::decode(data)?, )), GetReceipts::CODE => Ok(Message::GetReceipts(GetReceipts::decode(data)?)), - Receipts::CODE => Ok(Message::Receipts(Receipts::decode(data)?)), - BlockRangeUpdate::CODE => { - Ok(Message::BlockRangeUpdate(BlockRangeUpdate::decode(data)?)) - } + Receipts68::CODE => match eth_capability.version { + 68 => Ok(Message::Receipts68(Receipts68::decode(data)?)), + 69 => Ok(Message::Receipts69(Receipts69::decode(data)?)), + _ => Err(RLPDecodeError::IncompatibleProtocol), + }, + BlockRangeUpdate::CODE => match eth_capability.version { + 69 => Ok(Message::BlockRangeUpdate(BlockRangeUpdate::decode(data)?)), + _ => Err(RLPDecodeError::IncompatibleProtocol), + }, _ => Err(RLPDecodeError::MalformedData), - } + }; } else { - // snap capability - match msg_id - SNAP_CAPABILITY_OFFSET { - GetAccountRange::CODE => { - return Ok(Message::GetAccountRange(GetAccountRange::decode(data)?)); - } - AccountRange::CODE => Ok(Message::AccountRange(AccountRange::decode(data)?)), - GetStorageRanges::CODE => { - return Ok(Message::GetStorageRanges(GetStorageRanges::decode(data)?)); - } - StorageRanges::CODE => Ok(Message::StorageRanges(StorageRanges::decode(data)?)), - GetByteCodes::CODE => Ok(Message::GetByteCodes(GetByteCodes::decode(data)?)), - ByteCodes::CODE => Ok(Message::ByteCodes(ByteCodes::decode(data)?)), - GetTrieNodes::CODE => Ok(Message::GetTrieNodes(GetTrieNodes::decode(data)?)), - TrieNodes::CODE => Ok(Message::TrieNodes(TrieNodes::decode(data)?)), - _ => Err(RLPDecodeError::MalformedData), + let snap_msg_id = eth_msg_id - eth_capability.length(); + let snap_capability = snap_capability + .as_ref() + .ok_or(RLPDecodeError::MalformedData)?; + + if snap_msg_id < snap_capability.length() { + return match snap_msg_id { + GetAccountRange::CODE => { + Ok(Message::GetAccountRange(GetAccountRange::decode(data)?)) + } + AccountRange::CODE => Ok(Message::AccountRange(AccountRange::decode(data)?)), + GetStorageRanges::CODE => { + Ok(Message::GetStorageRanges(GetStorageRanges::decode(data)?)) + } + StorageRanges::CODE => Ok(Message::StorageRanges(StorageRanges::decode(data)?)), + GetByteCodes::CODE => Ok(Message::GetByteCodes(GetByteCodes::decode(data)?)), + ByteCodes::CODE => Ok(Message::ByteCodes(ByteCodes::decode(data)?)), + GetTrieNodes::CODE => Ok(Message::GetTrieNodes(GetTrieNodes::decode(data)?)), + TrieNodes::CODE => Ok(Message::TrieNodes(TrieNodes::decode(data)?)), + _ => Err(RLPDecodeError::MalformedData), + }; } } + Err(RLPDecodeError::MalformedData) } - pub fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - self.code().encode(buf); + pub fn encode( + &self, + buf: &mut dyn BufMut, + p2p_capability: &Option, + eth_capability: &Option, + snap_capability: &Option, + ) -> Result<(), RLPEncodeError> { + (self.code() + self.offset(p2p_capability, eth_capability, snap_capability)?).encode(buf); match self { Message::Hello(msg) => msg.encode(buf), Message::Disconnect(msg) => msg.encode(buf), Message::Ping(msg) => msg.encode(buf), Message::Pong(msg) => msg.encode(buf), - Message::Status(msg) => msg.encode(buf), + Message::Status68(msg) => { + let eth_capability = eth_capability + .as_ref() + .ok_or(RLPEncodeError::IncompatibleProtocol)?; + match eth_capability.version { + 68 => msg.encode(buf), + _ => Err(RLPEncodeError::IncompatibleProtocol), + } + } + Message::Status69(msg) => { + let eth_capability = eth_capability + .as_ref() + .ok_or(RLPEncodeError::IncompatibleProtocol)?; + match eth_capability.version { + 69 => msg.encode(buf), + _ => Err(RLPEncodeError::IncompatibleProtocol), + } + } Message::Transactions(msg) => msg.encode(buf), Message::GetBlockHeaders(msg) => msg.encode(buf), Message::BlockHeaders(msg) => msg.encode(buf), @@ -170,8 +311,33 @@ impl Message { Message::GetPooledTransactions(msg) => msg.encode(buf), Message::PooledTransactions(msg) => msg.encode(buf), Message::GetReceipts(msg) => msg.encode(buf), - Message::Receipts(msg) => msg.encode(buf), - Message::BlockRangeUpdate(msg) => msg.encode(buf), + Message::Receipts68(msg) => { + let eth_capability = eth_capability + .as_ref() + .ok_or(RLPEncodeError::IncompatibleProtocol)?; + match eth_capability.version { + 68 => msg.encode(buf), + _ => Err(RLPEncodeError::IncompatibleProtocol), + } + } + Message::Receipts69(msg) => { + let eth_capability = eth_capability + .as_ref() + .ok_or(RLPEncodeError::IncompatibleProtocol)?; + match eth_capability.version { + 69 => msg.encode(buf), + _ => Err(RLPEncodeError::IncompatibleProtocol), + } + } + Message::BlockRangeUpdate(msg) => { + let eth_capability = eth_capability + .as_ref() + .ok_or(RLPEncodeError::IncompatibleProtocol)?; + match eth_capability.version { + 69 => msg.encode(buf), + _ => Err(RLPEncodeError::IncompatibleProtocol), + } + } Message::GetAccountRange(msg) => msg.encode(buf), Message::AccountRange(msg) => msg.encode(buf), Message::GetStorageRanges(msg) => msg.encode(buf), @@ -191,7 +357,8 @@ impl Display for Message { Message::Disconnect(_) => "p2p:Disconnect".fmt(f), Message::Ping(_) => "p2p:Ping".fmt(f), Message::Pong(_) => "p2p:Pong".fmt(f), - Message::Status(_) => "eth:Status".fmt(f), + Message::Status68(_) => "eth:Status".fmt(f), + Message::Status69(_) => "eth:Status".fmt(f), Message::GetBlockHeaders(_) => "eth:getBlockHeaders".fmt(f), Message::BlockHeaders(_) => "eth:BlockHeaders".fmt(f), Message::BlockBodies(_) => "eth:BlockBodies".fmt(f), @@ -201,7 +368,8 @@ impl Display for Message { Message::Transactions(_) => "eth:TransactionsMessage".fmt(f), Message::GetBlockBodies(_) => "eth:GetBlockBodies".fmt(f), Message::GetReceipts(_) => "eth:GetReceipts".fmt(f), - Message::Receipts(_) => "eth:Receipts".fmt(f), + Message::Receipts68(_) => "eth:Receipts".fmt(f), + Message::Receipts69(_) => "eth:Receipts".fmt(f), Message::BlockRangeUpdate(_) => "eth:BlockRangeUpdate".fmt(f), Message::GetAccountRange(_) => "snap:GetAccountRange".fmt(f), Message::AccountRange(_) => "snap:AccountRange".fmt(f), diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index f7b754bb80..97cf755de2 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -14,9 +14,17 @@ use ethrex_rlp::{ use k256::PublicKey; use serde::Serialize; -pub const SUPPORTED_ETH_CAPABILITIES: [Capability; 1] = [Capability::eth(68)]; +pub const DEFAULT_P2P_PROTOCOL_VERSION: u8 = 5; + +pub const SUPPORTED_ETH_CAPABILITIES: [Capability; 2] = [Capability::eth(68), Capability::eth(69)]; pub const SUPPORTED_SNAP_CAPABILITIES: [Capability; 1] = [Capability::snap(1)]; -pub const SUPPORTED_P2P_CAPABILITIES: [Capability; 1] = [Capability::p2p(5)]; +pub const SUPPORTED_P2P_CAPABILITIES: [Capability; 1] = + [Capability::p2p(DEFAULT_P2P_PROTOCOL_VERSION)]; + +pub const PROTOCOL_P2P_5_LENGTH: u8 = 16; +pub const PROTOCOL_ETH_68_LENGTH: u8 = 17; +pub const PROTOCOL_ETH_69_LENGTH: u8 = 18; +pub const PROTOCOL_SNAP_1_LENGTH: u8 = 8; #[derive(Debug, Clone, PartialEq)] pub struct Capability { @@ -45,6 +53,31 @@ impl Capability { version, } } + + pub fn length(&self) -> u8 { + match self.protocol { + "p2p" => PROTOCOL_P2P_5_LENGTH, + "snap" => PROTOCOL_SNAP_1_LENGTH, + "eth" => match self.version { + 68 => PROTOCOL_ETH_68_LENGTH, + 69 => PROTOCOL_ETH_69_LENGTH, + _ => 0, + }, + _ => 0, + } + } + + pub fn is_p2p(&self) -> bool { + self.protocol == "p2p" + } + + pub fn is_eth(&self) -> bool { + self.protocol == "eth" + } + + pub fn is_snap(&self) -> bool { + self.protocol == "snap" + } } impl RLPEncode for Capability {