Skip to content

Runtime-wide worker heartbeat #983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: worker-heartbeat
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,6 @@ pub struct WorkerConfig {

/// A versioning strategy for this worker.
pub versioning_strategy: WorkerVersioningStrategy,

/// The interval within which the worker will send a heartbeat.
/// The timer is reset on each existing RPC call that also happens to send this data, like
/// `PollWorkflowTaskQueueRequest`.
#[builder(default)]
pub heartbeat_interval: Option<Duration>,
}

impl WorkerConfig {
Expand Down
1 change: 1 addition & 0 deletions core-c-bridge/include/temporal-sdk-core-c-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ typedef struct TemporalCoreTelemetryOptions {

typedef struct TemporalCoreRuntimeOptions {
const struct TemporalCoreTelemetryOptions *telemetry;
uint64_t heartbeat_duration_millis;
} TemporalCoreRuntimeOptions;

typedef struct TemporalCoreTestServerOptions {
Expand Down
10 changes: 8 additions & 2 deletions core-c-bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::UNIX_EPOCH;
use temporal_sdk_core::CoreRuntime;
use temporal_sdk_core::RuntimeOptions as CoreRuntimeOptions;
use temporal_sdk_core::TokioRuntimeBuilder;
use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter};
use temporal_sdk_core_api::telemetry::HistogramBucketOverrides;
Expand All @@ -31,6 +32,7 @@ use url::Url;
#[repr(C)]
pub struct RuntimeOptions {
pub telemetry: *const TelemetryOptions,
pub heartbeat_duration_millis: u64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub heartbeat_duration_millis: u64,
pub worker_heartbeat_duration_millis: u64,

Pedantic, but we have multiple forms of heartbeat, so need to quality if a bit (same for the CoreRuntime option IMO)

}

#[repr(C)]
Expand Down Expand Up @@ -143,7 +145,7 @@ pub extern "C" fn temporal_core_runtime_new(options: *const RuntimeOptions) -> R
let mut runtime = Runtime {
core: Arc::new(
CoreRuntime::new(
CoreTelemetryOptions::default(),
CoreRuntimeOptions::default(),
TokioRuntimeBuilder::default(),
)
.unwrap(),
Expand Down Expand Up @@ -238,9 +240,13 @@ impl Runtime {
} else {
CoreTelemetryOptions::default()
};
let core_runtime_options = CoreRuntimeOptions::new(
telemetry_options,
Some(Duration::from_millis(options.heartbeat_duration_millis)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't 0 turn this off?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

);

// Build core runtime
let mut core = CoreRuntime::new(telemetry_options, TokioRuntimeBuilder::default())?;
let mut core = CoreRuntime::new(core_runtime_options, TokioRuntimeBuilder::default())?;

// We late-bind the metrics after core runtime is created since it needs
// the Tokio handle
Expand Down
1 change: 1 addition & 0 deletions core-c-bridge/src/tests/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl Context {

let RuntimeOrFail { runtime, fail } = temporal_core_runtime_new(&RuntimeOptions {
telemetry: std::ptr::null(),
heartbeat_duration_millis: 0,
});

if let Some(fail) = byte_array_to_string(runtime, fail) {
Expand Down
5 changes: 3 additions & 2 deletions core/benches/workflow_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
time::Duration,
};
use temporal_sdk::{WfContext, WorkflowFunction};
use temporal_sdk_core::{CoreRuntime, replay::HistoryForReplay};
use temporal_sdk_core::{CoreRuntime, RuntimeOptions, replay::HistoryForReplay};
use temporal_sdk_core_api::telemetry::metrics::{
MetricKeyValue, MetricParametersBuilder, NewAttributes,
};
Expand Down Expand Up @@ -75,7 +75,8 @@ pub fn bench_metrics(c: &mut Criterion) {
let _tokio = tokio_runtime.enter();
let (mut telemopts, addr, _aborter) = prom_metrics(None);
telemopts.logging = None;
let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap();
let runtime_opts = RuntimeOptions::new(telemopts, None);
let rt = CoreRuntime::new_assume_tokio(runtime_opts).unwrap();
let meter = rt.telemetry().get_metric_meter().unwrap();

c.bench_function("Record with new attributes on each call", move |b| {
Expand Down
2 changes: 1 addition & 1 deletion core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3258,7 +3258,7 @@ async fn both_normal_and_sticky_pollers_poll_concurrently() {
Some("stickytq".to_string()),
Arc::new(mock_client),
None,
None,
false,
);

for _ in 1..50 {
Expand Down
168 changes: 142 additions & 26 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod core_tests;
#[macro_use]
mod test_help;

use std::collections::HashMap;
pub(crate) use temporal_sdk_core_api::errors;

pub use pollers::{
Expand All @@ -47,10 +48,13 @@ pub use worker::{
WorkerConfigBuilder,
};

use crate::abstractions::dbg_panic;
use crate::worker::client::WorkerClientWithHeartbeat;
/// Expose [WorkerClient] symbols
pub use crate::worker::client::{
PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient, WorkflowTaskCompletion,
};
use crate::worker::heartbeat::{ClientIdentity, HeartbeatFn, SharedNamespaceWorker};
use crate::{
replay::{HistoryForReplay, ReplayWorkerInput},
telemetry::{
Expand All @@ -61,14 +65,17 @@ use crate::{
};
use anyhow::bail;
use futures_util::Stream;
use std::sync::{Arc, OnceLock};
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use temporal_client::{ConfiguredClient, NamespacedClient, TemporalServiceClientWithMetrics};
use temporal_sdk_core_api::{
Worker as WorkerTrait,
errors::{CompleteActivityError, PollError},
telemetry::TelemetryOptions,
};
use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
use uuid::Uuid;

/// Initialize a worker bound to a task queue.
///
Expand All @@ -89,39 +96,65 @@ pub fn init_worker<CT>(
where
CT: Into<sealed::AnyClient>,
{
let client = init_worker_client(&worker_config, *client.into().into_inner());
if client.namespace() != worker_config.namespace {
let client_inner = *client.into().into_inner();
let endpoint = client_inner.options().clone().target_url;
let client = init_worker_client(
worker_config.namespace.clone(),
worker_config.client_identity_override.clone(),
client_inner,
);
let namespace = worker_config.namespace.clone();
if client.namespace() != namespace {
bail!("Passed in client is not bound to the same namespace as the worker");
}
if client.namespace() == "" {
bail!("Client namespace cannot be empty");
}
let client_ident = client.get_identity().to_owned();
let sticky_q = sticky_q_name_for_worker(&client_ident, &worker_config);
let sticky_q = sticky_q_name_for_worker(&client_ident, worker_config.max_cached_workflows);

if client_ident.is_empty() {
bail!("Client identity cannot be empty. Either lang or user should be setting this value");
}

let heartbeat_fn = worker_config
.heartbeat_interval
.map(|_| Arc::new(OnceLock::new()));

let client_bag = Arc::new(WorkerClientBag::new(
client,
worker_config.namespace.clone(),
namespace.clone(),
client_ident,
worker_config.versioning_strategy.clone(),
heartbeat_fn.clone(),
));

Ok(Worker::new(
let mut worker = Worker::new(
worker_config,
sticky_q,
client_bag,
client_bag.clone(),
Some(&runtime.telemetry),
heartbeat_fn,
))
false,
);

if let Some(_) = runtime.heartbeat_interval {
let worker_instance_key = worker.worker_instance_key();
let heartbeat_callback = worker
.get_heartbeat_callback()
.expect("Worker heartbeat data should exist for non-shared namespace worker");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably make this a dbg panic that just skips registration if it's not set.

let remove_worker_callback = runtime.register_heartbeat_callback(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put the heartbeat stuff like registration and interval duration and such on the client instead of on runtime? What benefit are you getting by doing it at the runtime level? At least at the client level, you have the exact client to use. While it doesn't always seem like there's a use case for multiple clients to the same endpoint/namespace, there may be, especially as we get more fine-grained permissions (a client that can support a worker is not the same as a client that may be used to start a workflow, they may have different API keys on purpose).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a client that can support a worker is not the same as a client that may be used to start a workflow, they may have different API keys on purpose

Can you explain this more? Are you saying it's possible for a client to not have permissions to heartbeat to the server?

To me it feels like there isn't a need for client-level granularity of heartbeat settings. And even if there is a scenario where there's multiple clients on the same endpoint/namespace, wouldn't they be configured on different runtimes/processes? To me it feels easier to think, for this process I'm creating, i have this heartbeat duration

Copy link
Member

@cretz cretz Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this more? Are you saying it's possible for a client to not have permissions to heartbeat to the server?

I am saying you cannot know how a user configured their client for a worker, so you should not blindly assume that they will only have one client per endpoint/namespace. You should use the client that the worker is configured with to make the heartbeat, there is no reason that I can see to make this runtime level.

To me it feels like there isn't a need for client-level granularity of heartbeat settings

It's not just about granularity of settings, it's about clarity of lifetimes and associating workers to clients not to runtimes. If you want the worker heartbeat settings at a runtime level, ok (seems no good reason to though). But that is unrelated to where this shared heartbeater is, which should be the client. Is there a good reason to use runtime instead of client here for the shared namespace/heartbeating?

I can't see a reason to do anything with CoreRuntime at all for this feature that is very client and workers-on-client specific. But if we have to put these settings at the runtime level even though it's really client level, we can, though would rather not, but still the collection of workers to heartbeat can still be on the client.

worker_instance_key,
ClientIdentity {
endpoint: endpoint.to_string(),
namespace: namespace.clone(),
task_queue: format!(
"temporal-sys/worker-commands/{}/{}",
namespace,
runtime.task_queue_key()
),
},
heartbeat_callback,
client_bag,
);
worker.register_heartbeat_shutdown_callback(remove_worker_callback)
}

Ok(worker)
}

/// Create a worker for replaying one or more existing histories. It will auto-shutdown as soon as
Expand All @@ -142,11 +175,12 @@ where
}

pub(crate) fn init_worker_client(
config: &WorkerConfig,
namespace: String,
client_identity_override: Option<String>,
client: ConfiguredClient<TemporalServiceClientWithMetrics>,
) -> RetryClient<Client> {
let mut client = Client::new(client, config.namespace.clone());
if let Some(ref id_override) = config.client_identity_override {
let mut client = Client::new(client, namespace);
if let Some(ref id_override) = client_identity_override {
client.options_mut().identity.clone_from(id_override);
}
RetryClient::new(client, RetryConfig::default())
Expand All @@ -156,9 +190,9 @@ pub(crate) fn init_worker_client(
/// workflows.
pub(crate) fn sticky_q_name_for_worker(
process_identity: &str,
config: &WorkerConfig,
max_cached_workflows: usize,
) -> Option<String> {
if config.max_cached_workflows > 0 {
if max_cached_workflows > 0 {
Some(format!(
"{}-{}",
&process_identity,
Expand Down Expand Up @@ -220,6 +254,35 @@ pub struct CoreRuntime {
telemetry: TelemetryInstance,
runtime: Option<tokio::runtime::Runtime>,
runtime_handle: tokio::runtime::Handle,
shared_namespace_map: Arc<Mutex<HashMap<ClientIdentity, SharedNamespaceWorker>>>,
task_queue_key: Uuid,
heartbeat_interval: Option<Duration>,
}

/// Holds telemetry options, as well as worker heartbeat_interval.
#[non_exhaustive]
pub struct RuntimeOptions {
telemetry_options: TelemetryOptions,
heartbeat_interval: Option<Duration>,
}

impl RuntimeOptions {
/// Creates new runtime options.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is non_exhaustive it should probably use a builder pattern rather than a new

pub fn new(telemetry_options: TelemetryOptions, heartbeat_interval: Option<Duration>) -> Self {
Self {
telemetry_options,
heartbeat_interval,
}
}
}

impl Default for RuntimeOptions {
fn default() -> Self {
Self {
telemetry_options: Default::default(),
heartbeat_interval: None,
}
}
}

/// Wraps a [tokio::runtime::Builder] to allow layering multiple on_thread_start functions
Expand Down Expand Up @@ -254,13 +317,13 @@ impl CoreRuntime {
/// If a tokio runtime has already been initialized. To re-use an existing runtime, call
/// [CoreRuntime::new_assume_tokio].
pub fn new<F>(
telemetry_options: TelemetryOptions,
runtime_options: RuntimeOptions,
mut tokio_builder: TokioRuntimeBuilder<F>,
) -> Result<Self, anyhow::Error>
where
F: Fn() + Send + Sync + 'static,
{
let telemetry = telemetry_init(telemetry_options)?;
let telemetry = telemetry_init(runtime_options.telemetry_options)?;
let subscriber = telemetry.trace_subscriber();
let runtime = tokio_builder
.inner
Expand All @@ -275,7 +338,8 @@ impl CoreRuntime {
})
.build()?;
let _rg = runtime.enter();
let mut me = Self::new_assume_tokio_initialized_telem(telemetry);
let mut me =
Self::new_assume_tokio_initialized_telem(telemetry, runtime_options.heartbeat_interval);
me.runtime = Some(runtime);
Ok(me)
}
Expand All @@ -285,17 +349,23 @@ impl CoreRuntime {
///
/// # Panics
/// If there is no currently active Tokio runtime
pub fn new_assume_tokio(telemetry_options: TelemetryOptions) -> Result<Self, anyhow::Error> {
let telemetry = telemetry_init(telemetry_options)?;
Ok(Self::new_assume_tokio_initialized_telem(telemetry))
pub fn new_assume_tokio(runtime_options: RuntimeOptions) -> Result<Self, anyhow::Error> {
let telemetry = telemetry_init(runtime_options.telemetry_options)?;
Ok(Self::new_assume_tokio_initialized_telem(
telemetry,
runtime_options.heartbeat_interval,
))
}

/// Construct a runtime from an already-initialized telemetry instance, assuming a tokio runtime
/// is already active and this call exists in its context. See [Self::new] for more.
///
/// # Panics
/// If there is no currently active Tokio runtime
pub fn new_assume_tokio_initialized_telem(telemetry: TelemetryInstance) -> Self {
pub fn new_assume_tokio_initialized_telem(
telemetry: TelemetryInstance,
heartbeat_interval: Option<Duration>,
) -> Self {
let runtime_handle = tokio::runtime::Handle::current();
if let Some(sub) = telemetry.trace_subscriber() {
set_trace_subscriber_for_current_thread(sub);
Expand All @@ -304,6 +374,9 @@ impl CoreRuntime {
telemetry,
runtime: None,
runtime_handle,
shared_namespace_map: Arc::new(Mutex::new(HashMap::new())),
task_queue_key: Uuid::new_v4(),
heartbeat_interval,
}
}

Expand All @@ -321,6 +394,49 @@ impl CoreRuntime {
pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
&mut self.telemetry
}

fn register_heartbeat_callback(
&self,
worker_instance_key: String,
client_identity: ClientIdentity,
heartbeat_callback: HeartbeatFn,
client: Arc<dyn WorkerClientWithHeartbeat>,
) -> Arc<dyn Fn() + Send + Sync> {
if let Some(ref heartbeat_interval) = self.heartbeat_interval {
let mut shared_namespace_map = self.shared_namespace_map.lock();
let worker = shared_namespace_map
.entry(client_identity.clone())
Copy link
Member

@cretz cretz Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use this "client identity" mechanism IMO. Instead, the state of registered workers and the heartbeater and such should just live on the client. Is there a good reason why not and a good reason why to have a map at the runtime level and potentially not use the client the user configured the worker with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this potentially use a client that the user configured the worker with? We use the same client as is used to start the worker

Copy link
Member

@cretz cretz Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the same client as is used to start the worker

Not true, you only use the first one that matches this "identity". And if I have two workers on separate clients that happen to share the same endpoint and namespace, but maybe I've given a different set of headers to one in a certain way for my proxy? The worker heartbeat should only ever be called on the literal client the worker was registered with. That's what should hold the group of workers.

Regardless of how rare it may be to have multiple clients on the runtime to the same endpoint/namespace, is there a good reason why not to make the worker heartbeat registration/collection stuff on client and leave runtime alone? It seems like such a better lifetime management approach to put the workers related to a client on a client. Is there any good reason to do this at the runtime level?

.or_insert_with(|| {
let namespace_map = self.shared_namespace_map.clone();
let client_identity_clone = client_identity.clone();
let remove_namespace_worker_callback = Arc::new(move || {
namespace_map.lock().remove(&client_identity_clone);
});

// The first client of a ClientIdentity is the client used for the
// SharedNamespaceWorker, so we need that client's heartbeat_map to store
// heartbeat callbacks
let heartbeat_map = client.get_heartbeat_map();

SharedNamespaceWorker::new(
client,
client_identity,
heartbeat_interval.clone(),
Some(&self.telemetry),
remove_namespace_worker_callback,
heartbeat_map,
)
});
worker.register_callback(worker_instance_key, heartbeat_callback)
} else {
dbg_panic!("Worker heartbeat disabled for this runtime");
Arc::new(|| {})
}
}

fn task_queue_key(&self) -> Uuid {
self.task_queue_key.clone()
}
}

impl Drop for CoreRuntime {
Expand Down
Loading
Loading