Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions distributed-data-stream-aggregator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Distributed Data Stream Aggregator

This workflow demonstrates how to aggregate data from multiple third-party locations using a distributed processing pattern with AWS Step Functions. The workflow orchestrates data extraction, transformation, and consolidation at scale using Step Functions, Amazon DynamoDB, Amazon S3, and AWS Glue.

**Key Features:** This approach is unique as it implements a **no-Lambda, low-code solution** that leverages native AWS service integrations and JSONata expressions within Step Functions. The only custom code required is a minimal Glue job script for data consolidation, making it a cost-effective and maintainable solution with fewer dependencies and no Lambda cold start overhead.


Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/step-functions-workflows-collection
```
2. Change directory to the pattern directory:
```
cd distributed-data-stream-aggregator
```
3. Create required DynamoDB tables:
```bash
# Create locations table
aws dynamodb create-table \
--table-name locations \
--attribute-definitions \
AttributeName=task_id,AttributeType=S \
--key-schema \
AttributeName=task_id,KeyType=HASH \
--billing-mode PAY_PER_REQUEST

# Create task table
aws dynamodb create-table \
--table-name processing-tasks \
--attribute-definitions \
AttributeName=task_id,AttributeType=S \
AttributeName=created_at,AttributeType=S \
--key-schema \
AttributeName=task_id,KeyType=HASH \
AttributeName=created_at,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST
```

4. Create S3 buckets for data processing:
```bash
# Create source bucket for temporary files
aws s3 mb s3://your-data-processing-bucket

# Create destination bucket for final output
aws s3 mb s3://your-output-bucket
```

5. Upload the Glue job script to S3:
```bash
# Upload the Glue script to your bucket
aws s3 cp app.py s3://your-data-processing-bucket/scripts/app.py
```

6. Create AWS Glue job for data consolidation:
```bash
# Create Glue job
aws glue create-job \
--name data-aggregation-job \
--role arn:aws:iam::YOUR_ACCOUNT:role/GlueServiceRole \
--command Name=glueetl,ScriptLocation=s3://your-data-processing-bucket/scripts/app.py
```

7. Create HTTP connections for third-party API access:
This workflow uses EventBridge connections to securely authenticate with third-party APIs.
Configure the connection based on your API's authentication requirements.

See the [AWS EventBridge Connections documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-connections.html)
for supported authentication methods (API Key, OAuth, Basic Auth, IAM Role).

Example for API Key authentication:

```bash
aws events create-connection \
--name api-connection \
--authorization-type API_KEY \
--auth-parameters "ApiKeyAuthParameters={ApiKeyName=Authorization,ApiKeyValue=Bearer YOUR_TOKEN}"
```

8. Deploy the state machines by updating the placeholder values in each ASL file:
- Replace `'s3-bucket-name'` with your source bucket name
- Replace `'api_endpoint'` with your API URL
- Replace `'ConnectionArn'` with your EventBridge connection ARN
- Replace `'data-aggregation-job'` with your Glue job name
- Replace `'processing-tasks'` with your task table name
- Replace `'Data Extraction Child'` and `'Data Processing Child'` with the respective state machine ARNs

9. Create the state machines:
```bash
# Create Distributed Data Stream Aggregator state machine
aws stepfunctions create-state-machine \
--name DistributedDataStreamAggregator \
--definition file://statemachine/statemachine.asl.json \
--role-arn arn:aws:iam::YOUR_ACCOUNT:role/StepFunctionsExecutionRole

# Create Data Extraction Child state machine
aws stepfunctions create-state-machine \
--name DataExtractionChild \
--definition file://statemachine/data-extraction-child.asl.json \
--role-arn arn:aws:iam::YOUR_ACCOUNT:role/StepFunctionsExecutionRole

