Skip to content

Feat: implement signature publisher decorators in signer #2468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions docs/website/root/manual/develop/nodes/mithril-signer.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,6 @@ Here is a list of the available parameters:
| `transactions_import_block_chunk_size` | - | - | `TRANSACTIONS_IMPORT_BLOCK_CHUNK_SIZE` | Chunk size for importing transactions, combined with transaction pruning it reduces the storage footprint of the signer by reducing the number of transactions stored on disk at any given time. | `1500` | - | - |
| `cardano_transactions_block_streamer_max_roll_forwards_per_poll` | - | - | `CARDANO_TRANSACTIONS_BLOCK_STREAMER_MAX_ROLL_FORWARDS_PER_POLL` | The maximum number of roll forwards during a poll of the block streamer when importing transactions. | `1000` | - | - |
| `preloading_refresh_interval_in_seconds` | `--preloading-refresh-interval-in-seconds` | - | `PRELOADING_REFRESH_INTERVAL_IN_SECONDS` | The preloading refresh interval in seconds. | `7200` | - | - |
| `signature_publisher_retry_attempts` | `--signature-publisher-retry-attempts` | - | `SIGNATURE_PUBLISHER_RETRY_ATTEMPTS` | Number of retry attempts when publishing the signature. | `3` | - | - |
| `signature_publisher_retry_delay_ms` | `--signature-publisher-retry-delay-ms` | - | `SIGNATURE_PUBLISHER_RETRY_DELAY_MS` | Delay (in milliseconds) between two retry attempts when publishing the signature. | `2000` | - | - |
| `signature_publisher_delayer_delay_ms` | `--signature-publisher-delayer-delay-ms` | - | `SIGNATURE_PUBLISHER_DELAYER_DELAY_MS` | Delay (in milliseconds) between two separate publications done by the delayer signature publisher. | `10000` | - | - |
12 changes: 12 additions & 0 deletions mithril-signer/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ pub struct Configuration {

/// Preloading refresh interval in seconds
pub preloading_refresh_interval_in_seconds: u64,

/// Number of retry attempts when publishing the signature
pub signature_publisher_retry_attempts: u8,

/// Delay (in milliseconds) between two retry attempts when publishing the signature
pub signature_publisher_retry_delay_ms: u64,

/// Delay (in milliseconds) between two separate publications done by the delayer signature publisher
pub signature_publisher_delayer_delay_ms: u64,
}

impl Configuration {
Expand Down Expand Up @@ -159,6 +168,9 @@ impl Configuration {
transactions_import_block_chunk_size: BlockNumber(1000),
cardano_transactions_block_streamer_max_roll_forwards_per_poll: 1000,
preloading_refresh_interval_in_seconds: 60,
signature_publisher_retry_attempts: 1,
signature_publisher_retry_delay_ms: 1,
signature_publisher_delayer_delay_ms: 1,
}
}

Expand Down
28 changes: 26 additions & 2 deletions mithril-signer/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ use crate::dependency_injection::SignerDependencyContainer;
use crate::services::{
AggregatorHTTPClient, CardanoTransactionsImporter,
CardanoTransactionsPreloaderActivationSigner, MithrilEpochService, MithrilSingleSigner,
SignaturePublisher, SignerCertifierService, SignerSignableSeedBuilder,
SignaturePublishRetryPolicy, SignaturePublisherDelayer, SignaturePublisherNoop,
SignaturePublisherRetrier, SignerCertifierService, SignerSignableSeedBuilder,
SignerSignedEntityConfigProvider, SignerUpkeepService, TransactionsImporterByChunk,
TransactionsImporterWithPruner, TransactionsImporterWithVacuum,
};
Expand Down Expand Up @@ -386,7 +387,30 @@ impl<'a> DependenciesBuilder<'a> {
self.root_logger(),
));

let signature_publisher: Arc<dyn SignaturePublisher> = aggregator_client.clone();
let signature_publisher = {
// Temporary no-op publisher before a DMQ-based implementation is available.
let first_publisher = SignaturePublisherRetrier::new(
Arc::new(SignaturePublisherNoop {}),
SignaturePublishRetryPolicy::never(),
);

let second_publisher = SignaturePublisherRetrier::new(
aggregator_client.clone(),
SignaturePublishRetryPolicy {
attempts: self.config.signature_publisher_retry_attempts,
delay_between_attempts: Duration::from_millis(
self.config.signature_publisher_retry_delay_ms,
),
},
);

Arc::new(SignaturePublisherDelayer::new(
Arc::new(first_publisher),
Arc::new(second_publisher),
Duration::from_millis(self.config.signature_publisher_delayer_delay_ms),
self.root_logger(),
))
};

