-
Notifications
You must be signed in to change notification settings - Fork 75
Expense example #201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
jssmith
wants to merge
18
commits into
main
Choose a base branch
from
expense-example
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+2,631
−2
Open
Expense example #201
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
437f80e
ported expense report test from Go
jssmith 089127c
cleanup
jssmith a822d57
move specification documents
jssmith 392c9b8
fix UI specification
jssmith b008ff8
testing cleanup
jssmith 3a6a45b
test reorg
jssmith d939798
cleanup
jssmith af1d0e8
add top-level readme
jssmith 3f950d8
Merge remote-tracking branch 'origin/main' into expense-example
jssmith 8dbb91e
remove expense from pyproject defaults
jssmith 2f5d5d3
change exception logging
jssmith 231a8de
switch from async activity completion to signals
jssmith ff80141
ui cleanup
jssmith 71c11b3
add expense group to ci
jssmith 4bf6493
lint fixes
jssmith 8a0229a
remove unicode for windows compatibility
jssmith f15ec1c
fixing ci
jssmith a1297b1
test fix
jssmith File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
# Expense | ||
|
||
This sample workflow processes an expense request. It demonstrates human-in-the loop processing using Temporal's signal mechanism. | ||
|
||
## Overview | ||
|
||
This sample demonstrates the following workflow: | ||
|
||
1. **Create Expense**: The workflow executes the `create_expense_activity` to initialize a new expense report in the external system. | ||
|
||
2. **Register for Decision**: The workflow calls `register_for_decision_activity`, which registers the workflow with the external UI system so it can receive signals when decisions are made. | ||
|
||
3. **Wait for Signal**: The workflow uses `workflow.wait_condition()` to wait for an external signal containing the approval/rejection decision. | ||
|
||
4. **Signal-Based Completion**: When a human approves or rejects the expense, the external UI system sends a signal to the workflow using `workflow_handle.signal()`, providing the decision result. | ||
|
||
5. **Process Payment**: Once the workflow receives the approval decision via signal, it executes the `payment_activity` to complete the simulated expense processing. | ||
|
||
This pattern enables human-in-the-loop workflows where workflows can wait as long as necessary for external decisions using Temporal's durable signal mechanism. | ||
|
||
## Steps To Run Sample | ||
|
||
* You need a Temporal service running. See the main [README.md](../README.md) for more details. | ||
* Start the sample expense system UI: | ||
```bash | ||
uv run -m expense.ui | ||
``` | ||
* Start workflow and activity workers: | ||
```bash | ||
uv run -m expense.worker | ||
``` | ||
* Start expense workflow execution: | ||
```bash | ||
# Start workflow and return immediately (default) | ||
uv run -m expense.starter | ||
|
||
# Start workflow and wait for completion | ||
uv run -m expense.starter --wait | ||
|
||
# Start workflow with custom expense ID | ||
uv run -m expense.starter --expense-id "my-expense-123" | ||
|
||
# Start workflow with custom ID and wait for completion | ||
uv run -m expense.starter --wait --expense-id "my-expense-123" | ||
``` | ||
* When you see the console print out that the expense is created, go to [localhost:8099/list](http://localhost:8099/list) to approve the expense. | ||
* You should see the workflow complete after you approve the expense. You can also reject the expense. | ||
|
||
## Running Tests | ||
|
||
```bash | ||
# Run all expense tests | ||
uv run -m pytest tests/expense/ -v | ||
|
||
# Run specific test categories | ||
uv run -m pytest tests/expense/test_expense_workflow.py -v # Workflow tests | ||
uv run -m pytest tests/expense/test_expense_activities.py -v # Activity tests | ||
uv run -m pytest tests/expense/test_expense_integration.py -v # Integration tests | ||
uv run -m pytest tests/expense/test_ui.py -v # UI tests | ||
|
||
# Run a specific test | ||
uv run -m pytest tests/expense/test_expense_workflow.py::TestWorkflowPaths::test_workflow_approved_complete_flow -v | ||
``` | ||
|
||
## Key Concepts Demonstrated | ||
|
||
* **Human-in-the-Loop Workflows**: Long-running workflows that wait for human interaction | ||
* **Workflow Signals**: Using `workflow.signal()` and `workflow.wait_condition()` for external communication | ||
* **Signal-Based Completion**: External systems sending signals to workflows for asynchronous decision-making | ||
* **External System Integration**: Communication between workflows and external systems via web services and signals | ||
* **HTTP Client Lifecycle Management**: Proper resource management with worker-scoped HTTP clients | ||
|
||
## Troubleshooting | ||
|
||
If you see the workflow failed, the cause may be a port conflict. You can try to change to a different port number in `__init__.py`. Then rerun everything. | ||
|
||
## Files | ||
|
||
* `workflow.py` - The main expense processing workflow with signal handling | ||
* `activities.py` - Three activities: create expense, register for decision, process payment | ||
* `ui.py` - A demonstration expense approval system web UI with signal sending | ||
* `worker.py` - Worker to run workflows and activities with HTTP client lifecycle management | ||
* `starter.py` - Client to start workflow executions with optional completion waiting |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
EXPENSE_SERVER_HOST = "localhost" | ||
EXPENSE_SERVER_PORT = 8099 | ||
EXPENSE_SERVER_HOST_PORT = f"http://{EXPENSE_SERVER_HOST}:{EXPENSE_SERVER_PORT}" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
from typing import Optional | ||
|
||
import httpx | ||
from temporalio import activity | ||
from temporalio.exceptions import ApplicationError | ||
|
||
from expense import EXPENSE_SERVER_HOST_PORT | ||
|
||
# Module-level HTTP client, managed by worker lifecycle | ||
_http_client: Optional[httpx.AsyncClient] = None | ||
|
||
|
||
async def initialize_http_client() -> None: | ||
"""Initialize the global HTTP client. Called by worker setup.""" | ||
global _http_client | ||
if _http_client is None: | ||
_http_client = httpx.AsyncClient() | ||
|
||
|
||
async def cleanup_http_client() -> None: | ||
"""Cleanup the global HTTP client. Called by worker shutdown.""" | ||
global _http_client | ||
if _http_client is not None: | ||
await _http_client.aclose() | ||
_http_client = None | ||
|
||
|
||
def get_http_client() -> httpx.AsyncClient: | ||
"""Get the global HTTP client.""" | ||
if _http_client is None: | ||
raise RuntimeError( | ||
"HTTP client not initialized. Call initialize_http_client() first." | ||
) | ||
return _http_client | ||
|
||
|
||
@activity.defn | ||
async def create_expense_activity(expense_id: str) -> None: | ||
if not expense_id: | ||
raise ValueError("expense id is empty") | ||
|
||
client = get_http_client() | ||
try: | ||
response = await client.get( | ||
f"{EXPENSE_SERVER_HOST_PORT}/create", | ||
params={"is_api_call": "true", "id": expense_id}, | ||
) | ||
response.raise_for_status() | ||
except httpx.HTTPStatusError as e: | ||
if 400 <= e.response.status_code < 500: | ||
raise ApplicationError( | ||
f"Client error: {e.response.status_code} {e.response.text}", | ||
non_retryable=True, | ||
) from e | ||
raise | ||
|
||
body = response.text | ||
|
||
if body == "SUCCEED": | ||
activity.logger.info(f"Expense created. ExpenseID: {expense_id}") | ||
return | ||
|
||
raise Exception(body) | ||
|
||
|
||
@activity.defn | ||
async def register_for_decision_activity(expense_id: str) -> None: | ||
""" | ||
Register the expense for decision. This activity registers the workflow | ||
with the external system so it can receive signals when decisions are made. | ||
""" | ||
if not expense_id: | ||
raise ValueError("expense id is empty") | ||
|
||
logger = activity.logger | ||
http_client = get_http_client() | ||
|
||
# Get workflow info to register with the UI system | ||
activity_info = activity.info() | ||
workflow_id = activity_info.workflow_id | ||
|
||
# Register the workflow ID with the UI system so it can send signals | ||
try: | ||
response = await http_client.post( | ||
f"{EXPENSE_SERVER_HOST_PORT}/registerWorkflow", | ||
params={"id": expense_id}, | ||
data={"workflow_id": workflow_id}, | ||
) | ||
response.raise_for_status() | ||
logger.info(f"Registered expense for decision. ExpenseID: {expense_id}") | ||
except Exception as e: | ||
logger.error(f"Failed to register workflow with UI system: {e}") | ||
raise | ||
|
||
|
||
@activity.defn | ||
async def payment_activity(expense_id: str) -> None: | ||
if not expense_id: | ||
raise ValueError("expense id is empty") | ||
|
||
client = get_http_client() | ||
try: | ||
response = await client.post( | ||
f"{EXPENSE_SERVER_HOST_PORT}/action", | ||
data={"is_api_call": "true", "type": "payment", "id": expense_id}, | ||
) | ||
response.raise_for_status() | ||
except httpx.HTTPStatusError as e: | ||
if 400 <= e.response.status_code < 500: | ||
raise ApplicationError( | ||
f"Client error: {e.response.status_code} {e.response.text}", | ||
non_retryable=True, | ||
) from e | ||
raise | ||
|
||
body = response.text | ||
|
||
if body == "SUCCEED": | ||
activity.logger.info(f"payment_activity succeed ExpenseID: {expense_id}") | ||
return | ||
|
||
raise Exception(body) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import argparse | ||
import asyncio | ||
import uuid | ||
|
||
from temporalio.client import Client | ||
|
||
from .workflow import SampleExpenseWorkflow | ||
|
||
|
||
async def main(): | ||
parser = argparse.ArgumentParser(description="Start an expense workflow") | ||
parser.add_argument( | ||
"--wait", | ||
action="store_true", | ||
help="Wait for workflow completion (default: start and return immediately)", | ||
) | ||
parser.add_argument( | ||
"--expense-id", | ||
type=str, | ||
help="Expense ID to use (default: generate random UUID)", | ||
) | ||
args = parser.parse_args() | ||
|
||
# The client is a heavyweight object that should be created once per process. | ||
client = await Client.connect("localhost:7233") | ||
|
||
expense_id = args.expense_id or str(uuid.uuid4()) | ||
workflow_id = f"expense_{expense_id}" | ||
|
||
# Start the workflow | ||
handle = await client.start_workflow( | ||
SampleExpenseWorkflow.run, | ||
expense_id, | ||
id=workflow_id, | ||
task_queue="expense", | ||
) | ||
|
||
print(f"Started workflow WorkflowID {handle.id} RunID {handle.result_run_id}") | ||
print(f"Workflow will register itself with UI system for expense {expense_id}") | ||
|
||
if args.wait: | ||
print("Waiting for workflow to complete...") | ||
result = await handle.result() | ||
print(f"Workflow completed with result: {result}") | ||
return result | ||
else: | ||
print("Workflow started. Use --wait flag to wait for completion.") | ||
return None | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
import asyncio | ||
from enum import Enum | ||
from typing import Dict, Optional | ||
|
||
import uvicorn | ||
from fastapi import FastAPI, Form, Query | ||
from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse | ||
from temporalio.client import Client | ||
|
||
from expense import EXPENSE_SERVER_HOST, EXPENSE_SERVER_PORT | ||
|
||
|
||
class ExpenseState(str, Enum): | ||
CREATED = "CREATED" | ||
APPROVED = "APPROVED" | ||
REJECTED = "REJECTED" | ||
COMPLETED = "COMPLETED" | ||
|
||
|
||
# Use memory store for this sample expense system | ||
all_expenses: Dict[str, ExpenseState] = {} | ||
workflow_map: Dict[str, str] = {} # Maps expense_id to workflow_id | ||
|
||
app = FastAPI() | ||
|
||
# Global client - will be initialized when starting the server | ||
workflow_client: Optional[Client] = None | ||
|
||
|
||
@app.get("/", response_class=HTMLResponse) | ||
@app.get("/list", response_class=HTMLResponse) | ||
async def list_handler(): | ||
html = """ | ||
<h1>SAMPLE EXPENSE SYSTEM</h1> | ||
<a href="/list">HOME</a> | ||
<h3>All expense requests:</h3> | ||
<table border=1> | ||
<tr><th>Expense ID</th><th>Status</th><th>Action</th></tr> | ||
""" | ||
|
||
# Sort keys for consistent display | ||
for expense_id in sorted(all_expenses.keys()): | ||
state = all_expenses[expense_id] | ||
action_link = "" | ||
if state == ExpenseState.CREATED: | ||
action_link = ( | ||
f'<form method="post" action="/action" style="display:inline;">' | ||
f'<input type="hidden" name="type" value="approve">' | ||
f'<input type="hidden" name="id" value="{expense_id}">' | ||
'<button type="submit" style="background-color:#4CAF50;">APPROVE</button>' | ||
"</form>" | ||
" " | ||
f'<form method="post" action="/action" style="display:inline;">' | ||
f'<input type="hidden" name="type" value="reject">' | ||
f'<input type="hidden" name="id" value="{expense_id}">' | ||
'<button type="submit" style="background-color:#f44336;">REJECT</button>' | ||
"</form>" | ||
) | ||
html += f"<tr><td>{expense_id}</td><td>{state}</td><td>{action_link}</td></tr>" | ||
|
||
html += "</table>" | ||
return html | ||
|
||
|
||
@app.post("/action") | ||
async def action_handler( | ||
type: str = Form(...), id: str = Form(...), is_api_call: str = Form("false") | ||
): | ||
if id not in all_expenses: | ||
if is_api_call == "true": | ||
return PlainTextResponse("ERROR:INVALID_ID") | ||
else: | ||
return PlainTextResponse("Invalid ID") | ||
|
||
old_state = all_expenses[id] | ||
|
||
if type == "approve": | ||
all_expenses[id] = ExpenseState.APPROVED | ||
elif type == "reject": | ||
all_expenses[id] = ExpenseState.REJECTED | ||
elif type == "payment": | ||
all_expenses[id] = ExpenseState.COMPLETED | ||
else: | ||
if is_api_call == "true": | ||
return PlainTextResponse("ERROR:INVALID_TYPE") | ||
else: | ||
return PlainTextResponse("Invalid action type") | ||
|
||
if is_api_call == "true" or type == "payment": | ||
# For API calls and payment, just return success | ||
if old_state == ExpenseState.CREATED and all_expenses[id] in [ | ||
ExpenseState.APPROVED, | ||
ExpenseState.REJECTED, | ||
]: | ||
# Report state change | ||
await notify_expense_state_change(id, all_expenses[id]) | ||
|
||
print(f"Set state for {id} from {old_state} to {all_expenses[id]}") | ||
return PlainTextResponse("SUCCEED") | ||
else: | ||
# For UI calls, notify and redirect to list | ||
if old_state == ExpenseState.CREATED and all_expenses[id] in [ | ||
ExpenseState.APPROVED, | ||
ExpenseState.REJECTED, | ||
]: | ||
await notify_expense_state_change(id, all_expenses[id]) | ||
|
||
print(f"Set state for {id} from {old_state} to {all_expenses[id]}") | ||
return RedirectResponse(url="/list", status_code=303) | ||
|
||
|
||
@app.get("/create") | ||
async def create_handler(id: str = Query(...), is_api_call: str = Query("false")): | ||
if id in all_expenses: | ||
if is_api_call == "true": | ||
return PlainTextResponse("ERROR:ID_ALREADY_EXISTS") | ||
else: | ||
return PlainTextResponse("ID already exists") | ||
|
||
all_expenses[id] = ExpenseState.CREATED | ||
|
||
if is_api_call == "true": | ||
print(f"Created new expense id: {id}") | ||
return PlainTextResponse("SUCCEED") | ||
else: | ||
print(f"Created new expense id: {id}") | ||
return await list_handler() | ||
|
||
|
||
@app.get("/status") | ||
async def status_handler(id: str = Query(...)): | ||
if id not in all_expenses: | ||
return PlainTextResponse("ERROR:INVALID_ID") | ||
|
||
state = all_expenses[id] | ||
print(f"Checking status for {id}: {state}") | ||
return PlainTextResponse(state.value) | ||
|
||
|
||
@app.post("/registerWorkflow") | ||
async def register_workflow_handler(id: str = Query(...), workflow_id: str = Form(...)): | ||
if id not in all_expenses: | ||
return PlainTextResponse("ERROR:INVALID_ID") | ||
|
||
curr_state = all_expenses[id] | ||
if curr_state != ExpenseState.CREATED: | ||
return PlainTextResponse("ERROR:INVALID_STATE") | ||
|
||
print(f"Registered workflow for ID={id}, workflow_id={workflow_id}") | ||
workflow_map[id] = workflow_id | ||
return PlainTextResponse("SUCCEED") | ||
|
||
|
||
async def notify_expense_state_change(expense_id: str, state: str): | ||
if expense_id not in workflow_map: | ||
print(f"Invalid id: {expense_id}") | ||
return | ||
|
||
if workflow_client is None: | ||
print("Workflow client not initialized") | ||
return | ||
|
||
workflow_id = workflow_map[expense_id] | ||
try: | ||
# Send signal to workflow | ||
handle = workflow_client.get_workflow_handle(workflow_id) | ||
await handle.signal("expense_decision_signal", state) | ||
print( | ||
f"Successfully sent signal to workflow: {workflow_id} with decision: {state}" | ||
) | ||
except Exception as err: | ||
print(f"Failed to send signal to workflow with error: {err}") | ||
|
||
|
||
async def main(): | ||
global workflow_client | ||
|
||
# Initialize the workflow client | ||
workflow_client = await Client.connect("localhost:7233") | ||
|
||
print( | ||
f"Expense system UI available at http://{EXPENSE_SERVER_HOST}:{EXPENSE_SERVER_PORT}" | ||
) | ||
|
||
# Start the FastAPI server | ||
config = uvicorn.Config( | ||
app, host="0.0.0.0", port=EXPENSE_SERVER_PORT, log_level="info" | ||
) | ||
server = uvicorn.Server(config) | ||
await server.serve() | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import asyncio | ||
|
||
from temporalio.client import Client | ||
from temporalio.worker import Worker | ||
|
||
from .activities import ( | ||
cleanup_http_client, | ||
create_expense_activity, | ||
initialize_http_client, | ||
payment_activity, | ||
register_for_decision_activity, | ||
) | ||
from .workflow import SampleExpenseWorkflow | ||
|
||
|
||
async def main(): | ||
# The client and worker are heavyweight objects that should be created once per process. | ||
client = await Client.connect("localhost:7233") | ||
|
||
# Initialize HTTP client before starting worker | ||
await initialize_http_client() | ||
|
||
try: | ||
# Run the worker | ||
worker = Worker( | ||
client, | ||
task_queue="expense", | ||
workflows=[SampleExpenseWorkflow], | ||
activities=[ | ||
create_expense_activity, | ||
register_for_decision_activity, | ||
payment_activity, | ||
], | ||
) | ||
|
||
print("Worker starting...") | ||
await worker.run() | ||
finally: | ||
# Cleanup HTTP client when worker shuts down | ||
await cleanup_http_client() | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
from datetime import timedelta | ||
|
||
from temporalio import workflow | ||
from temporalio.common import RetryPolicy | ||
|
||
with workflow.unsafe.imports_passed_through(): | ||
from expense.activities import ( | ||
create_expense_activity, | ||
payment_activity, | ||
register_for_decision_activity, | ||
) | ||
|
||
|
||
@workflow.defn | ||
class SampleExpenseWorkflow: | ||
def __init__(self) -> None: | ||
self.expense_decision: str = "" | ||
|
||
@workflow.signal | ||
async def expense_decision_signal(self, decision: str) -> None: | ||
"""Signal handler for expense decision.""" | ||
self.expense_decision = decision | ||
|
||
@workflow.run | ||
async def run(self, expense_id: str) -> str: | ||
logger = workflow.logger | ||
|
||
# Step 1: Create new expense report | ||
try: | ||
await workflow.execute_activity( | ||
create_expense_activity, | ||
expense_id, | ||
start_to_close_timeout=timedelta(seconds=10), | ||
retry_policy=RetryPolicy(maximum_attempts=3), | ||
) | ||
except Exception as err: | ||
logger.exception(f"Failed to create expense report: {err}") | ||
raise | ||
|
||
# Step 2: Register for decision and wait for signal | ||
try: | ||
await workflow.execute_activity( | ||
register_for_decision_activity, | ||
expense_id, | ||
start_to_close_timeout=timedelta(seconds=10), | ||
) | ||
except Exception as err: | ||
logger.exception(f"Failed to register for decision: {err}") | ||
raise | ||
|
||
# Wait for the expense decision signal with a timeout | ||
logger.info(f"Waiting for expense decision signal for {expense_id}") | ||
await workflow.wait_condition( | ||
lambda: self.expense_decision != "", timeout=timedelta(minutes=10) | ||
) | ||
|
||
status = self.expense_decision | ||
if status != "APPROVED": | ||
logger.info(f"Workflow completed. ExpenseStatus: {status}") | ||
return "" | ||
|
||
# Step 3: Request payment for the expense | ||
try: | ||
await workflow.execute_activity( | ||
payment_activity, | ||
expense_id, | ||
start_to_close_timeout=timedelta(seconds=10), | ||
retry_policy=RetryPolicy(maximum_attempts=3), | ||
) | ||
except Exception as err: | ||
logger.info(f"Workflow completed with payment failed. Error: {err}") | ||
raise | ||
|
||
logger.info("Workflow completed with expense payment completed.") | ||
return "COMPLETED" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
# Expense System UI Specification | ||
|
||
## Overview | ||
The Expense System UI is a FastAPI-based web application that provides both a web interface and REST API for managing expense requests. It integrates with Temporal workflows through signal mechanisms. | ||
|
||
## System Components | ||
|
||
### Data Model | ||
- **ExpenseState Enum**: Defines expense lifecycle states | ||
- `CREATED`: Initial state when expense is first created | ||
- `APPROVED`: Expense has been approved for payment | ||
- `REJECTED`: Expense has been denied | ||
- `COMPLETED`: Payment has been processed | ||
|
||
### Storage | ||
- **all_expenses**: In-memory dictionary mapping expense IDs to their current state | ||
- **workflow_map**: Maps expense IDs to Temporal workflow IDs for signal sending | ||
|
||
## API Endpoints | ||
|
||
### Parameter Validation | ||
All endpoints use FastAPI's automatic parameter validation: | ||
- Missing required parameters return HTTP 422 (Unprocessable Entity) | ||
- Invalid parameter types return HTTP 422 (Unprocessable Entity) | ||
- This validation occurs before endpoint-specific business logic | ||
|
||
### 1. Home/List View (`GET /` or `GET /list`) | ||
**Purpose**: Display all expenses in an HTML table format | ||
|
||
**Response**: HTML page containing: | ||
- Page title "SAMPLE EXPENSE SYSTEM" | ||
- Navigation link to HOME | ||
- Table with columns: Expense ID, Status, Action | ||
- Action buttons for CREATED expenses (APPROVE/REJECT) | ||
- Sorted expense display by ID | ||
|
||
**Business Rules**: | ||
- Only CREATED expenses show action buttons | ||
- Expenses are displayed in sorted order by ID | ||
|
||
### 2. Action Handler (`POST /action`) | ||
**Purpose**: Process expense state changes (approve/reject/payment) | ||
|
||
**Parameters**: | ||
- `type` (required): Action type - "approve", "reject", or "payment" (form data) | ||
- `id` (required): Expense ID (form data) | ||
- `is_api_call` (optional): "true" for API calls, "false" for UI calls (form data) | ||
|
||
**Business Rules**: | ||
- `approve`: Changes CREATED → APPROVED | ||
- `reject`: Changes CREATED → REJECTED | ||
- `payment`: Changes APPROVED → COMPLETED | ||
- Invalid IDs return HTTP 200 with error message in response body | ||
- Invalid action types return HTTP 200 with error message in response body | ||
- State changes from CREATED to APPROVED/REJECTED trigger workflow notifications | ||
- API calls return "SUCCEED" on success | ||
- UI calls redirect to list view after success (HTTP 303 redirect) | ||
|
||
**Error Handling**: | ||
- API calls return HTTP 200 with "ERROR:INVALID_ID" or "ERROR:INVALID_TYPE" in response body | ||
- UI calls return HTTP 200 with descriptive messages like "Invalid ID" or "Invalid action type" in response body | ||
|
||
### 3. Create Expense (`GET /create`) | ||
**Purpose**: Create a new expense entry | ||
|
||
**Parameters**: | ||
- `id` (required): Unique expense ID | ||
- `is_api_call` (optional): "true" for API calls, "false" for UI calls | ||
|
||
**Business Rules**: | ||
- Expense ID must be unique | ||
- New expenses start in CREATED state | ||
- Duplicate IDs return HTTP 200 with error message in response body | ||
|
||
**Error Handling**: | ||
- API calls return HTTP 200 with "ERROR:ID_ALREADY_EXISTS" in response body | ||
- UI calls return HTTP 200 with descriptive message "ID already exists" in response body | ||
|
||
### 4. Status Check (`GET /status`) | ||
**Purpose**: Retrieve current expense state | ||
|
||
**Parameters**: | ||
- `id` (required): Expense ID | ||
|
||
**Response**: Current expense state as string | ||
**Error Handling**: Returns HTTP 200 with "ERROR:INVALID_ID" in response body for unknown IDs | ||
|
||
### 5. Workflow Registration (`POST /registerWorkflow`) | ||
**Purpose**: Register Temporal workflow ID for expense state change signals | ||
|
||
**Parameters**: | ||
- `id` (query): Expense ID | ||
- `workflow_id` (form): Temporal workflow ID | ||
|
||
**Business Rules**: | ||
- Expense must exist and be in CREATED state | ||
- Workflow ID is stored for later signal sending | ||
- Enables workflow signal notification on state changes | ||
|
||
**Error Handling**: | ||
- HTTP 200 with "ERROR:INVALID_ID" in response body for unknown expenses | ||
- HTTP 200 with "ERROR:INVALID_STATE" in response body for non-CREATED expenses | ||
|
||
## Workflow Integration | ||
|
||
### Signal Mechanism | ||
- When expenses transition from CREATED to APPROVED/REJECTED, registered workflows are signaled | ||
- Uses Temporal's workflow signal mechanism | ||
- Workflow IDs are stored and used to send signals to workflows | ||
|
||
### Error Handling | ||
- Failed signal sending is logged but doesn't affect UI operations | ||
- Invalid or non-existent workflow IDs are handled gracefully | ||
|
||
## User Interface | ||
|
||
### Web Interface Features | ||
- Clean HTML table display | ||
- Color-coded action buttons (green for APPROVE, red for REJECT) | ||
- Real-time state display | ||
- Navigation between views | ||
|
||
### API Interface Features | ||
- RESTful endpoints for programmatic access | ||
- Consistent error response format | ||
- Support for both sync and async operations | ||
|
||
## Non-Functional Requirements | ||
|
||
### Concurrency | ||
- Thread-safe in-memory storage operations | ||
- Handles concurrent API and UI requests | ||
|
||
### Error Recovery | ||
- Graceful handling of workflow signal failures | ||
- Input validation on all endpoints (422 for missing/invalid parameters, 200 with error messages for business logic errors) | ||
- Descriptive error messages in response body | ||
|
||
### Logging | ||
- State change operations are logged | ||
- Workflow registration and signal sending logged | ||
- Error conditions logged for debugging | ||
|
||
## Security Considerations | ||
- Input validation on all parameters | ||
- Protection against duplicate ID creation | ||
- Secure handling of Temporal workflow IDs | ||
|
||
## Scalability Notes | ||
- Current implementation uses in-memory storage | ||
- Designed for demonstration/development use | ||
- Production deployment would require persistent storage |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,251 @@ | ||
# Expense Workflow and Activities Specification | ||
|
||
## Overview | ||
The Expense Processing System demonstrates a human-in-the-loop workflow pattern using Temporal. It processes expense requests through a multi-step approval workflow with signal-based completion. | ||
|
||
## Business Process Flow | ||
|
||
### Workflow Steps | ||
1. **Create Expense Report**: Initialize a new expense in the external system | ||
2. **Register for Decision & Wait for Signal**: Register expense and wait for approval/rejection via external UI (signal-based completion) | ||
3. **Process Payment** (conditional): Execute payment if approved | ||
|
||
### Decision Logic | ||
- **APPROVED**: Continue to payment processing → Return "COMPLETED" | ||
- **Any other value**: Skip payment processing → Return empty string "" | ||
- This includes: "REJECTED", "DENIED", "PENDING", "CANCELLED", or any unknown value | ||
- **ERROR**: Propagate failure to workflow caller | ||
|
||
## Architecture Components | ||
|
||
### Core Entities | ||
- **Workflow**: `SampleExpenseWorkflow` - Main orchestration logic | ||
- **Activities**: Three distinct activities for each business step | ||
- **External System**: HTTP-based expense management UI | ||
- **Workflow Signals**: Enable workflow completion from external systems | ||
|
||
### External Integration | ||
- **Expense UI Server**: HTTP API at `localhost:8099` | ||
- **Signal Completion**: UI system sends signals to workflows via Temporal client | ||
- **Human Interaction**: Web-based approval/rejection interface | ||
|
||
## Implementation Specifications | ||
|
||
### Workflow Definition | ||
|
||
#### `SampleExpenseWorkflow` | ||
```python | ||
@workflow.defn | ||
class SampleExpenseWorkflow: | ||
def __init__(self) -> None: | ||
self.expense_decision: str = "" | ||
|
||
@workflow.signal | ||
async def expense_decision_signal(self, decision: str) -> None: | ||
self.expense_decision = decision | ||
|
||
@workflow.run | ||
async def run(self, expense_id: str) -> str | ||
``` | ||
|
||
**Input Parameters**: | ||
- `expense_id`: Unique identifier for the expense request | ||
|
||
**Return Values**: | ||
- Success (Approved): `"COMPLETED"` | ||
- Success (Rejected): `""` (empty string) | ||
- Failure: Exception/error propagated | ||
|
||
**Timeout Configuration**: | ||
- Step 1 (Create): 10 seconds | ||
- Step 2 (Wait): 10 minutes (human approval timeout) | ||
- Step 3 (Payment): 10 seconds | ||
|
||
### Activity Definitions | ||
|
||
#### 1. Create Expense Activity | ||
|
||
**Purpose**: Initialize expense record in external system | ||
|
||
**Function Signature**: `create_expense_activity(expense_id: str) -> None` | ||
|
||
**Business Rules**: | ||
- Validate expense_id is not empty | ||
- HTTP GET to `/create?is_api_call=true&id={expense_id}` | ||
- Success condition: Response body equals "SUCCEED" | ||
- Any other response triggers exception | ||
|
||
**Error Handling**: | ||
- Empty expense_id: `ValueError` with message "expense id is empty" | ||
- Whitespace-only expense_id: `ValueError` (same as empty) | ||
- HTTP errors: `httpx.HTTPStatusError` propagated to workflow | ||
- Server error responses: `Exception` with specific error message (e.g., "ERROR:ID_ALREADY_EXISTS") | ||
- Network failures: Connection timeouts and DNS resolution errors propagated | ||
|
||
#### 2. Register for Decision Activity | ||
|
||
**Purpose**: Register expense for human decision and return immediately | ||
|
||
**Function Signature**: `register_for_decision_activity(expense_id: str) -> None` | ||
|
||
**Signal-Based Pattern**: | ||
The activity demonstrates a signal-based human-in-the-loop pattern. It simply registers the expense for decision and completes immediately. The workflow then waits for a signal from an external system. This pattern enables human-in-the-loop workflows where workflows can wait as long as necessary for external decisions using Temporal's signal mechanism. | ||
|
||
**Business Logic**: | ||
1. Validate expense_id is not empty | ||
2. Log that the expense has been registered for decision | ||
3. Return immediately (no HTTP calls or external registration) | ||
4. The workflow then waits for a signal using `workflow.wait_condition()` | ||
5. When a human approves or rejects the expense, an external process sends a signal to the workflow using `workflow_handle.signal()` | ||
|
||
**Signal Integration**: | ||
- **Signal Name**: `expense_decision_signal` | ||
- **Signal Payload**: Decision string ("APPROVED", "REJECTED", etc.) | ||
- **Workflow Registration**: External system must know the workflow ID to send signals | ||
|
||
**Completion Values**: | ||
- `"APPROVED"`: Expense approved for payment | ||
- `"REJECTED"`: Expense denied | ||
- `"DENIED"`, `"PENDING"`, `"CANCELLED"`: Also treated as rejection | ||
- Any other value: Treated as rejection (workflow returns empty string) | ||
|
||
**Error Scenarios**: | ||
- Empty expense_id: Immediate validation error | ||
- Signal timeout: Temporal timeout handling (workflow-level timeout) | ||
- Invalid signal payload: Handled gracefully by workflow | ||
|
||
#### 3. Payment Activity | ||
|
||
**Purpose**: Process payment for approved expenses | ||
|
||
**Function Signature**: `payment_activity(expense_id: str) -> None` | ||
|
||
**Business Rules**: | ||
- Only called for approved expenses | ||
- Validate expense_id is not empty | ||
- HTTP POST to `/action` with form data: `is_api_call=true`, `type=payment`, `id={expense_id}` | ||
- Success condition: Response body equals "SUCCEED" | ||
|
||
**Error Handling**: | ||
- Empty expense_id: `ValueError` with message "expense id is empty" | ||
- HTTP errors: `httpx.HTTPStatusError` propagated to workflow | ||
- Payment failure: `Exception` with specific error message (e.g., "ERROR:INSUFFICIENT_FUNDS") | ||
- Network failures: Connection timeouts and DNS resolution errors propagated | ||
|
||
## State Management | ||
|
||
### Activity Completion Flow | ||
1. **Synchronous Activities**: Create, Register, and Payment activities complete immediately | ||
2. **Signal-Based Waiting**: Workflow waits for external signal after registration | ||
|
||
### Signal Lifecycle | ||
1. Workflow starts and registers expense for decision | ||
2. External system stores workflow ID to expense ID mapping | ||
3. Human makes decision via web UI | ||
4. UI system calls Temporal client to send signal to workflow | ||
5. Workflow receives signal and continues execution | ||
|
||
### External System Integration | ||
- **Storage**: In-memory expense state management | ||
- **Workflow Mapping**: Workflow ID to expense ID mapping | ||
- **Signal Completion**: Temporal client workflow signal sending | ||
- **Error Recovery**: Graceful handling of signal failures | ||
|
||
## Error Handling Patterns | ||
|
||
### Validation Errors | ||
- **Trigger**: Empty or invalid input parameters | ||
- **Behavior**: Immediate activity/workflow failure | ||
- **Retry**: Not applicable (validation errors are non-retryable) | ||
|
||
### HTTP Communication Errors | ||
- **Network Failures**: Connection timeouts, DNS resolution | ||
- **Server Errors**: 5xx responses from expense system | ||
- **Retry Behavior**: Follows Temporal's default retry policy | ||
- **Final Failure**: Propagated to workflow after retries exhausted | ||
|
||
### External System Errors | ||
- **Business Logic Errors**: Duplicate expense IDs, invalid states | ||
- **Response Format**: Error messages in HTTP response body (e.g., "ERROR:ID_ALREADY_EXISTS") | ||
- **Handling**: Converted to application errors with descriptive messages | ||
- **Tested Examples**: "ERROR:INVALID_ID", "ERROR:INSUFFICIENT_FUNDS", "ERROR:INVALID_STATE" | ||
|
||
### Async Completion Errors | ||
- **Registration Failure**: Activity fails immediately if callback registration fails | ||
- **Completion Timeout**: Temporal enforces activity timeout (10 minutes) | ||
- **Invalid Completion**: External system error handling for malformed completions | ||
|
||
## Timeout Configuration | ||
|
||
### Activity Timeouts | ||
- **Create Expense**: 10 seconds (fast operation) | ||
- **Wait for Decision**: 10 minutes (human approval window) | ||
- **Payment Processing**: 10 seconds (automated operation) | ||
|
||
### Timeout Behavior | ||
- **Exceeded**: Activity marked as failed by Temporal | ||
- **Retry**: Follows activity retry policy | ||
- **Workflow Impact**: Timeout failures propagate to workflow | ||
|
||
|
||
|
||
## Testing Patterns | ||
|
||
### Mock Testing Approach | ||
The system supports comprehensive testing with mocked activities: | ||
|
||
#### Test Patterns | ||
```python | ||
@activity.defn(name="create_expense_activity") | ||
async def create_expense_mock(expense_id: str) -> None: | ||
return None # Success mock | ||
|
||
@activity.defn(name="register_for_decision_activity") | ||
async def register_for_decision_mock(expense_id: str) -> None: | ||
return None # Registration mock | ||
|
||
# Testing signal-based behavior: | ||
# Activity completes immediately, no special exceptions | ||
result = await activity_env.run(register_for_decision_activity, "test-expense") | ||
assert result is None | ||
``` | ||
|
||
### Test Scenarios | ||
1. **Happy Path**: All activities succeed, expense approved | ||
2. **Rejection Path**: Expense rejected, payment skipped | ||
3. **Failure Scenarios**: Activity failures at each step | ||
4. **Mock Server Testing**: HTTP interactions with test server | ||
5. **Signal Testing**: Simulated workflow signal sending and receiving | ||
6. **Decision Value Testing**: All possible decision values (APPROVED, REJECTED, DENIED, PENDING, CANCELLED, UNKNOWN) | ||
7. **Retryable Failures**: Activities that fail temporarily and then succeed on retry | ||
8. **Parameter Validation**: Empty and whitespace-only expense IDs | ||
9. **Logging Behavior**: Verify activity logging works correctly | ||
10. **Server Error Responses**: Specific error formats like "ERROR:ID_ALREADY_EXISTS" | ||
|
||
### Mock Server Integration | ||
- **HTTP Mocking**: Uses test frameworks to mock HTTP server responses | ||
- **Delayed Completion**: Simulates human approval delays in tests | ||
|
||
### Edge Case Testing | ||
Tests include comprehensive coverage of edge cases and error scenarios: | ||
|
||
#### Retry Behavior Testing | ||
- **Transient Failures**: Activities that fail on first attempts but succeed after retries | ||
- **Retry Counting**: Verification that activities retry the expected number of times | ||
- **Mixed Scenarios**: Different activities failing and recovering independently | ||
|
||
#### Parameter Validation Testing | ||
- **Empty Strings**: Expense IDs that are completely empty (`""`) | ||
- **Whitespace-Only**: Expense IDs containing only spaces (`" "`) | ||
- **Non-Retryable Errors**: Validation failures that should not be retried | ||
|
||
#### Logging Verification | ||
- **Activity Logging**: Ensures activity.logger.info() calls work correctly | ||
- **Workflow Logging**: Verification of workflow-level logging behavior | ||
- **Log Content**: Checking that log messages contain expected information | ||
|
||
#### Server Error Response Testing | ||
- **Specific Error Codes**: Testing responses like "ERROR:ID_ALREADY_EXISTS" | ||
- **HTTP Status Errors**: Network-level HTTP errors vs application errors | ||
- **Error Message Propagation**: Ensuring error details reach the workflow caller | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
""" | ||
Tests for individual expense activities. | ||
Focuses on activity behavior, parameters, error handling, and HTTP interactions. | ||
""" | ||
|
||
from unittest.mock import AsyncMock, MagicMock, patch | ||
|
||
import httpx | ||
import pytest | ||
from temporalio.testing import ActivityEnvironment | ||
|
||
from expense import EXPENSE_SERVER_HOST_PORT | ||
from expense.activities import ( | ||
create_expense_activity, | ||
payment_activity, | ||
register_for_decision_activity, | ||
) | ||
|
||
|
||
class TestCreateExpenseActivity: | ||
"""Test create_expense_activity individual behavior""" | ||
|
||
@pytest.fixture | ||
def activity_env(self): | ||
return ActivityEnvironment() | ||
|
||
async def test_create_expense_activity_success(self, activity_env): | ||
"""Test successful expense creation""" | ||
with patch("expense.activities.get_http_client") as mock_get_client: | ||
# Mock successful HTTP response | ||
mock_response = AsyncMock() | ||
mock_response.text = "SUCCEED" | ||
mock_response.raise_for_status = AsyncMock() | ||
|
||
mock_client_instance = AsyncMock() | ||
mock_client_instance.get.return_value = mock_response | ||
mock_get_client.return_value = mock_client_instance | ||
|
||
# Execute activity | ||
result = await activity_env.run(create_expense_activity, "test-expense-123") | ||
|
||
# Verify HTTP call | ||
mock_client_instance.get.assert_called_once_with( | ||
f"{EXPENSE_SERVER_HOST_PORT}/create", | ||
params={"is_api_call": "true", "id": "test-expense-123"}, | ||
) | ||
mock_response.raise_for_status.assert_called_once() | ||
|
||
# Activity should return None on success | ||
assert result is None | ||
|
||
async def test_create_expense_activity_empty_id(self, activity_env): | ||
"""Test create expense activity with empty expense ID""" | ||
with pytest.raises(ValueError, match="expense id is empty"): | ||
await activity_env.run(create_expense_activity, "") | ||
|
||
async def test_create_expense_activity_http_error(self, activity_env): | ||
"""Test create expense activity with HTTP error""" | ||
with patch("expense.activities.get_http_client") as mock_get_client: | ||
# Mock HTTP error with proper response mock | ||
mock_response_obj = MagicMock() | ||
mock_response_obj.status_code = 500 | ||
mock_response_obj.text = "Server Error" | ||
|
||
mock_response = MagicMock() | ||
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( | ||
"Server Error", request=MagicMock(), response=mock_response_obj | ||
) | ||
|
||
mock_client_instance = AsyncMock() | ||
mock_client_instance.get.return_value = mock_response | ||
mock_get_client.return_value = mock_client_instance | ||
|
||
with pytest.raises(httpx.HTTPStatusError): | ||
await activity_env.run(create_expense_activity, "test-expense-123") | ||
|
||
async def test_create_expense_activity_server_error_response(self, activity_env): | ||
"""Test create expense activity with server error response""" | ||
with patch("expense.activities.get_http_client") as mock_get_client: | ||
# Mock error response | ||
mock_response = AsyncMock() | ||
mock_response.text = "ERROR:ID_ALREADY_EXISTS" | ||
mock_response.raise_for_status = AsyncMock() | ||
|
||
mock_client_instance = AsyncMock() | ||
mock_client_instance.get.return_value = mock_response | ||
mock_get_client.return_value = mock_client_instance | ||
|
||
with pytest.raises(Exception, match="ERROR:ID_ALREADY_EXISTS"): | ||
await activity_env.run(create_expense_activity, "test-expense-123") | ||
|
||
|
||
class TestRegisterForDecisionActivity: | ||
"""Test register_for_decision_activity individual behavior""" | ||
|
||
@pytest.fixture | ||
def activity_env(self): | ||
return ActivityEnvironment() | ||
|
||
async def test_register_for_decision_activity_empty_id(self, activity_env): | ||
"""Test register for decision activity with empty expense ID""" | ||
with pytest.raises(ValueError, match="expense id is empty"): | ||
await activity_env.run(register_for_decision_activity, "") | ||
|
||
async def test_register_for_decision_activity_success(self, activity_env): | ||
"""Test successful expense registration behavior""" | ||
# Mock the HTTP client and response | ||
mock_response = AsyncMock() | ||
mock_response.raise_for_status = AsyncMock() | ||
|
||
mock_http_client = AsyncMock() | ||
mock_http_client.post.return_value = mock_response | ||
|
||
# Mock the get_http_client function | ||
with patch("expense.activities.get_http_client", return_value=mock_http_client): | ||
result = await activity_env.run( | ||
register_for_decision_activity, "test-expense-123" | ||
) | ||
|
||
# Activity should return None on success | ||
assert result is None | ||
|
||
# Verify HTTP registration was called | ||
mock_http_client.post.assert_called_once() | ||
call_args = mock_http_client.post.call_args | ||
assert "/registerWorkflow" in call_args[0][0] | ||
assert call_args[1]["params"]["id"] == "test-expense-123" | ||
assert "workflow_id" in call_args[1]["data"] | ||
|
||
|
||
class TestPaymentActivity: | ||
"""Test payment_activity individual behavior""" | ||
|
||
@pytest.fixture | ||
def activity_env(self): | ||
return ActivityEnvironment() | ||
|
||
async def test_payment_activity_success(self, activity_env): | ||
"""Test successful payment processing""" | ||
with patch("expense.activities.get_http_client") as mock_get_client: | ||
# Mock successful payment response | ||
mock_response = AsyncMock() | ||
mock_response.text = "SUCCEED" | ||
mock_response.raise_for_status = AsyncMock() | ||
|
||
mock_client_instance = AsyncMock() | ||
mock_client_instance.post.return_value = mock_response | ||
mock_get_client.return_value = mock_client_instance | ||
|
||
# Execute activity | ||
result = await activity_env.run(payment_activity, "test-expense-123") | ||
|
||
# Verify HTTP call | ||
mock_client_instance.post.assert_called_once_with( | ||
f"{EXPENSE_SERVER_HOST_PORT}/action", | ||
data={ | ||
"is_api_call": "true", | ||
"type": "payment", | ||
"id": "test-expense-123", | ||
}, | ||
) | ||
|
||
# Activity should return None on success | ||
assert result is None | ||
|
||
async def test_payment_activity_empty_id(self, activity_env): | ||
"""Test payment activity with empty expense ID""" | ||
with pytest.raises(ValueError, match="expense id is empty"): | ||
await activity_env.run(payment_activity, "") | ||
|
||
async def test_payment_activity_payment_failure(self, activity_env): | ||
"""Test payment activity with payment failure""" | ||
with patch("expense.activities.get_http_client") as mock_get_client: | ||
# Mock payment failure response | ||
mock_response = AsyncMock() | ||
mock_response.text = "ERROR:INSUFFICIENT_FUNDS" | ||
mock_response.raise_for_status = AsyncMock() | ||
|
||
mock_client_instance = AsyncMock() | ||
mock_client_instance.post.return_value = mock_response | ||
mock_get_client.return_value = mock_client_instance | ||
|
||
with pytest.raises(Exception, match="ERROR:INSUFFICIENT_FUNDS"): | ||
await activity_env.run(payment_activity, "test-expense-123") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,242 @@ | ||
""" | ||
Edge case tests for expense workflow and activities. | ||
Tests parameter validation, retries, error scenarios, and boundary conditions. | ||
""" | ||
|
||
import asyncio | ||
import uuid | ||
|
||
import pytest | ||
from temporalio import activity | ||
from temporalio.client import Client, WorkflowFailureError | ||
from temporalio.exceptions import ApplicationError | ||
from temporalio.testing import WorkflowEnvironment | ||
from temporalio.worker import Worker | ||
|
||
from expense.workflow import SampleExpenseWorkflow | ||
|
||
|
||
class MockExpenseUI: | ||
"""Mock UI that simulates the expense approval system""" | ||
|
||
def __init__(self, client: Client): | ||
self.client = client | ||
self.workflow_map: dict[str, str] = {} | ||
self.scheduled_decisions: dict[str, str] = {} | ||
|
||
def register_workflow(self, expense_id: str, workflow_id: str): | ||
"""Register a workflow for an expense (simulates UI registration)""" | ||
self.workflow_map[expense_id] = workflow_id | ||
|
||
def schedule_decision(self, expense_id: str, decision: str): | ||
"""Schedule a decision to be made (simulates human decision)""" | ||
self.scheduled_decisions[expense_id] = decision | ||
|
||
async def send_decision(): | ||
try: | ||
if expense_id in self.workflow_map: | ||
workflow_id = self.workflow_map[expense_id] | ||
handle = self.client.get_workflow_handle(workflow_id) | ||
await handle.signal("expense_decision_signal", decision) | ||
except Exception: | ||
# Ignore errors in time-skipping mode where workflows may complete quickly | ||
pass | ||
|
||
asyncio.create_task(send_decision()) | ||
|
||
def create_register_activity(self): | ||
"""Create a register activity that works with this mock UI""" | ||
|
||
@activity.defn(name="register_for_decision_activity") | ||
async def register_decision_activity(expense_id: str) -> None: | ||
# In time-skipping mode, send the decision immediately | ||
if expense_id in self.scheduled_decisions: | ||
decision = self.scheduled_decisions[expense_id] | ||
if expense_id in self.workflow_map: | ||
workflow_id = self.workflow_map[expense_id] | ||
handle = self.client.get_workflow_handle(workflow_id) | ||
try: | ||
# Send signal immediately when registering | ||
await handle.signal("expense_decision_signal", decision) | ||
except Exception: | ||
# Ignore errors in time-skipping mode | ||
pass | ||
return None | ||
|
||
return register_decision_activity | ||
|
||
|
||
class TestWorkflowEdgeCases: | ||
"""Test edge cases in workflow behavior""" | ||
|
||
async def test_workflow_with_retryable_activity_failures( | ||
self, client: Client, env: WorkflowEnvironment | ||
): | ||
"""Test workflow behavior with retryable activity failures""" | ||
task_queue = f"test-retryable-failures-{uuid.uuid4()}" | ||
workflow_id = f"test-workflow-retryable-{uuid.uuid4()}" | ||
expense_id = "test-expense-retryable" | ||
create_call_count = 0 | ||
payment_call_count = 0 | ||
|
||
# Set up mock UI with APPROVED decision | ||
mock_ui = MockExpenseUI(client) | ||
mock_ui.register_workflow(expense_id, workflow_id) | ||
mock_ui.schedule_decision(expense_id, "APPROVED") | ||
|
||
@activity.defn(name="create_expense_activity") | ||
async def create_expense_retry(expense_id: str) -> None: | ||
nonlocal create_call_count | ||
create_call_count += 1 | ||
if create_call_count == 1: | ||
# First call fails, but retryable | ||
raise Exception("Transient failure in create expense") | ||
return None # Second call succeeds | ||
|
||
@activity.defn(name="payment_activity") | ||
async def payment_retry(expense_id: str) -> None: | ||
nonlocal payment_call_count | ||
payment_call_count += 1 | ||
if payment_call_count == 1: | ||
# First call fails, but retryable | ||
raise Exception("Transient failure in payment") | ||
return None # Second call succeeds | ||
|
||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=[SampleExpenseWorkflow], | ||
activities=[ | ||
create_expense_retry, | ||
mock_ui.create_register_activity(), | ||
payment_retry, | ||
], | ||
): | ||
result = await client.execute_workflow( | ||
SampleExpenseWorkflow.run, | ||
expense_id, | ||
id=workflow_id, | ||
task_queue=task_queue, | ||
) | ||
|
||
# Should succeed after retries | ||
assert result == "COMPLETED" | ||
# Verify activities were retried | ||
assert create_call_count == 2 | ||
assert payment_call_count == 2 | ||
|
||
async def test_workflow_logging_behavior( | ||
self, client: Client, env: WorkflowEnvironment | ||
): | ||
"""Test that workflow logging works correctly""" | ||
task_queue = f"test-logging-{uuid.uuid4()}" | ||
workflow_id = f"test-workflow-logging-{uuid.uuid4()}" | ||
expense_id = "test-expense-logging" | ||
logged_messages = [] | ||
|
||
# Set up mock UI with APPROVED decision | ||
mock_ui = MockExpenseUI(client) | ||
mock_ui.register_workflow(expense_id, workflow_id) | ||
mock_ui.schedule_decision(expense_id, "APPROVED") | ||
|
||
@activity.defn(name="create_expense_activity") | ||
async def create_expense_mock(expense_id: str) -> None: | ||
# Mock logging by capturing messages | ||
logged_messages.append(f"Creating expense: {expense_id}") | ||
return None | ||
|
||
@activity.defn(name="payment_activity") | ||
async def payment_mock(expense_id: str) -> None: | ||
logged_messages.append(f"Processing payment: {expense_id}") | ||
return None | ||
|
||
# Create logging register activity | ||
def create_logging_register_activity(): | ||
@activity.defn(name="register_for_decision_activity") | ||
async def register_decision_logging(expense_id: str) -> None: | ||
logged_messages.append(f"Waiting for decision: {expense_id}") | ||
# In time-skipping mode, send the decision immediately | ||
if expense_id in mock_ui.scheduled_decisions: | ||
decision = mock_ui.scheduled_decisions[expense_id] | ||
if expense_id in mock_ui.workflow_map: | ||
workflow_id = mock_ui.workflow_map[expense_id] | ||
handle = client.get_workflow_handle(workflow_id) | ||
try: | ||
# Send signal immediately when registering | ||
await handle.signal("expense_decision_signal", decision) | ||
except Exception: | ||
# Ignore errors in time-skipping mode | ||
pass | ||
return None | ||
|
||
return register_decision_logging | ||
|
||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=[SampleExpenseWorkflow], | ||
activities=[ | ||
create_expense_mock, | ||
create_logging_register_activity(), | ||
payment_mock, | ||
], | ||
): | ||
result = await client.execute_workflow( | ||
SampleExpenseWorkflow.run, | ||
expense_id, | ||
id=workflow_id, | ||
task_queue=task_queue, | ||
) | ||
|
||
assert result == "COMPLETED" | ||
# Verify logging occurred | ||
assert len(logged_messages) == 3 | ||
assert f"Creating expense: {expense_id}" in logged_messages | ||
assert f"Waiting for decision: {expense_id}" in logged_messages | ||
assert f"Processing payment: {expense_id}" in logged_messages | ||
|
||
async def test_workflow_parameter_validation( | ||
self, client: Client, env: WorkflowEnvironment | ||
): | ||
"""Test workflow with various parameter validation scenarios""" | ||
task_queue = f"test-param-validation-{uuid.uuid4()}" | ||
|
||
@activity.defn(name="create_expense_activity") | ||
async def create_expense_validate(expense_id: str) -> None: | ||
if not expense_id or expense_id.strip() == "": | ||
raise ApplicationError( | ||
"expense id is empty or whitespace", non_retryable=True | ||
) | ||
return None | ||
|
||
@activity.defn(name="register_for_decision_activity") | ||
async def wait_for_decision_mock(expense_id: str) -> None: | ||
return None | ||
|
||
@activity.defn(name="payment_activity") | ||
async def payment_mock(expense_id: str) -> None: | ||
return None | ||
|
||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=[SampleExpenseWorkflow], | ||
activities=[create_expense_validate, wait_for_decision_mock, payment_mock], | ||
): | ||
# Test with empty string - this should fail at create_expense_activity | ||
with pytest.raises(WorkflowFailureError): | ||
await client.execute_workflow( | ||
SampleExpenseWorkflow.run, | ||
"", # Empty expense ID | ||
id=f"test-workflow-empty-id-{uuid.uuid4()}", | ||
task_queue=task_queue, | ||
) | ||
|
||
# Test with whitespace-only string - this should fail at create_expense_activity | ||
with pytest.raises(WorkflowFailureError): | ||
await client.execute_workflow( | ||
SampleExpenseWorkflow.run, | ||
" ", # Whitespace-only expense ID | ||
id=f"test-workflow-whitespace-id-{uuid.uuid4()}", | ||
task_queue=task_queue, | ||
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
""" | ||
Integration tests for expense workflow with mock HTTP server. | ||
Tests end-to-end behavior with realistic HTTP interactions. | ||
""" | ||
|
||
import asyncio | ||
import uuid | ||
from unittest.mock import AsyncMock, patch | ||
|
||
from temporalio import activity | ||
from temporalio.client import Client | ||
from temporalio.testing import WorkflowEnvironment | ||
from temporalio.worker import Worker | ||
|
||
from expense.workflow import SampleExpenseWorkflow | ||
|
||
|
||
class MockExpenseUI: | ||
"""Mock UI that simulates the expense approval system""" | ||
|
||
def __init__(self, client: Client): | ||
self.client = client | ||
self.workflow_map: dict[str, str] = {} | ||
self.scheduled_decisions: dict[str, str] = {} | ||
|
||
def register_workflow(self, expense_id: str, workflow_id: str): | ||
"""Register a workflow for an expense (simulates UI registration)""" | ||
self.workflow_map[expense_id] = workflow_id | ||
|
||
def schedule_decision(self, expense_id: str, decision: str): | ||
"""Schedule a decision to be made (simulates human decision)""" | ||
self.scheduled_decisions[expense_id] = decision | ||
|
||
async def send_decision(): | ||
try: | ||
if expense_id in self.workflow_map: | ||
workflow_id = self.workflow_map[expense_id] | ||
handle = self.client.get_workflow_handle(workflow_id) | ||
await handle.signal("expense_decision_signal", decision) | ||
except Exception: | ||
# Ignore errors in time-skipping mode where workflows may complete quickly | ||
pass | ||
|
||
asyncio.create_task(send_decision()) | ||
|
||
def create_register_activity(self): | ||
"""Create a register activity that works with this mock UI""" | ||
|
||
@activity.defn(name="register_for_decision_activity") | ||
async def register_decision_activity(expense_id: str) -> None: | ||
# In time-skipping mode, send the decision immediately | ||
if expense_id in self.scheduled_decisions: | ||
decision = self.scheduled_decisions[expense_id] | ||
if expense_id in self.workflow_map: | ||
workflow_id = self.workflow_map[expense_id] | ||
handle = self.client.get_workflow_handle(workflow_id) | ||
try: | ||
# Send signal immediately when registering | ||
await handle.signal("expense_decision_signal", decision) | ||
except Exception: | ||
# Ignore errors in time-skipping mode | ||
pass | ||
return None | ||
|
||
return register_decision_activity | ||
|
||
|
||
class TestExpenseWorkflowWithMockServer: | ||
"""Test workflow with mock HTTP server""" | ||
|
||
async def test_workflow_with_mock_server_approved( | ||
self, client: Client, env: WorkflowEnvironment | ||
): | ||
"""Test complete workflow with mock HTTP server - approved path""" | ||
task_queue = f"test-mock-server-approved-{uuid.uuid4()}" | ||
workflow_id = f"test-mock-server-workflow-{uuid.uuid4()}" | ||
expense_id = "test-mock-server-expense" | ||
|
||
# Set up mock UI with APPROVED decision | ||
mock_ui = MockExpenseUI(client) | ||
mock_ui.register_workflow(expense_id, workflow_id) | ||
mock_ui.schedule_decision(expense_id, "APPROVED") | ||
|
||
# Mock HTTP responses | ||
responses = { | ||
"/create": "SUCCEED", | ||
"/registerCallback": "SUCCEED", | ||
"/action": "SUCCEED", | ||
} | ||
|
||
with patch("httpx.AsyncClient") as mock_client: | ||
|
||
async def mock_request_handler(*args, **kwargs): | ||
mock_response = AsyncMock() | ||
url = args[0] if args else kwargs.get("url", "") | ||
|
||
# Determine response based on URL path | ||
for path, response_text in responses.items(): | ||
if path in url: | ||
mock_response.text = response_text | ||
break | ||
else: | ||
mock_response.text = "NOT_FOUND" | ||
|
||
mock_response.raise_for_status = AsyncMock() | ||
return mock_response | ||
|
||
mock_client_instance = AsyncMock() | ||
mock_client_instance.get.side_effect = mock_request_handler | ||
mock_client_instance.post.side_effect = mock_request_handler | ||
mock_client.return_value.__aenter__.return_value = mock_client_instance | ||
|
||
# Use completely mocked activities to avoid async completion issues | ||
@activity.defn(name="create_expense_activity") | ||
async def mock_create_expense(expense_id: str) -> None: | ||
# Simulated HTTP call logic | ||
return None | ||
|
||
@activity.defn(name="payment_activity") | ||
async def mock_payment(expense_id: str) -> None: | ||
# Simulated HTTP call logic | ||
return None | ||
|
||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=[SampleExpenseWorkflow], | ||
activities=[ | ||
mock_create_expense, | ||
mock_ui.create_register_activity(), | ||
mock_payment, | ||
], | ||
): | ||
result = await client.execute_workflow( | ||
SampleExpenseWorkflow.run, | ||
expense_id, | ||
id=workflow_id, | ||
task_queue=task_queue, | ||
) | ||
|
||
assert result == "COMPLETED" | ||
|
||
async def test_workflow_with_mock_server_rejected( | ||
self, client: Client, env: WorkflowEnvironment | ||
): | ||
"""Test complete workflow with mock HTTP server - rejected path""" | ||
task_queue = f"test-mock-server-rejected-{uuid.uuid4()}" | ||
workflow_id = f"test-mock-server-rejected-workflow-{uuid.uuid4()}" | ||
expense_id = "test-mock-server-rejected" | ||
|
||
# Set up mock UI with REJECTED decision | ||
mock_ui = MockExpenseUI(client) | ||
mock_ui.register_workflow(expense_id, workflow_id) | ||
mock_ui.schedule_decision(expense_id, "REJECTED") | ||
|
||
# Mock HTTP responses | ||
responses = { | ||
"/create": "SUCCEED", | ||
"/registerCallback": "SUCCEED", | ||
} | ||
|
||
with patch("httpx.AsyncClient") as mock_client: | ||
|
||
async def mock_request_handler(*args, **kwargs): | ||
mock_response = AsyncMock() | ||
url = args[0] if args else kwargs.get("url", "") | ||
|
||
# Determine response based on URL path | ||
for path, response_text in responses.items(): | ||
if path in url: | ||
mock_response.text = response_text | ||
break | ||
else: | ||
mock_response.text = "NOT_FOUND" | ||
|
||
mock_response.raise_for_status = AsyncMock() | ||
return mock_response | ||
|
||
mock_client_instance = AsyncMock() | ||
mock_client_instance.get.side_effect = mock_request_handler | ||
mock_client_instance.post.side_effect = mock_request_handler | ||
mock_client.return_value.__aenter__.return_value = mock_client_instance | ||
|
||
# Use completely mocked activities | ||
@activity.defn(name="create_expense_activity") | ||
async def mock_create_expense(expense_id: str) -> None: | ||
# Simulated HTTP call logic | ||
return None | ||
|
||
@activity.defn(name="payment_activity") | ||
async def mock_payment(expense_id: str) -> None: | ||
# Simulated HTTP call logic | ||
return None | ||
|
||
async with Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=[SampleExpenseWorkflow], | ||
activities=[ | ||
mock_create_expense, | ||
mock_ui.create_register_activity(), | ||
mock_payment, | ||
], | ||
): | ||
result = await client.execute_workflow( | ||
SampleExpenseWorkflow.run, | ||
expense_id, | ||
id=workflow_id, | ||
task_queue=task_queue, | ||
) | ||
|
||
assert result == "" |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
#!/usr/bin/env python3 | ||
""" | ||
Simple test script to verify HTTP client lifecycle management. | ||
""" | ||
|
||
import asyncio | ||
import os | ||
import sys | ||
|
||
# Add the project root to Python path | ||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | ||
|
||
from expense.activities import ( | ||
cleanup_http_client, | ||
create_expense_activity, | ||
get_http_client, | ||
initialize_http_client, | ||
) | ||
|
||
|
||
async def test_http_client_lifecycle(): | ||
"""Test that HTTP client lifecycle management works correctly.""" | ||
print("Testing HTTP client lifecycle management...") | ||
|
||
# Test 1: Client should not be initialized initially | ||
try: | ||
get_http_client() | ||
print("FAIL: Expected RuntimeError when client not initialized") | ||
return False | ||
except RuntimeError as e: | ||
print(f"PASS: Got expected error when client not initialized: {e}") | ||
|
||
# Test 2: Initialize client | ||
await initialize_http_client() | ||
print("PASS: HTTP client initialized") | ||
|
||
# Test 3: Client should be available now | ||
try: | ||
client = get_http_client() | ||
print(f"PASS: Got HTTP client: {type(client).__name__}") | ||
except Exception as e: | ||
print(f"FAIL: Could not get HTTP client after initialization: {e}") | ||
return False | ||
|
||
# Test 4: Test multiple initializations (should be safe) | ||
await initialize_http_client() | ||
client2 = get_http_client() | ||
if client is client2: | ||
print("PASS: Multiple initializations return same client instance") | ||
else: | ||
print("FAIL: Multiple initializations created different clients") | ||
return False | ||
|
||
# Test 5: Cleanup client | ||
await cleanup_http_client() | ||
print("PASS: HTTP client cleaned up") | ||
|
||
# Test 6: Client should not be available after cleanup | ||
try: | ||
get_http_client() | ||
print("FAIL: Expected RuntimeError after cleanup") | ||
return False | ||
except RuntimeError as e: | ||
print(f"PASS: Got expected error after cleanup: {e}") | ||
|
||
print("\nAll HTTP client lifecycle tests passed!") | ||
return True | ||
|
||
|
||
async def test_activity_integration(): | ||
"""Test that activities can use the HTTP client (mock test).""" | ||
print("\nTesting activity integration...") | ||
|
||
# Initialize client for activities | ||
await initialize_http_client() | ||
|
||
try: | ||
# This will fail because the expense server isn't running, | ||
# but it will test that the HTTP client is accessible | ||
await create_expense_activity("test-expense-123") | ||
print("Unexpected: Activity succeeded (expense server must be running)") | ||
except Exception as e: | ||
# We expect this to fail since expense server isn't running | ||
if "HTTP client not initialized" in str(e): | ||
print("FAIL: HTTP client not accessible in activity") | ||
return False | ||
else: | ||
print( | ||
f"PASS: Activity accessed HTTP client correctly (failed as expected due to no server): {type(e).__name__}" | ||
) | ||
|
||
# Cleanup | ||
await cleanup_http_client() | ||
print("PASS: Activity integration test completed") | ||
return True | ||
|
||
|
||
async def main(): | ||
"""Run all tests.""" | ||
print("=" * 60) | ||
print("HTTP Client Lifecycle Management Tests") | ||
print("=" * 60) | ||
|
||
test1_passed = await test_http_client_lifecycle() | ||
test2_passed = await test_activity_integration() | ||
|
||
print("\n" + "=" * 60) | ||
if test1_passed and test2_passed: | ||
print( | ||
"ALL TESTS PASSED! HTTP client lifecycle management is working correctly." | ||
) | ||
return 0 | ||
else: | ||
print("SOME TESTS FAILED! Please check the implementation.") | ||
return 1 | ||
|
||
|
||
if __name__ == "__main__": | ||
exit_code = asyncio.run(main()) | ||
sys.exit(exit_code) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,377 @@ | ||
from unittest.mock import AsyncMock, MagicMock, patch | ||
|
||
import pytest | ||
from fastapi.testclient import TestClient | ||
|
||
from expense.ui import ExpenseState, all_expenses, app, workflow_map | ||
|
||
|
||
class TestExpenseUI: | ||
"""Test suite for the Expense System UI based on the specification""" | ||
|
||
def setup_method(self): | ||
"""Reset state before each test""" | ||
all_expenses.clear() | ||
workflow_map.clear() | ||
|
||
@pytest.fixture | ||
def client(self): | ||
"""FastAPI test client fixture""" | ||
return TestClient(app) | ||
|
||
def test_list_view_empty(self, client): | ||
"""Test list view with no expenses""" | ||
response = client.get("/") | ||
assert response.status_code == 200 | ||
assert "SAMPLE EXPENSE SYSTEM" in response.text | ||
assert "<table border=1>" in response.text | ||
assert "<th>Expense ID</th>" in response.text | ||
|
||
def test_list_view_with_expenses(self, client): | ||
"""Test list view displaying expenses in sorted order""" | ||
# Setup test data | ||
all_expenses["EXP-003"] = ExpenseState.CREATED | ||
all_expenses["EXP-001"] = ExpenseState.APPROVED | ||
all_expenses["EXP-002"] = ExpenseState.REJECTED | ||
|
||
response = client.get("/list") | ||
assert response.status_code == 200 | ||
|
||
# Check sorted order in HTML | ||
html = response.text | ||
exp001_pos = html.find("EXP-001") | ||
exp002_pos = html.find("EXP-002") | ||
exp003_pos = html.find("EXP-003") | ||
|
||
assert exp001_pos < exp002_pos < exp003_pos | ||
|
||
def test_list_view_action_buttons_only_for_created(self, client): | ||
"""Test that action buttons only appear for CREATED expenses""" | ||
all_expenses["created-expense"] = ExpenseState.CREATED | ||
all_expenses["approved-expense"] = ExpenseState.APPROVED | ||
all_expenses["rejected-expense"] = ExpenseState.REJECTED | ||
all_expenses["completed-expense"] = ExpenseState.COMPLETED | ||
|
||
response = client.get("/") | ||
html = response.text | ||
|
||
# CREATED expense should have buttons | ||
assert "APPROVE" in html | ||
assert "REJECT" in html | ||
assert "created-expense" in html | ||
|
||
# Count actual button elements - should only be for the CREATED expense | ||
approve_count = html.count( | ||
'<button type="submit" style="background-color:#4CAF50;">APPROVE</button>' | ||
) | ||
reject_count = html.count( | ||
'<button type="submit" style="background-color:#f44336;">REJECT</button>' | ||
) | ||
assert approve_count == 1 | ||
assert reject_count == 1 | ||
|
||
def test_create_expense_success_ui(self, client): | ||
"""Test successful expense creation via UI""" | ||
response = client.get("/create?id=new-expense") | ||
assert response.status_code == 200 | ||
assert all_expenses["new-expense"] == ExpenseState.CREATED | ||
assert "SAMPLE EXPENSE SYSTEM" in response.text # Should redirect to list | ||
|
||
def test_create_expense_success_api(self, client): | ||
"""Test successful expense creation via API""" | ||
response = client.get("/create?id=new-expense&is_api_call=true") | ||
assert response.status_code == 200 | ||
assert response.text == "SUCCEED" | ||
assert all_expenses["new-expense"] == ExpenseState.CREATED | ||
|
||
def test_create_expense_duplicate_ui(self, client): | ||
"""Test creating duplicate expense via UI""" | ||
all_expenses["existing"] = ExpenseState.CREATED | ||
|
||
response = client.get("/create?id=existing") | ||
assert response.status_code == 200 | ||
assert response.text == "ID already exists" | ||
|
||
def test_create_expense_duplicate_api(self, client): | ||
"""Test creating duplicate expense via API""" | ||
all_expenses["existing"] = ExpenseState.CREATED | ||
|
||
response = client.get("/create?id=existing&is_api_call=true") | ||
assert response.status_code == 200 | ||
assert response.text == "ERROR:ID_ALREADY_EXISTS" | ||
|
||
def test_status_check_valid_id(self, client): | ||
"""Test status check for valid expense ID""" | ||
all_expenses["test-expense"] = ExpenseState.APPROVED | ||
|
||
response = client.get("/status?id=test-expense") | ||
assert response.status_code == 200 | ||
assert response.text == "APPROVED" | ||
|
||
def test_status_check_invalid_id(self, client): | ||
"""Test status check for invalid expense ID""" | ||
response = client.get("/status?id=nonexistent") | ||
assert response.status_code == 200 | ||
assert response.text == "ERROR:INVALID_ID" | ||
|
||
def test_action_approve_ui(self, client): | ||
"""Test approve action via UI""" | ||
all_expenses["test-expense"] = ExpenseState.CREATED | ||
|
||
with patch("expense.ui.notify_expense_state_change") as mock_notify: | ||
response = client.post( | ||
"/action", data={"type": "approve", "id": "test-expense"} | ||
) | ||
assert response.status_code == 200 | ||
assert all_expenses["test-expense"] == ExpenseState.APPROVED | ||
# Should redirect to list view | ||
assert response.url.path == "/list" | ||
mock_notify.assert_called_once_with("test-expense", ExpenseState.APPROVED) | ||
|
||
def test_action_approve_api(self, client): | ||
"""Test approve action via API""" | ||
all_expenses["test-expense"] = ExpenseState.CREATED | ||
|
||
with patch("expense.ui.notify_expense_state_change") as mock_notify: | ||
response = client.post( | ||
"/action", | ||
data={"type": "approve", "id": "test-expense", "is_api_call": "true"}, | ||
) | ||
assert response.status_code == 200 | ||
assert response.text == "SUCCEED" | ||
assert all_expenses["test-expense"] == ExpenseState.APPROVED | ||
mock_notify.assert_called_once_with("test-expense", ExpenseState.APPROVED) | ||
|
||
def test_action_reject_ui(self, client): | ||
"""Test reject action via UI""" | ||
all_expenses["test-expense"] = ExpenseState.CREATED | ||
|
||
with patch("expense.ui.notify_expense_state_change") as mock_notify: | ||
response = client.post( | ||
"/action", data={"type": "reject", "id": "test-expense"} | ||
) | ||
assert response.status_code == 200 | ||
assert all_expenses["test-expense"] == ExpenseState.REJECTED | ||
# Should redirect to list view | ||
assert response.url.path == "/list" | ||
mock_notify.assert_called_once_with("test-expense", ExpenseState.REJECTED) | ||
|
||
def test_action_payment(self, client): | ||
"""Test payment action""" | ||
all_expenses["test-expense"] = ExpenseState.APPROVED | ||
|
||
response = client.post( | ||
"/action", | ||
data={"type": "payment", "id": "test-expense", "is_api_call": "true"}, | ||
) | ||
assert response.status_code == 200 | ||
assert response.text == "SUCCEED" | ||
assert all_expenses["test-expense"] == ExpenseState.COMPLETED | ||
|
||
def test_action_invalid_id_ui(self, client): | ||
"""Test action with invalid ID via UI""" | ||
response = client.post("/action", data={"type": "approve", "id": "nonexistent"}) | ||
assert response.status_code == 200 | ||
assert response.text == "Invalid ID" | ||
|
||
def test_action_invalid_id_api(self, client): | ||
"""Test action with invalid ID via API""" | ||
response = client.post( | ||
"/action", | ||
data={"type": "approve", "id": "nonexistent", "is_api_call": "true"}, | ||
) | ||
assert response.status_code == 200 | ||
assert response.text == "ERROR:INVALID_ID" | ||
|
||
def test_action_invalid_type_ui(self, client): | ||
"""Test action with invalid type via UI""" | ||
all_expenses["test-expense"] = ExpenseState.CREATED | ||
|
||
response = client.post( | ||
"/action", data={"type": "invalid", "id": "test-expense"} | ||
) | ||
assert response.status_code == 200 | ||
assert response.text == "Invalid action type" | ||
|
||
def test_action_invalid_type_api(self, client): | ||
"""Test action with invalid type via API""" | ||
all_expenses["test-expense"] = ExpenseState.CREATED | ||
|
||
response = client.post( | ||
"/action", | ||
data={"type": "invalid", "id": "test-expense", "is_api_call": "true"}, | ||
) | ||
assert response.status_code == 200 | ||
assert response.text == "ERROR:INVALID_TYPE" | ||
|
||
def test_register_callback_success(self, client): | ||
"""Test successful callback registration""" | ||
all_expenses["test-expense"] = ExpenseState.CREATED | ||
test_token = "deadbeef" | ||
|
||
response = client.post( | ||
"/registerWorkflow?id=test-expense", data={"workflow_id": test_token} | ||
) | ||
assert response.status_code == 200 | ||
assert response.text == "SUCCEED" | ||
assert workflow_map["test-expense"] == test_token | ||
|
||
def test_register_workflow_invalid_id(self, client): | ||
"""Test workflow registration with invalid ID""" | ||
response = client.post( | ||
"/registerWorkflow?id=nonexistent", data={"workflow_id": "workflow-123"} | ||
) | ||
assert response.status_code == 200 | ||
assert response.text == "ERROR:INVALID_ID" | ||
|
||
def test_register_workflow_invalid_state(self, client): | ||
"""Test workflow registration with non-CREATED expense""" | ||
all_expenses["test-expense"] = ExpenseState.APPROVED | ||
|
||
response = client.post( | ||
"/registerWorkflow?id=test-expense", data={"workflow_id": "workflow-123"} | ||
) | ||
assert response.status_code == 200 | ||
assert response.text == "ERROR:INVALID_STATE" | ||
|
||
@pytest.mark.asyncio | ||
async def test_notify_expense_state_change_success(self): | ||
"""Test successful workflow notification""" | ||
# Setup | ||
expense_id = "test-expense" | ||
test_workflow_id = "workflow-123" | ||
workflow_map[expense_id] = test_workflow_id | ||
|
||
# Mock workflow client and workflow handle | ||
mock_handle = AsyncMock() | ||
mock_client = MagicMock() | ||
mock_client.get_workflow_handle.return_value = mock_handle | ||
|
||
with patch("expense.ui.workflow_client", mock_client): | ||
from expense.ui import notify_expense_state_change | ||
|
||
await notify_expense_state_change(expense_id, "APPROVED") | ||
|
||
mock_client.get_workflow_handle.assert_called_once_with(test_workflow_id) | ||
mock_handle.signal.assert_called_once_with( | ||
"expense_decision_signal", "APPROVED" | ||
) | ||
|
||
@pytest.mark.asyncio | ||
async def test_notify_expense_state_change_invalid_id(self): | ||
"""Test workflow notification with invalid expense ID""" | ||
from expense.ui import notify_expense_state_change | ||
|
||
# Should not raise exception for invalid ID | ||
await notify_expense_state_change("nonexistent", "APPROVED") | ||
|
||
@pytest.mark.asyncio | ||
async def test_notify_expense_state_change_client_error(self): | ||
"""Test workflow notification when client fails""" | ||
expense_id = "test-expense" | ||
test_workflow_id = "workflow-123" | ||
workflow_map[expense_id] = test_workflow_id | ||
|
||
mock_client = MagicMock() | ||
mock_client.get_workflow_handle.side_effect = Exception("Client error") | ||
|
||
with patch("expense.ui.workflow_client", mock_client): | ||
from expense.ui import notify_expense_state_change | ||
|
||
# Should not raise exception even if client fails | ||
await notify_expense_state_change(expense_id, "APPROVED") | ||
|
||
def test_state_transitions_complete_workflow(self, client): | ||
"""Test complete expense workflow state transitions""" | ||
expense_id = "workflow-expense" | ||
|
||
# 1. Create expense | ||
response = client.get(f"/create?id={expense_id}&is_api_call=true") | ||
assert response.text == "SUCCEED" | ||
assert all_expenses[expense_id] == ExpenseState.CREATED | ||
|
||
# 2. Register workflow | ||
test_workflow_id = "workflow-123" | ||
response = client.post( | ||
f"/registerWorkflow?id={expense_id}", data={"workflow_id": test_workflow_id} | ||
) | ||
assert response.text == "SUCCEED" | ||
|
||
# 3. Approve expense | ||
with patch("expense.ui.notify_expense_state_change") as mock_notify: | ||
response = client.post( | ||
"/action", | ||
data={"type": "approve", "id": expense_id, "is_api_call": "true"}, | ||
) | ||
assert response.text == "SUCCEED" | ||
assert all_expenses[expense_id] == ExpenseState.APPROVED | ||
mock_notify.assert_called_once_with(expense_id, ExpenseState.APPROVED) | ||
|
||
# 4. Process payment | ||
response = client.post( | ||
"/action", data={"type": "payment", "id": expense_id, "is_api_call": "true"} | ||
) | ||
assert response.text == "SUCCEED" | ||
assert all_expenses[expense_id] == ExpenseState.COMPLETED | ||
|
||
def test_html_response_structure(self, client): | ||
"""Test HTML response contains required elements""" | ||
all_expenses["test-expense"] = ExpenseState.CREATED | ||
|
||
response = client.get("/") | ||
html = response.text | ||
|
||
# Check required HTML elements | ||
assert "<h1>SAMPLE EXPENSE SYSTEM</h1>" in html | ||
assert '<a href="/list">HOME</a>' in html | ||
assert "<table border=1>" in html | ||
assert "<th>Expense ID</th>" in html | ||
assert "<th>Status</th>" in html | ||
assert "<th>Action</th>" in html | ||
assert 'style="background-color:#4CAF50;"' in html # Green approve button | ||
assert 'style="background-color:#f44336;"' in html # Red reject button | ||
|
||
def test_concurrent_operations(self, client): | ||
"""Test handling of concurrent operations""" | ||
import threading | ||
|
||
results = [] | ||
|
||
def create_expense(expense_id): | ||
try: | ||
response = client.get(f"/create?id={expense_id}&is_api_call=true") | ||
results.append((expense_id, response.status_code, response.text)) | ||
except Exception as e: | ||
results.append((expense_id, "error", str(e))) | ||
|
||
# Create multiple expenses concurrently | ||
threads = [] | ||
for i in range(5): | ||
thread = threading.Thread(target=create_expense, args=[f"concurrent-{i}"]) | ||
threads.append(thread) | ||
thread.start() | ||
|
||
for thread in threads: | ||
thread.join() | ||
|
||
# All should succeed | ||
assert len(results) == 5 | ||
for expense_id, status_code, text in results: | ||
assert status_code == 200 | ||
assert text == "SUCCEED" | ||
assert expense_id in all_expenses | ||
|
||
def test_parameter_validation(self, client): | ||
"""Test parameter validation for all endpoints""" | ||
# Missing required parameters | ||
response = client.get("/create") # Missing id | ||
assert response.status_code == 422 # FastAPI validation error | ||
|
||
response = client.post("/action") # Missing type and id | ||
assert response.status_code == 422 | ||
|
||
response = client.get("/status") # Missing id | ||
assert response.status_code == 422 | ||
|
||
response = client.post("/registerWorkflow") # Missing id and workflow_id | ||
assert response.status_code == 422 |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically 4xx is probably a
ApplicationError
withnon_retryable=True
, but that's a bit pedantic. But with this setup, an activity that, say, has invalid auth will retry forever