diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs
index c898163a..99d59d3c 100644
--- a/crates/chain-orchestrator/src/lib.rs
+++ b/crates/chain-orchestrator/src/lib.rs
@@ -47,6 +47,9 @@ pub use error::ChainOrchestratorError;
mod metrics;
pub use metrics::{ChainOrchestratorItem, ChainOrchestratorMetrics};
+mod retry;
+pub use retry::Retry;
+
/// The mask used to mask the L1 message queue hash.
const L1_MESSAGE_QUEUE_HASH_MASK: B256 =
b256!("ffffffffffffffffffffffffffffffffffffffffffffffffffffffff00000000");
@@ -211,9 +214,14 @@ impl<
let database = ctx.database.clone();
let block_info: L2BlockInfoWithL1Messages = (&block_with_peer.block).into();
Self::do_handle_block_from_peer(ctx, block_with_peer).await?;
- let tx = database.tx_mut().await?;
- tx.update_l1_messages_with_l2_block(block_info.clone()).await?;
- tx.commit().await?;
+ Retry::default()
+ .retry("update_l1_messages_with_l2_block", || async {
+ let tx = database.tx_mut().await?;
+ tx.update_l1_messages_with_l2_block(block_info.clone()).await?;
+ tx.commit().await?;
+ Ok::<_, ChainOrchestratorError>(())
+ })
+ .await?;
Ok(ChainOrchestratorEvent::L2ChainCommitted(block_info, None, true))
}
@@ -253,9 +261,14 @@ impl<
tracing::trace!(target: "scroll::chain_orchestrator", number = ?(optimistic_headers.first().expect("chain can not be empty").number - 1), "fetching block");
let parent_hash =
optimistic_headers.first().expect("chain can not be empty").parent_hash;
- let header = network_client
- .get_header(BlockHashOrNumber::Hash(parent_hash))
- .await?
+ let header = Retry::default()
+ .retry("network_client_get_header", || async {
+ let header =
+ network_client.get_header(BlockHashOrNumber::Hash(parent_hash)).await?;
+ Ok::<_, ChainOrchestratorError>(header)
+ })
+ .await?;
+ let header = header
.into_data()
.ok_or(ChainOrchestratorError::MissingBlockHeader { hash: parent_hash })?;
optimistic_headers.push_front(header);
@@ -292,10 +305,14 @@ impl<
let mut received_chain_headers = VecDeque::from(vec![received_block.header.clone()]);
// We should never have a re-org that is deeper than the current safe head.
- let tx = database.tx().await?;
- let (latest_safe_block, _) =
- tx.get_latest_safe_l2_info().await?.expect("safe block must exist");
- drop(tx);
+ let (latest_safe_block, _) = Retry::default()
+ .retry("get_latest_safe_l2_info", || async {
+ let tx = database.tx().await?;
+ let (latest_safe_block, batch_info) =
+ tx.get_latest_safe_l2_info().await?.expect("safe block must exist");
+ Ok::<_, ChainOrchestratorError>((latest_safe_block, batch_info))
+ })
+ .await?;
// We search for the re-org index in the in-memory chain.
const BATCH_FETCH_SIZE: usize = 50;
@@ -383,12 +400,20 @@ impl<
}
tracing::trace!(target: "scroll::chain_orchestrator", number = ?(received_chain_headers.front().expect("chain can not be empty").number - 1), "fetching block");
- if let Some(header) = network_client
- .get_header(BlockHashOrNumber::Hash(
- received_chain_headers.front().expect("chain can not be empty").parent_hash,
- ))
+ if let Some(header) = Retry::default()
+ .retry("network_client_get_header", || async {
+ let header = network_client
+ .get_header(BlockHashOrNumber::Hash(
+ received_chain_headers
+ .front()
+ .expect("chain can not be empty")
+ .parent_hash,
+ ))
+ .await?
+ .into_data();
+ Ok::<_, ChainOrchestratorError>(header)
+ })
.await?
- .into_data()
{
received_chain_headers.push_front(header.clone());
} else {
@@ -455,11 +480,16 @@ impl<
ChainOrchestratorItem::InsertConsolidatedL2Blocks,
Box::pin(async move {
let head = block_infos.last().expect("block info must not be empty").clone();
- let tx = database.tx_mut().await?;
- for block in block_infos {
- tx.insert_block(block, batch_info).await?;
- }
- tx.commit().await?;
+ Retry::default()
+ .retry("insert_block", || async {
+ let tx = database.tx_mut().await?;
+ for block in block_infos.clone() {
+ tx.insert_block(block, batch_info).await?;
+ }
+ tx.commit().await?;
+ Ok::<_, ChainOrchestratorError>(())
+ })
+ .await?;
Result::<_, ChainOrchestratorError>::Ok(Some(
ChainOrchestratorEvent::L2ConsolidatedBlockCommitted(head),
))
@@ -503,9 +533,14 @@ impl<
// Insert the blocks into the database.
let head = block_info.last().expect("block info must not be empty").clone();
- let tx = database.tx_mut().await?;
- tx.update_l1_messages_from_l2_blocks(block_info).await?;
- tx.commit().await?;
+ Retry::default()
+ .retry("update_l1_messages_from_l2_blocks", || async {
+ let tx = database.tx_mut().await?;
+ tx.update_l1_messages_from_l2_blocks(block_info.clone()).await?;
+ tx.commit().await?;
+ Ok::<_, ChainOrchestratorError>(())
+ })
+ .await?;
Result::<_, ChainOrchestratorError>::Ok(Some(
ChainOrchestratorEvent::L2ChainCommitted(head, None, consolidated),
@@ -589,10 +624,25 @@ impl<
l2_client: Arc
,
current_chain: Arc>,
) -> Result