let certifier = Arc::new(SignerCertifierService::new(
signed_beacon_repository,
Expand Down
35 changes: 35 additions & 0 deletions mithril-signer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,26 @@ pub struct Args {
default_value_t = 43200
)]
preloading_refresh_interval_in_seconds: u64,

/// Number of retry attempts when publishing the signature
#[clap(long, env = "SIGNATURE_PUBLISHER_RETRY_ATTEMPTS", default_value_t = 3)]
signature_publisher_retry_attempts: u64,

/// Delay (in milliseconds) between two retry attempts when publishing the signature
#[clap(
long,
env = "SIGNATURE_PUBLISHER_RETRY_DELAY_MS",
default_value_t = 2_000
)]
signature_publisher_retry_delay_ms: u64,

/// Delay (in milliseconds) between two separate publications done by the delayer signature publisher
#[clap(
long,
env = "SIGNATURE_PUBLISHER_DELAYER_DELAY_MS",
default_value_t = 10_000
)]
signature_publisher_delayer_delay_ms: u64,
}

impl Args {
Expand Down Expand Up @@ -175,6 +195,21 @@ async fn main() -> StdResult<()> {
.with_context(|| {
"configuration error: could not set `preloading_refresh_interval_in_seconds`"
})?
.set_default(
"signature_publisher_retry_attempts",
args.signature_publisher_retry_attempts,
)
.with_context(|| "configuration error: could not set `signature_publisher_retry_attempts`")?
.set_default(
"signature_publisher_retry_delay_ms",
args.signature_publisher_retry_delay_ms,
)
.with_context(|| "configuration error: could not set `signature_publisher_retry_delay_ms`")?
.set_default(
"signature_publisher_delay_ms",
args.signature_publisher_delayer_delay_ms,
)
.with_context(|| "configuration error: could not set `signature_publisher_delay_ms`")?
.add_source(DefaultConfiguration::default())
.add_source(
config::File::with_name(&format!(
Expand Down
6 changes: 6 additions & 0 deletions mithril-signer/src/services/signature_publisher/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
mod http;
mod interface;
mod signature_publisher_delayer;
mod signature_publisher_noop;
mod signature_publisher_retrier;

pub use interface::*;
pub use signature_publisher_delayer::*;
pub use signature_publisher_noop::*;
pub use signature_publisher_retrier::*;
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use std::{sync::Arc, time::Duration};

use mithril_common::{
entities::{ProtocolMessage, SignedEntityType, SingleSignature},
logging::LoggerExtensions,
StdResult,
};
use slog::{error, Logger};

use super::SignaturePublisher;

/// A decorator of [SignaturePublisher] that publishes right away on a first publisher
/// and with a delay on the second publisher.
pub struct SignaturePublisherDelayer {
first_publisher: Arc<dyn SignaturePublisher>,
second_publisher: Arc<dyn SignaturePublisher>,
delay_between_publish: Duration,
logger: Logger,
}

impl SignaturePublisherDelayer {
/// Creates a new [SignaturePublisherDelayer]
pub fn new(
first_publisher: Arc<dyn SignaturePublisher>,
second_publisher: Arc<dyn SignaturePublisher>,
delay_between_publish: Duration,
logger: Logger,
) -> Self {
Self {
first_publisher,
second_publisher,
delay_between_publish,
logger: logger.new_with_component_name::<Self>(),
}
}
}

#[async_trait::async_trait]
impl SignaturePublisher for SignaturePublisherDelayer {
async fn publish(
&self,
signed_entity_type: &SignedEntityType,
signature: &SingleSignature,
protocol_message: &ProtocolMessage,
) -> StdResult<()> {
if let Err(e) = self
.first_publisher
.publish(signed_entity_type, signature, protocol_message)
.await
{
error!(
self.logger,
"Delayer failed to publish first signature: {e}"
);
}

tokio::time::sleep(self.delay_between_publish).await;

if let Err(e) = self
.second_publisher
.publish(signed_entity_type, signature, protocol_message)
.await
{
error!(
self.logger,
"Delayer failed to publish second signature: {e}"
);
return Err(e);
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use mithril_common::{entities::Epoch, test_utils::fake_data};

use crate::{services::MockSignaturePublisher, test_tools::TestLogger};

use super::*;

#[tokio::test]
async fn should_call_both_publishers_when_first_succeeds() {
let mut first_publisher = MockSignaturePublisher::new();
first_publisher
.expect_publish()
.once()
.returning(|_, _, _| Ok(()));

let mut second_publisher = MockSignaturePublisher::new();
second_publisher
.expect_publish()
.once()
.returning(|_, _, _| Ok(()));

let delayer = SignaturePublisherDelayer::new(
Arc::new(first_publisher),
Arc::new(second_publisher),
Duration::from_millis(0),
TestLogger::stdout(),
);

delayer
.publish(
&SignedEntityType::MithrilStakeDistribution(Epoch(1)),
&fake_data::single_signature(vec![1]),
&ProtocolMessage::default(),
)
.await
.unwrap();
}

#[tokio::test]
async fn should_call_second_publisher_even_if_first_fails_and_log_error() {
let (logger, log_inspector) = TestLogger::memory();
let mut first_publisher = MockSignaturePublisher::new();
first_publisher
.expect_publish()
.once()
.returning(|_, _, _| Err(anyhow::anyhow!("first publisher failure")));

let mut second_publisher = MockSignaturePublisher::new();
second_publisher
.expect_publish()
.once()
.returning(|_, _, _| Ok(()));

let delayer = SignaturePublisherDelayer::new(
Arc::new(first_publisher),
Arc::new(second_publisher),
Duration::from_millis(0),
logger,
);

delayer
.publish(
&SignedEntityType::MithrilStakeDistribution(Epoch(1)),
&fake_data::single_signature(vec![1]),
&ProtocolMessage::default(),
)
.await
.unwrap();

assert!(log_inspector
.contains_log("Delayer failed to publish first signature: first publisher failure"));
}

#[tokio::test]
async fn should_return_and_log_error_if_second_publisher_fails() {
let (logger, log_inspector) = TestLogger::memory();
let mut first_publisher = MockSignaturePublisher::new();
first_publisher
.expect_publish()
.once()
.returning(|_, _, _| Ok(()));

let mut second_publisher = MockSignaturePublisher::new();
second_publisher
.expect_publish()
.once()
.returning(|_, _, _| Err(anyhow::anyhow!("second publisher failure")));

let delayer = SignaturePublisherDelayer::new(
Arc::new(first_publisher),
Arc::new(second_publisher),
Duration::from_millis(0),
logger,
);

delayer
.publish(
&SignedEntityType::MithrilStakeDistribution(Epoch(1)),
&fake_data::single_signature(vec![1]),
&ProtocolMessage::default(),
)
.await
.expect_err("Expected error when delayed publisher failed");

assert!(log_inspector
.contains_log("Delayer failed to publish second signature: second publisher failure"));
}

#[tokio::test]
async fn should_wait_before_calling_second_publisher() {
let mut first_publisher = MockSignaturePublisher::new();
first_publisher
.expect_publish()
.once()
.returning(|_, _, _| Ok(()));

let mut second_publisher = MockSignaturePublisher::new();
second_publisher
.expect_publish()
.once()
.returning(|_, _, _| Ok(()));

let delay = Duration::from_millis(50);
let delayer = SignaturePublisherDelayer::new(
Arc::new(first_publisher),
Arc::new(second_publisher),
delay,
TestLogger::stdout(),
);

let start_time = std::time::Instant::now();
delayer
.publish(
&SignedEntityType::MithrilStakeDistribution(Epoch(1)),
&fake_data::single_signature(vec![1]),
&ProtocolMessage::default(),
)
.await
.unwrap();

let elapsed_time = start_time.elapsed();
assert!(
elapsed_time >= delay,
"Expected at least {:?} time elapsed, but got {:?}",
delay,
elapsed_time
);
assert!(
elapsed_time < delay * 2,
"Expected less than {:?} time elapsed, but got {:?}",
delay * 2,
elapsed_time
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use mithril_common::{
entities::{ProtocolMessage, SignedEntityType, SingleSignature},
StdResult,
};

use super::SignaturePublisher;

/// A no-op implementation of the [SignaturePublisher] trait.
/// This implementation performs no action when a signature is published.
pub struct SignaturePublisherNoop;

#[async_trait::async_trait]
impl SignaturePublisher for SignaturePublisherNoop {
async fn publish(
&self,
_signed_entity_type: &SignedEntityType,
_signature: &SingleSignature,
_protocol_message: &ProtocolMessage,
) -> StdResult<()> {
Ok(())
}
}
Loading