-
Notifications
You must be signed in to change notification settings - Fork 134
New Workflow Submit: Distributed Data Stream Aggregator workflow with 3-tier architecture #404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
91fcbdb
82a5556
e6ada31
6333837
e0bb0c1
6ad506b
eb43aca
91bf342
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
# Distributed Data Stream Aggregator | ||
|
||
This workflow demonstrates how to aggregate data from multiple third-party locations using a distributed processing pattern with AWS Step Functions. The workflow orchestrates data extraction, transformation, and consolidation at scale using Step Functions, DynamoDB, S3, and AWS Glue. | ||
|
||
**Key Features:** This approach is unique as it implements a **no-Lambda, low-code solution** that leverages native AWS service integrations and JSONata expressions within Step Functions. The only custom code required is a minimal Glue job script for data consolidation, making it a cost-effective and maintainable solution with fewer dependencies and no Lambda cold start overhead. | ||
|
||
|
||
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. | ||
|
||
## Requirements | ||
|
||
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. | ||
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured | ||
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) | ||
|
||
## Deployment Instructions | ||
|
||
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: | ||
``` | ||
git clone https://github.com/aws-samples/step-functions-workflows-collection | ||
``` | ||
2. Change directory to the pattern directory: | ||
``` | ||
cd distributed-data-stream-aggregator | ||
``` | ||
3. Create required DynamoDB tables: | ||
```bash | ||
# Create locations table | ||
aws dynamodb create-table \ | ||
--table-name locations \ | ||
--attribute-definitions \ | ||
AttributeName=task_id,AttributeType=S \ | ||
--key-schema \ | ||
AttributeName=task_id,KeyType=HASH \ | ||
--billing-mode PAY_PER_REQUEST | ||
|
||
# Create task table | ||
aws dynamodb create-table \ | ||
--table-name processing-tasks \ | ||
--attribute-definitions \ | ||
AttributeName=task_id,AttributeType=S \ | ||
AttributeName=created_at,AttributeType=S \ | ||
--key-schema \ | ||
AttributeName=task_id,KeyType=HASH \ | ||
AttributeName=created_at,KeyType=RANGE \ | ||
--billing-mode PAY_PER_REQUEST | ||
``` | ||
|
||
4. Create S3 buckets for data processing: | ||
```bash | ||
# Create source bucket for temporary files | ||
aws s3 mb s3://your-data-processing-bucket | ||
|
||
# Create destination bucket for final output | ||
aws s3 mb s3://your-output-bucket | ||
``` | ||
|
||
5. Create AWS Glue job for data consolidation: | ||
```bash | ||
# Create Glue job (replace with your script location) | ||
aws glue create-job \ | ||
--name data-aggregation-job \ | ||
--role arn:aws:iam::YOUR_ACCOUNT:role/GlueServiceRole \ | ||
--command Name=glueetl,ScriptLocation=s3://your-bucket/glue-script.py | ||
|
||
``` | ||
|
||
6. Create HTTP connections for third-party API access: | ||
|
||
```bash | ||
# Create EventBridge connection for API access | ||
aws events create-connection \ | ||
--name api-connection \ | ||
--authorization-type API_KEY \ | ||
--auth-parameters "ApiKeyAuthParameters={ApiKeyName=Authorization,ApiKeyValue=Bearer YOUR_TOKEN}" | ||
``` | ||
|
||
7. Deploy the state machines by updating the placeholder values in each ASL file: | ||
- Replace `'s3-bucket-name'` with your source bucket name | ||
- Replace `'destination_bucket'` with your destination bucket name | ||
- Replace `'api_endpoint'` and `'summary_api_endpoint'` with your API URLs | ||
|
||
- Replace `'ConnectionArn'` with your EventBridge connection ARN | ||
- Replace `'glue_job'` with your Glue job name | ||
- Replace `'task_table'` with your task table name | ||
- Replace `'Data Extraction Child'` and `'Data Processing Child'` with the respective state machine ARNs | ||
|
||
8. Create the state machines: | ||
```bash | ||
# Create Distributed Data Stream Aggregator state machine | ||
aws stepfunctions create-state-machine \ | ||
--name DistributedDataStreamAggregator \ | ||
--definition file://statemachine/statemachine.asl.json \ | ||
--role-arn arn:aws:iam::YOUR_ACCOUNT:role/StepFunctionsExecutionRole | ||
|
||
# Create Data Extraction Child state machine | ||
aws stepfunctions create-state-machine \ | ||
--name DataExtractionChild \ | ||
--definition file://statemachine/data-extraction-child.asl.json \ | ||
--role-arn arn:aws:iam::YOUR_ACCOUNT:role/StepFunctionsExecutionRole | ||
|
||
# Create Data Processing Child state machine (Express) | ||
aws stepfunctions create-state-machine \ | ||
--name DataProcessingChildExpress \ | ||
--definition file://statemachine/data-processing-child.asl.json \ | ||
--role-arn arn:aws:iam::YOUR_ACCOUNT:role/StepFunctionsExecutionRole \ | ||
--type EXPRESS | ||
``` | ||
|
||
## How it works | ||
|
||
This distributed data stream aggregator implements a three-tier processing architecture: | ||
|
||
### Distributed Data Stream Aggregator Workflow (Parent State Machine) | ||
The main workflow accepts a unique task ID and orchestrates the entire data aggregation process. It queries DynamoDB to retrieve various client locations based on the task ID, then uses distributed map iteration to process multiple locations in parallel. Finally, it triggers an AWS Glue job to combine all partial data files and updates the task status in DynamoDB. | ||
|
||
### Data Extraction Workflow (Standard Execution) | ||
This workflow handles data extraction from third-party locations. It pings locations via HTTP endpoints to verify data availability, processes different types of data (failed, rejected) using inline map iteration, and calls the express child workflow with pagination parameters. Extracted data is stored as JSON files in S3 organized by task ID. | ||
|
||
### Data Processing Workflow (Express Execution) | ||
The express workflow handles the actual API calls to third-party endpoints. It receives location details, data type, and pagination parameters, makes HTTP calls with query parameters, formats the retrieved data into standardized JSON format, and returns results with count and pagination metadata. | ||
|
||
### Data Consolidation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't Step Functions Distributed Map be an alternative to this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The aggregation phase is a reduce step across all items, which the Distributed Map isn’t designed to handle directly. I write per-item results as part files and use a Glue job to merge them. This also helps avoid Step Functions’ 256 KB state payload limit by keeping large intermediate data out of the state. |
||
An AWS Glue job combines all small JSON files from the temporary S3 directory into a single consolidated file, which is uploaded to the destination S3 bucket. The workflow monitors job status and updates the DynamoDB task table upon completion. | ||
|
||
## Image | ||
|
||
 | ||
|
||
## Testing | ||
|
||
1. Populate the locations table with test data: | ||
```bash | ||
aws dynamodb put-item \ | ||
--table-name locations \ | ||
--item '{"task_id": {"S": "example-task-123"}, "location_id": {"S": "location-001"}, "api_url": {"S": "https://api.example.com"}}' | ||
``` | ||
|
||
2. Execute the state machine with the example input: | ||
```bash | ||
aws stepfunctions start-execution \ | ||
--state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DistributedDataStreamAggregator \ | ||
--name test-execution-$(date +%s) \ | ||
--input '{"task_id": "data-aggregation-20241215-batch-001", "task_sort_key": "2024-12-15T14:30:00.000Z"}' | ||
``` | ||
|
||
3. Monitor the execution in the AWS Step Functions console or via CLI: | ||
```bash | ||
aws stepfunctions describe-execution \ | ||
--execution-arn EXECUTION_ARN | ||
``` | ||
|
||
4. Verify the results by checking the destination S3 bucket for the aggregated CSV file and the task table for updated status. | ||
|
||
## Cleanup | ||
|
||
1. Delete the state machines: | ||
```bash | ||
aws stepfunctions delete-state-machine --state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DistributedDataStreamAggregator | ||
aws stepfunctions delete-state-machine --state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DataExtractionChild | ||
aws stepfunctions delete-state-machine --state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DataProcessingChildExpress | ||
``` | ||
|
||
2. Delete DynamoDB tables: | ||
```bash | ||
aws dynamodb delete-table --table-name locations | ||
aws dynamodb delete-table --table-name processing-tasks | ||
``` | ||
|
||
3. Delete S3 buckets (ensure they are empty first): | ||
```bash | ||
aws s3 rb s3://your-data-processing-bucket --force | ||
aws s3 rb s3://your-output-bucket --force | ||
``` | ||
|
||
4. Delete Glue job: | ||
```bash | ||
aws glue delete-job --job-name data-aggregation-job | ||
``` | ||
|
||
5. Delete EventBridge connection: | ||
```bash | ||
aws events delete-connection --name api-connection | ||
``` | ||
|
||
---- | ||
Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
treadyaparna marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
SPDX-License-Identifier: MIT-0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
{ | ||
"title": "Distributed Data Stream Aggregator", | ||
"description": "Aggregate data from multiple third-party locations using distributed processing with 3-tier architecture", | ||
"language": "", | ||
"simplicity": "3 - Application", | ||
"usecase": "Data Processing", | ||
"type": "Standard", | ||
"diagram": "/resources/illustration.png", | ||
"videoId": "", | ||
"level": "200", | ||
"framework": "Manual", | ||
"services": ["stepfunctions", "dynamodb", "s3", "glue", "eventbridge"], | ||
"introBox": { | ||
"headline": "How it works", | ||
"text": [ | ||
"This workflow demonstrates large-scale data aggregation from multiple third-party locations using AWS Step Functions' distributed processing capabilities with a 3-tier architecture.", | ||
"The main workflow orchestrates the entire process by querying DynamoDB for client locations, then uses distributed map to process multiple locations in parallel. Each location is processed by a standard execution child workflow that handles data extraction and pagination.", | ||
"A second express execution child workflow performs the actual API calls to third-party endpoints with query parameters and pagination support. Data is temporarily stored in S3 as JSON files organized by task ID.", | ||
"Finally, an AWS Glue job consolidates all partial files into a single output file uploaded to the destination S3 bucket, with status updates tracked in DynamoDB." | ||
] | ||
}, | ||
"testing": { | ||
"headline": "Testing", | ||
"text": [ | ||
"1. Populate the locations DynamoDB table with test data containing task_id and location information.", | ||
"2. Execute the state machine using the AWS CLI with task_id and task_sort_key as input.", | ||
"3. Monitor execution progress in the Step Functions console and verify data consolidation in S3." | ||
] | ||
}, | ||
"cleanup": { | ||
"headline": "Cleanup", | ||
"text": [ | ||
"1. Delete the state machines using AWS CLI: <code>aws stepfunctions delete-state-machine</code>", | ||
"2. Delete DynamoDB tables: <code>aws dynamodb delete-table</code>", | ||
"3. Delete S3 buckets: <code>aws s3 rb --force</code>", | ||
"4. Delete Glue job and EventBridge connection" | ||
] | ||
}, | ||
"deploy": { | ||
"text": [ | ||
"Follow the step-by-step deployment instructions in the README.md to create DynamoDB tables, S3 buckets, Glue job, EventBridge connections, and deploy the state machines." | ||
] | ||
}, | ||
"gitHub": { | ||
"template": { | ||
"repoURL": "https://github.com/aws-samples/step-functions-workflows-collection/tree/main/distributed-data-stream-aggregator/", | ||
"templateDir": "distributed-data-stream-aggregator", | ||
"templateFile": "", | ||
"ASL": "statemachine/statemachine.asl.json" | ||
}, | ||
"payloads": [ | ||
{ | ||
"headline": "", | ||
"payloadURL": "" | ||
} | ||
] | ||
}, | ||
"resources": { | ||
"headline": "Additional resources", | ||
"bullets": [ | ||
{ | ||
"text": "The AWS Step Functions Workshop", | ||
"link": "https://catalog.workshops.aws/stepfunctions/en-US" | ||
}, | ||
{ | ||
"text": "Distributed Map state documentation", | ||
"link": "https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-map-state.html" | ||
}, | ||
{ | ||
"text": "JSONata expressions in Step Functions", | ||
"link": "https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-jsonata.html" | ||
} | ||
] | ||
}, | ||
"authors": [ | ||
{ | ||
"name": "Aparna Saha" | ||
} | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
{ | ||
"Comment": "Data Extraction Child", | ||
"StartAt": "Assign variables", | ||
"States": { | ||
"Assign variables": { | ||
"Type": "Pass", | ||
"Assign": { | ||
"locationId": "{% $states.input.locationId %}", | ||
"locationDataType": [ | ||
"failed", | ||
"rejected" | ||
], | ||
"taskId": "{% $states.input.taskId %}", | ||
"taskToken": "{% $states.input.taskToken %}" | ||
}, | ||
"Next": "Iterate states" | ||
}, | ||
"Iterate states": { | ||
"Type": "Map", | ||
"ItemProcessor": { | ||
"ProcessorConfig": { | ||
"Mode": "INLINE" | ||
}, | ||
"StartAt": "Get data type", | ||
"States": { | ||
"Get data type": { | ||
"Type": "Pass", | ||
"Next": "Get Data", | ||
"Assign": { | ||
"dataType": "{% $states.input%}", | ||
"maxCount": 400, | ||
"offset": 0 | ||
} | ||
}, | ||
"Get Data": { | ||
"Type": "Task", | ||
"Resource": "arn:aws:states:::states:startExecution.sync:2", | ||
"Arguments": { | ||
"StateMachineArn": "{% 'child2' %}", | ||
"Input": { | ||
"locationId": "{% $locationId %}", | ||
"dataType": "{% $dataType %}", | ||
"limit": "{% $maxCount %}", | ||
"offset": "{% $offset %}", | ||
"AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID": "{% $states.context.Execution.Id %}", | ||
"childTaskToken": "{% $states.context.Task.Token %}" | ||
} | ||
}, | ||
"Next": "Data count", | ||
"Assign": { | ||
"resultCount": "{% $states.result.resultCount %}", | ||
"offset": "{% $states.result.offset %}" | ||
} | ||
}, | ||
"Data count": { | ||
"Type": "Choice", | ||
"Choices": [ | ||
{ | ||
"Next": "PutObject", | ||
"Condition": "{% $count($states.input.csvContent) > 0 %}", | ||
"Output": { | ||
"data": "{% $states.input.csvContent %}" | ||
} | ||
} | ||
], | ||
"Default": "No data found" | ||
}, | ||
"PutObject": { | ||
"Type": "Task", | ||
"Arguments": { | ||
"Body": "{% $string($states.input.data) %}", | ||
"Bucket": "{% 's3-bucket-name' %}", | ||
"Key": "{% 'tmp/'&$taskId&'/'&'part-'&$millis()&'.json' %}", | ||
"ContentType": "application/json" | ||
}, | ||
"Resource": "arn:aws:states:::aws-sdk:s3:putObject", | ||
"Next": "Check pagination" | ||
}, | ||
"No data found": { | ||
"Type": "Pass", | ||
"End": true | ||
}, | ||
"Check pagination": { | ||
"Type": "Choice", | ||
"Choices": [ | ||
{ | ||
"Next": "Do nothing", | ||
"Condition": "{% $resultCount < $maxCount %}" | ||
} | ||
], | ||
"Default": "Get Data" | ||
}, | ||
"Do nothing": { | ||
"Type": "Pass", | ||
"End": true | ||
} | ||
} | ||
}, | ||
"Items": "{% $locationDataType %}", | ||
"Next": "SendTaskSuccess" | ||
}, | ||
"SendTaskSuccess": { | ||
"Type": "Task", | ||
"Arguments": { | ||
"Output": "{\"status\": \" complete\"}", | ||
"TaskToken": "{% $taskToken %}" | ||
}, | ||
"Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskSuccess", | ||
"End": true | ||
} | ||
}, | ||
"QueryLanguage": "JSONata" | ||
} |
Uh oh!
There was an error while loading. Please reload this page.