Skip to content

Do not require heartbeating to be done in context #504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/Temporalio/Worker/ActivityWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ await Task.WhenAll(tsk.Start.Input.Select(p =>
(ActivityInboundInterceptor)new InboundImpl(),
(v, impl) => impl.InterceptActivity(v));
// Initialize with outbound
inbound.Init(new OutboundImpl(this));
inbound.Init(new OutboundImpl(this, act.Context.TaskToken));

// Execute and put result on completed
var result = await inbound.ExecuteActivityAsync(new(
Expand Down Expand Up @@ -781,18 +781,23 @@ public override void Init(ActivityOutboundInterceptor outbound)
internal class OutboundImpl : ActivityOutboundInterceptor
{
private readonly ActivityWorker worker;
private readonly ByteString taskToken;

/// <summary>
/// Initializes a new instance of the <see cref="OutboundImpl"/> class.
/// </summary>
/// <param name="worker">Activity worker.</param>
public OutboundImpl(ActivityWorker worker) => this.worker = worker;
/// <param name="taskToken">Activity task token.</param>
public OutboundImpl(ActivityWorker worker, ByteString taskToken)
{
this.worker = worker;
this.taskToken = taskToken;
}

/// <inheritdoc />
public override void Heartbeat(HeartbeatInput input)
{
if (worker.runningActivities.TryGetValue(
ActivityExecutionContext.Current.TaskToken, out var act))
if (worker.runningActivities.TryGetValue(taskToken, out var act))
{
act.Heartbeat(worker.worker, input.Details);
}
Expand Down
35 changes: 35 additions & 0 deletions tests/Temporalio.Tests/Worker/ActivityWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,39 @@ public async Task ExecuteActivityAsync_UseTemporalClient_Succeeds()
await ExecuteAsyncActivityAsync(UseTemporalClientActivity));
}

[Fact]
public async Task ExecuteActivityAsync_BackgroundThreadHeartbeat_Received()
{
using var heartbeatStartEvent = new AutoResetEvent(false);
using var heartbeatDoneEvent = new AutoResetEvent(false);
ActivityExecutionContext? context = null;
var heartbeadThread = new Thread(() =>
{
heartbeatStartEvent.WaitOne();
Assert.False(ActivityExecutionContext.HasCurrent);
context!.Heartbeat("Heartbeat details");
heartbeatDoneEvent.Set();
});
heartbeadThread.Start();

[Activity]
async Task<string> BackgroundThreadHeartbeat()
{
context = ActivityExecutionContext.Current;
if (context.Info.Attempt == 1)
{
heartbeatStartEvent.Set();
heartbeatDoneEvent.WaitOne();
throw new InvalidOperationException("Failing first attempt");
}
return (string)context.PayloadConverter.ToValue(context.Info.HeartbeatDetails.Single(), typeof(string))!;
}

var result = await ExecuteActivityAsync(BackgroundThreadHeartbeat, maxAttempts: 2);
heartbeadThread.Join();
Assert.Equal("Heartbeat details", result);
}

internal async Task ExecuteActivityAsync(
Action activity)
{
Expand Down Expand Up @@ -881,13 +914,15 @@ internal Task<TResult> ExecuteActivityAsync<TResult>(
Func<WorkflowHandle, Task>? afterStarted = null,
bool waitForCancellation = false,
TimeSpan? heartbeatTimeout = null,
int? maxAttempts = null,
CancellationToken workerStoppingToken = default)
{
return ExecuteActivityInternalAsync<TResult>(
activity,
afterStarted: afterStarted,
workerStoppingToken: workerStoppingToken,
waitForCancellation: waitForCancellation,
maxAttempts: maxAttempts,
heartbeatTimeout: heartbeatTimeout);
}

Expand Down