Skip to content

Add Request/Response sample with activity-based responses (fixes #6) #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions reqrespactivity/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@

# Request/Response Sample with Activity-Based Responses

This sample demonstrates how to send a request and get a response from a Temporal workflow via a response activity.

In this example, the workflow accepts requests (signals) to uppercase a string and then provides the response via a callback response activity. Because the response is delivered by an activity execution, the requester must have its own worker running.

## Running

Follow these steps to run the sample:

1. **Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use):**

2. **Run the Worker:**
In one terminal, run the worker that executes the workflow and activity:
```bash
python worker.py
```

3. **Start the Workflow:**
In another terminal, start the workflow instance:
```bash
python starter.py
```

4. **Run the Requester:**
In a third terminal, run the requester that sends a request every second:
```bash
python requester_run.py
```
This will send requests like `foo0`, `foo1`, etc., to be uppercased by the workflow. You should see output similar to:
```
Requested uppercase for 'foo0', got: 'FOO0'
Requested uppercase for 'foo1', got: 'FOO1'
...
```

Multiple instances of these processes can be run in separate terminals to confirm that they work independently.

## Comparison with Query-Based Responses

In the [reqrespquery](../reqrespquery) sample, responses are fetched by periodically polling the workflow using queries. This sample, however, uses activity-based responses, which has the following pros and cons:

**Pros:**

* Activity-based responses are often faster due to pushing rather than polling.
* The workflow does not need to explicitly store the response state.
* The workflow can detect whether a response was actually received.

**Cons:**

* Activity-based responses require a worker on the caller (requester) side.
* They record the response in history as an activity execution.
* They can only occur while the workflow is running.

## Explanation of Continue-As-New

Workflows have a limit on history size. When the event count grows too large, a workflow can return a `ContinueAsNew` error to atomically start a new workflow execution. To prevent data loss, signals must be drained and any pending futures completed before a new execution starts.

In this sample, which is designed to run long-term, a `ContinueAsNew` is performed once the request count reaches a specified limit, provided there are no in-flight signal requests or executing activities. (If signals are processed too quickly or activities take too long, the workflow might never idle long enough for a `ContinueAsNew` to be triggered.) Careful tuning of signal and activity handling (including setting appropriate retry policies) is essential to ensure that the workflow can transition smoothly to a new execution when needed.

## License

This sample is released under the MIT License.
77 changes: 77 additions & 0 deletions reqrespactivity/requester.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# requester.py
import asyncio
import uuid
from dataclasses import dataclass
from temporalio import activity
from temporalio.client import Client
from temporalio.worker import Worker

# Global variable to hold the current Requester instance.
global_requester_instance = None

@dataclass
class Request:
id: str
input: str
response_activity: str
response_task_queue: str

@dataclass
class Response:
id: str
output: str
error: str = ""

# Define the response activity as a top-level function with the decorator.
@activity.defn
async def response_activity(response: Response):
global global_requester_instance
if global_requester_instance:
fut = global_requester_instance.pending.pop(response.id, None)
if fut:
fut.set_result(response)
else:
raise Exception("No requester instance available")

class Requester:
def __init__(self, client: Client, target_workflow_id: str):
self.client = client
self.target_workflow_id = target_workflow_id
self.task_queue = "requester-" + str(uuid.uuid4())
self.pending = {} # Maps request IDs to asyncio.Future objects

async def start_worker(self):
global global_requester_instance
global_requester_instance = self # Set the global reference
self.worker = Worker(
self.client,
task_queue=self.task_queue,
activities=[response_activity],
)
# Run the worker in the background.
asyncio.create_task(self.worker.run())

async def close(self):
await self.worker.shutdown()

async def request_uppercase(self, text: str) -> str:
req_id = str(uuid.uuid4())
req = Request(
id=req_id,
input=text,
response_activity="response_activity", # Must match the name of the decorated function
response_task_queue=self.task_queue,
)
loop = asyncio.get_running_loop()
fut = loop.create_future()
self.pending[req_id] = fut

# Get a handle to the workflow and send the signal.
handle = self.client.get_workflow_handle(self.target_workflow_id)
await handle.signal("request", req)

# Wait for the callback to return the response.
response: Response = await fut
if response.error:
raise Exception(response.error)
return response.output
23 changes: 23 additions & 0 deletions reqrespactivity/requester_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# requester_run.py
import asyncio
from temporalio.client import Client
from requester import Requester

async def main():
client = await Client.connect("localhost:7233")
workflow_id = "reqrespactivity_workflow"
requester = Requester(client, workflow_id)
await requester.start_worker()
try:
i = 0
while True:
text = f"foo{i}" # Create request similar to the Go sample: foo0, foo1, etc.
result = await requester.request_uppercase(text)
print(f"Requested uppercase for '{text}', got: '{result}'")
await asyncio.sleep(1)
i += 1
finally:
await requester.close()

if __name__ == "__main__":
asyncio.run(main())
17 changes: 17 additions & 0 deletions reqrespactivity/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# starter.py
import asyncio
from temporalio.client import Client
from workflow import UppercaseWorkflow

async def main():
client = await Client.connect("localhost:7233")
workflow_id = "reqrespactivity_workflow"
handle = await client.start_workflow(
UppercaseWorkflow.run,
id=workflow_id,
task_queue="reqrespactivity",
)
print(f"Started workflow with ID: {handle.id}")

if __name__ == "__main__":
asyncio.run(main())
19 changes: 19 additions & 0 deletions reqrespactivity/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import UppercaseWorkflow, uppercase_activity

async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="reqrespactivity",
workflows=[UppercaseWorkflow],
activities=[uppercase_activity],
)
print("Worker started on task queue 'reqrespactivity'")
await worker.run()

if __name__ == "__main__":
asyncio.run(main())
60 changes: 60 additions & 0 deletions reqrespactivity/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# workflow.py
import asyncio
from datetime import timedelta
from dataclasses import dataclass
from temporalio import workflow, activity

# Define data models similar to the Go structs.
@dataclass
class Request:
id: str
input: str
response_activity: str
response_task_queue: str

@dataclass
class Response:
id: str
output: str
error: str = ""

# Activity to convert text to uppercase.
@activity.defn
async def uppercase_activity(text: str) -> str:
return text.upper()

# Workflow that listens for "request" signals.
@workflow.defn
class UppercaseWorkflow:
def __init__(self):
self.requests = [] # Buffer for incoming requests

@workflow.signal
def request(self, req: Request):
self.requests.append(req)

@workflow.run
async def run(self):
# Continuously process incoming requests.
while True:
if self.requests:
req = self.requests.pop(0)
try:
# Execute the uppercase activity.
result = await workflow.execute_activity(
uppercase_activity,
req.input,
start_to_close_timeout=timedelta(seconds=5),
)
resp = Response(id=req.id, output=result)
except Exception as e:
resp = Response(id=req.id, output="", error=str(e))
# Call back the requester via the designated response activity.
await workflow.execute_activity(
req.response_activity,
resp,
task_queue=req.response_task_queue,
start_to_close_timeout=timedelta(seconds=10),
)
else:
await workflow.sleep(1)