# Create Data Processing Child state machine (Express)
aws stepfunctions create-state-machine \
--name DataProcessingChildExpress \
--definition file://statemachine/data-processing-child.asl.json \
--role-arn arn:aws:iam::YOUR_ACCOUNT:role/StepFunctionsExecutionRole \
--type EXPRESS
```

## How it works

This distributed data stream aggregator implements a three-tier processing architecture:

### Distributed Data Stream Aggregator Workflow (Parent State Machine)
The main workflow accepts a unique task ID and orchestrates the entire data aggregation process. It queries DynamoDB to retrieve various client locations based on the task ID, then uses distributed map iteration to process multiple locations in parallel. Finally, it triggers an AWS Glue job to combine all partial data files and updates the task status in DynamoDB.

### Data Extraction Workflow (Standard Execution)
This workflow handles data extraction from third-party locations. It pings locations via HTTP endpoints to verify data availability, processes different types of data (failed, rejected) using inline map iteration, and calls the express child workflow with pagination parameters. Extracted data is stored as JSON files in S3 organized by task ID.

### Data Processing Workflow (Express Execution)
The express workflow handles the actual API calls to third-party endpoints. It receives location details, data type, and pagination parameters, makes HTTP calls with query parameters, formats the retrieved data into standardized JSON format, and returns results with count and pagination metadata.

### Data Consolidation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't Step Functions Distributed Map be an alternative to this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The aggregation phase is a reduce step across all items, which the Distributed Map isn’t designed to handle directly. I write per-item results as part files and use a Glue job to merge them. This also helps avoid Step Functions’ 256 KB state payload limit by keeping large intermediate data out of the state.

An AWS Glue job combines all small JSON files from the temporary S3 directory into a single consolidated file, which is uploaded to the destination S3 bucket. The workflow monitors job status and updates the DynamoDB task table upon completion.

## Image

![image](./resources/illustration.png)

## Testing

1. Populate the locations table with test data:
```bash
aws dynamodb put-item \
--table-name locations \
--item '{"task_id": {"S": "example-task-123"}, "location_id": {"S": "location-001"}, "api_url": {"S": "https://api.example.com"}}'
```

2. Execute the state machine with the example input:
```bash
aws stepfunctions start-execution \
--state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DistributedDataStreamAggregator \
--name test-execution-$(date +%s) \
--input '{"task_id": "data-aggregation-20241215-batch-001", "task_sort_key": "2024-12-15T14:30:00.000Z"}'
```

3. Monitor the execution in the AWS Step Functions console or via CLI:
```bash
aws stepfunctions describe-execution \
--execution-arn EXECUTION_ARN
```

4. Verify the results by checking the destination S3 bucket for the aggregated CSV file and the task table for updated status.

## Cleanup

1. Delete the state machines:
```bash
aws stepfunctions delete-state-machine --state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DistributedDataStreamAggregator
aws stepfunctions delete-state-machine --state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DataExtractionChild
aws stepfunctions delete-state-machine --state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DataProcessingChildExpress
```

2. Delete DynamoDB tables:
```bash
aws dynamodb delete-table --table-name locations
aws dynamodb delete-table --table-name processing-tasks
```

3. Delete S3 buckets (ensure they are empty first):
```bash
aws s3 rb s3://your-data-processing-bucket --force
aws s3 rb s3://your-output-bucket --force
```

4. Delete Glue job:
```bash
aws glue delete-job --job-name data-aggregation-job
```

5. Delete EventBridge connection:
```bash
aws events delete-connection --name api-connection
```

----
Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
153 changes: 153 additions & 0 deletions distributed-data-stream-aggregator/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import sys
import boto3
import pandas as pd
import os
import json
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext
import logging

# Configure the logger
logging.basicConfig(level=logging.ERROR)
log = logging.getLogger(__name__)

def get_s3_client():
"""Returns the S3 client"""
return boto3.client("s3")


def get_input_files(s3_client, bucket, input_prefix):
"""List JSON files in the input path"""
response = s3_client.list_objects_v2(Bucket=bucket, Prefix=input_prefix)
return [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith(".json")]


def read_json_from_s3(s3_client, bucket, file_key):
"""Read JSON data from a file in S3"""
obj = s3_client.get_object(Bucket=bucket, Key=file_key)
json_content = obj['Body'].read().decode('utf-8')
decoded_json = json.loads(json_content) # First decode
if isinstance(decoded_json, str):
decoded_json = json.loads(decoded_json) # Second decode if still a string
return decoded_json

def merge_json_data(files, s3_client, bucket):
"""Merge multiple JSON files into a single DataFrame"""
dataframes = []
for file in files:
try:
decoded_json = read_json_from_s3(s3_client, bucket, file)
if isinstance(decoded_json, list):
json_df = pd.DataFrame(decoded_json)
dataframes.append(json_df)
else:
log.warning(f"Unexpected JSON format in file {file}, skipping.")
except json.JSONDecodeError as e:
log.error(f"Error decoding JSON from file {file}: {e}")
raise e
if dataframes:
return pd.concat(dataframes, ignore_index=True)
else:
return None


def save_to_csv(dataframe, output_file):
"""Save DataFrame to CSV"""
try:
dataframe.to_csv(output_file, index=False, sep=",", quoting=1, escapechar="\\")
except Exception as e:
log.error(f"Error saving merged file locally: {e}")
sys.exit(1)


def upload_to_s3(s3_client, local_file, bucket, output_file, task_id):
"""Upload the CSV file to S3"""
try:
s3_client.upload_file(
local_file, bucket, output_file,
ExtraArgs={"Metadata": {"task_id": task_id}}
)
log.debug(f"Merged CSV uploaded to s3://{bucket}/{output_file}")
except Exception as e:
log.error(f"Error uploading file to S3: {e}")
sys.exit(1)


def delete_s3_files(s3_client, bucket, files):
"""Delete processed files from S3"""
try:
for file in files:
s3_client.delete_object(Bucket=bucket, Key=file)
log.debug(f"Deleted: s3://{bucket}/{file}")
except Exception as e:
log.error(f"Error deleting temporary files: {e}")
sys.exit(1)


def cleanup_local_file(local_output_file):
"""Remove the local output file"""
if os.path.exists(local_output_file):
os.remove(local_output_file)


def main():
# Avoid multiple SparkContext initialization
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Get parameters
args = getResolvedOptions(sys.argv, ["task_id", "bucket", "output_file_name"])
task_id = args["task_id"]
bucket = args["bucket"]
output_file = args["output_file_name"]
local_output_file = f"/tmp/{output_file}"

# S3 Client
s3_client = get_s3_client()

# Define input path in S3
input_prefix = f"tmp/{task_id}/"

# List all JSON part files under the input path
files = get_input_files(s3_client, bucket, input_prefix)

if not files:
log.debug("No JSON files found to process.")
sys.exit("No JSON files found to process.") # Exit with a message

log.debug(f"Processing JSON files: {files}")

# Merge the JSON files into a single DataFrame
merged_df = merge_json_data(files, s3_client, bucket)

if merged_df is None:
log.debug("No valid JSON data to merge.")
sys.exit("No valid JSON data to merge.") # Exit with a message

# Check column count before assigning headers
expected_columns = ["discount_code", "error_message", "status"]
if merged_df.shape[1] == len(expected_columns):
merged_df.columns = expected_columns

# Save merged CSV to local storage
save_to_csv(merged_df, local_output_file)

# Upload merged CSV to S3
upload_to_s3(s3_client, local_output_file, bucket, output_file, task_id)

# Delete temporary files from S3
delete_s3_files(s3_client, bucket, files)

# Cleanup local temp file if it exists
cleanup_local_file(local_output_file)

# Print the output file location (for Step Function)
log.debug(f"Output file: s3://{bucket}/{output_file}")

log.debug("Process completed successfully!")


if __name__ == "__main__":
main()
Loading