diff --git a/client/src/lib.rs b/client/src/lib.rs index 41cd2b79f..579bc6bc8 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -32,7 +32,9 @@ pub use temporal_sdk_core_protos::temporal::api::{ }, }; pub use tonic; -pub use worker_registry::{Slot, SlotManager, SlotProvider, WorkerKey}; +pub use worker_registry::{ + ClientWorkerSet, SharedNamespaceWorkerTrait, Slot, SlotProvider, WorkerKey, +}; pub use workflow_handle::{ GetWorkflowResultOpts, WorkflowExecutionInfo, WorkflowExecutionResult, WorkflowHandle, }; @@ -388,7 +390,7 @@ pub struct ConfiguredClient { headers: Arc>, /// Capabilities as read from the `get_system_info` RPC call made on client connection capabilities: Option, - workers: Arc, + workers: Arc, } impl ConfiguredClient { @@ -438,7 +440,7 @@ impl ConfiguredClient { } /// Returns a cloned reference to a registry with workers using this client instance - pub fn workers(&self) -> Arc { + pub fn workers(&self) -> Arc { self.workers.clone() } } @@ -497,9 +499,10 @@ impl ClientOptions { &self, namespace: impl Into, metrics_meter: Option, + process_key: Uuid, ) -> Result, ClientInitError> { let client = self.connect_no_namespace(metrics_meter).await?.into_inner(); - let client = Client::new(client, namespace.into()); + let client = Client::new(client, namespace.into(), process_key); let retry_client = RetryClient::new(client, self.retry_config.clone()); Ok(retry_client) } @@ -584,7 +587,7 @@ impl ClientOptions { client: TemporalServiceClient::new(svc), options: Arc::new(self.clone()), capabilities: None, - workers: Arc::new(SlotManager::new()), + workers: Arc::new(ClientWorkerSet::new()), }; if !self.skip_get_system_info { match client @@ -848,6 +851,8 @@ pub struct Client { inner: ConfiguredClient, /// The namespace this client interacts with namespace: String, + /// Process-wide key, used for worker heartbeating + process_key: Uuid, } impl Client { @@ -855,10 +860,12 @@ impl Client { pub fn new( client: ConfiguredClient, namespace: String, + process_key: Uuid, ) -> Self { Client { inner: client, namespace, + process_key, } } @@ -901,6 +908,11 @@ impl Client { pub fn into_inner(self) -> ConfiguredClient { self.inner } + + /// Returns the process-wide key + pub fn process_key(&self) -> Uuid { + self.process_key + } } impl NamespacedClient for Client { diff --git a/client/src/raw.rs b/client/src/raw.rs index aa5500ee1..92e6a7956 100644 --- a/client/src/raw.rs +++ b/client/src/raw.rs @@ -7,7 +7,7 @@ use crate::{ TEMPORAL_NAMESPACE_HEADER_KEY, TemporalServiceClient, metrics::{namespace_kv, task_queue_kv}, raw::sealed::RawClientLike, - worker_registry::{Slot, SlotManager}, + worker_registry::{ClientWorkerSet, Slot}, }; use futures_util::{FutureExt, TryFutureExt, future::BoxFuture}; use std::sync::Arc; @@ -68,7 +68,7 @@ pub(super) mod sealed { fn health_client_mut(&mut self) -> &mut HealthClient; /// Return a registry with workers using this client instance - fn get_workers_info(&self) -> Option>; + fn get_workers_info(&self) -> Option>; async fn call( &mut self, @@ -134,7 +134,7 @@ where self.get_client_mut().health_client_mut() } - fn get_workers_info(&self) -> Option> { + fn get_workers_info(&self) -> Option> { self.get_client().get_workers_info() } @@ -213,7 +213,7 @@ where self.health_svc_mut() } - fn get_workers_info(&self) -> Option> { + fn get_workers_info(&self) -> Option> { None } } @@ -268,7 +268,7 @@ where self.client.health_client_mut() } - fn get_workers_info(&self) -> Option> { + fn get_workers_info(&self) -> Option> { Some(self.workers()) } } @@ -316,7 +316,7 @@ impl RawClientLike for Client { self.inner.health_client_mut() } - fn get_workers_info(&self) -> Option> { + fn get_workers_info(&self) -> Option> { self.inner.get_workers_info() } } diff --git a/client/src/worker_registry/mod.rs b/client/src/worker_registry/mod.rs index 90882a718..3f943341b 100644 --- a/client/src/worker_registry/mod.rs +++ b/client/src/worker_registry/mod.rs @@ -2,10 +2,10 @@ //! This is needed to implement Eager Workflow Start, a latency optimization in which the client, //! after reserving a slot, directly forwards a WFT to a local worker. -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use slotmap::SlotMap; use std::collections::{HashMap, hash_map::Entry::Vacant}; - +use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat; use temporal_sdk_core_protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse; slotmap::new_key_type! { @@ -49,7 +49,7 @@ impl SlotKey { } } -/// This is an inner class for [SlotManager] needed to hide the mutex. +/// This is an inner class for [ClientWorkerSet] needed to hide the mutex. #[derive(Default, Debug)] struct SlotManagerImpl { /// Maps keys, i.e., namespace#task_queue, to provider. @@ -109,19 +109,42 @@ impl SlotManagerImpl { } } +/// This trait represents a shared namespace worker that sends worker heartbeats and worker commands. +pub trait SharedNamespaceWorkerTrait: std::fmt::Debug { + /// Namespace that the shared namespace worker is connected to. + fn namespace(&self) -> String; + + /// Unregisters a heartbeat callback. Returns the callback removed, as well as a bool that + /// indicates if there are no remaining callbacks in the SharedNamespaceWorker, indicating + /// the shared worker itself can be shut down. + fn unregister_callback( + &self, + worker_instance_key: String, + ) -> (Option WorkerHeartbeat + Send + Sync>>, bool); + + /// Registers a heartbeat callback. + fn register_callback( + &self, + worker_instance_key: String, + heartbeat_callback: Box WorkerHeartbeat + Send + Sync>, + ); +} + /// Enables local workers to make themselves visible to a shared client instance. /// There can only be one worker registered per namespace+queue_name+client, others will get ignored. /// It also provides a convenient method to find compatible slots within the collection. #[derive(Default, Debug)] -pub struct SlotManager { - manager: RwLock, +pub struct ClientWorkerSet { + slot_manager: RwLock, + heartbeat_manager: Mutex>>, } -impl SlotManager { +impl ClientWorkerSet { /// Factory method. pub fn new() -> Self { Self { - manager: RwLock::new(SlotManagerImpl::new()), + slot_manager: RwLock::new(SlotManagerImpl::new()), + heartbeat_manager: Mutex::new(HashMap::new()), } } @@ -131,26 +154,65 @@ impl SlotManager { namespace: String, task_queue: String, ) -> Option> { - self.manager + self.slot_manager .read() .try_reserve_wft_slot(namespace, task_queue) } /// Register a local worker that can provide WFT processing slots. - pub fn register(&self, provider: Box) -> Option { - self.manager.write().register(provider) + pub fn register_slot( + &self, + provider: Box, + ) -> Option { + self.slot_manager.write().register(provider) } /// Unregister a provider, typically when its worker starts shutdown. - pub fn unregister(&self, id: WorkerKey) -> Option> { - self.manager.write().unregister(id) + pub fn unregister_slot(&self, id: WorkerKey) -> Option> { + self.slot_manager.write().unregister(id) + } + + /// Register a worker with the worker heartbeat manager. + pub fn register_heartbeat_worker( + &self, + namespace: String, + worker_instance_key: String, + heartbeat_callback: Box WorkerHeartbeat + Send + Sync>, + shared_worker_callback: impl Fn() -> Box, + ) { + let mut shared_namespace_map = self.heartbeat_manager.lock(); + let worker = shared_namespace_map + .entry(namespace) + .or_insert_with(|| shared_worker_callback()); + worker.register_callback(worker_instance_key, heartbeat_callback) + } + + /// Unregister a worker with the worker heartbeat manager. + pub fn unregister_heartbeat_worker( + &self, + namespace: String, + worker_instance_key: String, + ) -> Option WorkerHeartbeat + Send + Sync>> { + let mut heartbeat_manager = self.heartbeat_manager.lock(); + if let Some(shared_worker) = heartbeat_manager.get(&namespace) { + let (callback, is_empty) = shared_worker.unregister_callback(worker_instance_key); + if is_empty { + heartbeat_manager.remove(&namespace); + } + callback + } else { + warn!( + "Namespace {namespace} isn't registered to client worker heartbeat, ignoring unregister." + ); + None + } } #[cfg(test)] /// Returns (num_providers, num_buckets), where a bucket key is namespace+task_queue. /// There is only one provider per bucket so `num_providers` should be equal to `num_buckets`. pub fn num_providers(&self) -> (usize, usize) { - self.manager.read().num_providers() + self.slot_manager.read().num_providers() } } @@ -197,9 +259,9 @@ mod tests { new_mock_provider("foo".to_string(), "bar_q".to_string(), false, false); let mock_provider2 = new_mock_provider("foo".to_string(), "bar_q".to_string(), false, true); - let manager = SlotManager::new(); - let some_slots = manager.register(Box::new(mock_provider1)); - let no_slots = manager.register(Box::new(mock_provider2)); + let manager = ClientWorkerSet::new(); + let some_slots = manager.register_slot(Box::new(mock_provider1)); + let no_slots = manager.register_slot(Box::new(mock_provider2)); assert!(no_slots.is_none()); let mut found = 0; @@ -214,15 +276,15 @@ mod tests { assert_eq!(found, 10); assert_eq!((1, 1), manager.num_providers()); - manager.unregister(some_slots.unwrap()); + manager.unregister_slot(some_slots.unwrap()); assert_eq!((0, 0), manager.num_providers()); let mock_provider1 = new_mock_provider("foo".to_string(), "bar_q".to_string(), false, false); let mock_provider2 = new_mock_provider("foo".to_string(), "bar_q".to_string(), false, true); - let no_slots = manager.register(Box::new(mock_provider2)); - let some_slots = manager.register(Box::new(mock_provider1)); + let no_slots = manager.register_slot(Box::new(mock_provider2)); + let some_slots = manager.register_slot(Box::new(mock_provider1)); assert!(some_slots.is_none()); let mut not_found = 0; @@ -236,18 +298,18 @@ mod tests { } assert_eq!(not_found, 10); assert_eq!((1, 1), manager.num_providers()); - manager.unregister(no_slots.unwrap()); + manager.unregister_slot(no_slots.unwrap()); assert_eq!((0, 0), manager.num_providers()); } #[test] fn registry_keeps_one_provider_per_namespace() { - let manager = SlotManager::new(); + let manager = ClientWorkerSet::new(); let mut worker_keys = vec![]; for i in 0..10 { let namespace = format!("myId{}", i % 3); let mock_provider = new_mock_provider(namespace, "bar_q".to_string(), false, false); - worker_keys.push(manager.register(Box::new(mock_provider))); + worker_keys.push(manager.register_slot(Box::new(mock_provider))); } assert_eq!((3, 3), manager.num_providers()); @@ -255,9 +317,9 @@ mod tests { .iter() .filter(|key| key.is_some()) .fold(0, |count, key| { - manager.unregister(key.unwrap()); + manager.unregister_slot(key.unwrap()); // Should be idempotent - manager.unregister(key.unwrap()); + manager.unregister_slot(key.unwrap()); count + 1 }); assert_eq!(3, count); diff --git a/core-api/src/worker.rs b/core-api/src/worker.rs index b7304c5c9..d92efeec0 100644 --- a/core-api/src/worker.rs +++ b/core-api/src/worker.rs @@ -161,12 +161,6 @@ pub struct WorkerConfig { /// A versioning strategy for this worker. pub versioning_strategy: WorkerVersioningStrategy, - - /// The interval within which the worker will send a heartbeat. - /// The timer is reset on each existing RPC call that also happens to send this data, like - /// `PollWorkflowTaskQueueRequest`. - #[builder(default)] - pub heartbeat_interval: Option, } impl WorkerConfig { diff --git a/core-c-bridge/include/temporal-sdk-core-c-bridge.h b/core-c-bridge/include/temporal-sdk-core-c-bridge.h index 803879f7d..6a5b82e71 100644 --- a/core-c-bridge/include/temporal-sdk-core-c-bridge.h +++ b/core-c-bridge/include/temporal-sdk-core-c-bridge.h @@ -397,6 +397,7 @@ typedef struct TemporalCoreTelemetryOptions { typedef struct TemporalCoreRuntimeOptions { const struct TemporalCoreTelemetryOptions *telemetry; + uint64_t worker_heartbeat_duration_millis; } TemporalCoreRuntimeOptions; typedef struct TemporalCoreTestServerOptions { diff --git a/core-c-bridge/src/runtime.rs b/core-c-bridge/src/runtime.rs index 94fe46929..5a330e268 100644 --- a/core-c-bridge/src/runtime.rs +++ b/core-c-bridge/src/runtime.rs @@ -16,7 +16,8 @@ use std::{ time::{Duration, UNIX_EPOCH}, }; use temporal_sdk_core::{ - CoreRuntime, TokioRuntimeBuilder, + CoreRuntime, RuntimeOptions as CoreRuntimeOptions, + RuntimeOptionsBuilder as CoreRuntimeOptionsBuilder, TokioRuntimeBuilder, telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter}, }; use temporal_sdk_core_api::telemetry::{ @@ -30,6 +31,7 @@ use url::Url; #[repr(C)] pub struct RuntimeOptions { pub telemetry: *const TelemetryOptions, + pub worker_heartbeat_duration_millis: u64, } #[repr(C)] @@ -142,7 +144,7 @@ pub extern "C" fn temporal_core_runtime_new(options: *const RuntimeOptions) -> R let mut runtime = Runtime { core: Arc::new( CoreRuntime::new( - CoreTelemetryOptions::default(), + CoreRuntimeOptions::default(), TokioRuntimeBuilder::default(), ) .unwrap(), @@ -238,8 +240,21 @@ impl Runtime { CoreTelemetryOptions::default() }; + let heartbeat_interval = if options.worker_heartbeat_duration_millis == 0 { + None + } else { + Some(Duration::from_millis( + options.worker_heartbeat_duration_millis, + )) + }; + + let core_runtime_options = CoreRuntimeOptionsBuilder::default() + .telemetry_options(telemetry_options) + .heartbeat_interval(heartbeat_interval) + .build()?; + // Build core runtime - let mut core = CoreRuntime::new(telemetry_options, TokioRuntimeBuilder::default())?; + let mut core = CoreRuntime::new(core_runtime_options, TokioRuntimeBuilder::default())?; // We late-bind the metrics after core runtime is created since it needs // the Tokio handle diff --git a/core-c-bridge/src/tests/context.rs b/core-c-bridge/src/tests/context.rs index 5f889d442..7a5767177 100644 --- a/core-c-bridge/src/tests/context.rs +++ b/core-c-bridge/src/tests/context.rs @@ -153,6 +153,7 @@ impl Context { let RuntimeOrFail { runtime, fail } = temporal_core_runtime_new(&RuntimeOptions { telemetry: std::ptr::null(), + worker_heartbeat_duration_millis: 0, }); if let Some(fail) = byte_array_to_string(runtime, fail) { diff --git a/core/src/lib.rs b/core/src/lib.rs index db8f80da4..f724979a0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -61,7 +61,8 @@ use crate::{ }; use anyhow::bail; use futures_util::Stream; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; +use std::time::Duration; use temporal_client::{ConfiguredClient, NamespacedClient, TemporalServiceClientWithMetrics}; use temporal_sdk_core_api::{ Worker as WorkerTrait, @@ -69,6 +70,7 @@ use temporal_sdk_core_api::{ telemetry::TelemetryOptions, }; use temporal_sdk_core_protos::coresdk::ActivityHeartbeat; +use uuid::Uuid; /// Initialize a worker bound to a task queue. /// @@ -89,39 +91,43 @@ pub fn init_worker( where CT: Into, { - let client = init_worker_client(&worker_config, *client.into().into_inner()); - if client.namespace() != worker_config.namespace { + let client_inner = *client.into().into_inner(); + let client = init_worker_client( + worker_config.namespace.clone(), + worker_config.client_identity_override.clone(), + client_inner, + runtime.process_key(), + ); + let namespace = worker_config.namespace.clone(); + if client.namespace() != namespace { bail!("Passed in client is not bound to the same namespace as the worker"); } if client.namespace() == "" { bail!("Client namespace cannot be empty"); } let client_ident = client.get_identity().to_owned(); - let sticky_q = sticky_q_name_for_worker(&client_ident, &worker_config); + let sticky_q = sticky_q_name_for_worker(&client_ident, worker_config.max_cached_workflows); if client_ident.is_empty() { bail!("Client identity cannot be empty. Either lang or user should be setting this value"); } - let heartbeat_fn = worker_config - .heartbeat_interval - .map(|_| Arc::new(OnceLock::new())); - let client_bag = Arc::new(WorkerClientBag::new( client, - worker_config.namespace.clone(), - client_ident, + namespace.clone(), + client_ident.clone(), worker_config.versioning_strategy.clone(), - heartbeat_fn.clone(), )); - Ok(Worker::new( - worker_config, + let worker = Worker::new( + worker_config.clone(), sticky_q, - client_bag, + client_bag.clone(), Some(&runtime.telemetry), - heartbeat_fn, - )) + runtime.heartbeat_interval, + ); + + Ok(worker) } /// Create a worker for replaying one or more existing histories. It will auto-shutdown as soon as @@ -142,11 +148,13 @@ where } pub(crate) fn init_worker_client( - config: &WorkerConfig, + namespace: String, + client_identity_override: Option, client: ConfiguredClient, + process_key: Uuid, ) -> RetryClient { - let mut client = Client::new(client, config.namespace.clone()); - if let Some(ref id_override) = config.client_identity_override { + let mut client = Client::new(client, namespace, process_key); + if let Some(ref id_override) = client_identity_override { client.options_mut().identity.clone_from(id_override); } RetryClient::new(client, RetryConfig::default()) @@ -156,9 +164,9 @@ pub(crate) fn init_worker_client( /// workflows. pub(crate) fn sticky_q_name_for_worker( process_identity: &str, - config: &WorkerConfig, + max_cached_workflows: usize, ) -> Option { - if config.max_cached_workflows > 0 { + if max_cached_workflows > 0 { Some(format!( "{}-{}", &process_identity, @@ -220,6 +228,22 @@ pub struct CoreRuntime { telemetry: TelemetryInstance, runtime: Option, runtime_handle: tokio::runtime::Handle, + process_key: Uuid, + heartbeat_interval: Option, +} + +/// Holds telemetry options, as well as worker heartbeat_interval. Construct with [RuntimeOptionsBuilder] +#[derive(derive_builder::Builder)] +#[non_exhaustive] +#[derive(Default)] +pub struct RuntimeOptions { + /// Telemetry configuration options. + #[builder(default)] + telemetry_options: TelemetryOptions, + /// Optional worker heartbeat interval - This configures the heartbeat setting of all + /// workers created using this runtime. + #[builder(default = Some(Duration::from_secs(60)))] + heartbeat_interval: Option, } /// Wraps a [tokio::runtime::Builder] to allow layering multiple on_thread_start functions @@ -254,13 +278,13 @@ impl CoreRuntime { /// If a tokio runtime has already been initialized. To re-use an existing runtime, call /// [CoreRuntime::new_assume_tokio]. pub fn new( - telemetry_options: TelemetryOptions, + runtime_options: RuntimeOptions, mut tokio_builder: TokioRuntimeBuilder, ) -> Result where F: Fn() + Send + Sync + 'static, { - let telemetry = telemetry_init(telemetry_options)?; + let telemetry = telemetry_init(runtime_options.telemetry_options)?; let subscriber = telemetry.trace_subscriber(); let runtime = tokio_builder .inner @@ -275,7 +299,8 @@ impl CoreRuntime { }) .build()?; let _rg = runtime.enter(); - let mut me = Self::new_assume_tokio_initialized_telem(telemetry); + let mut me = + Self::new_assume_tokio_initialized_telem(telemetry, runtime_options.heartbeat_interval); me.runtime = Some(runtime); Ok(me) } @@ -285,9 +310,12 @@ impl CoreRuntime { /// /// # Panics /// If there is no currently active Tokio runtime - pub fn new_assume_tokio(telemetry_options: TelemetryOptions) -> Result { - let telemetry = telemetry_init(telemetry_options)?; - Ok(Self::new_assume_tokio_initialized_telem(telemetry)) + pub fn new_assume_tokio(runtime_options: RuntimeOptions) -> Result { + let telemetry = telemetry_init(runtime_options.telemetry_options)?; + Ok(Self::new_assume_tokio_initialized_telem( + telemetry, + runtime_options.heartbeat_interval, + )) } /// Construct a runtime from an already-initialized telemetry instance, assuming a tokio runtime @@ -295,7 +323,10 @@ impl CoreRuntime { /// /// # Panics /// If there is no currently active Tokio runtime - pub fn new_assume_tokio_initialized_telem(telemetry: TelemetryInstance) -> Self { + pub fn new_assume_tokio_initialized_telem( + telemetry: TelemetryInstance, + heartbeat_interval: Option, + ) -> Self { let runtime_handle = tokio::runtime::Handle::current(); if let Some(sub) = telemetry.trace_subscriber() { set_trace_subscriber_for_current_thread(sub); @@ -304,6 +335,8 @@ impl CoreRuntime { telemetry, runtime: None, runtime_handle, + process_key: Uuid::new_v4(), + heartbeat_interval, } } @@ -317,6 +350,11 @@ impl CoreRuntime { &self.telemetry } + /// Return a process-wide unique key + pub fn process_key(&self) -> Uuid { + self.process_key + } + /// Return a mutable reference to the owned [TelemetryInstance] pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance { &mut self.telemetry diff --git a/core/src/pollers/poll_buffer.rs b/core/src/pollers/poll_buffer.rs index 72f2e8a41..7bc4311fb 100644 --- a/core/src/pollers/poll_buffer.rs +++ b/core/src/pollers/poll_buffer.rs @@ -203,6 +203,7 @@ impl LongPollBuffer { permit_dealer: MeteredPermitDealer, shutdown: CancellationToken, num_pollers_handler: Option, + send_heartbeat: bool, ) -> Self { let no_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) { Some(NoRetryOnMatching { @@ -216,11 +217,14 @@ impl LongPollBuffer { let task_queue = task_queue.clone(); async move { client - .poll_nexus_task(PollOptions { - task_queue, - no_retry, - timeout_override, - }) + .poll_nexus_task( + PollOptions { + task_queue, + no_retry, + timeout_override, + }, + send_heartbeat, + ) .await } }; diff --git a/core/src/telemetry/metrics.rs b/core/src/telemetry/metrics.rs index 28bf18845..fe6aec8e1 100644 --- a/core/src/telemetry/metrics.rs +++ b/core/src/telemetry/metrics.rs @@ -1,4 +1,6 @@ -use crate::{abstractions::dbg_panic, telemetry::TelemetryInstance}; +#[cfg(test)] +use crate::TelemetryInstance; +use crate::abstractions::dbg_panic; use std::{ fmt::{Debug, Display}, @@ -11,7 +13,7 @@ use temporal_sdk_core_api::telemetry::metrics::{ GaugeF64, GaugeF64Base, Histogram, HistogramBase, HistogramDuration, HistogramDurationBase, HistogramF64, HistogramF64Base, LazyBufferInstrument, MetricAttributable, MetricAttributes, MetricCallBufferer, MetricEvent, MetricKeyValue, MetricKind, MetricParameters, MetricUpdateVal, - NewAttributes, NoOpCoreMeter, + NewAttributes, NoOpCoreMeter, TemporalMeter, }; use temporal_sdk_core_protos::temporal::api::{ enums::v1::WorkflowTaskFailedCause, failure::v1::Failure, @@ -76,8 +78,17 @@ impl MetricsContext { } } + #[cfg(test)] pub(crate) fn top_level(namespace: String, tq: String, telemetry: &TelemetryInstance) -> Self { - if let Some(mut meter) = telemetry.get_temporal_metric_meter() { + MetricsContext::top_level_with_meter(namespace, tq, telemetry.get_temporal_metric_meter()) + } + + pub(crate) fn top_level_with_meter( + namespace: String, + tq: String, + temporal_meter: Option, + ) -> Self { + if let Some(mut meter) = temporal_meter { meter .default_attribs .attributes diff --git a/core/src/test_help/integ_helpers.rs b/core/src/test_help/integ_helpers.rs index 2c3417dc6..3d7529a28 100644 --- a/core/src/test_help/integ_helpers.rs +++ b/core/src/test_help/integ_helpers.rs @@ -185,7 +185,7 @@ pub fn build_fake_worker( } pub fn mock_worker(mocks: MocksHolder) -> Worker { - let sticky_q = sticky_q_name_for_worker("unit-test", &mocks.inputs.config); + let sticky_q = sticky_q_name_for_worker("unit-test", mocks.inputs.config.max_cached_workflows); let act_poller = if mocks.inputs.config.no_remote_activities { None } else { diff --git a/core/src/worker/client.rs b/core/src/worker/client.rs index 519994fc3..38c6909b5 100644 --- a/core/src/worker/client.rs +++ b/core/src/worker/client.rs @@ -1,17 +1,12 @@ //! Worker-specific client needs pub(crate) mod mocks; -use crate::{ - abstractions::dbg_panic, protosext::legacy_query_failure, worker::heartbeat::HeartbeatFn, -}; +use crate::protosext::legacy_query_failure; use parking_lot::RwLock; -use std::{ - sync::{Arc, OnceLock}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; use temporal_client::{ - Client, IsWorkerTaskLongPoll, Namespace, NamespacedClient, NoRetryOnMatching, RetryClient, - SlotManager, WorkflowService, + Client, ClientWorkerSet, IsWorkerTaskLongPoll, Namespace, NamespacedClient, NoRetryOnMatching, + RetryClient, WorkflowService, }; use temporal_sdk_core_api::worker::WorkerVersioningStrategy; use temporal_sdk_core_protos::{ @@ -38,6 +33,7 @@ use temporal_sdk_core_protos::{ }, }; use tonic::IntoRequest; +use uuid::Uuid; type Result = std::result::Result; @@ -52,7 +48,6 @@ pub(crate) struct WorkerClientBag { namespace: String, identity: String, worker_versioning_strategy: WorkerVersioningStrategy, - heartbeat_data: Option>>, } impl WorkerClientBag { @@ -61,14 +56,12 @@ impl WorkerClientBag { namespace: String, identity: String, worker_versioning_strategy: WorkerVersioningStrategy, - heartbeat_data: Option>>, ) -> Self { Self { replaceable_client: RwLock::new(client), namespace, identity, worker_versioning_strategy, - heartbeat_data, } } @@ -129,19 +122,6 @@ impl WorkerClientBag { None } } - - fn capture_heartbeat(&self) -> Option { - if let Some(heartbeat_data) = self.heartbeat_data.as_ref() { - if let Some(hb) = heartbeat_data.get() { - hb() - } else { - dbg_panic!("Heartbeat function never set"); - None - } - } else { - None - } - } } /// This trait contains everything workers need to interact with Temporal, and hence provides a @@ -165,6 +145,7 @@ pub trait WorkerClient: Sync + Send { async fn poll_nexus_task( &self, poll_options: PollOptions, + send_heartbeat: bool, ) -> Result; /// Complete a workflow task async fn complete_workflow_task( @@ -234,7 +215,8 @@ pub trait WorkerClient: Sync + Send { /// Record a worker heartbeat async fn record_worker_heartbeat( &self, - heartbeat: WorkerHeartbeat, + namespace: String, + worker_heartbeat: Vec, ) -> Result; /// Replace the underlying client @@ -242,13 +224,15 @@ pub trait WorkerClient: Sync + Send { /// Return server capabilities fn capabilities(&self) -> Option; /// Return workers using this client - fn workers(&self) -> Arc; + fn workers(&self) -> Arc; /// Indicates if this is a mock client fn is_mock(&self) -> bool; /// Return name and version of the SDK fn sdk_name_and_version(&self) -> (String, String); /// Get worker identity fn get_identity(&self) -> String; + /// Get process key + fn get_process_key(&self) -> Uuid; } /// Configuration options shared by workflow, activity, and Nexus polling calls @@ -360,6 +344,7 @@ impl WorkerClient for WorkerClientBag { async fn poll_nexus_task( &self, poll_options: PollOptions, + _send_heartbeat: bool, ) -> Result { #[allow(deprecated)] // want to list all fields explicitly let mut request = PollNexusTaskQueueRequest { @@ -372,7 +357,7 @@ impl WorkerClient for WorkerClientBag { identity: self.identity.clone(), worker_version_capabilities: self.worker_version_capabilities(), deployment_options: self.deployment_options(), - worker_heartbeat: self.capture_heartbeat().into_iter().collect(), + worker_heartbeat: Vec::new(), } .into_request(); request.extensions_mut().insert(IsWorkerTaskLongPoll); @@ -661,7 +646,7 @@ impl WorkerClient for WorkerClientBag { identity: self.identity.clone(), sticky_task_queue, reason: "graceful shutdown".to_string(), - worker_heartbeat: self.capture_heartbeat(), + worker_heartbeat: None, }; Ok( @@ -671,32 +656,34 @@ impl WorkerClient for WorkerClientBag { ) } - fn replace_client(&self, new_client: RetryClient) { - let mut replaceable_client = self.replaceable_client.write(); - *replaceable_client = new_client; - } - async fn record_worker_heartbeat( &self, - heartbeat: WorkerHeartbeat, + namespace: String, + worker_heartbeat: Vec, ) -> Result { + let request = RecordWorkerHeartbeatRequest { + namespace, + identity: self.identity.clone(), + worker_heartbeat, + }; Ok(self .cloned_client() - .record_worker_heartbeat(RecordWorkerHeartbeatRequest { - namespace: self.namespace.clone(), - identity: self.identity.clone(), - worker_heartbeat: vec![heartbeat], - }) + .record_worker_heartbeat(request) .await? .into_inner()) } + fn replace_client(&self, new_client: RetryClient) { + let mut replaceable_client = self.replaceable_client.write(); + *replaceable_client = new_client; + } + fn capabilities(&self) -> Option { let client = self.replaceable_client.read(); client.get_client().inner().capabilities().cloned() } - fn workers(&self) -> Arc { + fn workers(&self) -> Arc { let client = self.replaceable_client.read(); client.get_client().inner().workers() } @@ -714,6 +701,10 @@ impl WorkerClient for WorkerClientBag { fn get_identity(&self) -> String { self.identity.clone() } + + fn get_process_key(&self) -> Uuid { + self.replaceable_client.read().get_client().process_key() + } } impl NamespacedClient for WorkerClientBag { diff --git a/core/src/worker/client/mocks.rs b/core/src/worker/client/mocks.rs index f6407f2a4..933f3aef5 100644 --- a/core/src/worker/client/mocks.rs +++ b/core/src/worker/client/mocks.rs @@ -1,10 +1,10 @@ use super::*; use futures_util::Future; use std::sync::{Arc, LazyLock}; -use temporal_client::SlotManager; +use temporal_client::ClientWorkerSet; -pub(crate) static DEFAULT_WORKERS_REGISTRY: LazyLock> = - LazyLock::new(|| Arc::new(SlotManager::new())); +pub(crate) static DEFAULT_WORKERS_REGISTRY: LazyLock> = + LazyLock::new(|| Arc::new(ClientWorkerSet::new())); pub(crate) static DEFAULT_TEST_CAPABILITIES: &Capabilities = &Capabilities { signal_and_query_header: true, @@ -35,6 +35,7 @@ pub fn mock_worker_client() -> MockWorkerClient { .returning(|| ("test-core".to_string(), "0.0.0".to_string())); r.expect_get_identity() .returning(|| "test-identity".to_string()); + r.expect_get_process_key().returning(Uuid::new_v4); r } @@ -68,7 +69,7 @@ mockall::mock! { -> impl Future> + Send + 'b where 'a: 'b, Self: 'b; - fn poll_nexus_task<'a, 'b>(&self, poll_options: PollOptions) + fn poll_nexus_task<'a, 'b>(&self, poll_options: PollOptions, send_heartbeat: bool) -> impl Future> + Send + 'b where 'a: 'b, Self: 'b; @@ -139,7 +140,7 @@ mockall::mock! { fn respond_legacy_query<'a, 'b>( &self, task_token: TaskToken, - query_result: LegacyQueryResult, + query_result: LegacyQueryResult, ) -> impl Future> + Send + 'b where 'a: 'b, Self: 'b; @@ -150,13 +151,18 @@ mockall::mock! { fn shutdown_worker<'a, 'b>(&self, sticky_task_queue: String) -> impl Future> + Send + 'b where 'a: 'b, Self: 'b; - fn record_worker_heartbeat<'a, 'b>(&self, heartbeat: WorkerHeartbeat) -> impl Future> + Send + 'b where 'a: 'b, Self: 'b; + fn record_worker_heartbeat<'a, 'b>( + &self, + namespace: String, + heartbeat: Vec + ) -> impl Future> + Send + 'b where 'a: 'b, Self: 'b; fn replace_client(&self, new_client: RetryClient); fn capabilities(&self) -> Option; - fn workers(&self) -> Arc; + fn workers(&self) -> Arc; fn is_mock(&self) -> bool; fn sdk_name_and_version(&self) -> (String, String); fn get_identity(&self) -> String; + fn get_process_key(&self) -> Uuid; } } diff --git a/core/src/worker/heartbeat.rs b/core/src/worker/heartbeat.rs index f7c8d5694..22a4791d0 100644 --- a/core/src/worker/heartbeat.rs +++ b/core/src/worker/heartbeat.rs @@ -1,54 +1,111 @@ -use crate::{WorkerClient, abstractions::dbg_panic}; -use gethostname::gethostname; +use crate::WorkerClient; +use crate::worker::{TaskPollers, WorkerTelemetry}; use parking_lot::Mutex; use prost_types::Duration as PbDuration; +use std::collections::HashMap; use std::{ - sync::{Arc, OnceLock}, + fmt, + sync::Arc, time::{Duration, SystemTime}, }; -use temporal_sdk_core_api::worker::WorkerConfig; -use temporal_sdk_core_protos::temporal::api::worker::v1::{WorkerHeartbeat, WorkerHostInfo}; -use tokio::{sync::Notify, task::JoinHandle, time::MissedTickBehavior}; -use uuid::Uuid; - -pub(crate) type HeartbeatFn = Box Option + Send + Sync>; - -pub(crate) struct WorkerHeartbeatManager { - heartbeat_handle: JoinHandle<()>, +use temporal_client::SharedNamespaceWorkerTrait; +use temporal_sdk_core_api::worker::{WorkerConfigBuilder, WorkerVersioningStrategy}; +use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat; +use tokio::sync::Notify; + +/// Callback used to collect heartbeat data from each worker at the time of heartbeat +pub(crate) type HeartbeatFn = Box WorkerHeartbeat + Send + Sync>; + +/// SharedNamespaceWorker is responsible for polling nexus-delivered worker commands and sending +/// worker heartbeats to the server. This communicates with all workers in the same process that +/// share the same namespace. +pub(crate) struct SharedNamespaceWorker { + heartbeat_map: Arc>>, + namespace: String, } -impl WorkerHeartbeatManager { +impl SharedNamespaceWorker { pub(crate) fn new( - config: WorkerConfig, - identity: String, - heartbeat_fn: Arc>, client: Arc, + namespace: String, + heartbeat_interval: Duration, + telemetry: Option, ) -> Self { - let sdk_name_and_ver = client.sdk_name_and_version(); - let reset_notify = Arc::new(Notify::new()); - let data = Arc::new(Mutex::new(WorkerHeartbeatData::new( + let config = WorkerConfigBuilder::default() + .namespace(namespace.clone()) + .task_queue(format!( + "temporal-sys/worker-commands/{namespace}/{}", + client.get_process_key() + )) + .no_remote_activities(true) + .max_outstanding_nexus_tasks(5_usize) + .versioning_strategy(WorkerVersioningStrategy::None { + build_id: "1.0".to_owned(), + }) + .build() + .expect("all required fields should be implemented"); + let worker = crate::worker::Worker::new_with_pollers_inner( config, - identity, - sdk_name_and_ver, - reset_notify.clone(), - ))); - let data_clone = data.clone(); - - let heartbeat_handle = tokio::spawn(async move { - let mut ticker = tokio::time::interval(data_clone.lock().heartbeat_interval); - ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + None, + client.clone(), + TaskPollers::Real, + telemetry, + None, + ); + + let last_heartbeat_time_map = Mutex::new(HashMap::new()); + + // heartbeat task + let reset_notify = Arc::new(Notify::new()); + let client_clone = client; + let namespace_clone = namespace.clone(); + + let heartbeat_map = Arc::new(Mutex::new(HashMap::::new())); + let heartbeat_map_clone = heartbeat_map.clone(); + + tokio::spawn(async move { + let mut ticker = tokio::time::interval(heartbeat_interval); + ticker.tick().await; loop { + // TODO: Race condition here, can technically shut down before anything is ever initialized + if heartbeat_map_clone.lock().is_empty() { + println!( + "// TODO: Race condition here, can technically shut down before anything is ever initialized" + ); + worker.shutdown().await; + return; + } + tokio::select! { _ = ticker.tick() => { - let heartbeat = if let Some(heartbeat) = data_clone.lock().capture_heartbeat_if_needed() { - heartbeat - } else { - continue - }; - if let Err(e) = client.clone().record_worker_heartbeat(heartbeat).await { + let mut hb_to_send = Vec::new(); + for (instance_key, heartbeat_callback) in heartbeat_map_clone.lock().iter() { + let mut heartbeat = heartbeat_callback(); + let mut last_heartbeat_time_map = last_heartbeat_time_map.lock(); + let heartbeat_time = last_heartbeat_time_map.get(instance_key).cloned(); + + let now = SystemTime::now(); + let elapsed_since_last_heartbeat = if let Some(heartbeat_time) = heartbeat_time { + let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO); + Some(PbDuration { + seconds: dur.as_secs() as i64, + nanos: dur.subsec_nanos() as i32, + }) + } else { + None + }; + + heartbeat.elapsed_since_last_heartbeat = elapsed_since_last_heartbeat; + heartbeat.heartbeat_time = Some(now.into()); + + hb_to_send.push(heartbeat); + + last_heartbeat_time_map.insert(instance_key.clone(), now); + } + if let Err(e) = client_clone.record_worker_heartbeat(namespace_clone.clone(), hb_to_send).await { if matches!( - e.code(), - tonic::Code::Unimplemented + e.code(), + tonic::Code::Unimplemented ) { return; } @@ -62,127 +119,83 @@ impl WorkerHeartbeatManager { } }); - let data_clone = data.clone(); - if heartbeat_fn - .set(Box::new(move || { - data_clone.lock().capture_heartbeat_if_needed() - })) - .is_err() - { - dbg_panic!( - "Failed to set heartbeat_fn, heartbeat_fn should only be set once, when a singular WorkerHeartbeatInfo is created" - ); + Self { + heartbeat_map, + namespace, } - - Self { heartbeat_handle } - } - - pub(crate) fn shutdown(&self) { - self.heartbeat_handle.abort() } } -#[derive(Debug, Clone)] -struct WorkerHeartbeatData { - worker_instance_key: String, - worker_identity: String, - host_info: WorkerHostInfo, - // Time of the last heartbeat. This is used to both for heartbeat_time and last_heartbeat_time - heartbeat_time: Option, - task_queue: String, - /// SDK name - sdk_name: String, - /// SDK version - sdk_version: String, - /// Worker start time - start_time: SystemTime, - heartbeat_interval: Duration, - reset_notify: Arc, -} - -impl WorkerHeartbeatData { - fn new( - worker_config: WorkerConfig, - worker_identity: String, - sdk_name_and_ver: (String, String), - reset_notify: Arc, - ) -> Self { - Self { - worker_identity, - host_info: WorkerHostInfo { - host_name: gethostname().to_string_lossy().to_string(), - process_id: std::process::id().to_string(), - ..Default::default() - }, - sdk_name: sdk_name_and_ver.0, - sdk_version: sdk_name_and_ver.1, - task_queue: worker_config.task_queue.clone(), - start_time: SystemTime::now(), - heartbeat_time: None, - worker_instance_key: Uuid::new_v4().to_string(), - heartbeat_interval: worker_config - .heartbeat_interval - .expect("WorkerHeartbeatData is only called when heartbeat_interval is Some"), - reset_notify, - } +impl SharedNamespaceWorkerTrait for SharedNamespaceWorker { + fn namespace(&self) -> String { + self.namespace.clone() } - fn capture_heartbeat_if_needed(&mut self) -> Option { - let now = SystemTime::now(); - let elapsed_since_last_heartbeat = if let Some(heartbeat_time) = self.heartbeat_time { - let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO); + fn unregister_callback( + &self, + worker_instance_key: String, + ) -> (Option WorkerHeartbeat + Send + Sync>>, bool) { + let mut heartbeat_map = self.heartbeat_map.lock(); + ( + heartbeat_map.remove(&worker_instance_key), + heartbeat_map.is_empty(), + ) + } + fn register_callback( + &self, + worker_instance_key: String, + heartbeat_callback: Box WorkerHeartbeat + Send + Sync>, + ) { + self.heartbeat_map + .lock() + .insert(worker_instance_key, heartbeat_callback); + } +} - // Only send poll data if it's nearly been a full interval since this data has been sent - // In this case, "nearly" is 90% of the interval - if dur.as_secs_f64() < 0.9 * self.heartbeat_interval.as_secs_f64() { - return None; +impl fmt::Debug for SharedNamespaceWorker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.heartbeat_map.try_lock() { + Some(guard) => { + let keys: Vec<_> = guard.keys().cloned().collect(); + f.debug_struct("SharedNamespaceWorker") + .field("namespace", &self.namespace) + .field("heartbeat_keys", &keys) + .finish() } - Some(PbDuration { - seconds: dur.as_secs() as i64, - nanos: dur.subsec_nanos() as i32, - }) - } else { - None - }; - - self.heartbeat_time = Some(now); - - self.reset_notify.notify_one(); - - Some(WorkerHeartbeat { - worker_instance_key: self.worker_instance_key.clone(), - worker_identity: self.worker_identity.clone(), - host_info: Some(self.host_info.clone()), - task_queue: self.task_queue.clone(), - sdk_name: self.sdk_name.clone(), - sdk_version: self.sdk_version.clone(), - status: 0, - start_time: Some(self.start_time.into()), - heartbeat_time: Some(SystemTime::now().into()), - elapsed_since_last_heartbeat, - ..Default::default() - }) + None => f + .debug_struct("SharedNamespaceWorker") + .field("namespace", &self.namespace) + .field("heartbeat_map", &"") + .finish(), + } } } - #[cfg(test)] mod tests { - use super::*; use crate::{ test_help::{WorkerExt, test_worker_cfg}, worker, worker::client::mocks::mock_worker_client, }; - use std::{sync::Arc, time::Duration}; + use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, + }; use temporal_sdk_core_api::worker::PollerBehavior; use temporal_sdk_core_protos::temporal::api::workflowservice::v1::RecordWorkerHeartbeatResponse; #[tokio::test] - async fn worker_heartbeat() { + async fn worker_heartbeat_basic() { let mut mock = mock_worker_client(); - mock.expect_record_worker_heartbeat() - .times(2) - .returning(move |heartbeat| { + let heartbeat_count = Arc::new(AtomicUsize::new(0)); + let heartbeat_count_clone = heartbeat_count.clone(); + mock.expect_record_worker_heartbeat().times(2).returning( + move |_namespace, worker_heartbeat| { + assert_eq!(1, worker_heartbeat.len()); + let heartbeat = worker_heartbeat[0].clone(); let host_info = heartbeat.host_info.clone().unwrap(); assert_eq!("test-identity", heartbeat.worker_identity); assert!(!heartbeat.worker_instance_key.is_empty()); @@ -197,34 +210,31 @@ mod tests { assert!(heartbeat.heartbeat_time.is_some()); assert!(heartbeat.start_time.is_some()); + heartbeat_count_clone.fetch_add(1, Ordering::Relaxed); + Ok(RecordWorkerHeartbeatResponse {}) - }); + }, + ); let config = test_worker_cfg() .activity_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize)) .max_outstanding_activities(1_usize) - .heartbeat_interval(Duration::from_millis(200)) .build() - .unwrap(); + .unwrap() + .into(); - let heartbeat_fn = Arc::new(OnceLock::new()); let client = Arc::new(mock); - let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_fn.clone())); - heartbeat_fn.get().unwrap()(); - - // heartbeat timer fires once - advance_time(Duration::from_millis(300)).await; - // it hasn't been >90% of the interval since the last heartbeat, so no data should be returned here - assert_eq!(None, heartbeat_fn.get().unwrap()()); - // heartbeat timer fires once - advance_time(Duration::from_millis(300)).await; + let worker = worker::Worker::new( + config, + None, + client.clone(), + None, + Some(Duration::from_millis(100)), + ); + tokio::time::sleep(Duration::from_millis(250)).await; worker.drain_activity_poller_and_shutdown().await; - } - async fn advance_time(dur: Duration) { - tokio::time::pause(); - tokio::time::advance(dur).await; - tokio::time::resume(); + assert_eq!(2, heartbeat_count.load(Ordering::Relaxed)); } } diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index 5e57a26e8..b34cfa796 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -20,11 +20,12 @@ pub(crate) use activities::{ pub(crate) use wft_poller::WFTPollerShared; pub use workflow::LEGACY_QUERY_ID; +use crate::worker::heartbeat::{HeartbeatFn, SharedNamespaceWorker}; use crate::{ ActivityHeartbeat, CompleteActivityError, PollError, WorkerTrait, abstractions::{MeteredPermitDealer, PermitDealerContextData, dbg_panic}, errors::CompleteWfError, - pollers::{ActivityTaskOptions, BoxedActPoller, BoxedNexusPoller, LongPollBuffer}, + pollers::{BoxedActPoller, BoxedNexusPoller}, protosext::validate_activity_completion, telemetry::{ TelemetryInstance, @@ -36,32 +37,39 @@ use crate::{ worker::{ activities::{LACompleteAction, LocalActivityManager, NextPendingLAAction}, client::WorkerClient, - heartbeat::{HeartbeatFn, WorkerHeartbeatManager}, nexus::NexusManager, workflow::{ - LAReqSink, LocalResolution, WorkflowBasics, Workflows, wft_poller, - wft_poller::make_wft_poller, + LAReqSink, LocalResolution, WorkflowBasics, Workflows, wft_poller::make_wft_poller, }, }, }; +use crate::{ + pollers::{ActivityTaskOptions, LongPollBuffer}, + worker::workflow::wft_poller, +}; use activities::WorkerActivityTasks; use futures_util::{StreamExt, stream}; +use gethostname::gethostname; use parking_lot::Mutex; use slot_provider::SlotProvider; use std::{ convert::TryInto, future, sync::{ - Arc, OnceLock, + Arc, atomic::{AtomicBool, Ordering}, }, time::Duration, }; -use temporal_client::{ConfiguredClient, TemporalServiceClientWithMetrics, WorkerKey}; +use temporal_client::{ + ConfiguredClient, SharedNamespaceWorkerTrait, TemporalServiceClientWithMetrics, WorkerKey, +}; +use temporal_sdk_core_api::telemetry::metrics::TemporalMeter; use temporal_sdk_core_api::{ errors::{CompleteNexusError, WorkerValidationError}, worker::PollerBehavior, }; +use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHostInfo; use temporal_sdk_core_protos::{ TaskToken, coresdk::{ @@ -80,7 +88,8 @@ use temporal_sdk_core_protos::{ use tokio::sync::{mpsc::unbounded_channel, watch}; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::CancellationToken; - +use tracing::Subscriber; +use uuid::Uuid; #[cfg(any(feature = "test-utilities", test))] use { crate::{ @@ -107,6 +116,8 @@ pub struct Worker { local_act_mgr: Arc, /// Manages Nexus tasks nexus_mgr: NexusManager, + /// Manages worker heartbeat. Will be None for [SharedNamespaceWorker] workers + worker_heartbeat_mgr: Option, /// Has shutdown been called? shutdown_token: CancellationToken, /// Will be called at the end of each activation completion @@ -118,8 +129,6 @@ pub struct Worker { local_activities_complete: Arc, /// Used to track all permits have been released all_permits_tracker: tokio::sync::Mutex, - /// Used to shutdown the worker heartbeat task - worker_heartbeat: Option, } struct AllPermitsTracker { @@ -136,6 +145,35 @@ impl AllPermitsTracker { } } +#[derive(Clone)] +pub(crate) struct WorkerTelemetry { + metric_meter: Option, + temporal_metric_meter: Option, + trace_subscriber: Option>, +} + +impl WorkerTelemetry { + fn metric_meter(&self) -> Option { + self.metric_meter.as_ref().cloned() + } + + fn temporal_metric_meter(&self) -> Option { + self.temporal_metric_meter.as_ref().cloned() + } + fn trace_subscriber(&self) -> Option> { + self.trace_subscriber.as_ref().cloned() + } +} + +struct WorkerHeartbeatManager { + /// Instance key used to identify this worker in worker heartbeating + worker_instance_key: Uuid, + /// Heartbeat interval, defaults to 60s + heartbeat_interval: Duration, + /// Telemetry instance, needed to initialize [SharedNamespaceWorker] when replacing client + telemetry: Option, +} + #[async_trait::async_trait] impl WorkerTrait for Worker { async fn validate(&self) -> Result<(), WorkerValidationError> { @@ -233,7 +271,7 @@ impl WorkerTrait for Worker { self.shutdown_token.cancel(); // First, disable Eager Workflow Start if let Some(key) = *self.worker_key.lock() { - self.client.workers().unregister(key); + self.client.workers().unregister_slot(key); } // Second, we want to stop polling of both activity and workflow tasks if let Some(atm) = self.at_task_mgr.as_ref() { @@ -272,7 +310,7 @@ impl Worker { sticky_queue_name: Option, client: Arc, telem_instance: Option<&TelemetryInstance>, - heartbeat_fn: Option>>, + worker_heartbeat_interval: Option, ) -> Self { info!(task_queue=%config.task_queue, namespace=%config.namespace, "Initializing worker"); @@ -282,7 +320,7 @@ impl Worker { client, TaskPollers::Real, telem_instance, - heartbeat_fn, + worker_heartbeat_interval, ) } @@ -291,11 +329,42 @@ impl Worker { pub fn replace_client(&self, new_client: ConfiguredClient) { // Unregister worker from current client, register in new client at the end let mut worker_key = self.worker_key.lock(); - let slot_provider = (*worker_key).and_then(|k| self.client.workers().unregister(k)); - self.client - .replace_client(super::init_worker_client(&self.config, new_client)); - *worker_key = - slot_provider.and_then(|slot_provider| self.client.workers().register(slot_provider)); + let heartbeat_fn = if let Some(worker_heartbeat_mgr) = &self.worker_heartbeat_mgr { + self.client.workers().unregister_heartbeat_worker( + self.config.namespace.clone(), + worker_heartbeat_mgr.worker_instance_key.to_string(), + ) + } else { + None + }; + let slot_provider = (*worker_key).and_then(|k| self.client.workers().unregister_slot(k)); + let new_worker_client = super::init_worker_client( + self.config.namespace.clone(), + self.config.client_identity_override.clone(), + new_client, + self.client.get_process_key(), + ); + + self.client.replace_client(new_worker_client); + + *worker_key = slot_provider + .and_then(|slot_provider| self.client.workers().register_slot(slot_provider)); + if let Some(worker_heartbeat_mgr) = &self.worker_heartbeat_mgr + && let Some(heartbeat_fn) = heartbeat_fn + { + let shared_worker_fn = construct_shared_worker_fn( + self.client.clone(), + self.config.namespace.clone(), + worker_heartbeat_mgr.heartbeat_interval, + worker_heartbeat_mgr.telemetry.clone(), + ); + self.client.workers().register_heartbeat_worker( + self.config.namespace.clone(), + worker_heartbeat_mgr.worker_instance_key.to_string(), + heartbeat_fn, + shared_worker_fn, + ); + } } #[cfg(test)] @@ -309,16 +378,46 @@ impl Worker { client: Arc, task_pollers: TaskPollers, telem_instance: Option<&TelemetryInstance>, - heartbeat_fn: Option>>, + worker_heartbeat_interval: Option, ) -> Self { - let (metrics, meter) = if let Some(ti) = telem_instance { + let worker_telemetry = telem_instance.map(|telem| WorkerTelemetry { + metric_meter: telem.get_metric_meter(), + temporal_metric_meter: telem.get_temporal_metric_meter(), + trace_subscriber: telem.trace_subscriber(), + }); + + Worker::new_with_pollers_inner( + config, + sticky_queue_name, + client, + task_pollers, + worker_telemetry, + worker_heartbeat_interval, + ) + } + + pub(crate) fn new_with_pollers_inner( + config: WorkerConfig, + sticky_queue_name: Option, + client: Arc, + task_pollers: TaskPollers, + worker_telemetry: Option, + worker_heartbeat_interval: Option, + ) -> Self { + let shared_namespace_worker = worker_heartbeat_interval.is_none(); + let (metrics, meter) = if let Some(wt) = worker_telemetry.as_ref() { ( - MetricsContext::top_level(config.namespace.clone(), config.task_queue.clone(), ti), - ti.get_metric_meter(), + MetricsContext::top_level_with_meter( + config.namespace.clone(), + config.task_queue.clone(), + wt.temporal_metric_meter(), + ), + wt.metric_meter(), ) } else { (MetricsContext::no_op(), None) }; + let tuner = config .tuner .as_ref() @@ -408,6 +507,7 @@ impl Worker { nexus_slots.clone(), shutdown_token.child_token(), Some(move |np| np_metrics.record_num_pollers(np)), + shared_namespace_worker, )) as BoxedNexusPoller; #[cfg(any(feature = "test-utilities", test))] @@ -486,17 +586,68 @@ impl Worker { wft_slots.clone(), external_wft_tx, ); - let worker_key = Mutex::new(client.workers().register(Box::new(provider))); + let worker_key = Mutex::new(client.workers().register_slot(Box::new(provider))); let sdk_name_and_ver = client.sdk_name_and_version(); - let worker_heartbeat = heartbeat_fn.map(|heartbeat_fn| { - WorkerHeartbeatManager::new( - config.clone(), - client.get_identity(), - heartbeat_fn, - client.clone(), - ) - }); + let worker_heartbeat = if let Some(heartbeat_interval) = worker_heartbeat_interval { + let worker_instance_key = Uuid::new_v4(); + let worker_instance_key_clone = worker_instance_key.to_string(); + let worker_identity = client.get_identity(); + let task_queue = config.task_queue.clone(); + let sdk_name_and_ver = sdk_name_and_ver.clone(); + + // TODO: requires the metrics changes to get the rest of these fields + let worker_heartbeat_callback: HeartbeatFn = Box::new(move || { + temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat { + worker_instance_key: worker_instance_key_clone.clone(), + worker_identity: worker_identity.clone(), + host_info: Some(WorkerHostInfo { + host_name: gethostname().to_string_lossy().to_string(), + process_id: std::process::id().to_string(), + ..Default::default() + }), + task_queue: task_queue.clone(), + deployment_version: None, + sdk_name: sdk_name_and_ver.clone().0, + sdk_version: sdk_name_and_ver.clone().1, + status: 0, + start_time: Some(std::time::SystemTime::now().into()), + workflow_task_slots_info: None, + activity_task_slots_info: None, + nexus_task_slots_info: None, + local_activity_slots_info: None, + workflow_poller_info: None, + workflow_sticky_poller_info: None, + activity_poller_info: None, + nexus_poller_info: None, + total_sticky_cache_hit: 0, + total_sticky_cache_miss: 0, + current_sticky_cache_size: 0, + plugins: vec![], + ..Default::default() + } + }); + + client.workers().register_heartbeat_worker( + config.namespace.clone(), + worker_instance_key.to_string(), + worker_heartbeat_callback, + construct_shared_worker_fn( + client.clone(), + config.namespace.clone(), + heartbeat_interval, + worker_telemetry.clone(), + ), + ); + + Some(WorkerHeartbeatManager { + worker_instance_key, + heartbeat_interval, + telemetry: worker_telemetry.clone(), + }) + } else { + None + }; Self { worker_key, @@ -538,7 +689,9 @@ impl Worker { _ => Some(mgr.get_handle_for_workflows()), } }), - telem_instance, + worker_telemetry + .as_ref() + .and_then(|telem| telem.trace_subscriber()), ), at_task_mgr, local_act_mgr, @@ -554,7 +707,7 @@ impl Worker { la_permits, }), nexus_mgr, - worker_heartbeat, + worker_heartbeat_mgr: worker_heartbeat, } } @@ -599,8 +752,11 @@ impl Worker { dbg_panic!("Waiting for all slot permits to release took too long!"); } } - if let Some(heartbeat) = self.worker_heartbeat.as_ref() { - heartbeat.shutdown(); + if let Some(worker_heartbeat) = self.worker_heartbeat_mgr.as_ref() { + self.client.workers().unregister_heartbeat_worker( + self.config.namespace.clone(), + worker_heartbeat.worker_instance_key.to_string(), + ); } } @@ -895,6 +1051,22 @@ fn wft_poller_behavior(config: &WorkerConfig, is_sticky: bool) -> PollerBehavior } } +fn construct_shared_worker_fn( + client: Arc, + namespace: String, + heartbeat_interval: Duration, + telemetry: Option, +) -> impl Fn() -> Box { + move || { + Box::new(SharedNamespaceWorker::new( + client.clone(), + namespace.clone(), + heartbeat_interval, + telemetry.clone(), + )) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/worker/workflow/mod.rs b/core/src/worker/workflow/mod.rs index 33b9c6dac..24aa59f11 100644 --- a/core/src/worker/workflow/mod.rs +++ b/core/src/worker/workflow/mod.rs @@ -23,7 +23,7 @@ use crate::{ internal_flags::InternalFlags, pollers::TrackedPermittedTqResp, protosext::{ValidPollWFTQResponse, protocol_messages::IncomingProtocolMessage}, - telemetry::{TelemetryInstance, VecDisplayer, set_trace_subscriber_for_current_thread}, + telemetry::{VecDisplayer, set_trace_subscriber_for_current_thread}, worker::{ LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution, PostActivateHookData, @@ -94,7 +94,7 @@ use tokio::{ }; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::CancellationToken; -use tracing::Span; +use tracing::{Span, Subscriber}; /// Id used by server for "legacy" queries. IE: Queries that come in the `query` rather than /// `queries` field of a WFT, and are responded to on the separate `respond_query_task_completed` @@ -166,7 +166,7 @@ impl Workflows { local_act_mgr: Arc, heartbeat_timeout_rx: UnboundedReceiver, activity_tasks_handle: Option, - telem_instance: Option<&TelemetryInstance>, + tracing_sub: Option>, ) -> Self { let (local_tx, local_rx) = unbounded_channel(); let (fetch_tx, fetch_rx) = unbounded_channel(); @@ -187,7 +187,6 @@ impl Workflows { let (start_polling_tx, start_polling_rx) = oneshot::channel(); // We must spawn a task to constantly poll the activation stream, because otherwise // activation completions would not cause anything to happen until the next poll. - let tracing_sub = telem_instance.and_then(|ti| ti.trace_subscriber()); let processing_task = thread::Builder::new() .name("workflow-processing".to_string()) .spawn(move || { diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 90115715e..51f1e1627 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -18,12 +18,14 @@ //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { +//! use temporal_sdk_core::RuntimeOptionsBuilder; //! let server_options = sdk_client_options(Url::from_str("http://localhost:7233")?).build()?; //! -//! let client = server_options.connect("default", None).await?; -//! //! let telemetry_options = TelemetryOptionsBuilder::default().build()?; -//! let runtime = CoreRuntime::new_assume_tokio(telemetry_options)?; +//! let runtime_options = RuntimeOptionsBuilder::default().telemetry_options(telemetry_options).build().unwrap(); +//! let runtime = CoreRuntime::new_assume_tokio(runtime_options)?; +//! +//! let client = server_options.connect("default", None, runtime.process_key()).await?; //! //! let worker_config = WorkerConfigBuilder::default() //! .namespace("default") diff --git a/tests/common/mod.rs b/tests/common/mod.rs index b17d07d31..a6ad8a673 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -39,8 +39,8 @@ use temporal_sdk::{ }, }; use temporal_sdk_core::{ - ClientOptions, ClientOptionsBuilder, CoreRuntime, WorkerConfigBuilder, init_replay_worker, - init_worker, + ClientOptions, ClientOptionsBuilder, CoreRuntime, RuntimeOptionsBuilder, WorkerConfigBuilder, + init_replay_worker, init_worker, replay::{HistoryForReplay, ReplayWorkerInput}, telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter}, }; @@ -67,6 +67,7 @@ use temporal_sdk_core_protos::{ use tokio::{sync::OnceCell, task::AbortHandle}; use tracing::{debug, warn}; use url::Url; +use uuid::Uuid; pub(crate) use temporal_sdk_core::test_help::NAMESPACE; /// The env var used to specify where the integ tests should point @@ -164,8 +165,12 @@ pub(crate) fn init_integ_telem() -> Option<&'static CoreRuntime> { } Some(INTEG_TESTS_RT.get_or_init(|| { let telemetry_options = get_integ_telem_options(); + let runtime_options = RuntimeOptionsBuilder::default() + .telemetry_options(telemetry_options) + .build() + .expect("Runtime options build cleanly"); let rt = - CoreRuntime::new_assume_tokio(telemetry_options).expect("Core runtime inits cleanly"); + CoreRuntime::new_assume_tokio(runtime_options).expect("Core runtime inits cleanly"); if let Some(sub) = rt.telemetry().trace_subscriber() { let _ = tracing::subscriber::set_global_default(sub); } @@ -199,6 +204,7 @@ pub(crate) async fn get_cloud_client() -> RetryClient { sgo.connect( env::var("TEMPORAL_NAMESPACE").expect("TEMPORAL_NAMESPACE must be set"), None, + Uuid::new_v4(), ) .await .unwrap() @@ -450,6 +456,7 @@ impl CoreWfStarter { .connect( cfg.namespace.clone(), rt.telemetry().get_temporal_metric_meter(), + rt.process_key(), ) .await .expect("Must connect"), diff --git a/tests/global_metric_tests.rs b/tests/global_metric_tests.rs index 14e799195..4da926801 100644 --- a/tests/global_metric_tests.rs +++ b/tests/global_metric_tests.rs @@ -6,7 +6,7 @@ use common::CoreWfStarter; use parking_lot::Mutex; use std::{sync::Arc, time::Duration}; use temporal_sdk_core::{ - CoreRuntime, + CoreRuntime, RuntimeOptionsBuilder, telemetry::{build_otlp_metric_exporter, construct_filter_string, telemetry_init_global}, }; use temporal_sdk_core_api::telemetry::{ @@ -71,18 +71,20 @@ async fn otel_errors_logged_as_errors() { .unwrap(), ) .unwrap(); + let telemopts = TelemetryOptionsBuilder::default() + .metrics(Arc::new(exporter) as Arc) + // Importantly, _not_ using subscriber override, is using console. + .logging(Logger::Console { + filter: construct_filter_string(Level::INFO, Level::WARN), + }) + .build() + .unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); - let rt = CoreRuntime::new_assume_tokio( - TelemetryOptionsBuilder::default() - .metrics(Arc::new(exporter) as Arc) - // Importantly, _not_ using subscriber override, is using console. - .logging(Logger::Console { - filter: construct_filter_string(Level::INFO, Level::WARN), - }) - .build() - .unwrap(), - ) - .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let mut starter = CoreWfStarter::new_with_runtime("otel_errors_logged_as_errors", rt); let _worker = starter.get_worker().await; diff --git a/tests/heavy_tests.rs b/tests/heavy_tests.rs index f5dc9018d..7259f6cc3 100644 --- a/tests/heavy_tests.rs +++ b/tests/heavy_tests.rs @@ -19,7 +19,9 @@ use std::{ }; use temporal_client::{GetWorkflowResultOpts, WfClientExt, WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{ActContext, ActivityOptions, WfContext, WorkflowResult}; -use temporal_sdk_core::{CoreRuntime, ResourceBasedTuner, ResourceSlotOptions}; +use temporal_sdk_core::{ + CoreRuntime, ResourceBasedTuner, ResourceSlotOptions, RuntimeOptionsBuilder, +}; use temporal_sdk_core_api::worker::PollerBehavior; use temporal_sdk_core_protos::{ coresdk::{AsJsonPayloadExt, workflow_commands::ActivityCancellationType}, @@ -194,7 +196,11 @@ async fn workflow_load() { // cause us to encounter the tracing span drop bug telemopts.logging = None; init_integ_telem(); - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let mut starter = CoreWfStarter::new_with_runtime("workflow_load", rt); starter .worker_config diff --git a/tests/integ_tests/client_tests.rs b/tests/integ_tests/client_tests.rs index 9a4473b35..3323efa76 100644 --- a/tests/integ_tests/client_tests.rs +++ b/tests/integ_tests/client_tests.rs @@ -39,6 +39,7 @@ use tonic::{ transport::Server, }; use tracing::info; +use uuid::Uuid; #[tokio::test] async fn can_use_retry_client() { @@ -279,7 +280,7 @@ async fn non_retryable_errors() { .unwrap(); opts.target_url = uri; opts.skip_get_system_info = true; - let client = opts.connect("ns", None).await.unwrap(); + let client = opts.connect("ns", None, Uuid::new_v4()).await.unwrap(); let result = client.cancel_activity_task(vec![1].into(), None).await; @@ -319,7 +320,7 @@ async fn retryable_errors() { .unwrap(); opts.target_url = uri; opts.skip_get_system_info = true; - let client = opts.connect("ns", None).await.unwrap(); + let client = opts.connect("ns", None, Uuid::new_v4()).await.unwrap(); let result = client.cancel_activity_task(vec![1].into(), None).await; @@ -365,7 +366,7 @@ async fn namespace_header_attached_to_relevant_calls() { opts.retry_config = RetryConfig::no_retries(); let namespace = "namespace"; - let client = opts.connect(namespace, None).await.unwrap(); + let client = opts.connect(namespace, None, Uuid::new_v4()).await.unwrap(); let _ = client .get_workflow_execution_history("hi".to_string(), None, vec![]) @@ -445,7 +446,10 @@ async fn http_proxy() { opts.target_url = format!("http://127.0.0.1:{}", server.addr.port()) .parse() .unwrap(); - let client = opts.connect("my-namespace", None).await.unwrap(); + let client = opts + .connect("my-namespace", None, Uuid::new_v4()) + .await + .unwrap(); let _ = client.list_namespaces().await; assert!(call_count.load(Ordering::SeqCst) == 1); assert!(tcp_proxy.hit_count() == 0); @@ -455,7 +459,10 @@ async fn http_proxy() { target_addr: tcp_proxy_addr.to_string(), basic_auth: None, }); - let proxied_client = opts.connect("my-namespace", None).await.unwrap(); + let proxied_client = opts + .connect("my-namespace", None, Uuid::new_v4()) + .await + .unwrap(); let _ = proxied_client.list_namespaces().await; assert!(call_count.load(Ordering::SeqCst) == 2); assert!(tcp_proxy.hit_count() == 1); @@ -477,7 +484,10 @@ async fn http_proxy() { target_addr: format!("unix:{}", sock_path.to_str().unwrap()), basic_auth: None, }); - let proxied_client = opts.connect("my-namespace", None).await.unwrap(); + let proxied_client = opts + .connect("my-namespace", None, Uuid::new_v4()) + .await + .unwrap(); let _ = proxied_client.list_namespaces().await; assert!(call_count.load(Ordering::SeqCst) == 3); assert!(unix_proxy.hit_count() == 1); diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index dd4068ac4..e8734662a 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -22,7 +22,8 @@ use temporal_sdk::{ NexusOperationOptions, WfContext, }; use temporal_sdk_core::{ - CoreRuntime, FixedSizeSlotSupplier, TokioRuntimeBuilder, TunerBuilder, init_worker, + CoreRuntime, FixedSizeSlotSupplier, RuntimeOptionsBuilder, TokioRuntimeBuilder, TunerBuilder, + init_worker, telemetry::{WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, build_otlp_metric_exporter}, }; use temporal_sdk_core_api::{ @@ -97,7 +98,11 @@ async fn prometheus_metrics_exported( }); } let (telemopts, addr, _aborter) = prom_metrics(Some(opts_builder.build().unwrap())); - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let opts = get_integ_server_options(); let mut raw_client = opts .connect_no_namespace(rt.telemetry().get_temporal_metric_meter()) @@ -148,7 +153,11 @@ async fn prometheus_metrics_exported( async fn one_slot_worker_reports_available_slot() { let (telemopts, addr, _aborter) = prom_metrics(None); let tq = "one_slot_worker_tq"; - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let worker_cfg = WorkerConfigBuilder::default() .namespace(NAMESPACE) @@ -168,7 +177,7 @@ async fn one_slot_worker_reports_available_slot() { let client = Arc::new( get_integ_server_options() - .connect(worker_cfg.namespace.clone(), None) + .connect(worker_cfg.namespace.clone(), None, rt.process_key()) .await .expect("Must connect"), ); @@ -401,7 +410,11 @@ async fn query_of_closed_workflow_doesnt_tick_terminal_metric( completion: workflow_command::Variant, ) { let (telemopts, addr, _aborter) = prom_metrics(None); - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let mut starter = CoreWfStarter::new_with_runtime("query_of_closed_workflow_doesnt_tick_terminal_metric", rt); // Disable cache to ensure replay happens completely @@ -523,8 +536,11 @@ async fn query_of_closed_workflow_doesnt_tick_terminal_metric( #[test] fn runtime_new() { - let mut rt = - CoreRuntime::new(get_integ_telem_options(), TokioRuntimeBuilder::default()).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(get_integ_telem_options()) + .build() + .unwrap(); + let mut rt = CoreRuntime::new(runtimeopts, TokioRuntimeBuilder::default()).unwrap(); let handle = rt.tokio_handle(); let _rt = handle.enter(); let (telemopts, addr, _aborter) = prom_metrics(None); @@ -570,7 +586,11 @@ async fn latency_metrics( .build() .unwrap(), )); - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let mut starter = CoreWfStarter::new_with_runtime("latency_metrics", rt); let worker = starter.get_worker().await; starter.start_wf().await; @@ -624,10 +644,18 @@ async fn latency_metrics( #[tokio::test] async fn request_fail_codes() { let (telemopts, addr, _aborter) = prom_metrics(None); - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let opts = get_integ_server_options(); let mut client = opts - .connect(NAMESPACE, rt.telemetry().get_temporal_metric_meter()) + .connect( + NAMESPACE, + rt.telemetry().get_temporal_metric_meter(), + rt.process_key(), + ) .await .unwrap(); @@ -667,11 +695,18 @@ async fn request_fail_codes_otel() { let mut telemopts = TelemetryOptionsBuilder::default(); let exporter = Arc::new(exporter); telemopts.metrics(exporter as Arc); - - let rt = CoreRuntime::new_assume_tokio(telemopts.build().unwrap()).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts.build().unwrap()) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let opts = get_integ_server_options(); let mut client = opts - .connect(NAMESPACE, rt.telemetry().get_temporal_metric_meter()) + .connect( + NAMESPACE, + rt.telemetry().get_temporal_metric_meter(), + rt.process_key(), + ) .await .unwrap(); @@ -718,7 +753,11 @@ async fn docker_metrics_with_prometheus( .metric_prefix(test_uid.clone()) .build() .unwrap(); - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let test_name = "docker_metrics_with_prometheus"; let mut starter = CoreWfStarter::new_with_runtime(test_name, rt); let worker = starter.get_worker().await; @@ -772,7 +811,11 @@ async fn docker_metrics_with_prometheus( #[tokio::test] async fn activity_metrics() { let (telemopts, addr, _aborter) = prom_metrics(None); - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let wf_name = "activity_metrics"; let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); starter @@ -906,7 +949,11 @@ async fn activity_metrics() { #[tokio::test] async fn nexus_metrics() { let (telemopts, addr, _aborter) = prom_metrics(None); - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let wf_name = "nexus_metrics"; let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); starter.worker_config.no_remote_activities(true); @@ -1083,7 +1130,11 @@ async fn nexus_metrics() { #[tokio::test] async fn evict_on_complete_does_not_count_as_forced_eviction() { let (telemopts, addr, _aborter) = prom_metrics(None); - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let wf_name = "evict_on_complete_does_not_count_as_forced_eviction"; let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); starter.worker_config.no_remote_activities(true); @@ -1166,7 +1217,11 @@ where #[tokio::test] async fn metrics_available_from_custom_slot_supplier() { let (telemopts, addr, _aborter) = prom_metrics(None); - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let mut starter = CoreWfStarter::new_with_runtime("metrics_available_from_custom_slot_supplier", rt); starter.worker_config.no_remote_activities(true); diff --git a/tests/integ_tests/polling_tests.rs b/tests/integ_tests/polling_tests.rs index a69006e39..56c24ff1e 100644 --- a/tests/integ_tests/polling_tests.rs +++ b/tests/integ_tests/polling_tests.rs @@ -28,6 +28,7 @@ use temporal_sdk_core_protos::{ use tokio::time::timeout; use tracing::info; use url::Url; +use uuid::Uuid; #[tokio::test] async fn out_of_order_completion_doesnt_hang() { @@ -134,7 +135,7 @@ async fn switching_worker_client_changes_poll() { .target_url(Url::parse(&format!("http://{}", server1.target)).unwrap()) .build() .unwrap() - .connect("default", None) + .connect("default", None, Uuid::new_v4()) .await .unwrap(); let client2 = client_common_config @@ -142,7 +143,7 @@ async fn switching_worker_client_changes_poll() { .target_url(Url::parse(&format!("http://{}", server2.target)).unwrap()) .build() .unwrap() - .connect("default", None) + .connect("default", None, Uuid::new_v4()) .await .unwrap(); diff --git a/tests/integ_tests/visibility_tests.rs b/tests/integ_tests/visibility_tests.rs index 7ffba0921..065045517 100644 --- a/tests/integ_tests/visibility_tests.rs +++ b/tests/integ_tests/visibility_tests.rs @@ -10,6 +10,7 @@ use temporal_sdk_core_protos::coresdk::workflow_activation::{ WorkflowActivationJob, workflow_activation_job, }; use tokio::time::sleep; +use uuid::Uuid; #[tokio::test] async fn client_list_open_closed_workflow_executions() { @@ -109,7 +110,7 @@ async fn client_list_open_closed_workflow_executions() { async fn client_create_namespace() { let client = Arc::new( get_integ_server_options() - .connect(NAMESPACE.to_owned(), None) + .connect(NAMESPACE.to_owned(), None, Uuid::new_v4()) .await .expect("Must connect"), ); @@ -156,7 +157,7 @@ async fn client_create_namespace() { async fn client_describe_namespace() { let client = Arc::new( get_integ_server_options() - .connect(NAMESPACE.to_owned(), None) + .connect(NAMESPACE.to_owned(), None, Uuid::new_v4()) .await .expect("Must connect"), ); diff --git a/tests/integ_tests/worker_tests.rs b/tests/integ_tests/worker_tests.rs index 728ba3222..e755a9abf 100644 --- a/tests/integ_tests/worker_tests.rs +++ b/tests/integ_tests/worker_tests.rs @@ -15,7 +15,7 @@ use std::{ use temporal_client::WorkflowOptions; use temporal_sdk::{ActivityOptions, WfContext, interceptors::WorkerInterceptor}; use temporal_sdk_core::{ - CoreRuntime, ResourceBasedTuner, ResourceSlotOptions, init_worker, + CoreRuntime, ResourceBasedTuner, ResourceSlotOptions, RuntimeOptionsBuilder, init_worker, test_help::{ FakeWfResponses, MockPollCfg, ResponseType, TEST_Q, build_mock_pollers, drain_pollers_and_shutdown, hist_to_poll_resp, mock_worker, mock_worker_client, @@ -61,7 +61,11 @@ use uuid::Uuid; #[tokio::test] async fn worker_validation_fails_on_nonexistent_namespace() { let opts = get_integ_server_options(); - let runtime = CoreRuntime::new_assume_tokio(get_integ_telem_options()).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(get_integ_telem_options()) + .build() + .unwrap(); + let runtime = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let retrying_client = opts .connect_no_namespace(runtime.telemetry().get_temporal_metric_meter()) .await diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index 6d10fbb71..c11602900 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -35,7 +35,7 @@ use temporal_sdk::{ ActivityOptions, LocalActivityOptions, TimerOptions, WfContext, interceptors::WorkerInterceptor, }; use temporal_sdk_core::{ - CoreRuntime, + CoreRuntime, RuntimeOptionsBuilder, replay::HistoryForReplay, test_help::{MockPollCfg, WorkerTestHelpers, drain_pollers_and_shutdown}, }; @@ -764,7 +764,11 @@ async fn nondeterminism_errors_fail_workflow_when_configured_to( #[values(true, false)] whole_worker: bool, ) { let (telemopts, addr, _aborter) = prom_metrics(None); - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let wf_name = "nondeterminism_errors_fail_workflow_when_configured_to"; let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); starter.worker_config.no_remote_activities(true); diff --git a/tests/integ_tests/workflow_tests/eager.rs b/tests/integ_tests/workflow_tests/eager.rs index 7271decd1..7ee238d09 100644 --- a/tests/integ_tests/workflow_tests/eager.rs +++ b/tests/integ_tests/workflow_tests/eager.rs @@ -2,6 +2,7 @@ use crate::common::{CoreWfStarter, NAMESPACE, get_integ_server_options}; use std::time::Duration; use temporal_client::WorkflowClientTrait; use temporal_sdk::{WfContext, WorkflowResult}; +use uuid::Uuid; pub(crate) async fn eager_wf(_context: WfContext) -> WorkflowResult<()> { Ok(().into()) @@ -33,7 +34,7 @@ async fn eager_wf_start_different_clients() { worker.register_wf(wf_name.to_owned(), eager_wf); let client = get_integ_server_options() - .connect(NAMESPACE.to_string(), None) + .connect(NAMESPACE.to_string(), None, Uuid::new_v4()) .await .expect("Should connect"); let w = starter.get_worker().await; diff --git a/tests/main.rs b/tests/main.rs index bf05e2a1f..768aa04f5 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -31,7 +31,7 @@ mod integ_tests { }; use std::time::Duration; use temporal_client::{NamespacedClient, WorkflowService}; - use temporal_sdk_core::{CoreRuntime, init_worker}; + use temporal_sdk_core::{CoreRuntime, RuntimeOptionsBuilder, init_worker}; use temporal_sdk_core_api::worker::WorkerConfigBuilder; use temporal_sdk_core_protos::temporal::api::{ nexus::v1::{EndpointSpec, EndpointTarget, endpoint_target}, @@ -44,7 +44,11 @@ mod integ_tests { #[ignore] // Really a compile time check more than anything async fn lang_bridge_example() { let opts = get_integ_server_options(); - let runtime = CoreRuntime::new_assume_tokio(get_integ_telem_options()).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(get_integ_telem_options()) + .build() + .unwrap(); + let runtime = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let mut retrying_client = opts .connect_no_namespace(runtime.telemetry().get_temporal_metric_meter()) .await diff --git a/tests/manual_tests.rs b/tests/manual_tests.rs index 8f5ef4c5b..749b574a2 100644 --- a/tests/manual_tests.rs +++ b/tests/manual_tests.rs @@ -20,7 +20,7 @@ use std::{ }; use temporal_client::{GetWorkflowResultOpts, WfClientExt, WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{ActContext, ActivityOptions, WfContext}; -use temporal_sdk_core::CoreRuntime; +use temporal_sdk_core::{CoreRuntime, RuntimeOptionsBuilder}; use temporal_sdk_core_api::{telemetry::PrometheusExporterOptionsBuilder, worker::PollerBehavior}; use temporal_sdk_core_protos::coresdk::AsJsonPayloadExt; use tracing::info; @@ -41,7 +41,11 @@ async fn poller_load_spiky() { } else { prom_metrics(None) }; - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let mut starter = CoreWfStarter::new_with_runtime("poller_load", rt); starter .worker_config @@ -200,7 +204,11 @@ async fn poller_load_sustained() { } else { prom_metrics(None) }; - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let mut starter = CoreWfStarter::new_with_runtime("poller_load", rt); starter .worker_config @@ -291,7 +299,11 @@ async fn poller_load_spike_then_sustained() { } else { prom_metrics(None) }; - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let mut starter = CoreWfStarter::new_with_runtime("poller_load", rt); starter .worker_config diff --git a/tests/workflow_replay_bench.rs b/tests/workflow_replay_bench.rs index 4200ebd3a..cc9131441 100644 --- a/tests/workflow_replay_bench.rs +++ b/tests/workflow_replay_bench.rs @@ -14,7 +14,9 @@ use std::{ time::Duration, }; use temporal_sdk::{WfContext, WorkflowFunction}; -use temporal_sdk_core::{CoreRuntime, replay::HistoryForReplay}; +use temporal_sdk_core::{ + CoreRuntime, RuntimeOptions, RuntimeOptionsBuilder, replay::HistoryForReplay, +}; use temporal_sdk_core_api::telemetry::metrics::{ MetricKeyValue, MetricParametersBuilder, NewAttributes, }; @@ -80,7 +82,11 @@ pub fn bench_metrics(c: &mut Criterion) { let _tokio = tokio_runtime.enter(); let (mut telemopts, _addr, _aborter) = prom_metrics(None); telemopts.logging = None; - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let runtimeopts = RuntimeOptionsBuilder::default() + .telemetry_options(telemopts) + .build() + .unwrap(); + let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); let meter = rt.telemetry().get_metric_meter().unwrap(); c.bench_function("Record with new attributes on each call", move |b| {