diff --git a/client/build.gradle b/client/build.gradle index bab0b18..e4233e7 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -17,8 +17,18 @@ archivesBaseName = 'durabletask-client' def grpcVersion = '1.69.0' def protocVersion = '3.25.5' def jacksonVersion = '2.15.3' + +def otelVersion = '1.51.0' +def micrometerVersion = '1.5.1' + +// When build on local, you need to set this value to your local jdk11 directory. +// Java11 is used to compile and run all the tests. +// Example for Windows: C:/Program Files/Java/openjdk-11.0.12_7/ +def PATH_TO_TEST_JAVA_RUNTIME = System.env.JDK_11 ?: System.getProperty("java.home") + // Java 11 is now the minimum required version for both compilation and testing + dependencies { // https://github.com/grpc/grpc-java#download @@ -27,6 +37,9 @@ dependencies { implementation 'com.google.protobuf:protobuf-java:3.25.5' runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}" + implementation "io.opentelemetry:opentelemetry-api:${otelVersion}" + implementation "io.opentelemetry:opentelemetry-context:${otelVersion}" + compileOnly "org.apache.tomcat:annotations-api:6.0.53" // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java b/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java index 3136d80..a69c8da 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package io.dapr.durabletask; +import io.opentelemetry.context.Context; import javax.annotation.Nullable; import java.time.Duration; import java.util.concurrent.TimeoutException; @@ -38,7 +39,7 @@ public void close() { * @return the randomly-generated instance ID of the scheduled orchestration instance */ public String scheduleNewOrchestrationInstance(String orchestratorName) { - return this.scheduleNewOrchestrationInstance(orchestratorName, null, null); + return this.scheduleNewOrchestrationInstance(orchestratorName, null, null, null); } /** @@ -67,6 +68,19 @@ public String scheduleNewOrchestrationInstance(String orchestratorName, Object i return this.scheduleNewOrchestrationInstance(orchestratorName, options); } + public String scheduleNewOrchestrationInstance( + String orchestratorName, + Object input, String instanceId, Context context){ + NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions() + .setInput(input) + .setInstanceId(instanceId); + return this.scheduleNewOrchestrationInstance(orchestratorName, options, context); + } + + public abstract String scheduleNewOrchestrationInstance( + String orchestratorName, + NewOrchestrationInstanceOptions options, Context context); + /** * Schedules a new orchestration instance with a specified set of options for execution. * @@ -98,6 +112,24 @@ public void raiseEvent(String instanceId, String eventName) { this.raiseEvent(instanceId, eventName, null); } + /** + * Sends an event notification message to a waiting orchestration instance. + *

