|
3 | 3 | import asyncio
|
4 | 4 | import logging
|
5 | 5 | import uuid
|
| 6 | +from concurrent.futures import ThreadPoolExecutor |
6 | 7 | from dataclasses import dataclass
|
7 | 8 | from datetime import timedelta
|
8 | 9 | from typing import Iterable, List, Optional
|
9 | 10 |
|
10 | 11 | from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
|
11 | 12 | from opentelemetry.sdk.trace.export import SimpleSpanProcessor
|
12 | 13 | from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
|
13 |
| -from opentelemetry.trace import get_tracer |
| 14 | +from opentelemetry.trace import StatusCode, get_tracer |
14 | 15 |
|
15 | 16 | from temporalio import activity, workflow
|
16 | 17 | from temporalio.client import Client
|
17 | 18 | from temporalio.common import RetryPolicy
|
18 | 19 | from temporalio.contrib.opentelemetry import TracingInterceptor
|
19 | 20 | from temporalio.contrib.opentelemetry import workflow as otel_workflow
|
| 21 | +from temporalio.exceptions import ApplicationError, ApplicationErrorCategory |
20 | 22 | from temporalio.testing import WorkflowEnvironment
|
21 | 23 | from temporalio.worker import UnsandboxedWorkflowRunner, Worker
|
22 | 24 |
|
@@ -386,6 +388,60 @@ async def test_opentelemetry_always_create_workflow_spans(client: Client):
|
386 | 388 | assert spans[0].name == "RunWorkflow:SimpleWorkflow"
|
387 | 389 |
|
388 | 390 |
|
| 391 | +attempted = False |
| 392 | + |
| 393 | + |
| 394 | +@activity.defn |
| 395 | +def benign_activity() -> str: |
| 396 | + global attempted |
| 397 | + if attempted: |
| 398 | + return "done" |
| 399 | + attempted = True |
| 400 | + raise ApplicationError( |
| 401 | + category=ApplicationErrorCategory.BENIGN, message="Benign Error" |
| 402 | + ) |
| 403 | + |
| 404 | + |
| 405 | +@workflow.defn |
| 406 | +class BenignWorkflow: |
| 407 | + @workflow.run |
| 408 | + async def run(self) -> str: |
| 409 | + return await workflow.execute_activity( |
| 410 | + benign_activity, schedule_to_close_timeout=timedelta(seconds=1) |
| 411 | + ) |
| 412 | + |
| 413 | + |
| 414 | +async def test_opentelemetry_benign_exception(client: Client): |
| 415 | + # Create a tracer that has an in-memory exporter |
| 416 | + exporter = InMemorySpanExporter() |
| 417 | + provider = TracerProvider() |
| 418 | + provider.add_span_processor(SimpleSpanProcessor(exporter)) |
| 419 | + tracer = get_tracer(__name__, tracer_provider=provider) |
| 420 | + |
| 421 | + # Create new client with tracer interceptor |
| 422 | + client_config = client.config() |
| 423 | + client_config["interceptors"] = [TracingInterceptor(tracer)] |
| 424 | + client = Client(**client_config) |
| 425 | + |
| 426 | + async with Worker( |
| 427 | + client, |
| 428 | + task_queue=f"task_queue_{uuid.uuid4()}", |
| 429 | + workflows=[BenignWorkflow], |
| 430 | + activities=[benign_activity], |
| 431 | + activity_executor=ThreadPoolExecutor(max_workers=1), |
| 432 | + ) as worker: |
| 433 | + assert "done" == await client.execute_workflow( |
| 434 | + BenignWorkflow.run, |
| 435 | + id=f"workflow_{uuid.uuid4()}", |
| 436 | + task_queue=worker.task_queue, |
| 437 | + retry_policy=RetryPolicy( |
| 438 | + maximum_attempts=2, initial_interval=timedelta(milliseconds=10) |
| 439 | + ), |
| 440 | + ) |
| 441 | + spans = exporter.get_finished_spans() |
| 442 | + assert all(span.status.status_code == StatusCode.UNSET for span in spans) |
| 443 | + |
| 444 | + |
389 | 445 | # TODO(cretz): Additional tests to write
|
390 | 446 | # * query without interceptor (no headers)
|
391 | 447 | # * workflow without interceptor (no headers) but query with interceptor (headers)
|
|
0 commit comments