diff --git a/crates/op-rbuilder/Cargo.toml b/crates/op-rbuilder/Cargo.toml index dbefc829..0fd9bd7b 100644 --- a/crates/op-rbuilder/Cargo.toml +++ b/crates/op-rbuilder/Cargo.toml @@ -101,7 +101,6 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } shellexpand = "3.1" serde_yaml = { version = "0.9" } - # `msozin/flashblocks-v1.4.1` branch based on `flashblocks-rebase` rollup-boost = { git = "http://github.com/flashbots/rollup-boost", rev = "8506dfb7d84c65746f7c88d250983658438f59e8" } diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 136e58ad..df6bed65 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -31,6 +31,14 @@ pub struct OpRbuilderArgs { #[arg(long = "builder.log-pool-transactions", default_value = "false")] pub log_pool_transactions: bool, + /// Signals whether to enable the txpool monitor + #[arg(long = "builder.enable-txpool-monitor", default_value = "false")] + pub enable_txpool_monitor: bool, + + /// The buffer size for the txpool events + #[arg(long = "builder.txpool-monitor-buffer-size", default_value = "1000")] + pub txpool_monitor_buffer_size: usize, + /// How much time extra to wait for the block building job to complete and not get garbage collected #[arg(long = "builder.extra-block-deadline-secs", default_value = "20")] pub extra_block_deadline_secs: u64, diff --git a/crates/op-rbuilder/src/main.rs b/crates/op-rbuilder/src/main.rs index f378f961..89fde075 100644 --- a/crates/op-rbuilder/src/main.rs +++ b/crates/op-rbuilder/src/main.rs @@ -5,7 +5,6 @@ use reth_optimism_node::{ node::{OpAddOnsBuilder, OpPoolBuilder}, OpNode, }; -use reth_transaction_pool::TransactionPool; /// CLI argument parsing. pub mod args; @@ -22,7 +21,7 @@ use metrics::{ VersionInfo, BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES, VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA, }; -use monitor_tx_pool::monitor_tx_pool; +use monitor_tx_pool::{TransactionPoolMonitor, TxpoolExtApiServer}; use revert_protection::{EthApiOverrideServer, RevertProtectionExt}; use tx::FBPooledTransaction; @@ -99,33 +98,44 @@ where .build(), ) .extend_rpc_modules(move |ctx| { + let pool = ctx.pool().clone(); + if builder_args.enable_revert_protection { tracing::info!("Revert protection enabled"); - let pool = ctx.pool().clone(); let provider = ctx.provider().clone(); - let revert_protection_ext = RevertProtectionExt::new(pool, provider); + let revert_protection_ext = RevertProtectionExt::new(pool.clone(), provider); ctx.modules .merge_configured(revert_protection_ext.into_rpc())?; } - Ok(()) - }) - .on_node_started(move |ctx| { - VERSION.register_version_metrics(); - if builder_args.log_pool_transactions { + if builder_args.log_pool_transactions || builder_args.enable_txpool_monitor { tracing::info!("Logging pool transactions"); - ctx.task_executor.spawn_critical( + + let tx_monitor = TransactionPoolMonitor::new( + pool, + builder_args.log_pool_transactions, + builder_args.enable_txpool_monitor, + builder_args.txpool_monitor_buffer_size, + ); + ctx.modules.merge_configured(tx_monitor.rpc().into_rpc())?; + + ctx.node().task_executor.spawn_critical( "txlogging", Box::pin(async move { - monitor_tx_pool(ctx.pool.all_transactions_event_listener()).await; + tx_monitor.run().await; }), ); } Ok(()) }) + .on_node_started(move |_ctx| { + VERSION.register_version_metrics(); + + Ok(()) + }) .launch() .await?; diff --git a/crates/op-rbuilder/src/monitor_tx_pool.rs b/crates/op-rbuilder/src/monitor_tx_pool.rs index 5bb3a4ea..a8d93888 100644 --- a/crates/op-rbuilder/src/monitor_tx_pool.rs +++ b/crates/op-rbuilder/src/monitor_tx_pool.rs @@ -1,68 +1,198 @@ use crate::tx::FBPooledTransaction; +use alloy_primitives::TxHash; use futures_util::StreamExt; -use reth_transaction_pool::{AllTransactionsEvents, FullTransactionEvent}; +use jsonrpsee::{ + core::{async_trait, SubscriptionResult}, + proc_macros::rpc, + PendingSubscriptionSink, SubscriptionMessage, +}; +use reth_transaction_pool::{FullTransactionEvent, TransactionEvent, TransactionPool}; +use serde::Serialize; +use tokio::sync::broadcast; use tracing::info; -pub async fn monitor_tx_pool(mut new_transactions: AllTransactionsEvents) { - while let Some(event) = new_transactions.next().await { - transaction_event_log(event); +#[rpc(server, namespace = "txpool")] +pub trait TxpoolExtApi { + /// Creates a subscription that returns the txpool events. + #[subscription(name = "subscribeEvents", item = usize)] + fn subscribe_events(&self) -> SubscriptionResult; +} + +pub struct TransactionPoolMonitor { + pool: Pool, + log_events: bool, + txpool_monitor: bool, + event_sender: broadcast::Sender, + // Keep a receiver to prevent channel from closing + _event_receiver: broadcast::Receiver, +} + +impl TransactionPoolMonitor { + pub fn new(pool: Pool, log_events: bool, txpool_monitor: bool, buffer_size: usize) -> Self { + let (event_sender, _event_receiver) = broadcast::channel(buffer_size); + + if log_events { + info!("Logging pool transactions"); + } + if txpool_monitor { + info!("Monitoring txpool enabled"); + } + + Self { + pool, + log_events, + txpool_monitor, + event_sender, + _event_receiver, + } } } -fn transaction_event_log(event: FullTransactionEvent) { - match event { - FullTransactionEvent::Pending(hash) => { - info!( - target = "monitoring", - tx_hash = hash.to_string(), - kind = "pending", - "Transaction event received" - ) +impl TransactionPoolMonitor +where + Pool: TransactionPool + Clone + 'static, +{ + pub fn rpc(&self) -> TransactionPoolMonitorRpc { + TransactionPoolMonitorRpc { + event_sender: self.event_sender.clone(), } - FullTransactionEvent::Queued(hash) => { - info!( - target = "monitoring", - tx_hash = hash.to_string(), - kind = "queued", - "Transaction event received" - ) + } + + pub async fn run(self) { + let mut new_transactions = self.pool.all_transactions_event_listener(); + + while let Some(event) = new_transactions.next().await { + // Push the event to the buffer + let event_data = TransactionEventData::from(event); + if self.log_events { + info!( + target = "monitoring", + tx_hash = event_data.hash.to_string(), + kind = event_data.kind(), + "Transaction event received" + ) + } + + if self.txpool_monitor { + println!("Sending event: {:?}", event_data); + let _ = self.event_sender.send(event_data); + } } - FullTransactionEvent::Mined { - tx_hash, - block_hash, - } => info!( - target = "monitoring", - tx_hash = tx_hash.to_string(), - kind = "mined", - block_hash = block_hash.to_string(), - "Transaction event received" - ), - FullTransactionEvent::Replaced { - transaction, - replaced_by, - } => info!( - target = "monitoring", - tx_hash = transaction.hash().to_string(), - kind = "replaced", - replaced_by = replaced_by.to_string(), - "Transaction event received" - ), - FullTransactionEvent::Discarded(hash) => { - info!( - target = "monitoring", - tx_hash = hash.to_string(), - kind = "discarded", - "Transaction event received" - ) + } +} + +pub struct TransactionPoolMonitorRpc { + event_sender: broadcast::Sender, +} + +#[async_trait] +impl TxpoolExtApiServer for TransactionPoolMonitorRpc { + fn subscribe_events( + &self, + pending_subscription_sink: PendingSubscriptionSink, + ) -> SubscriptionResult { + println!("Subscribing to txpool events"); + let mut event_receiver = self.event_sender.subscribe(); + + tokio::spawn(async move { + let sink = match pending_subscription_sink.accept().await { + Ok(sink) => sink, + Err(e) => { + tracing::warn!("failed to accept subscription: {e}"); + return; + } + }; + + println!("Subscribed to txpool events"); + + loop { + match event_receiver.recv().await { + Ok(event) => { + println!("Received event: {:?}", event); + + let msg = SubscriptionMessage::from( + serde_json::value::to_raw_value(&event) + .expect("Failed to serialize event"), + ); + + if sink.send(msg).await.is_err() { + tracing::debug!("Subscription closed"); + break; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => { + tracing::warn!("Subscription lagged, some events were dropped"); + continue; + } + Err(broadcast::error::RecvError::Closed) => { + tracing::debug!("Event channel closed"); + break; + } + } + } + }); + + Ok(()) + } +} + +#[derive(Clone, Debug, Serialize)] +struct TransactionEventData { + hash: TxHash, + transaction_event: TransactionEvent, +} + +impl TransactionEventData { + pub fn kind(&self) -> &str { + match self.transaction_event { + TransactionEvent::Pending => "pending", + TransactionEvent::Queued => "queued", + TransactionEvent::Mined(_) => "mined", + TransactionEvent::Replaced(_) => "replaced", + TransactionEvent::Discarded => "discarded", + TransactionEvent::Invalid => "invalid", + TransactionEvent::Propagated(_) => "propagated", } - FullTransactionEvent::Invalid(hash) => { - info!( - target = "monitoring", - tx_hash = hash.to_string(), - kind = "invalid", - "Transaction event received" - ) + } +} + +impl From> for TransactionEventData { + fn from(event: FullTransactionEvent) -> Self { + match event { + FullTransactionEvent::Pending(hash) => Self { + hash, + transaction_event: TransactionEvent::Pending, + }, + FullTransactionEvent::Queued(hash) => Self { + hash, + transaction_event: TransactionEvent::Queued, + }, + FullTransactionEvent::Mined { + tx_hash, + block_hash, + } => Self { + hash: tx_hash, + transaction_event: TransactionEvent::Mined(block_hash), + }, + FullTransactionEvent::Replaced { + transaction, + replaced_by, + } => Self { + hash: *transaction.hash(), + transaction_event: TransactionEvent::Replaced(replaced_by), + }, + FullTransactionEvent::Discarded(hash) => Self { + hash, + transaction_event: TransactionEvent::Discarded, + }, + FullTransactionEvent::Invalid(hash) => Self { + hash, + transaction_event: TransactionEvent::Invalid, + }, + FullTransactionEvent::Propagated(kind) => Self { + hash: TxHash::default(), + transaction_event: TransactionEvent::Propagated(kind), + }, } - FullTransactionEvent::Propagated(_propagated) => {} } } diff --git a/crates/op-rbuilder/src/tests/framework/harness.rs b/crates/op-rbuilder/src/tests/framework/harness.rs index 30fb084e..eb630b14 100644 --- a/crates/op-rbuilder/src/tests/framework/harness.rs +++ b/crates/op-rbuilder/src/tests/framework/harness.rs @@ -86,12 +86,14 @@ impl TestHarnessBuilder { let builder_data_dir: PathBuf = std::env::temp_dir().join(Uuid::new_v4().to_string()); let builder_auth_rpc_port = get_available_port(); let builder_http_port = get_available_port(); + let builder_ws_port = get_available_port(); let mut op_rbuilder_config = OpRbuilderConfig::new() .chain_config_path(genesis_path.clone()) .data_dir(builder_data_dir) .auth_rpc_port(builder_auth_rpc_port) .network_port(get_available_port()) .http_port(builder_http_port) + .ws_port(builder_ws_port) .with_builder_private_key(BUILDER_PRIVATE_KEY) .with_revert_protection(self.use_revert_protection) .with_namespaces(self.namespaces) @@ -131,6 +133,7 @@ impl TestHarnessBuilder { framework: framework, builder_auth_rpc_port, builder_http_port, + builder_ws_port, validator_auth_rpc_port, builder_log_path, }) @@ -141,6 +144,7 @@ pub struct TestHarness { framework: IntegrationFramework, builder_auth_rpc_port: u16, builder_http_port: u16, + pub builder_ws_port: u16, validator_auth_rpc_port: u16, builder_log_path: PathBuf, } diff --git a/crates/op-rbuilder/src/tests/framework/op.rs b/crates/op-rbuilder/src/tests/framework/op.rs index 30aae7a8..e2a30b89 100644 --- a/crates/op-rbuilder/src/tests/framework/op.rs +++ b/crates/op-rbuilder/src/tests/framework/op.rs @@ -21,6 +21,7 @@ pub struct OpRbuilderConfig { chain_config_path: Option, data_dir: Option, http_port: Option, + ws_port: Option, network_port: Option, builder_private_key: Option, flashblocks_port: Option, @@ -61,6 +62,11 @@ impl OpRbuilderConfig { self } + pub fn ws_port(mut self, port: u16) -> Self { + self.ws_port = Some(port); + self + } + pub fn with_builder_private_key(mut self, private_key: &str) -> Self { self.builder_private_key = Some(private_key.to_string()); self @@ -130,6 +136,7 @@ impl Service for OpRbuilderConfig { .arg("--color") .arg("never") .arg("--builder.log-pool-transactions") + .arg("--builder.enable-txpool-monitor") .arg("--port") .arg(self.network_port.expect("network_port not set").to_string()) .arg("--ipcdisable") @@ -152,6 +159,10 @@ impl Service for OpRbuilderConfig { .arg(http_port.to_string()); } + if let Some(ws_port) = self.ws_port { + cmd.arg("--ws").arg("--ws.port").arg(ws_port.to_string()); + } + if let Some(flashblocks_port) = &self.flashblocks_port { cmd.arg("--flashblocks.enabled").arg("true"); cmd.arg("--flashblocks.addr").arg("127.0.0.1"); diff --git a/crates/op-rbuilder/src/tests/vanilla/txpool.rs b/crates/op-rbuilder/src/tests/vanilla/txpool.rs index cbf15512..1c2a607d 100644 --- a/crates/op-rbuilder/src/tests/vanilla/txpool.rs +++ b/crates/op-rbuilder/src/tests/vanilla/txpool.rs @@ -1,5 +1,7 @@ use crate::tests::{framework::TestHarnessBuilder, ONE_ETH}; use alloy_provider::ext::TxPoolApi; +use jsonrpsee::proc_macros::rpc; +use jsonrpsee::ws_client::WsClientBuilder; /// This test ensures that pending pool custom limit is respected and priority tx would be included even when pool if full. #[tokio::test] @@ -67,3 +69,38 @@ async fn pending_pool_limit() -> eyre::Result<()> { Ok(()) } + +#[rpc(client, namespace = "txpool")] +pub trait TxpoolExtApi { + /// Creates a subscription that returns the txpool events. + #[subscription(name = "subscribeEvents", item = usize)] + fn subscribe_events(&self) -> SubscriptionResult; +} + +/// This test ensures that if we enable the txpool monitor, there is a websocket +/// on which we can subscribe and receive txpool events. +#[tokio::test] +async fn txpool_monitor() -> eyre::Result<()> { + let harness = TestHarnessBuilder::new("txpool_monitor") + .with_namespaces("txpool,eth,debug,admin,txpool") + .build() + .await?; + + let ws_url = format!("ws://127.0.0.1:{}", harness.builder_ws_port); + let client = WsClientBuilder::default().build(&ws_url).await.unwrap(); + + // send 10 transactions + for _ in 0..10 { + let tx = harness.create_transaction().send().await?; + println!("tx: {:?}", tx); + } + + // If we subscribe now, we should receive 10 events, one for each tx since they are internally buffered + let mut sub = TxpoolExtApiClient::subscribe_events(&client) + .await + .expect("failed to subscribe"); + + println!("sub: {:?}", sub.next().await); + + Ok(()) +}