+ * In order to handle the event, the target orchestration instance must be waiting for an event named + * eventName using the {@link TaskOrchestrationContext#waitForExternalEvent(String)} method. + * If the target orchestration instance is not yet waiting for an event named eventName, + * then the event will be saved in the orchestration instance state and dispatched immediately when the + * orchestrator calls {@link TaskOrchestrationContext#waitForExternalEvent(String)}. This event saving occurs even + * if the orchestrator has canceled its wait operation before the event was received. + *

+ * Raised events for a completed or non-existent orchestration instance will be silently discarded. + * + * @param instanceId the ID of the orchestration instance that will handle the event + * @param eventName the case-insensitive name of the event + * @param context Otel context for trace propagation. + */ + public abstract void raiseEvent(String instanceId, String eventName, @Nullable Object eventPayload, Context context); + /** * Sends an event notification message with a payload to a waiting orchestration instance. *

diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java index 0ec0291..b174805 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java @@ -8,10 +8,14 @@ import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*; + +import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors; import io.grpc.*; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.opentelemetry.context.Context; + import java.io.FileInputStream; import java.io.InputStream; @@ -38,6 +42,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { private final DataConverter dataConverter; private final ManagedChannel managedSidecarChannel; private final TaskHubSidecarServiceBlockingStub sidecarClient; + private final DaprWorkflowClientGrpcInterceptors interceptors; DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); @@ -112,6 +117,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { } this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); + this.interceptors = builder.interceptors; } /** @@ -133,6 +139,53 @@ public void close() { } } + /* + * @salaboy TODO: refactor to avoid duplicated code + */ + @Override + public String scheduleNewOrchestrationInstance( + String orchestratorName, + NewOrchestrationInstanceOptions options, Context context) { + if (orchestratorName == null || orchestratorName.length() == 0) { + throw new IllegalArgumentException("A non-empty orchestrator name must be specified."); + } + + Helpers.throwIfArgumentNull(options, "options"); + + CreateInstanceRequest.Builder builder = CreateInstanceRequest.newBuilder(); + builder.setName(orchestratorName); + + String instanceId = options.getInstanceId(); + if (instanceId == null) { + instanceId = UUID.randomUUID().toString(); + } + builder.setInstanceId(instanceId); + + String version = options.getVersion(); + if (version != null) { + builder.setVersion(StringValue.of(version)); + } + + Object input = options.getInput(); + if (input != null) { + String serializedInput = this.dataConverter.serialize(input); + builder.setInput(StringValue.of(serializedInput)); + } + + Instant startTime = options.getStartTime(); + if (startTime != null) { + Timestamp ts = DataConverter.getTimestampFromInstant(startTime); + builder.setScheduledStartTimestamp(ts); + } + + CreateInstanceRequest request = builder.build(); + + CreateInstanceResponse response = interceptors.intercept(this.sidecarClient, context) + .startInstance(request); + return response.getInstanceId(); + + } + @Override public String scheduleNewOrchestrationInstance( String orchestratorName, @@ -170,12 +223,25 @@ public String scheduleNewOrchestrationInstance( } CreateInstanceRequest request = builder.build(); - CreateInstanceResponse response = this.sidecarClient.startInstance(request); + + CreateInstanceResponse response = interceptors.intercept(this.sidecarClient) + .startInstance(request); return response.getInstanceId(); } @Override public void raiseEvent(String instanceId, String eventName, Object eventPayload) { + RaiseEventRequest request = raiseEventRequest(instanceId, eventName, eventPayload); + this.sidecarClient.raiseEvent(request); + } + + @Override + public void raiseEvent(String instanceId, String eventName, Object eventPayload, Context context) { + RaiseEventRequest request = raiseEventRequest(instanceId, eventName, eventPayload); + interceptors.intercept(this.sidecarClient, context).raiseEvent(request); + } + + private RaiseEventRequest raiseEventRequest(String instanceId, String eventName, Object eventPayload){ Helpers.throwIfArgumentNull(instanceId, "instanceId"); Helpers.throwIfArgumentNull(eventName, "eventName"); @@ -186,11 +252,13 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload) String serializedPayload = this.dataConverter.serialize(eventPayload); builder.setInput(StringValue.of(serializedPayload)); } - - RaiseEventRequest request = builder.build(); - this.sidecarClient.raiseEvent(request); + return builder.build(); } + + + + @Override public OrchestrationMetadata getInstanceMetadata(String instanceId, boolean getInputsAndOutputs) { GetInstanceRequest request = GetInstanceRequest.newBuilder() @@ -295,7 +363,7 @@ private OrchestrationStatusQueryResult toQueryResult(QueryInstancesResponse quer @Override public void createTaskHub(boolean recreateIfExists) { - this.sidecarClient.createTaskHub(CreateTaskHubRequest.newBuilder().setRecreateIfExists(recreateIfExists).build()); + interceptors.intercept(this.sidecarClient).createTaskHub(CreateTaskHubRequest.newBuilder().setRecreateIfExists(recreateIfExists).build()); } @Override diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java index bac1784..b2cca5f 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java @@ -2,7 +2,12 @@ // Licensed under the MIT License. package io.dapr.durabletask; +import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors; import io.grpc.Channel; +import io.grpc.ClientInterceptor; + +import java.util.Arrays; +import java.util.List; /** * Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process @@ -16,6 +21,7 @@ public final class DurableTaskGrpcClientBuilder { String tlsCertPath; String tlsKeyPath; boolean insecure; + DaprWorkflowClientGrpcInterceptors interceptors; /** * Sets the {@link DataConverter} to use for converting serializable data payloads. @@ -106,6 +112,11 @@ public DurableTaskGrpcClientBuilder insecure(boolean insecure) { return this; } + public DurableTaskGrpcClientBuilder interceptor(DaprWorkflowClientGrpcInterceptors interceptors){ + this.interceptors = interceptors; + return this; + } + /** * Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object. * @return a new {@link DurableTaskClient} object diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index d9e3bc6..f261b71 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -9,8 +9,14 @@ import io.dapr.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*; + +import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors; import io.grpc.*; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; + +import javax.annotation.Nullable; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutorService; @@ -19,6 +25,7 @@ import java.util.logging.Level; import java.util.logging.Logger; + /** * Task hub worker that connects to a sidecar process over gRPC to execute * orchestrator and activity events. @@ -42,6 +49,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final boolean isExecutorServiceManaged; private volatile boolean isNormalShutdown = false; private Thread workerThread; + private DaprWorkflowClientGrpcInterceptors interceptors; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { this.orchestrationFactories.putAll(builder.orchestrationFactories); @@ -69,10 +77,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { } this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); + this.interceptors = builder.interceptors; this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; - this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); + this.workerPool = Context.taskWrapping(builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool()); this.isExecutorServiceManaged = builder.executorService == null; } @@ -140,7 +149,7 @@ public void startAndBlock() { while (true) { try { GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build(); - Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); + Iterator workItemStream = interceptors.intercept(this.sidecarClient, Context.root()).getWorkItems(getWorkItemsRequest); while (workItemStream.hasNext()) { WorkItem workItem = workItemStream.next(); RequestCase requestType = workItem.getRequestCase(); @@ -164,10 +173,13 @@ public void startAndBlock() { .build(); try { + interceptors.intercept(this.sidecarClient, Context.root()).completeOrchestratorTask(response); + this.sidecarClient.completeOrchestratorTask(response); logger.log(Level.FINEST, "Completed orchestrator request for instance: {0}", orchestratorRequest.getInstanceId()); + } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { logger.log(Level.WARNING, @@ -191,7 +203,9 @@ public void startAndBlock() { activityRequest.getName(), activityRequest.getOrchestrationInstance().getInstanceId())); + // TODO: Error handling + this.workerPool.submit(() -> { String output = null; TaskFailureDetails failureDetails = null; @@ -200,7 +214,8 @@ public void startAndBlock() { activityRequest.getName(), activityRequest.getInput().getValue(), activityRequest.getTaskExecutionId(), - activityRequest.getTaskId()); + activityRequest.getTaskId(), + activityRequest.getParentTraceId()); } catch (Throwable e) { failureDetails = TaskFailureDetails.newBuilder() .setErrorType(e.getClass().getName()) @@ -223,7 +238,11 @@ public void startAndBlock() { } try { - this.sidecarClient.completeActivityTask(responseBuilder.build()); + System.out.println(activityRequest); + + + Context activityContext = Context.current().with(ContextKey.named("traceparent"), activityRequest.getParentTraceContext().getTraceParent()); + interceptors.intercept(this.sidecarClient, activityContext).completeActivityTask(responseBuilder.build()); } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { logger.log(Level.WARNING, diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java index 8ef1286..6b1ebb6 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -1,153 +1,161 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package io.dapr.durabletask; - -import io.grpc.Channel; - -import java.time.Duration; -import java.util.HashMap; -import java.util.concurrent.ExecutorService; - - -/** - * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. - */ -public final class DurableTaskGrpcWorkerBuilder { - final HashMap orchestrationFactories = new HashMap<>(); - final HashMap activityFactories = new HashMap<>(); - int port; - Channel channel; - DataConverter dataConverter; - Duration maximumTimerInterval; - ExecutorService executorService; - String appId; // App ID for cross-app routing - - /** - * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. - * - * @param factory an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker} - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) { - String key = factory.getName(); - if (key == null || key.length() == 0) { - throw new IllegalArgumentException("A non-empty task orchestration name is required."); - } - - if (this.orchestrationFactories.containsKey(key)) { - throw new IllegalArgumentException( - String.format("A task orchestration factory named %s is already registered.", key)); - } - - this.orchestrationFactories.put(key, factory); - return this; - } - - /** - * Adds an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}. - * - * @param factory an activity factory to be used by the constructed {@link DurableTaskGrpcWorker} - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) { - // TODO: Input validation - String key = factory.getName(); - if (key == null || key.length() == 0) { - throw new IllegalArgumentException("A non-empty task activity name is required."); - } - - if (this.activityFactories.containsKey(key)) { - throw new IllegalArgumentException( - String.format("A task activity factory named %s is already registered.", key)); - } - - this.activityFactories.put(key, factory); - return this; - } - - /** - * Sets the gRPC channel to use for communicating with the sidecar process. - *

- * This builder method allows you to provide your own gRPC channel for communicating with the Durable Task sidecar - * endpoint. Channels provided using this method won't be closed when the worker is closed. - * Rather, the caller remains responsible for shutting down the channel after disposing the worker. - *

- * If not specified, a gRPC channel will be created automatically for each constructed - * {@link DurableTaskGrpcWorker}. - * - * @param channel the gRPC channel to use - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder grpcChannel(Channel channel) { - this.channel = channel; - return this; - } - - /** - * Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used. - * - * @param port the gRPC endpoint port to connect to - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder port(int port) { - this.port = port; - return this; - } - - /** - * Sets the {@link DataConverter} to use for converting serializable data payloads. - * - * @param dataConverter the {@link DataConverter} to use for converting serializable data payloads - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder dataConverter(DataConverter dataConverter) { - this.dataConverter = dataConverter; - return this; - } - - /** - * Sets the maximum timer interval. If not specified, the default maximum timer interval duration will be used. - * The default maximum timer interval duration is 3 days. - * - * @param maximumTimerInterval the maximum timer interval - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerInterval) { - this.maximumTimerInterval = maximumTimerInterval; - return this; - } - - /** - * Sets the executor service that will be used to execute threads. - * - * @param executorService {@link ExecutorService}. - * @return this builder object. - */ - public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executorService) { - this.executorService = executorService; - return this; - } - - /** - * Sets the app ID for cross-app workflow routing. - *

- * This app ID is used to identify this worker in cross-app routing scenarios. - * It should match the app ID configured in the Dapr sidecar. - *

- * - * @param appId the app ID for this worker - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder appId(String appId) { - this.appId = appId; - return this; - } - - /** - * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. - * @return a new {@link DurableTaskGrpcWorker} object - */ - public DurableTaskGrpcWorker build() { - return new DurableTaskGrpcWorker(this); - } -} +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package io.dapr.durabletask; + +import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors; +import io.grpc.Channel; +import io.opentelemetry.context.Context; + +import java.time.Duration; +import java.util.HashMap; +import java.util.concurrent.ExecutorService; + + +/** + * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. + */ +public final class DurableTaskGrpcWorkerBuilder { + final HashMap orchestrationFactories = new HashMap<>(); + final HashMap activityFactories = new HashMap<>(); + int port; + Channel channel; + DataConverter dataConverter; + Duration maximumTimerInterval; + ExecutorService executorService; + String appId; // App ID for cross-app routing + DaprWorkflowClientGrpcInterceptors interceptors; + + /** + * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. + * + * @param factory an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker} + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) { + String key = factory.getName(); + if (key == null || key.length() == 0) { + throw new IllegalArgumentException("A non-empty task orchestration name is required."); + } + + if (this.orchestrationFactories.containsKey(key)) { + throw new IllegalArgumentException( + String.format("A task orchestration factory named %s is already registered.", key)); + } + + this.orchestrationFactories.put(key, factory); + return this; + } + + /** + * Adds an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}. + * + * @param factory an activity factory to be used by the constructed {@link DurableTaskGrpcWorker} + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) { + // TODO: Input validation + String key = factory.getName(); + if (key == null || key.length() == 0) { + throw new IllegalArgumentException("A non-empty task activity name is required."); + } + + if (this.activityFactories.containsKey(key)) { + throw new IllegalArgumentException( + String.format("A task activity factory named %s is already registered.", key)); + } + + this.activityFactories.put(key, factory); + return this; + } + + /** + * Sets the gRPC channel to use for communicating with the sidecar process. + *

+ * This builder method allows you to provide your own gRPC channel for communicating with the Durable Task sidecar + * endpoint. Channels provided using this method won't be closed when the worker is closed. + * Rather, the caller remains responsible for shutting down the channel after disposing the worker. + *

+ * If not specified, a gRPC channel will be created automatically for each constructed + * {@link DurableTaskGrpcWorker}. + * + * @param channel the gRPC channel to use + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder grpcChannel(Channel channel) { + this.channel = channel; + return this; + } + + /** + * Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used. + * + * @param port the gRPC endpoint port to connect to + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder port(int port) { + this.port = port; + return this; + } + + /** + * Sets the {@link DataConverter} to use for converting serializable data payloads. + * + * @param dataConverter the {@link DataConverter} to use for converting serializable data payloads + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder dataConverter(DataConverter dataConverter) { + this.dataConverter = dataConverter; + return this; + } + + /** + * Sets the maximum timer interval. If not specified, the default maximum timer interval duration will be used. + * The default maximum timer interval duration is 3 days. + * + * @param maximumTimerInterval the maximum timer interval + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerInterval) { + this.maximumTimerInterval = maximumTimerInterval; + return this; + } + + /** + * Sets the executor service that will be used to execute threads. + * + * @param executorService {@link ExecutorService}. + * @return this builder object. + */ + public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + + /** + * Sets the app ID for cross-app workflow routing. + *

+ * This app ID is used to identify this worker in cross-app routing scenarios. + * It should match the app ID configured in the Dapr sidecar. + *

+ * + * @param appId the app ID for this worker + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder appId(String appId) { + this.appId = appId; + return this; + } + + public DurableTaskGrpcWorkerBuilder interceptors(DaprWorkflowClientGrpcInterceptors interceptors){ + this.interceptors = interceptors; + return this; + } + + /** + * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. + * @return a new {@link DurableTaskGrpcWorker} object + */ + public DurableTaskGrpcWorker build() { + return new DurableTaskGrpcWorker(this); + } +} diff --git a/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java b/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java index 316fb52..f741877 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java +++ b/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java @@ -34,4 +34,11 @@ public interface TaskActivityContext { * @return the task id of the current task activity */ int getTaskId(); + + /** + * Get the task parent trace id for Otel trace propagation. + * @return the task parent traceId + * + */ + String getParentTraceId(); } diff --git a/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java index a751319..541b20f 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java @@ -19,7 +19,7 @@ public TaskActivityExecutor( this.logger = logger; } - public String execute(String taskName, String input, String taskExecutionId, int taskId) throws Throwable { + public String execute(String taskName, String input, String taskExecutionId, int taskId, String parentTraceId) throws Throwable { TaskActivityFactory factory = this.activityFactories.get(taskName); if (factory == null) { throw new IllegalStateException( @@ -32,7 +32,7 @@ public String execute(String taskName, String input, String taskExecutionId, int String.format("The task factory '%s' returned a null TaskActivity object.", taskName)); } - TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, taskId); + TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, taskId, parentTraceId); // Unhandled exceptions are allowed to escape Object output = activity.run(context); @@ -48,14 +48,16 @@ private class TaskActivityContextImpl implements TaskActivityContext { private final String rawInput; private final String taskExecutionId; private final int taskId; + private final String parentTraceId; private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter; - public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId, int taskId) { + public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId, int taskId, String parentTraceId) { this.name = activityName; this.rawInput = rawInput; this.taskExecutionId = taskExecutionId; this.taskId = taskId; + this.parentTraceId = parentTraceId; } @Override @@ -81,5 +83,10 @@ public String getTaskExecutionId() { public int getTaskId() { return this.taskId; } + + @Override + public String getParentTraceId() { + return this.parentTraceId; + } } } diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/BigendianEncoding.java b/client/src/main/java/io/dapr/durabletask/interceptors/BigendianEncoding.java new file mode 100644 index 0000000..ddb3ec5 --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/BigendianEncoding.java @@ -0,0 +1,164 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + +import java.util.Arrays; + +/** + * Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/BigendianEncoding.java + */ +final class BigendianEncoding { + + static final int LONG_BYTES = Long.SIZE / Byte.SIZE; + + static final int BYTE_BASE16 = 2; + + static final int LONG_BASE16 = BYTE_BASE16 * LONG_BYTES; + + private static final String ALPHABET = "0123456789abcdef"; + + private static final int ASCII_CHARACTERS = 128; + + private static final char[] ENCODING = buildEncodingArray(); + + private static final byte[] DECODING = buildDecodingArray(); + + private static char[] buildEncodingArray() { + char[] encoding = new char[512]; + for (int i = 0; i < 256; ++i) { + encoding[i] = ALPHABET.charAt(i >>> 4); + encoding[i | 0x100] = ALPHABET.charAt(i & 0xF); + } + return encoding; + } + + private static byte[] buildDecodingArray() { + byte[] decoding = new byte[ASCII_CHARACTERS]; + Arrays.fill(decoding, (byte) -1); + for (int i = 0; i < ALPHABET.length(); i++) { + char c = ALPHABET.charAt(i); + decoding[c] = (byte) i; + } + return decoding; + } + + /** + * Returns the {@code long} value whose big-endian representation is stored in the first 8 bytes + * of {@code bytes} starting from the {@code offset}. + * + * @param bytes the byte array representation of the {@code long}. + * @param offset the starting offset in the byte array. + * @return the {@code long} value whose big-endian representation is given. + * @throws IllegalArgumentException if {@code bytes} has fewer than 8 elements. + */ + static long longFromByteArray(byte[] bytes, int offset) { + Utils.checkArgument(bytes.length >= offset + LONG_BYTES, "array too small"); + return (bytes[offset] & 0xFFL) << 56 + | (bytes[offset + 1] & 0xFFL) << 48 + | (bytes[offset + 2] & 0xFFL) << 40 + | (bytes[offset + 3] & 0xFFL) << 32 + | (bytes[offset + 4] & 0xFFL) << 24 + | (bytes[offset + 5] & 0xFFL) << 16 + | (bytes[offset + 6] & 0xFFL) << 8 + | (bytes[offset + 7] & 0xFFL); + } + + /** + * Stores the big-endian representation of {@code value} in the {@code dest} starting from the + * {@code destOffset}. + * + * @param value the value to be converted. + * @param dest the destination byte array. + * @param destOffset the starting offset in the destination byte array. + */ + static void longToByteArray(long value, byte[] dest, int destOffset) { + Utils.checkArgument(dest.length >= destOffset + LONG_BYTES, "array too small"); + dest[destOffset + 7] = (byte) (value & 0xFFL); + dest[destOffset + 6] = (byte) (value >> 8 & 0xFFL); + dest[destOffset + 5] = (byte) (value >> 16 & 0xFFL); + dest[destOffset + 4] = (byte) (value >> 24 & 0xFFL); + dest[destOffset + 3] = (byte) (value >> 32 & 0xFFL); + dest[destOffset + 2] = (byte) (value >> 40 & 0xFFL); + dest[destOffset + 1] = (byte) (value >> 48 & 0xFFL); + dest[destOffset] = (byte) (value >> 56 & 0xFFL); + } + + /** + * Returns the {@code long} value whose base16 representation is stored in the first 16 chars of + * {@code chars} starting from the {@code offset}. + * + * @param chars the base16 representation of the {@code long}. + * @param offset the starting offset in the {@code CharSequence}. + */ + static long longFromBase16String(CharSequence chars, int offset) { + Utils.checkArgument(chars.length() >= offset + LONG_BASE16, "chars too small"); + return (decodeByte(chars.charAt(offset), chars.charAt(offset + 1)) & 0xFFL) << 56 + | (decodeByte(chars.charAt(offset + 2), chars.charAt(offset + 3)) & 0xFFL) << 48 + | (decodeByte(chars.charAt(offset + 4), chars.charAt(offset + 5)) & 0xFFL) << 40 + | (decodeByte(chars.charAt(offset + 6), chars.charAt(offset + 7)) & 0xFFL) << 32 + | (decodeByte(chars.charAt(offset + 8), chars.charAt(offset + 9)) & 0xFFL) << 24 + | (decodeByte(chars.charAt(offset + 10), chars.charAt(offset + 11)) & 0xFFL) << 16 + | (decodeByte(chars.charAt(offset + 12), chars.charAt(offset + 13)) & 0xFFL) << 8 + | (decodeByte(chars.charAt(offset + 14), chars.charAt(offset + 15)) & 0xFFL); + } + + /** + * Appends the base16 encoding of the specified {@code value} to the {@code dest}. + * + * @param value the value to be converted. + * @param dest the destination char array. + * @param destOffset the starting offset in the destination char array. + */ + static void longToBase16String(long value, char[] dest, int destOffset) { + byteToBase16((byte) (value >> 56 & 0xFFL), dest, destOffset); + byteToBase16((byte) (value >> 48 & 0xFFL), dest, destOffset + BYTE_BASE16); + byteToBase16((byte) (value >> 40 & 0xFFL), dest, destOffset + 2 * BYTE_BASE16); + byteToBase16((byte) (value >> 32 & 0xFFL), dest, destOffset + 3 * BYTE_BASE16); + byteToBase16((byte) (value >> 24 & 0xFFL), dest, destOffset + 4 * BYTE_BASE16); + byteToBase16((byte) (value >> 16 & 0xFFL), dest, destOffset + 5 * BYTE_BASE16); + byteToBase16((byte) (value >> 8 & 0xFFL), dest, destOffset + 6 * BYTE_BASE16); + byteToBase16((byte) (value & 0xFFL), dest, destOffset + 7 * BYTE_BASE16); + } + + /** + * Decodes the specified two character sequence, and returns the resulting {@code byte}. + * + * @param chars the character sequence to be decoded. + * @param offset the starting offset in the {@code CharSequence}. + * @return the resulting {@code byte} + * @throws IllegalArgumentException if the input is not a valid encoded string according to this + * encoding. + */ + static byte byteFromBase16String(CharSequence chars, int offset) { + Utils.checkArgument(chars.length() >= offset + 2, "chars too small"); + return decodeByte(chars.charAt(offset), chars.charAt(offset + 1)); + } + + private static byte decodeByte(char hi, char lo) { + Utils.checkArgument(lo < ASCII_CHARACTERS && DECODING[lo] != -1, "invalid character " + lo); + Utils.checkArgument(hi < ASCII_CHARACTERS && DECODING[hi] != -1, "invalid character " + hi); + int decoded = DECODING[hi] << 4 | DECODING[lo]; + return (byte) decoded; + } + + private static void byteToBase16(byte value, char[] dest, int destOffset) { + int b = value & 0xFF; + dest[destOffset] = ENCODING[b]; + dest[destOffset + 1] = ENCODING[b | 0x100]; + } + + private BigendianEncoding() { + } +} diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/BinaryFormatImpl.java b/client/src/main/java/io/dapr/durabletask/interceptors/BinaryFormatImpl.java new file mode 100644 index 0000000..eea265b --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/BinaryFormatImpl.java @@ -0,0 +1,81 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/impl_core/src/main/java/io/opencensus/ + * implcore/trace/propagation/BinaryFormatImpl.java + */ +final class BinaryFormatImpl { + + private static final byte VERSION_ID = 0; + + private static final int VERSION_ID_OFFSET = 0; + + // The version_id/field_id size in bytes. + private static final byte ID_SIZE = 1; + + private static final byte TRACE_ID_FIELD_ID = 0; + + // TODO: clarify if offsets are correct here. While the specification suggests you should stop + // parsing when you hit an unknown field, it does not suggest that fields must be declared in + // ID order. Rather it only groups by data type order, in this case Trace Context + // https://github.com/census-instrumentation/opencensus-specs/blob/master/encodings/BinaryEncoding + // .md#deserialization-rules + private static final int TRACE_ID_FIELD_ID_OFFSET = VERSION_ID_OFFSET + ID_SIZE; + + private static final int TRACE_ID_OFFSET = TRACE_ID_FIELD_ID_OFFSET + ID_SIZE; + + private static final byte SPAN_ID_FIELD_ID = 1; + + private static final int SPAN_ID_FIELD_ID_OFFSET = TRACE_ID_OFFSET + TraceId.SIZE; + + private static final int SPAN_ID_OFFSET = SPAN_ID_FIELD_ID_OFFSET + ID_SIZE; + + private static final byte TRACE_OPTION_FIELD_ID = 2; + + private static final int TRACE_OPTION_FIELD_ID_OFFSET = SPAN_ID_OFFSET + SpanId.SIZE; + + private static final int TRACE_OPTIONS_OFFSET = TRACE_OPTION_FIELD_ID_OFFSET + ID_SIZE; + + /** + * Version, Trace and Span IDs are required fields. + */ + private static final int REQUIRED_FORMAT_LENGTH = 3 * ID_SIZE + TraceId.SIZE + SpanId.SIZE; + + private static final int ALL_FORMAT_LENGTH = REQUIRED_FORMAT_LENGTH + ID_SIZE + TraceOptions.SIZE; + + /** + * Generates the byte array for a span context. + * @param spanContext OpenCensus' span context. + * @return byte array for span context. + */ + byte[] toByteArray(SpanContext spanContext) { + checkNotNull(spanContext, "spanContext"); + byte[] bytes = new byte[ALL_FORMAT_LENGTH]; + bytes[VERSION_ID_OFFSET] = VERSION_ID; + bytes[TRACE_ID_FIELD_ID_OFFSET] = TRACE_ID_FIELD_ID; + spanContext.getTraceId().copyBytesTo(bytes, TRACE_ID_OFFSET); + bytes[SPAN_ID_FIELD_ID_OFFSET] = SPAN_ID_FIELD_ID; + spanContext.getSpanId().copyBytesTo(bytes, SPAN_ID_OFFSET); + bytes[TRACE_OPTION_FIELD_ID_OFFSET] = TRACE_OPTION_FIELD_ID; + spanContext.getTraceOptions().copyBytesTo(bytes, TRACE_OPTIONS_OFFSET); + return bytes; + } + +} diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/DaprWorkflowClientGrpcInterceptors.java b/client/src/main/java/io/dapr/durabletask/interceptors/DaprWorkflowClientGrpcInterceptors.java new file mode 100644 index 0000000..4a13f80 --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/DaprWorkflowClientGrpcInterceptors.java @@ -0,0 +1,64 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + +import io.grpc.stub.AbstractStub; +import io.opentelemetry.context.Context; + +/** + * Class to be used as part of your service's client stub interceptor. + * Usage: myClientStub = DaprWorkflowClientGrpcInterceptors.intercept(myClientStub); + */ +public class DaprWorkflowClientGrpcInterceptors { + + + /** + * Instantiates a holder of all gRPC interceptors. + */ + public DaprWorkflowClientGrpcInterceptors() { + + } + + + + /** + * Adds all Dapr interceptors to a gRPC async stub. + * @param client gRPC client + * @param async client type + * @return async client instance with interceptors + */ + public > T intercept(final T client) { + return intercept(client, null); + } + + + /** + * Adds all Dapr interceptors to a gRPC async stub. + * @param client gRPC client + * @param context Reactor context for tracing + * @param async client type + * @return async client instance with interceptors + */ + public > T intercept( + final T client, + final Context context) { + if (client == null) { + throw new IllegalArgumentException("client cannot be null"); + } + + return client.withInterceptors( + new DaprWorkflowTracingInterceptor(context)); + } + +} diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/DaprWorkflowTracingInterceptor.java b/client/src/main/java/io/dapr/durabletask/interceptors/DaprWorkflowTracingInterceptor.java new file mode 100644 index 0000000..1be8dae --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/DaprWorkflowTracingInterceptor.java @@ -0,0 +1,51 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + +import io.grpc.*; +import io.opentelemetry.context.Context; + +/** + * Injects tracing headers to gRPC metadata. + */ +public class DaprWorkflowTracingInterceptor implements ClientInterceptor { + + private final Context context; + + /** + * Creates an instance of the injector for gRPC context from Reactor's context. + * @param context Reactor's context + */ + public DaprWorkflowTracingInterceptor(Context context) { + this.context = context; + } + + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions, + Channel channel) { + ClientCall clientCall = channel.newCall(methodDescriptor, callOptions); + return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { + @Override + public void start(final Listener responseListener, final Metadata metadata) { + if (context != null) { + GrpcHelper.populateMetadata(context, metadata); + } + super.start(responseListener, metadata); + } + }; + } + +} diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/GrpcHelper.java b/client/src/main/java/io/dapr/durabletask/interceptors/GrpcHelper.java new file mode 100644 index 0000000..f1697fa --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/GrpcHelper.java @@ -0,0 +1,99 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + +import io.grpc.Metadata; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.context.propagation.TextMapSetter; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Helper to extract tracing information for gRPC calls. + */ +public final class GrpcHelper { + + private static final Logger LOGGER = Logger.getLogger(GrpcHelper.class.getName()); + + /** + * Binary formatter to generate grpc-trace-bin. + */ + private static final BinaryFormatImpl OPENCENSUS_BINARY_FORMAT = new BinaryFormatImpl(); + + private static final Metadata.Key GRPC_TRACE_BIN_KEY = + Metadata.Key.of("grpc-trace-bin", Metadata.BINARY_BYTE_MARSHALLER); + + private static final Metadata.Key TRACEPARENT_KEY = + Metadata.Key.of("traceparent", Metadata.ASCII_STRING_MARSHALLER); + + private static final Metadata.Key TRACESTATE_KEY = + Metadata.Key.of("tracestate", Metadata.ASCII_STRING_MARSHALLER); + + private GrpcHelper() { + } + + /** + * Populates GRPC client's metadata with tracing headers. + * + * @param context Reactor's context. + * @param metadata GRPC client metadata to be populated. + */ + public static void populateMetadata(final Context context, final Metadata metadata) { + + Map map = new HashMap<>(); + TextMapSetter> setter = + (carrier, key, value) -> map.put(key, value); + + GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, map, setter); + + if (map.containsKey(TRACEPARENT_KEY.name())) { + String value = map.get(TRACEPARENT_KEY.name()); + metadata.put(TRACEPARENT_KEY, value); + } + if (map.containsKey(TRACESTATE_KEY.name())) { + String value = map.get(TRACESTATE_KEY.name()); + metadata.put(TRACESTATE_KEY, value); + } + + // Dapr only supports "grpc-trace-bin" for GRPC and OpenTelemetry SDK does not support that yet: + // https://github.com/open-telemetry/opentelemetry-specification/issues/639 + // This should be the only use of OpenCensus SDK: populate "grpc-trace-bin". + SpanContext opencensusSpanContext = extractOpenCensusSpanContext(metadata); + if (opencensusSpanContext != null) { + byte[] grpcTraceBin = OPENCENSUS_BINARY_FORMAT.toByteArray(opencensusSpanContext); + metadata.put(GRPC_TRACE_BIN_KEY, grpcTraceBin); + } + } + + private static SpanContext extractOpenCensusSpanContext(Metadata metadata) { + if (!metadata.keys().contains(TRACEPARENT_KEY.name())) { + // Trying to extract context without this key will throw an "expected" exception, so we avoid it here. + return null; + } + + try { + return TraceContextFormat.extract(metadata); + } catch (RuntimeException e) { + LOGGER.log(Level.FINE, "Could not extract span context.", e); + return null; + } + } +} diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/SpanContext.java b/client/src/main/java/io/dapr/durabletask/interceptors/SpanContext.java new file mode 100644 index 0000000..e07c89d --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/SpanContext.java @@ -0,0 +1,137 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + +import javax.annotation.concurrent.Immutable; +import java.util.Arrays; + +/** + * A class that represents a span context. A span context contains the state that must propagate to + * child Spans and across process boundaries. It contains the identifiers (a {@link TraceId + * trace_id} and {@link SpanId span_id}) associated with the and a set of {@link + * TraceOptions options}. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/SpanContext.java

+ * @since 0.5 + */ +@Immutable +final class SpanContext { + + private final TraceId traceId; + + private final SpanId spanId; + + private final TraceOptions traceOptions; + + private final Tracestate tracestate; + + /** + * Creates a new {@code SpanContext} with the given identifiers and options. + * + * @param traceId the trace identifier of the span context. + * @param spanId the span identifier of the span context. + * @param traceOptions the trace options for the span context. + * @param tracestate the trace state for the span context. + * @return a new {@code SpanContext} with the given identifiers and options. + * @since 0.16 + */ + static SpanContext create( + TraceId traceId, SpanId spanId, TraceOptions traceOptions, Tracestate tracestate) { + return new SpanContext(traceId, spanId, traceOptions, tracestate); + } + + /** + * Returns the trace identifier associated with this {@code SpanContext}. + * + * @return the trace identifier associated with this {@code SpanContext}. + * @since 0.5 + */ + TraceId getTraceId() { + return traceId; + } + + /** + * Returns the span identifier associated with this {@code SpanContext}. + * + * @return the span identifier associated with this {@code SpanContext}. + * @since 0.5 + */ + SpanId getSpanId() { + return spanId; + } + + /** + * Returns the {@code TraceOptions} associated with this {@code SpanContext}. + * + * @return the {@code TraceOptions} associated with this {@code SpanContext}. + * @since 0.5 + */ + TraceOptions getTraceOptions() { + return traceOptions; + } + + /** + * Returns the {@code Tracestate} associated with this {@code SpanContext}. + * + * @return the {@code Tracestate} associated with this {@code SpanContext}. + * @since 0.5 + */ + Tracestate getTracestate() { + return tracestate; + } + + /** + * Returns true if this {@code SpanContext} is valid. + * + * @return true if this {@code SpanContext} is valid. + * @since 0.5 + */ + boolean isValid() { + return traceId.isValid() && spanId.isValid(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (!(obj instanceof SpanContext)) { + return false; + } + + SpanContext that = (SpanContext) obj; + return traceId.equals(that.traceId) + && spanId.equals(that.spanId) + && traceOptions.equals(that.traceOptions); + } + + @Override + public int hashCode() { + return Arrays.hashCode(new Object[]{traceId, spanId, traceOptions}); + } + + private SpanContext( + TraceId traceId, SpanId spanId, TraceOptions traceOptions, Tracestate tracestate) { + this.traceId = traceId; + this.spanId = spanId; + this.traceOptions = traceOptions; + this.tracestate = tracestate; + } +} diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/SpanId.java b/client/src/main/java/io/dapr/durabletask/interceptors/SpanId.java new file mode 100644 index 0000000..010a5b1 --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/SpanId.java @@ -0,0 +1,162 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + +import javax.annotation.concurrent.Immutable; + +/** + * A class that represents a span identifier. A valid span identifier is an 8-byte array with at + * least one non-zero byte. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/TraceId.java

+ * @since 0.5 + */ +@Immutable +final class SpanId implements Comparable { + + /** + * The size in bytes of the {@code SpanId}. + * + * @since 0.5 + */ + public static final int SIZE = 8; + + private static final int BASE16_SIZE = 2 * SIZE; + + private static final long INVALID_ID = 0; + + // The internal representation of the SpanId. + private final long id; + + private SpanId(long id) { + this.id = id; + } + + /** + * Returns a {@code SpanId} built from a lowercase base16 representation. + * + * @param src the lowercase base16 representation. + * @param srcOffset the offset in the buffer where the representation of the {@code SpanId} + * begins. + * @return a {@code SpanId} built from a lowercase base16 representation. + * @throws NullPointerException if {@code src} is null. + * @throws IllegalArgumentException if not enough characters in the {@code src} from the {@code + * srcOffset}. + * @since 0.11 + */ + static SpanId fromLowerBase16(CharSequence src, int srcOffset) { + Utils.checkNotNull(src, "src"); + return new SpanId(BigendianEncoding.longFromBase16String(src, srcOffset)); + } + + /** + * Returns the byte representation of the {@code SpanId}. + * + * @return the byte representation of the {@code SpanId}. + * @since 0.5 + */ + byte[] getBytes() { + byte[] bytes = new byte[SIZE]; + BigendianEncoding.longToByteArray(id, bytes, 0); + return bytes; + } + + /** + * Copies the byte array representations of the {@code SpanId} into the {@code dest} beginning at + * the {@code destOffset} offset. + * + * @param dest the destination buffer. + * @param destOffset the starting offset in the destination buffer. + * @throws NullPointerException if {@code dest} is null. + * @throws IndexOutOfBoundsException if {@code destOffset+SpanId.SIZE} is greater than {@code + * dest.length}. + * @since 0.5 + */ + void copyBytesTo(byte[] dest, int destOffset) { + BigendianEncoding.longToByteArray(id, dest, destOffset); + } + + /** + * Copies the lowercase base16 representations of the {@code SpanId} into the {@code dest} + * beginning at the {@code destOffset} offset. + * + * @param dest the destination buffer. + * @param destOffset the starting offset in the destination buffer. + * @throws IndexOutOfBoundsException if {@code destOffset + 2 * SpanId.SIZE} is greater than + * {@code dest.length}. + * @since 0.18 + */ + void copyLowerBase16To(char[] dest, int destOffset) { + BigendianEncoding.longToBase16String(id, dest, destOffset); + } + + /** + * Returns whether the span identifier is valid. A valid span identifier is an 8-byte array with + * at least one non-zero byte. + * + * @return {@code true} if the span identifier is valid. + * @since 0.5 + */ + boolean isValid() { + return id != INVALID_ID; + } + + /** + * Returns the lowercase base16 encoding of this {@code SpanId}. + * + * @return the lowercase base16 encoding of this {@code SpanId}. + * @since 0.11 + */ + String toLowerBase16() { + char[] chars = new char[BASE16_SIZE]; + copyLowerBase16To(chars, 0); + return new String(chars); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (!(obj instanceof SpanId)) { + return false; + } + + SpanId that = (SpanId) obj; + return id == that.id; + } + + @Override + public int hashCode() { + // Copied from Long.hashCode in java8. + return (int) (id ^ (id >>> 32)); + } + + @Override + public String toString() { + return "SpanId{spanId=" + toLowerBase16() + "}"; + } + + @Override + public int compareTo(SpanId that) { + // Copied from Long.compare in java8. + return (id < that.id) ? -1 : ((id == that.id) ? 0 : 1); + } +} diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/TraceContextFormat.java b/client/src/main/java/io/dapr/durabletask/interceptors/TraceContextFormat.java new file mode 100644 index 0000000..e6881cb --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/TraceContextFormat.java @@ -0,0 +1,105 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + +import com.google.common.base.Splitter; +import io.grpc.Metadata; + + +import java.util.List; +import java.util.regex.Pattern; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Implementation of the TraceContext propagation protocol. See w3c/distributed-tracing. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/impl_core/src/main/java/io/opencensus/implcore/ + * trace/propagation/TraceContextFormat.java

+ */ +class TraceContextFormat { + + private static final Tracestate TRACESTATE_DEFAULT = Tracestate.builder().build(); + private static final String TRACEPARENT = "traceparent"; + private static final String TRACESTATE = "tracestate"; + + private static final Metadata.Key TRACEPARENT_KEY = + Metadata.Key.of(TRACEPARENT, Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key TRACESTATE_KEY = + Metadata.Key.of(TRACESTATE, Metadata.ASCII_STRING_MARSHALLER); + + private static final int VERSION_SIZE = 2; + private static final char TRACEPARENT_DELIMITER = '-'; + private static final int TRACEPARENT_DELIMITER_SIZE = 1; + private static final int TRACE_ID_HEX_SIZE = 2 * TraceId.SIZE; + private static final int SPAN_ID_HEX_SIZE = 2 * SpanId.SIZE; + private static final int TRACE_OPTION_HEX_SIZE = 2 * TraceOptions.SIZE; + private static final int TRACE_ID_OFFSET = VERSION_SIZE + TRACEPARENT_DELIMITER_SIZE; + private static final int SPAN_ID_OFFSET = + TRACE_ID_OFFSET + TRACE_ID_HEX_SIZE + TRACEPARENT_DELIMITER_SIZE; + private static final int TRACE_OPTION_OFFSET = + SPAN_ID_OFFSET + SPAN_ID_HEX_SIZE + TRACEPARENT_DELIMITER_SIZE; + private static final int TRACEPARENT_HEADER_SIZE = TRACE_OPTION_OFFSET + TRACE_OPTION_HEX_SIZE; + private static final int TRACESTATE_MAX_MEMBERS = 32; + private static final char TRACESTATE_KEY_VALUE_DELIMITER = '='; + private static final char TRACESTATE_ENTRY_DELIMITER = ','; + private static final Splitter TRACESTATE_ENTRY_DELIMITER_SPLITTER = + Splitter.on(Pattern.compile("[ \t]*" + TRACESTATE_ENTRY_DELIMITER + "[ \t]*")); + + /** + * Extracts span context from gRPC's metadata. + * @param metadata gRPC's metadata. + * @return span context. + */ + static SpanContext extract(Metadata metadata) { + String traceparent = metadata.get(TRACEPARENT_KEY); + if (traceparent == null) { + throw new RuntimeException("Traceparent not present"); + } + + checkArgument( + traceparent.charAt(TRACE_OPTION_OFFSET - 1) == TRACEPARENT_DELIMITER + && (traceparent.length() == TRACEPARENT_HEADER_SIZE + || (traceparent.length() > TRACEPARENT_HEADER_SIZE + && traceparent.charAt(TRACEPARENT_HEADER_SIZE) == TRACEPARENT_DELIMITER)) + && traceparent.charAt(SPAN_ID_OFFSET - 1) == TRACEPARENT_DELIMITER + && traceparent.charAt(TRACE_OPTION_OFFSET - 1) == TRACEPARENT_DELIMITER, + "Missing or malformed TRACEPARENT."); + + TraceId traceId = TraceId.fromLowerBase16(traceparent, TRACE_ID_OFFSET); + SpanId spanId = SpanId.fromLowerBase16(traceparent, SPAN_ID_OFFSET); + TraceOptions traceOptions = TraceOptions.fromLowerBase16(traceparent, TRACE_OPTION_OFFSET); + + String tracestate = metadata.get(TRACESTATE_KEY); + if (tracestate == null || tracestate.isEmpty()) { + return SpanContext.create(traceId, spanId, traceOptions, TRACESTATE_DEFAULT); + } + Tracestate.Builder tracestateBuilder = Tracestate.builder(); + List listMembers = TRACESTATE_ENTRY_DELIMITER_SPLITTER.splitToList(tracestate); + checkArgument( + listMembers.size() <= TRACESTATE_MAX_MEMBERS, "Tracestate has too many elements."); + // Iterate in reverse order because when call builder set the elements is added in the + // front of the list. + for (int i = listMembers.size() - 1; i >= 0; i--) { + String listMember = listMembers.get(i); + int index = listMember.indexOf(TRACESTATE_KEY_VALUE_DELIMITER); + checkArgument(index != -1, "Invalid tracestate list-member format."); + tracestateBuilder.set( + listMember.substring(0, index), listMember.substring(index + 1, listMember.length())); + } + return SpanContext.create(traceId, spanId, traceOptions, tracestateBuilder.build()); + } +} diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/TraceId.java b/client/src/main/java/io/dapr/durabletask/interceptors/TraceId.java new file mode 100644 index 0000000..35573a5 --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/TraceId.java @@ -0,0 +1,161 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + +import javax.annotation.concurrent.Immutable; + +/** + * A class that represents a trace identifier. A valid trace identifier is a 16-byte array with at + * least one non-zero byte. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/TraceId.java.

+ * @since 0.5 + */ +@Immutable +final class TraceId implements Comparable { + /** + * The size in bytes of the {@code TraceId}. + * + * @since 0.5 + */ + static final int SIZE = 16; + private static final int BASE16_SIZE = 2 * BigendianEncoding.LONG_BASE16; + private static final long INVALID_ID = 0; + + // The internal representation of the TraceId. + private final long idHi; + private final long idLo; + + private TraceId(long idHi, long idLo) { + this.idHi = idHi; + this.idLo = idLo; + } + + /** + * Returns a {@code TraceId} built from a lowercase base16 representation. + * + * @param src the lowercase base16 representation. + * @param srcOffset the offset in the buffer where the representation of the {@code TraceId} + * begins. + * @return a {@code TraceId} built from a lowercase base16 representation. + * @throws NullPointerException if {@code src} is null. + * @throws IllegalArgumentException if not enough characters in the {@code src} from the {@code + * srcOffset}. + * @since 0.11 + */ + static TraceId fromLowerBase16(CharSequence src, int srcOffset) { + Utils.checkNotNull(src, "src"); + return new TraceId( + BigendianEncoding.longFromBase16String(src, srcOffset), + BigendianEncoding.longFromBase16String(src, srcOffset + BigendianEncoding.LONG_BASE16)); + } + + /** + * Copies the byte array representations of the {@code TraceId} into the {@code dest} beginning at + * the {@code destOffset} offset. + * + * @param dest the destination buffer. + * @param destOffset the starting offset in the destination buffer. + * @throws NullPointerException if {@code dest} is null. + * @throws IndexOutOfBoundsException if {@code destOffset+TraceId.SIZE} is greater than {@code + * dest.length}. + * @since 0.5 + */ + void copyBytesTo(byte[] dest, int destOffset) { + BigendianEncoding.longToByteArray(idHi, dest, destOffset); + BigendianEncoding.longToByteArray(idLo, dest, destOffset + BigendianEncoding.LONG_BYTES); + } + + /** + * Copies the lowercase base16 representations of the {@code TraceId} into the {@code dest} + * beginning at the {@code destOffset} offset. + * + * @param dest the destination buffer. + * @param destOffset the starting offset in the destination buffer. + * @throws IndexOutOfBoundsException if {@code destOffset + 2 * TraceId.SIZE} is greater than + * {@code dest.length}. + * @since 0.18 + */ + void copyLowerBase16To(char[] dest, int destOffset) { + BigendianEncoding.longToBase16String(idHi, dest, destOffset); + BigendianEncoding.longToBase16String(idLo, dest, destOffset + BASE16_SIZE / 2); + } + + /** + * Returns whether the {@code TraceId} is valid. A valid trace identifier is a 16-byte array with + * at least one non-zero byte. + * + * @return {@code true} if the {@code TraceId} is valid. + * @since 0.5 + */ + boolean isValid() { + return idHi != INVALID_ID || idLo != INVALID_ID; + } + + /** + * Returns the lowercase base16 encoding of this {@code TraceId}. + * + * @return the lowercase base16 encoding of this {@code TraceId}. + * @since 0.11 + */ + String toLowerBase16() { + char[] chars = new char[BASE16_SIZE]; + copyLowerBase16To(chars, 0); + return new String(chars); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (!(obj instanceof TraceId)) { + return false; + } + + TraceId that = (TraceId) obj; + return idHi == that.idHi && idLo == that.idLo; + } + + @Override + public int hashCode() { + // Copied from Arrays.hashCode(long[]) + int result = 1; + result = 31 * result + ((int) (idHi ^ (idHi >>> 32))); + result = 31 * result + ((int) (idLo ^ (idLo >>> 32))); + return result; + } + + @Override + public String toString() { + return "TraceId{traceId=" + toLowerBase16() + "}"; + } + + @Override + public int compareTo(TraceId that) { + if (idHi == that.idHi) { + if (idLo == that.idLo) { + return 0; + } + return idLo < that.idLo ? -1 : 1; + } + return idHi < that.idHi ? -1 : 1; + } +} diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/TraceOptions.java b/client/src/main/java/io/dapr/durabletask/interceptors/TraceOptions.java new file mode 100644 index 0000000..5f05f4b --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/TraceOptions.java @@ -0,0 +1,105 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + +import javax.annotation.concurrent.Immutable; +import java.util.Arrays; + +/** + * A class that represents global trace options. These options are propagated to all child spans. + * These determine features such as whether a {@code Span} should be traced. It is implemented as a bitmask. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/TraceOptions.java

+ * @since 0.5 + */ +@Immutable +final class TraceOptions { + /** + * The size in bytes of the {@code TraceOptions}. + * + * @since 0.5 + */ + static final int SIZE = 1; + + // The set of enabled features is determined by all the enabled bits. + private final byte options; + + // Creates a new {@code TraceOptions} with the given options. + private TraceOptions(byte options) { + this.options = options; + } + + /** + * Returns a {@code TraceOption} built from a lowercase base16 representation. + * + * @param src the lowercase base16 representation. + * @param srcOffset the offset in the buffer where the representation of the {@code TraceOptions} + * begins. + * @return a {@code TraceOption} built from a lowercase base16 representation. + * @throws NullPointerException if {@code src} is null. + * @throws IllegalArgumentException if {@code src.length} is not {@code 2 * TraceOption.SIZE} OR + * if the {@code str} has invalid characters. + * @since 0.18 + */ + static TraceOptions fromLowerBase16(CharSequence src, int srcOffset) { + return new TraceOptions(BigendianEncoding.byteFromBase16String(src, srcOffset)); + } + + /** + * Copies the byte representations of the {@code TraceOptions} into the {@code dest} beginning at + * the {@code destOffset} offset. + * + *

Equivalent with (but faster because it avoids any new allocations): + * + *

{@code
+   * System.arraycopy(getBytes(), 0, dest, destOffset, TraceOptions.SIZE);
+   * }
+ * + * @param dest the destination buffer. + * @param destOffset the starting offset in the destination buffer. + * @throws NullPointerException if {@code dest} is null. + * @throws IndexOutOfBoundsException if {@code destOffset+TraceOptions.SIZE} is greater than + * {@code dest.length}. + * @since 0.5 + */ + void copyBytesTo(byte[] dest, int destOffset) { + Utils.checkIndex(destOffset, dest.length); + dest[destOffset] = options; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (!(obj instanceof TraceOptions)) { + return false; + } + + TraceOptions that = (TraceOptions) obj; + return options == that.options; + } + + @Override + public int hashCode() { + return Arrays.hashCode(new byte[]{options}); + } + +} diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/Tracestate.java b/client/src/main/java/io/dapr/durabletask/interceptors/Tracestate.java new file mode 100644 index 0000000..12548ba --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/Tracestate.java @@ -0,0 +1,269 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + + +import javax.annotation.concurrent.Immutable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Carries tracing-system specific context in a list of key-value pairs. TraceState allows different + * vendors propagate additional information and inter-operate with their legacy Id formats. + * + *

Implementation is optimized for a small list of key-value pairs. + * + *

Key is opaque string up to 256 characters printable. It MUST begin with a lowercase letter, + * and can only contain lowercase letters a-z, digits 0-9, underscores _, dashes -, asterisks *, and + * forward slashes /. + * + *

Value is opaque string up to 256 characters printable ASCII RFC0020 characters (i.e., the + * range 0x20 to 0x7E) except comma , and =. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/Tracestate.java

+ * @since 0.16 + */ +@Immutable +class Tracestate { + private static final int KEY_MAX_SIZE = 256; + private static final int VALUE_MAX_SIZE = 256; + private static final int MAX_KEY_VALUE_PAIRS = 32; + + private final List entries; + + private Tracestate(List entries) { + this.entries = entries; + } + + /** + * Returns the value to which the specified key is mapped, or null if this map contains no mapping + * for the key. + * + * @param key with which the specified value is to be associated + * @return the value to which the specified key is mapped, or null if this map contains no mapping for the key. + * @since 0.16 + */ + @javax.annotation.Nullable + String get(String key) { + for (Entry entry : getEntries()) { + if (entry.getKey().equals(key)) { + return entry.getValue(); + } + } + return null; + } + + /** + * Returns a {@link List} view of the mappings contained in this {@code TraceState}. + * + * @return a {@link List} view of the mappings contained in this {@code TraceState}. + * @since 0.16 + */ + List getEntries() { + return this.entries; + } + + /** + * Returns a {@code Builder} based on an empty {@code Tracestate}. + * + * @return a {@code Builder} based on an empty {@code Tracestate}. + * @since 0.16 + */ + static Builder builder() { + return new Builder(Builder.EMPTY); + } + + /** + * Builder class. + * + * @since 0.16 + */ + static final class Builder { + private final Tracestate parent; + @javax.annotation.Nullable + private ArrayList entries; + + // Needs to be in this class to avoid initialization deadlock because super class depends on + // subclass (the auto-value generate class). + private static final Tracestate EMPTY = create(Collections.emptyList()); + + private Builder(Tracestate parent) { + Utils.checkNotNull(parent, "parent"); + this.parent = parent; + this.entries = null; + } + + /** + * Adds or updates the {@code Entry} that has the given {@code key} if it is present. The new + * {@code Entry} will always be added in the front of the list of entries. + * + * @param key the key for the {@code Entry} to be added. + * @param value the value for the {@code Entry} to be added. + * @return this. + * @since 0.16 + */ + @SuppressWarnings("nullness") + Builder set(String key, String value) { + // Initially create the Entry to validate input. + Entry entry = new Entry(key, value); + if (entries == null) { + // Copy entries from the parent. + entries = new ArrayList(parent.getEntries()); + } + for (int i = 0; i < entries.size(); i++) { + if (entries.get(i).getKey().equals(entry.getKey())) { + entries.remove(i); + // Exit now because the entries list cannot contain duplicates. + break; + } + } + // Inserts the element at the front of this list. + entries.add(0, entry); + return this; + } + + /** + * Removes the {@code Entry} that has the given {@code key} if it is present. + * + * @param key the key for the {@code Entry} to be removed. + * @return this. + * @since 0.16 + */ + @SuppressWarnings("nullness") + Builder remove(String key) { + Utils.checkNotNull(key, "key"); + if (entries == null) { + // Copy entries from the parent. + entries = new ArrayList(parent.getEntries()); + } + for (int i = 0; i < entries.size(); i++) { + if (entries.get(i).getKey().equals(key)) { + entries.remove(i); + // Exit now because the entries list cannot contain duplicates. + break; + } + } + return this; + } + + /** + * Builds a TraceState by adding the entries to the parent in front of the key-value pairs list + * and removing duplicate entries. + * + * @return a TraceState with the new entries. + * @since 0.16 + */ + Tracestate build() { + if (entries == null) { + return parent; + } + return Tracestate.create(entries); + } + } + + /** + * Immutable key-value pair for {@code Tracestate}. + * + * @since 0.16 + */ + @Immutable + static class Entry { + + private final String key; + + private final String value; + + /** + * Creates a new {@code Entry} for the {@code Tracestate}. + * + * @param key the Entry's key. + * @param value the Entry's value. + * @since 0.16 + */ + Entry(String key, String value) { + Utils.checkNotNull(key, "key"); + Utils.checkNotNull(value, "value"); + Utils.checkArgument(validateKey(key), "Invalid key %s", key); + Utils.checkArgument(validateValue(value), "Invalid value %s", value); + this.key = key; + this.value = value; + } + + /** + * Returns the key {@code String}. + * + * @return the key {@code String}. + * @since 0.16 + */ + String getKey() { + return this.key; + } + + /** + * Returns the value {@code String}. + * + * @return the value {@code String}. + * @since 0.16 + */ + String getValue() { + return this.value; + } + } + + // Key is opaque string up to 256 characters printable. It MUST begin with a lowercase letter, and + // can only contain lowercase letters a-z, digits 0-9, underscores _, dashes -, asterisks *, and + // forward slashes /. + static boolean validateKey(String key) { + if (key.length() > KEY_MAX_SIZE + || key.isEmpty() + || key.charAt(0) < 'a' + || key.charAt(0) > 'z') { + return false; + } + for (int i = 1; i < key.length(); i++) { + char c = key.charAt(i); + if (!(c >= 'a' && c <= 'z') + && !(c >= '0' && c <= '9') + && c != '_' + && c != '-' + && c != '*' + && c != '/') { + return false; + } + } + return true; + } + + // Value is opaque string up to 256 characters printable ASCII RFC0020 characters (i.e., the range + // 0x20 to 0x7E) except comma , and =. + static boolean validateValue(String value) { + if (value.length() > VALUE_MAX_SIZE || value.charAt(value.length() - 1) == ' ' /* '\u0020' */) { + return false; + } + for (int i = 0; i < value.length(); i++) { + char c = value.charAt(i); + if (c == ',' || c == '=' || c < ' ' /* '\u0020' */ || c > '~' /* '\u007E' */) { + return false; + } + } + return true; + } + + private static Tracestate create(List entries) { + Utils.checkState(entries.size() <= MAX_KEY_VALUE_PAIRS, "Invalid size"); + return new Tracestate(Collections.unmodifiableList(entries)); + } +} diff --git a/client/src/main/java/io/dapr/durabletask/interceptors/Utils.java b/client/src/main/java/io/dapr/durabletask/interceptors/Utils.java new file mode 100644 index 0000000..4154226 --- /dev/null +++ b/client/src/main/java/io/dapr/durabletask/interceptors/Utils.java @@ -0,0 +1,164 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.interceptors; + +/** + * General internal utility methods. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/internal/Utils.java

+ */ +final class Utils { + + private Utils() { + } + + /** + * Throws an {@link IllegalArgumentException} if the argument is false. This method is similar to + * {@code Preconditions.checkArgument(boolean, Object)} from Guava. + * + * @param isValid whether the argument check passed. + * @param errorMessage the message to use for the exception. Will be converted to a string using + * {@link String#valueOf(Object)}. + */ + static void checkArgument( + boolean isValid, @javax.annotation.Nullable Object errorMessage) { + if (!isValid) { + throw new IllegalArgumentException(String.valueOf(errorMessage)); + } + } + + /** + * Throws an {@link IllegalArgumentException} if the argument is false. This method is similar to + * {@code Preconditions.checkArgument(boolean, Object)} from Guava. + * + * @param expression a boolean expression + * @param errorMessageTemplate a template for the exception message should the check fail. The + * message is formed by replacing each {@code %s} placeholder in the template with an + * argument. These are matched by position - the first {@code %s} gets {@code + * errorMessageArgs[0]}, etc. Unmatched arguments will be appended to the formatted + * message in + * square braces. Unmatched placeholders will be left as-is. + * @param errorMessageArgs the arguments to be substituted into the message template. Arguments + * are converted to strings using {@link String#valueOf(Object)}. + * @throws IllegalArgumentException if {@code expression} is false + * @throws NullPointerException if the check fails and either {@code errorMessageTemplate} or + * {@code errorMessageArgs} is null (don't let this happen) + */ + static void checkArgument( + boolean expression, + String errorMessageTemplate, + @javax.annotation.Nullable Object... errorMessageArgs) { + if (!expression) { + throw new IllegalArgumentException(format(errorMessageTemplate, errorMessageArgs)); + } + } + + /** + * Throws an {@link IllegalStateException} if the argument is false. This method is similar to + * {@code Preconditions.checkState(boolean, Object)} from Guava. + * + * @param isValid whether the state check passed. + * @param errorMessage the message to use for the exception. Will be converted to a string using + * {@link String#valueOf(Object)}. + */ + static void checkState(boolean isValid, @javax.annotation.Nullable Object errorMessage) { + if (!isValid) { + throw new IllegalStateException(String.valueOf(errorMessage)); + } + } + + /** + * Validates an index in an array or other container. This method throws an {@link + * IllegalArgumentException} if the size is negative and throws an {@link + * IndexOutOfBoundsException} if the index is negative or greater than or equal to the size. This + * method is similar to {@code Preconditions.checkElementIndex(int, int)} from Guava. + * + * @param index the index to validate. + * @param size the size of the array or container. + */ + static void checkIndex(int index, int size) { + if (size < 0) { + throw new IllegalArgumentException("Negative size: " + size); + } + if (index < 0 || index >= size) { + throw new IndexOutOfBoundsException("Index out of bounds: size=" + size + ", index=" + index); + } + } + + /** + * Throws a {@link NullPointerException} if the argument is null. This method is similar to {@code + * Preconditions.checkNotNull(Object, Object)} from Guava. + * + * @param arg the argument to check for null. + * @param errorMessage the message to use for the exception. Will be converted to a string using + * {@link String#valueOf(Object)}. + * @param Object checked. + * @return the argument, if it passes the null check. + */ + public static T checkNotNull(T arg, @javax.annotation.Nullable Object errorMessage) { + if (arg == null) { + throw new NullPointerException(String.valueOf(errorMessage)); + } + return arg; + } + + /** + * Substitutes each {@code %s} in {@code template} with an argument. These are matched by + * position: the first {@code %s} gets {@code args[0]}, etc. If there are more arguments than + * placeholders, the unmatched arguments will be appended to the end of the formatted message in + * square braces. + * + *

Copied from {@code Preconditions.format(String, Object...)} from Guava + * + * @param template a non-null string containing 0 or more {@code %s} placeholders. + * @param args the arguments to be substituted into the message template. Arguments are converted + * to strings using {@link String#valueOf(Object)}. Arguments can be null. + */ + // Note that this is somewhat-improperly used from Verify.java as well. + private static String format(String template, @javax.annotation.Nullable Object... args) { + // If no arguments return the template. + if (args == null) { + return template; + } + + // start substituting the arguments into the '%s' placeholders + StringBuilder builder = new StringBuilder(template.length() + 16 * args.length); + int templateStart = 0; + int i = 0; + while (i < args.length) { + int placeholderStart = template.indexOf("%s", templateStart); + if (placeholderStart == -1) { + break; + } + builder.append(template, templateStart, placeholderStart); + builder.append(args[i++]); + templateStart = placeholderStart + 2; + } + builder.append(template, templateStart, template.length()); + + // if we run out of placeholders, append the extra args in square braces + if (i < args.length) { + builder.append(" ["); + builder.append(args[i++]); + while (i < args.length) { + builder.append(", "); + builder.append(args[i++]); + } + builder.append(']'); + } + + return builder.toString(); + } +}