diff --git a/contrib/opentelemetry/tracing_interceptor.go b/contrib/opentelemetry/tracing_interceptor.go index 053b7565d..bb536e238 100644 --- a/contrib/opentelemetry/tracing_interceptor.go +++ b/contrib/opentelemetry/tracing_interceptor.go @@ -3,6 +3,7 @@ package opentelemetry import ( "context" + "errors" "fmt" "go.opentelemetry.io/otel" @@ -14,6 +15,7 @@ import ( "go.temporal.io/sdk/interceptor" "go.temporal.io/sdk/log" + "go.temporal.io/sdk/temporal" ) // DefaultTextMapPropagator is the default OpenTelemetry TextMapPropagator used @@ -196,8 +198,13 @@ func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (intercepto } } + spanKind := trace.SpanKindServer + if opts.Outbound { + spanKind = trace.SpanKindClient + } + // Create span - span := t.options.SpanStarter(ctx, t.options.Tracer, opts.Operation+":"+opts.Name, trace.WithTimestamp(opts.Time)) + span := t.options.SpanStarter(ctx, t.options.Tracer, opts.Operation+":"+opts.Name, trace.WithTimestamp(opts.Time), trace.WithSpanKind(spanKind)) // Set tags if len(opts.Tags) > 0 { @@ -241,12 +248,22 @@ type tracerSpan struct { } func (t *tracerSpan) Finish(opts *interceptor.TracerFinishSpanOptions) { - if opts.Error != nil { + if opts.Error != nil && !isBenignApplicationError(opts.Error) { t.SetStatus(codes.Error, opts.Error.Error()) } t.End() } +func isBenignApplicationError(err error) bool { + var appErr *temporal.ApplicationError + if temporal.IsApplicationError(err) { + if errors.As(err, &appErr) { + return appErr.Category() == temporal.ApplicationErrorCategoryBenign + } + } + return false +} + type textMapCarrier map[string]string func (t textMapCarrier) Get(key string) string { return t[key] } diff --git a/contrib/opentelemetry/tracing_interceptor_test.go b/contrib/opentelemetry/tracing_interceptor_test.go index 35d58c1d7..289db6fa0 100644 --- a/contrib/opentelemetry/tracing_interceptor_test.go +++ b/contrib/opentelemetry/tracing_interceptor_test.go @@ -2,8 +2,11 @@ package opentelemetry_test import ( "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.opentelemetry.io/otel/trace" @@ -11,6 +14,7 @@ import ( "go.temporal.io/sdk/contrib/opentelemetry" "go.temporal.io/sdk/interceptor" "go.temporal.io/sdk/internal/interceptortest" + "go.temporal.io/sdk/temporal" ) func TestSpanPropagation(t *testing.T) { @@ -42,3 +46,105 @@ func spanChildren(spans []sdktrace.ReadOnlySpan, parentID trace.SpanID) (ret []* } return } + +func TestSpanKind(t *testing.T) { + tests := []struct { + operation string + outbound bool + expectedKind trace.SpanKind + }{ + { + operation: "StartWorkflow", + outbound: true, + expectedKind: trace.SpanKindClient, + }, + { + operation: "RunWorkflow", + outbound: false, + expectedKind: trace.SpanKindServer, + }, + } + + for _, tt := range tests { + t.Run(tt.operation, func(t *testing.T) { + rec := tracetest.NewSpanRecorder() + tracer, err := opentelemetry.NewTracer(opentelemetry.TracerOptions{ + Tracer: sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)).Tracer(""), + }) + require.NoError(t, err) + + span, err := tracer.StartSpan(&interceptor.TracerStartSpanOptions{ + Operation: tt.operation, + Name: "test-span", + Outbound: tt.outbound, + }) + require.NoError(t, err) + + span.Finish(&interceptor.TracerFinishSpanOptions{}) + + spans := rec.Ended() + require.Equal(t, len(spans), 1) + + foundSpan := spans[0] + assert.Equal(t, tt.expectedKind, foundSpan.SpanKind(), + "Expected span kind %v but got %v for operation %s (outbound=%v)", + tt.expectedKind, foundSpan.SpanKind(), tt.operation, tt.outbound) + }) + } +} + +func TestBenignErrorSpanStatus(t *testing.T) { + tests := []struct { + name string + err error + expectError bool + expectStatus codes.Code + }{ + { + name: "benign application error should not set error status", + err: temporal.NewApplicationErrorWithOptions("benign error", "TestType", temporal.ApplicationErrorOptions{Category: temporal.ApplicationErrorCategoryBenign}), + expectError: false, + expectStatus: codes.Unset, + }, + { + name: "regular application error should set error status", + err: temporal.NewApplicationError("regular error", "TestType"), + expectError: true, + expectStatus: codes.Error, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rec := tracetest.NewSpanRecorder() + tracer, err := opentelemetry.NewTracer(opentelemetry.TracerOptions{ + Tracer: sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)).Tracer(""), + }) + require.NoError(t, err) + + span, err := tracer.StartSpan(&interceptor.TracerStartSpanOptions{ + Operation: "TestOperation", + Name: "TestSpan", + Time: time.Now(), + }) + require.NoError(t, err) + + span.Finish(&interceptor.TracerFinishSpanOptions{ + Error: tt.err, + }) + + // Check recorded spans + spans := rec.Ended() + require.Len(t, spans, 1) + + recordedSpan := spans[0] + assert.Equal(t, tt.expectStatus, recordedSpan.Status().Code) + + if tt.expectError { + assert.NotEmpty(t, recordedSpan.Status().Description) + } else { + assert.Empty(t, recordedSpan.Status().Description) + } + }) + } +} diff --git a/contrib/opentracing/interceptor.go b/contrib/opentracing/interceptor.go index fe804e4cc..4109198ce 100644 --- a/contrib/opentracing/interceptor.go +++ b/contrib/opentracing/interceptor.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "go.temporal.io/sdk/temporal" "github.com/opentracing/opentracing-go" @@ -164,9 +165,19 @@ type tracerSpanRef struct{ opentracing.SpanContext } type tracerSpan struct{ opentracing.Span } func (t *tracerSpan) Finish(opts *interceptor.TracerFinishSpanOptions) { - if opts.Error != nil { + if opts.Error != nil && !isBenignApplicationError(opts.Error) { // Standard tag that can be bridged to OpenTelemetry t.SetTag("error", "true") } t.Span.Finish() } + +func isBenignApplicationError(err error) bool { + var appErr *temporal.ApplicationError + if temporal.IsApplicationError(err) { + if errors.As(err, &appErr) { + return appErr.Category() == temporal.ApplicationErrorCategoryBenign + } + } + return false +} diff --git a/interceptor/tracing_interceptor.go b/interceptor/tracing_interceptor.go index 91fa489c0..8b5df04d4 100644 --- a/interceptor/tracing_interceptor.go +++ b/interceptor/tracing_interceptor.go @@ -126,14 +126,6 @@ type TracerStartSpanOptions struct { // Tags are a set of span tags. Tags map[string]string - // FromHeader is used internally, not by tracer implementations, to determine - // whether the parent span can be retrieved from the Temporal header. - FromHeader bool - - // ToHeader is used internally, not by tracer implementations, to determine - // whether the span should be placed on the Temporal header. - ToHeader bool - // IdempotencyKey may optionally be used by tracing implementations to generate // deterministic span IDs. // @@ -147,6 +139,14 @@ type TracerStartSpanOptions struct { // IdempotencyKey should be treated as opaque data by Tracer implementations. // Do not attempt to parse it, as the format is subject to change. IdempotencyKey string + + // Outbound is true if the span is outbound and false if it is inbound. + // + // This is used internally by the interceptor to determine whether to read + // from the header or write to the header, where outbound means the span should + // be placed on the Temporal header, and inbound means the parent span can be + // retrieved from the Temporal header. + Outbound bool } // TracerSpanRef represents a span reference such as a parent. @@ -230,8 +230,8 @@ func (t *tracingClientOutboundInterceptor) CreateSchedule(ctx context.Context, i span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{ Operation: "CreateSchedule", Name: in.Options.ID, - ToHeader: true, Time: time.Now(), + Outbound: true, }, t.root.headerReader(ctx), t.root.headerWriter(ctx)) if err != nil { return nil, err @@ -253,8 +253,8 @@ func (t *tracingClientOutboundInterceptor) ExecuteWorkflow( Operation: "StartWorkflow", Name: in.WorkflowType, Tags: map[string]string{workflowIDTagKey: in.Options.ID}, - ToHeader: true, Time: time.Now(), + Outbound: true, }, t.root.headerReader(ctx), t.root.headerWriter(ctx)) if err != nil { return nil, err @@ -277,8 +277,8 @@ func (t *tracingClientOutboundInterceptor) SignalWorkflow(ctx context.Context, i Operation: "SignalWorkflow", Name: in.SignalName, Tags: map[string]string{workflowIDTagKey: in.WorkflowID}, - ToHeader: true, Time: time.Now(), + Outbound: true, }, t.root.headerReader(ctx), t.root.headerWriter(ctx)) if err != nil { return err @@ -300,7 +300,7 @@ func (t *tracingClientOutboundInterceptor) SignalWithStartWorkflow( Operation: "SignalWithStartWorkflow", Name: in.WorkflowType, Tags: map[string]string{workflowIDTagKey: in.Options.ID}, - ToHeader: true, + Outbound: true, }, t.root.headerReader(ctx), t.root.headerWriter(ctx)) if err != nil { return nil, err @@ -326,8 +326,8 @@ func (t *tracingClientOutboundInterceptor) QueryWorkflow( Operation: "QueryWorkflow", Name: in.QueryType, Tags: map[string]string{workflowIDTagKey: in.WorkflowID}, - ToHeader: true, Time: time.Now(), + Outbound: true, }, t.root.headerReader(ctx), t.root.headerWriter(ctx)) if err != nil { return nil, err @@ -353,8 +353,8 @@ func (t *tracingClientOutboundInterceptor) UpdateWorkflow( Operation: "UpdateWorkflow", Name: in.UpdateName, Tags: map[string]string{workflowIDTagKey: in.WorkflowID}, - ToHeader: true, Time: time.Now(), + Outbound: true, }, t.root.headerReader(ctx), t.root.headerWriter(ctx)) if err != nil { return nil, err @@ -380,8 +380,8 @@ func (t *tracingClientOutboundInterceptor) UpdateWithStartWorkflow( Operation: "UpdateWithStartWorkflow", Name: in.UpdateOptions.UpdateName, Tags: map[string]string{workflowIDTagKey: in.UpdateOptions.WorkflowID, updateIDTagKey: in.UpdateOptions.UpdateID}, - ToHeader: true, Time: time.Now(), + Outbound: true, }, t.root.headerReader(ctx), t.root.headerWriter(ctx)) if err != nil { return nil, err @@ -432,8 +432,8 @@ func (t *tracingActivityInboundInterceptor) ExecuteActivity( runIDTagKey: info.WorkflowExecution.RunID, activityIDTagKey: info.ActivityID, }, - FromHeader: true, - Time: info.StartedTime, + Time: info.StartedTime, + Outbound: false, }, t.root.headerReader(ctx), t.root.headerWriter(ctx)) if err != nil { return nil, err @@ -482,9 +482,9 @@ func (t *tracingWorkflowInboundInterceptor) ExecuteWorkflow( workflowIDTagKey: t.info.WorkflowExecution.ID, runIDTagKey: t.info.WorkflowExecution.RunID, }, - FromHeader: true, Time: t.info.WorkflowStartTime, IdempotencyKey: t.newIdempotencyKey(), + Outbound: false, }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) if err != nil { return nil, err @@ -511,9 +511,9 @@ func (t *tracingWorkflowInboundInterceptor) HandleSignal(ctx workflow.Context, i workflowIDTagKey: info.WorkflowExecution.ID, runIDTagKey: info.WorkflowExecution.RunID, }, - FromHeader: true, Time: time.Now(), IdempotencyKey: t.newIdempotencyKey(), + Outbound: false, }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) if err != nil { return err @@ -543,8 +543,8 @@ func (t *tracingWorkflowInboundInterceptor) HandleQuery( workflowIDTagKey: info.WorkflowExecution.ID, runIDTagKey: info.WorkflowExecution.RunID, }, - FromHeader: true, - Time: time.Now(), + Time: time.Now(), + Outbound: false, // We intentionally do not set IdempotencyKey here because queries are not recorded in // workflow history. When the tracing interceptor's span counter is reset between workflow // replays, old queries will not be processed which could result in idempotency key @@ -580,8 +580,8 @@ func (t *tracingWorkflowInboundInterceptor) ValidateUpdate( runIDTagKey: info.WorkflowExecution.RunID, updateIDTagKey: currentUpdateInfo.ID, }, - FromHeader: true, - Time: time.Now(), + Time: time.Now(), + Outbound: false, // We intentionally do not set IdempotencyKey here because validation is not run on // replay. When the tracing interceptor's span counter is reset between workflow // replays, the validator will not be processed which could result in impotency key @@ -618,9 +618,9 @@ func (t *tracingWorkflowInboundInterceptor) ExecuteUpdate( runIDTagKey: info.WorkflowExecution.RunID, updateIDTagKey: currentUpdateInfo.ID, }, - FromHeader: true, Time: time.Now(), IdempotencyKey: t.newIdempotencyKey(), + Outbound: false, }, t.root.workflowHeaderReader(ctx), t.root.workflowHeaderWriter(ctx)) if err != nil { return nil, err @@ -794,8 +794,8 @@ func (t *tracingWorkflowOutboundInterceptor) startNonReplaySpan( workflowIDTagKey: info.WorkflowExecution.ID, runIDTagKey: info.WorkflowExecution.RunID, }, - ToHeader: true, Time: time.Now(), + Outbound: true, }, t.root.workflowHeaderReader(ctx), headerWriter) if err != nil { return nopSpan{}, ctx, workflowFutureFromErr(ctx, err) @@ -816,8 +816,8 @@ func (t *tracingNexusOperationInboundInterceptor) CancelOperation(ctx context.Co Operation: "RunCancelNexusOperationHandler", Name: info.Service + "/" + info.Operation, DependedOn: true, - FromHeader: true, Time: time.Now(), + Outbound: false, }, t.root.nexusHeaderReader(input.Options.Header), t.root.headerWriter(ctx)) if err != nil { return err @@ -838,8 +838,8 @@ func (t *tracingNexusOperationInboundInterceptor) StartOperation(ctx context.Con Operation: "RunStartNexusOperationHandler", Name: info.Service + "/" + info.Operation, DependedOn: true, - FromHeader: true, Time: time.Now(), + Outbound: false, }, t.root.nexusHeaderReader(input.Options.Header), t.root.headerWriter(ctx)) if err != nil { return nil, err @@ -888,7 +888,7 @@ func (t *tracingInterceptor) startSpan( headerWriter func(span TracerSpan) error, ) (TracerSpan, error) { // Get parent span from header if not already present and allowed - if options.Parent == nil && options.FromHeader { + if options.Parent == nil && !options.Outbound { if span, err := headerReader(); err != nil && !t.options.AllowInvalidParentSpans { return nil, err } else if span != nil { @@ -908,7 +908,7 @@ func (t *tracingInterceptor) startSpan( } // Put span in header if wanted - if options.ToHeader { + if options.Outbound { if err := headerWriter(span); err != nil { return nil, err }