Skip to content

Runtime-wide worker heartbeat #983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: worker-heartbeat
Choose a base branch
from

Conversation

yuandrew
Copy link
Contributor

NOTE: this targets a worker-heartbeat feature branch to merge into, that way I can make incremental progress and not hit main until the whole feature is ready for both server and SDK.

What was changed

  • Breaking change: Moved heartbeat_interval from a per-worker config setting into a per-runtime config setting.
  • WorkerHeartbeatData is now gone, HeartbeatFn callback is enough to grab heartbeat data whenever needed.
  • A new SharedNamespaceWorker is spawned with the first worker for each namespace, and will handle heartbeating for all workers in the same namespace (on the same process) via nexus.

Why?

Worker heartbeat. I separated WorkerHeartbeat out of #962. Some of the design for this is with Worker Commands in mind, but that will come at a later time.

Checklist

  1. Closes

  2. How was this tested:

fixed up worker_heartbeat unit test

  1. Any docs updates needed?

@yuandrew yuandrew requested a review from a team as a code owner August 18, 2025 22:34
last_heartbeat_time_map.insert(instance_key.clone(), now);
}
if let Err(e) = client_clone.record_worker_heartbeat(client_identity.namespace.clone(), client_identity.endpoint.clone(), hb_to_send
).await {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Heartbeat Identity Error

The record_worker_heartbeat call uses client_identity.endpoint for the identity parameter. This is incorrect; the method expects the worker's identity string, not the endpoint URL, leading to heartbeats being sent with an inaccurate client identity.

Fix in Cursor Fix in Web

// remove this worker from the Runtime map
shutdown_callback();
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Worker Premature Shutdown Due to Heartbeat Race Condition

The SharedNamespaceWorker may prematurely shut down. A race condition exists where the heartbeat loop checks for an empty heartbeat_map and exits, potentially before callbacks are registered or during concurrent registration.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this caught a race condition, although it might have just seen my TODO comment right above this mentioning the race condition.

I was thinking I could use a channel to signal once the first heartbeatFn got added, is there a cleaner/rustier way to do this?

Copy link
Member

@cretz cretz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR titled "process-wide" but actually it's "runtime-wide" it seems. But IMO it should be at the client level, where you have the connection to actually make the invocation and is the thing workers share to communicate to server.

@@ -31,6 +32,7 @@ use url::Url;
#[repr(C)]
pub struct RuntimeOptions {
pub telemetry: *const TelemetryOptions,
pub heartbeat_duration_millis: u64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub heartbeat_duration_millis: u64,
pub worker_heartbeat_duration_millis: u64,

Pedantic, but we have multiple forms of heartbeat, so need to quality if a bit (same for the CoreRuntime option IMO)

@@ -238,9 +240,13 @@ impl Runtime {
} else {
CoreTelemetryOptions::default()
};
let core_runtime_options = CoreRuntimeOptions::new(
telemetry_options,
Some(Duration::from_millis(options.heartbeat_duration_millis)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't 0 turn this off?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

let heartbeat_callback = worker
.get_heartbeat_callback()
.expect("Worker heartbeat data should exist for non-shared namespace worker");
let remove_worker_callback = runtime.register_heartbeat_callback(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put the heartbeat stuff like registration and interval duration and such on the client instead of on runtime? What benefit are you getting by doing it at the runtime level? At least at the client level, you have the exact client to use. While it doesn't always seem like there's a use case for multiple clients to the same endpoint/namespace, there may be, especially as we get more fine-grained permissions (a client that can support a worker is not the same as a client that may be used to start a workflow, they may have different API keys on purpose).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a client that can support a worker is not the same as a client that may be used to start a workflow, they may have different API keys on purpose

Can you explain this more? Are you saying it's possible for a client to not have permissions to heartbeat to the server?

To me it feels like there isn't a need for client-level granularity of heartbeat settings. And even if there is a scenario where there's multiple clients on the same endpoint/namespace, wouldn't they be configured on different runtimes/processes? To me it feels easier to think, for this process I'm creating, i have this heartbeat duration

Copy link
Member

@cretz cretz Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this more? Are you saying it's possible for a client to not have permissions to heartbeat to the server?

I am saying you cannot know how a user configured their client for a worker, so you should not blindly assume that they will only have one client per endpoint/namespace. You should use the client that the worker is configured with to make the heartbeat, there is no reason that I can see to make this runtime level.

To me it feels like there isn't a need for client-level granularity of heartbeat settings

It's not just about granularity of settings, it's about clarity of lifetimes and associating workers to clients not to runtimes. If you want the worker heartbeat settings at a runtime level, ok (seems no good reason to though). But that is unrelated to where this shared heartbeater is, which should be the client. Is there a good reason to use runtime instead of client here for the shared namespace/heartbeating?

I can't see a reason to do anything with CoreRuntime at all for this feature that is very client and workers-on-client specific. But if we have to put these settings at the runtime level even though it's really client level, we can, though would rather not, but still the collection of workers to heartbeat can still be on the client.

if let Some(ref heartbeat_interval) = self.heartbeat_interval {
let mut shared_namespace_map = self.shared_namespace_map.lock();
let worker = shared_namespace_map
.entry(client_identity.clone())
Copy link
Member

@cretz cretz Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use this "client identity" mechanism IMO. Instead, the state of registered workers and the heartbeater and such should just live on the client. Is there a good reason why not and a good reason why to have a map at the runtime level and potentially not use the client the user configured the worker with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this potentially use a client that the user configured the worker with? We use the same client as is used to start the worker

Copy link
Member

@cretz cretz Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the same client as is used to start the worker

Not true, you only use the first one that matches this "identity". And if I have two workers on separate clients that happen to share the same endpoint and namespace, but maybe I've given a different set of headers to one in a certain way for my proxy? The worker heartbeat should only ever be called on the literal client the worker was registered with. That's what should hold the group of workers.

Regardless of how rare it may be to have multiple clients on the runtime to the same endpoint/namespace, is there a good reason why not to make the worker heartbeat registration/collection stuff on client and leave runtime alone? It seems like such a better lifetime management approach to put the workers related to a client on a client. Is there any good reason to do this at the runtime level?

@yuandrew yuandrew changed the title Process-wide worker heartbeat Runtime-wide worker heartbeat Aug 19, 2025
Copy link
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the client/runtime thing, I think this is pretty preferential and an internal detail that won't matter much to users.

The thing I think that does matter is clients don't "own" workers (and in fact can't, due to circular deps). They are used when creating a worker, but the relationship is more like a worker "has" a client, and more than one worker may have the same client.

That's why I don't really want the heartbeating duration to be a client property. Right now our client options in the lang layer don't directly reference anything worker specific (they do indirectly with interceptors/plugins), and I think it should really stay that way. Specifying a heartbeating duration for a client that is never used with a worker, for example, is weird.

So, that means either this option stays on a Runtime (seems perfectly fine to me) or needs to be passed in when initting the worker, but then there's some last-write-wins problem if users use the same client with different values when passing to workers. So, more reason to put it on the Runtime.

As for literally where this map lives or not - I don't have a huge preference either way. Changing the trait that was used to get slot suppliers for eager to be more generic & used for this I can see. I'm also fine with the current setup.

Comment on lines +119 to +120
/// Adds `WorkerHeartbeatData` to the `SharedNamespaceWorker` and adds a `HeartbeatCallback` to
/// the client that Worker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstrings can use [TypeName] to make links, rather than it being in single quotes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this should describe what the return type is

mock.expect_record_worker_heartbeat()
.times(2)
.returning(move |heartbeat| {
let heartbeat_count = Arc::new(Mutex::new(0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test so it doesn't matter much, but, an atomic usize would be easier than a mutex

Comment on lines +126 to +132
/// Instance key used to identify this worker in worker heartbeating
worker_instance_key: String,
/// Collects data to send to server via worker heartbeat
worker_heartbeat_callback: Option<HeartbeatFn>,
/// Used to remove this worker from the parent map used to track this worker for
/// worker heartbeat
worker_heartbeat_shutdown_callback: OnceCell<Arc<dyn Fn() + Send + Sync>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably would be nice to wrap these up in a little struct (can just be private to this file)

worker_heartbeat: Option<WorkerHeartbeatManager>,
/// Instance key used to identify this worker in worker heartbeating
worker_instance_key: String,
/// Collects data to send to server via worker heartbeat
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should say when this isn't set

let worker_instance_key = worker.worker_instance_key();
let heartbeat_callback = worker
.get_heartbeat_callback()
.expect("Worker heartbeat data should exist for non-shared namespace worker");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably make this a dbg panic that just skips registration if it's not set.

}

impl RuntimeOptions {
/// Creates new runtime options.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is non_exhaustive it should probably use a builder pattern rather than a new

@cretz
Copy link
Member

cretz commented Aug 20, 2025

The thing I think that does matter is clients don't "own" workers (and in fact can't, due to circular deps)

Right, they are associated with them. That's the point. Same for eager workflow start. A worker is associated with a single client, not some other client in the process/runtime.

@Sushisource - to confirm, you don't believe that we need to make sure a worker's heartbeat should occur on the worker's client? I think that is much simpler to understand for those that want to understand what calls use what clients. This is important because users control lifetimes of clients and handle explicit client replacement to update auth and are the ones that control which clients are for which workers.

I still can't really see a reason not use a worker's client to make calls relating to that worker (or set of workers that share the client).

@@ -295,15 +302,18 @@ impl Worker {
// Unregister worker from current client, register in new client at the end
let mut worker_key = self.worker_key.lock();
let slot_provider = (*worker_key).and_then(|k| self.client.workers().unregister(k));
self.client
.replace_client(super::init_worker_client(&self.config, new_client));
self.client.replace_client(super::init_worker_client(
Copy link
Member

@cretz cretz Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm if I replace a worker's client, it is this new client that will be used for its successive heartbeats? For most use cases of this call, the older client's auth/mTLS is invalid or will be at some point, and if you keep using it, you'll start hitting permission denials and failures. Also, you should not assume if someone replaces a single worker's client, that it can be the replacement for all worker's clients that happen to share the same endpoint and namespace.

This is another reason that workers should use their clients to make heartbeat calls.

@Sushisource
Copy link
Member

I still can't really see a reason not use a worker's client to make calls relating to that worker (or set of workers that share the client).

Sure, I think that's fair (though the actual scenario in the current design where that wouldn't happen seems to be exceptionally rare). I maintain the config option shouldn't live in the client options, though.

@cretz
Copy link
Member

cretz commented Aug 21, 2025

I maintain the config option shouldn't live in the client options, though.

Do you believe I as a user need to make a whole new Tokio thread pool if I want to disable worker heartbeating only for some workers? Not a rhetorical question, I don't mind if the answer is "yes, should be rare".

@Sushisource
Copy link
Member

I maintain the config option shouldn't live in the client options, though.

Do you believe I as a user need to make a whole new Tokio thread pool if I want to disable worker heartbeating only for some workers? Not a rhetorical question, I don't mind if the answer is "yes, should be rare".

Technically you wouldn't even have to, right now (at least how this is exposed in Core - you might have to depending on how lang does it). You can make a new CoreRuntime without constructing a new Tokio runtime, but rather re-using an existing one.

So, we can support that. But, yeah even if we don't I'm not hugely concerned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants