Skip to content

Commit 4c402da

Browse files
committed
feat: Init distributed-data-stream-aggregator
1 parent 1feff72 commit 4c402da

File tree

9 files changed

+589
-0
lines changed

9 files changed

+589
-0
lines changed
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
# Distributed Data Stream Aggregator
2+
3+
This workflow demonstrates how to aggregate data from multiple third-party locations using a distributed processing pattern with AWS Step Functions. The workflow orchestrates data extraction, transformation, and consolidation at scale using Step Functions, DynamoDB, S3, and AWS Glue.
4+
5+
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.
6+
7+
## Requirements
8+
9+
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
10+
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
11+
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
12+
13+
## Deployment Instructions
14+
15+
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
16+
```
17+
git clone https://github.com/aws-samples/step-functions-workflows-collection
18+
```
19+
2. Change directory to the pattern directory:
20+
```
21+
cd distributed-data-stream-aggregator
22+
```
23+
3. Create required DynamoDB tables:
24+
```bash
25+
# Create locations table
26+
aws dynamodb create-table \
27+
--table-name locations \
28+
--attribute-definitions \
29+
AttributeName=task_id,AttributeType=S \
30+
--key-schema \
31+
AttributeName=task_id,KeyType=HASH \
32+
--billing-mode PAY_PER_REQUEST
33+
34+
# Create task table
35+
aws dynamodb create-table \
36+
--table-name processing-tasks \
37+
--attribute-definitions \
38+
AttributeName=task_id,AttributeType=S \
39+
AttributeName=created_at,AttributeType=S \
40+
--key-schema \
41+
AttributeName=task_id,KeyType=HASH \
42+
AttributeName=created_at,KeyType=RANGE \
43+
--billing-mode PAY_PER_REQUEST
44+
```
45+
46+
4. Create S3 buckets for data processing:
47+
```bash
48+
# Create source bucket for temporary files
49+
aws s3 mb s3://your-data-processing-bucket
50+
51+
# Create destination bucket for final output
52+
aws s3 mb s3://your-output-bucket
53+
```
54+
55+
5. Create AWS Glue job for data consolidation:
56+
```bash
57+
# Create Glue job (replace with your script location)
58+
aws glue create-job \
59+
--name data-aggregation-job \
60+
--role arn:aws:iam::YOUR_ACCOUNT:role/GlueServiceRole \
61+
--command Name=glueetl,ScriptLocation=s3://your-bucket/glue-script.py
62+
```
63+
64+
6. Create HTTP connections for third-party API access:
65+
```bash
66+
# Create EventBridge connection for API access
67+
aws events create-connection \
68+
--name api-connection \
69+
--authorization-type API_KEY \
70+
--auth-parameters "ApiKeyAuthParameters={ApiKeyName=Authorization,ApiKeyValue=Bearer YOUR_TOKEN}"
71+
```
72+
73+
7. Deploy the state machines by updating the placeholder values in each ASL file:
74+
- Replace `'s3-bucket-name'` with your source bucket name
75+
- Replace `'destination_bucket'` with your destination bucket name
76+
- Replace `'api_endpoint'` and `'summary_api_endpoint'` with your API URLs
77+
- Replace `'ConnectionArn'` with your EventBridge connection ARN
78+
- Replace `'glue_job'` with your Glue job name
79+
- Replace `'task_table'` with your task table name
80+
- Replace `'child1'` and `'child2'` with the respective state machine ARNs
81+
82+
8. Create the state machines:
83+
```bash
84+
# Create main state machine
85+
aws stepfunctions create-state-machine \
86+
--name DistributedDataStreamAggregator \
87+
--definition file://statemachine/statemachine.asl.json \
88+
--role-arn arn:aws:iam::YOUR_ACCOUNT:role/StepFunctionsExecutionRole
89+
90+
# Create Data Extraction Child state machine
91+
aws stepfunctions create-state-machine \
92+
--name DataExtractionChild \
93+
--definition file://statemachine/data-extraction-child.asl.json \
94+
--role-arn arn:aws:iam::YOUR_ACCOUNT:role/StepFunctionsExecutionRole
95+
96+
# Create Data Processing Child state machine (Express)
97+
aws stepfunctions create-state-machine \
98+
--name DataProcessingChildExpress \
99+
--definition file://statemachine/data-processing-child.asl.json \
100+
--role-arn arn:aws:iam::YOUR_ACCOUNT:role/StepFunctionsExecutionRole \
101+
--type EXPRESS
102+
```
103+
104+
## How it works
105+
106+
This distributed data stream aggregator implements a three-tier processing architecture:
107+
108+
### Main Workflow (Parent State Machine)
109+
The main workflow accepts a unique task ID and orchestrates the entire data aggregation process. It queries DynamoDB to retrieve various client locations based on the task ID, then uses distributed map iteration to process multiple locations in parallel. Finally, it triggers an AWS Glue job to combine all partial data files and updates the task status in DynamoDB.
110+
111+
### Child Workflow 1 (Standard Execution)
112+
This workflow handles data extraction from third-party locations. It pings locations via HTTP endpoints to verify data availability, processes different types of data (failed, rejected) using inline map iteration, and calls the express child workflow with pagination parameters. Extracted data is stored as JSON files in S3 organized by task ID.
113+
114+
### Child Workflow 2 (Express Execution)
115+
The express workflow handles the actual API calls to third-party endpoints. It receives location details, data type, and pagination parameters, makes HTTP calls with query parameters, formats the retrieved data into standardized JSON format, and returns results with count and pagination metadata.
116+
117+
### Data Consolidation
118+
An AWS Glue job combines all small JSON files from the temporary S3 directory into a single consolidated file, which is uploaded to the destination S3 bucket. The workflow monitors job status and updates the DynamoDB task table upon completion.
119+
120+
## Image
121+
122+
![image](./resources/illustration.png)
123+
124+
## Testing
125+
126+
1. Populate the locations table with test data:
127+
```bash
128+
aws dynamodb put-item \
129+
--table-name locations \
130+
--item '{"task_id": {"S": "example-task-123"}, "location_id": {"S": "location-001"}, "api_url": {"S": "https://api.example.com"}}'
131+
```
132+
133+
2. Execute the state machine with the example input:
134+
```bash
135+
aws stepfunctions start-execution \
136+
--state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DistributedDataStreamAggregator \
137+
--name test-execution-$(date +%s) \
138+
--input file://example-workflow.json
139+
```
140+
141+
3. Monitor the execution in the AWS Step Functions console or via CLI:
142+
```bash
143+
aws stepfunctions describe-execution \
144+
--execution-arn EXECUTION_ARN
145+
```
146+
147+
4. Verify the results by checking the destination S3 bucket for the aggregated CSV file and the task table for updated status.
148+
149+
## Cleanup
150+
151+
1. Delete the state machines:
152+
```bash
153+
aws stepfunctions delete-state-machine --state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DistributedDataStreamAggregator
154+
aws stepfunctions delete-state-machine --state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DataExtractionChild
155+
aws stepfunctions delete-state-machine --state-machine-arn arn:aws:states:REGION:ACCOUNT:stateMachine:DataProcessingChildExpress
156+
```
157+
158+
2. Delete DynamoDB tables:
159+
```bash
160+
aws dynamodb delete-table --table-name locations
161+
aws dynamodb delete-table --table-name processing-tasks
162+
```
163+
164+
3. Delete S3 buckets (ensure they are empty first):
165+
```bash
166+
aws s3 rb s3://your-data-processing-bucket --force
167+
aws s3 rb s3://your-output-bucket --force
168+
```
169+
170+
4. Delete Glue job:
171+
```bash
172+
aws glue delete-job --job-name data-aggregation-job
173+
```
174+
175+
5. Delete EventBridge connection:
176+
```bash
177+
aws events delete-connection --name api-connection
178+
```
179+
180+
----
181+
Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
182+
183+
SPDX-License-Identifier: MIT-0
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"Comment": "Example input for Distributed Data Stream Aggregator workflow",
3+
"task_id": "example-task-123",
4+
"task_sort_key": "2024-01-15T10:30:00Z"
5+
}
152 KB
Loading
49.5 KB
Loading
580 KB
Loading
140 KB
Loading
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
{
2+
"Comment": "Data Extraction Child",
3+
"StartAt": "Assign variables",
4+
"States": {
5+
"Assign variables": {
6+
"Type": "Pass",
7+
"Assign": {
8+
"locationId": "{% $states.input.locationId %}",
9+
"locationDataType": [
10+
"failed",
11+
"rejected"
12+
],
13+
"taskId": "{% $states.input.taskId %}",
14+
"taskToken": "{% $states.input.taskToken %}"
15+
},
16+
"Next": "Iterate states"
17+
},
18+
"Iterate states": {
19+
"Type": "Map",
20+
"ItemProcessor": {
21+
"ProcessorConfig": {
22+
"Mode": "INLINE"
23+
},
24+
"StartAt": "Get data type",
25+
"States": {
26+
"Get data type": {
27+
"Type": "Pass",
28+
"Next": "Get Data",
29+
"Assign": {
30+
"dataType": "{% $states.input%}",
31+
"maxCount": 400,
32+
"offset": 0
33+
}
34+
},
35+
"Get Data": {
36+
"Type": "Task",
37+
"Resource": "arn:aws:states:::states:startExecution.sync:2",
38+
"Arguments": {
39+
"StateMachineArn": "{% 'child2' %}",
40+
"Input": {
41+
"locationId": "{% $locationId %}",
42+
"dataType": "{% $dataType %}",
43+
"limit": "{% $maxCount %}",
44+
"offset": "{% $offset %}",
45+
"AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID": "{% $states.context.Execution.Id %}",
46+
"childTaskToken": "{% $states.context.Task.Token %}"
47+
}
48+
},
49+
"Next": "Data count",
50+
"Assign": {
51+
"resultCount": "{% $states.result.resultCount %}",
52+
"offset": "{% $states.result.offset %}"
53+
}
54+
},
55+
"Data count": {
56+
"Type": "Choice",
57+
"Choices": [
58+
{
59+
"Next": "PutObject",
60+
"Condition": "{% $count($states.input.csvContent) > 0 %}",
61+
"Output": {
62+
"data": "{% $states.input.csvContent %}"
63+
}
64+
}
65+
],
66+
"Default": "No data found"
67+
},
68+
"PutObject": {
69+
"Type": "Task",
70+
"Arguments": {
71+
"Body": "{% $string($states.input.data) %}",
72+
"Bucket": "{% 's3-bucket-name' %}",
73+
"Key": "{% 'tmp/'&$taskId&'/'&'part-'&$millis()&'.json' %}",
74+
"ContentType": "application/json"
75+
},
76+
"Resource": "arn:aws:states:::aws-sdk:s3:putObject",
77+
"Next": "Check pagination"
78+
},
79+
"No data found": {
80+
"Type": "Pass",
81+
"End": true
82+
},
83+
"Check pagination": {
84+
"Type": "Choice",
85+
"Choices": [
86+
{
87+
"Next": "Do nothing",
88+
"Condition": "{% $resultCount < $maxCount %}"
89+
}
90+
],
91+
"Default": "Get Data"
92+
},
93+
"Do nothing": {
94+
"Type": "Pass",
95+
"End": true
96+
}
97+
}
98+
},
99+
"Items": "{% $locationDataType %}",
100+
"Next": "SendTaskSuccess"
101+
},
102+
"SendTaskSuccess": {
103+
"Type": "Task",
104+
"Arguments": {
105+
"Output": "{\"status\": \" complete\"}",
106+
"TaskToken": "{% $taskToken %}"
107+
},
108+
"Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskSuccess",
109+
"End": true
110+
}
111+
},
112+
"QueryLanguage": "JSONata"
113+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
{
2+
"Comment": "Data Processing Child",
3+
"StartAt": "initiate variables",
4+
"States": {
5+
"initiate variables": {
6+
"Type": "Pass",
7+
"Assign": {
8+
"locationId": "{% $states.input.locationId %}",
9+
"dataType": "{% $states.input.dataType %}",
10+
"limit": "{% $states.input.limit %}",
11+
"offset": "{% $states.input.offset %}",
12+
"childTaskToken": "{% $states.input.childTaskToken %}"
13+
},
14+
"Next": "Get Data"
15+
},
16+
"Get Data": {
17+
"Type": "Task",
18+
"Resource": "arn:aws:states:::http:invoke",
19+
"Arguments": {
20+
"ApiEndpoint": "{% 'api_endpoint' %}",
21+
"Method": "GET",
22+
"InvocationConfig": {
23+
"ConnectionArn": "{% 'ConnectionArn' %}"
24+
},
25+
"QueryParameters": {
26+
"state": "{% $dataType %}",
27+
"limit": "{% $limit %}",
28+
"offset": "{% $offset %}"
29+
}
30+
},
31+
"Retry": [
32+
{
33+
"ErrorEquals": [
34+
"States.ALL"
35+
],
36+
"BackoffRate": 2,
37+
"IntervalSeconds": 1,
38+
"MaxAttempts": 3,
39+
"JitterStrategy": "FULL"
40+
}
41+
],
42+
"Next": "Convert Page to JSON content",
43+
"Assign": {
44+
"resultCount": "{% $states.result.ResponseBody.count %}",
45+
"nextOffset": "{% $states.result.ResponseBody.count+$number($offset) %}"
46+
}
47+
},
48+
"Convert Page to JSON content": {
49+
"Type": "Map",
50+
"ItemProcessor": {
51+
"ProcessorConfig": {
52+
"Mode": "INLINE"
53+
},
54+
"StartAt": "Format Row",
55+
"States": {
56+
"Format Row": {
57+
"Type": "Pass",
58+
"Output": {
59+
"discount_code": "{% $states.input.resourceKey %}",
60+
"error_message": "{% $replace($states.input.errors[0].message, '\"', '') %}",
61+
"status": "{% $states.input.state %}"
62+
},
63+
"End": true
64+
}
65+
}
66+
},
67+
"Items": "{% $states.input.ResponseBody.results %}",
68+
"Next": "output"
69+
},
70+
"output": {
71+
"Type": "Pass",
72+
"Output": {
73+
"csvContent": "{% $states.input %}",
74+
"resultCount": "{% $resultCount %}",
75+
"offset": "{% $nextOffset %}"
76+
},
77+
"Next": "SendTaskSuccess"
78+
},
79+
"SendTaskSuccess": {
80+
"Type": "Task",
81+
"Arguments": {
82+
"Output": "{% $states.input %}",
83+
"TaskToken": "{% $childTaskToken %}"
84+
},
85+
"Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskSuccess",
86+
"End": true
87+
}
88+
},
89+
"QueryLanguage": "JSONata"
90+
}

0 commit comments

Comments
 (0)