Lab: Orchestrating Serverless Data Pipelines with AWS Step Functions
Pipeline Orchestration and Programming
Lab: Orchestrating Serverless Data Pipelines with AWS Step Functions
In this lab, you will build a serverless data orchestration pipeline. Following the AWS Certified Data Engineer (DEA-C01) curriculum, you will use AWS Step Functions to coordinate a workflow that processes data uploaded to Amazon S3, transforms it using AWS Lambda, and notifies you of the results via Amazon SNS.
[!WARNING] This lab involves creating resources that may incur costs if not deleted. Remember to run the teardown commands at the end.
Prerequisites
- AWS Account: Active account with permissions to manage S3, Lambda, Step Functions, SNS, and IAM.
- AWS CLI: Installed and configured with
aws configure. - Python 3.x: Basic familiarity with Python syntax.
- IAM Permissions: Your user must have
AdministratorAccessor equivalent for this lab environment.
Learning Objectives
- Design a resilient State Machine using Amazon States Language (ASL).
- Configure an event-driven trigger using Amazon EventBridge and S3.
- Implement a "Task" state that invokes a serverless Lambda function.
- Build error-handling logic into a data pipeline.
Architecture Overview
Step-by-Step Instructions
Step 1: Create the Notification Channel (SNS)
Before building the logic, we need a way to receive status updates.
# Create the SNS Topic
aws sns create-topic --name lab-pipeline-notifications
# Subscribe your email (Replace <YOUR_EMAIL>)
aws sns subscribe \
--topic-arn arn:aws:sns:us-east-1:<YOUR_ACCOUNT_ID>:lab-pipeline-notifications \
--protocol email \
--notification-endpoint <YOUR_EMAIL>[!IMPORTANT] You must click the "Confirm Subscription" link in the email sent to you before messages will arrive.
▶Console alternative
Navigate to
. Select
, name it
lab-pipeline-notifications, and click
. Then, select
, choose
, and enter your address.
Step 2: Create the Transformation Lambda
This function will simulate data transformation (e.g., converting CSV to JSON or cleaning strings).
- Save the following code as
transform_function.py:
import json
def lambda_handler(event, context):
# Simulate transformation logic
raw_data = event.get('detail', {}).get('object', {}).get('key', 'unknown')
print(f"Processing file: {raw_data}")
return {
'statusCode': 200,
'body': json.dumps({'message': 'Transformation Successful', 'file': raw_data})
}- Deploy via CLI:
# Zip the function
zip function.zip transform_function.py
# Create IAM Role for Lambda (simplified for lab)
# Note: In production, use the principle of least privilege
aws lambda create-function --function-name DataTransformFunc \
--zip-file fileb://function.zip --handler transform_function.lambda_handler \
--runtime python3.9 --role arn:aws:iam::<YOUR_ACCOUNT_ID>:role/service-role/LambdaRoleStep 3: Define the Step Functions State Machine
The State Machine is the "Traffic Control Center" described in the DEA-C01 guide.
- Create a file named
state_machine.asl.json:
{
"StartAt": "InvokeLambda",
"States": {
"InvokeLambda": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:<YOUR_ACCOUNT_ID>:function:DataTransformFunc",
"Next": "NotifySuccess",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "NotifyFailure"
}
]
},
"NotifySuccess": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:<YOUR_ACCOUNT_ID>:lab-pipeline-notifications",
"Message": "Data Pipeline Succeeded!"
},
"End": true
},
"NotifyFailure": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:<YOUR_ACCOUNT_ID>:lab-pipeline-notifications",
"Message": "Data Pipeline Failed!"
},
"End": true
}
}
}- Create the State Machine:
aws stepfunctions create-state-machine --name DataPipelineOrchestrator \
--definition file://state_machine.asl.json \
--role-arn arn:aws:iam::<YOUR_ACCOUNT_ID>:role/StepFunctionsRoleStep 4: Configure the S3 Trigger
We will enable S3 Event Notifications to send events to EventBridge, which then triggers the Step Function.
# Create the bucket
aws s3 mb s3://brainybee-lab-data-<YOUR_ACCOUNT_ID>
# Enable EventBridge notifications on the bucket
aws s3api put-bucket-notification-configuration \
--bucket brainybee-lab-data-<YOUR_ACCOUNT_ID> \
--notification-configuration '{ "EventBridgeConfiguration": {} }'Checkpoints
- Execution Check: Upload a file to your bucket:
aws s3 cp test.txt s3://brainybee-lab-data-<YOUR_ACCOUNT_ID>/. - Verification: Run the following to see if the workflow started:
bash
aws stepfunctions list-executions --state-machine-arn arn:aws:states:us-east-1:<YOUR_ACCOUNT_ID>:stateMachine:DataPipelineOrchestrator - Success: You should receive an email stating "Data Pipeline Succeeded!".
Troubleshooting
| Problem | Potential Cause | Solution |
|---|---|---|
| S3 Upload doesn't trigger workflow | EventBridge notification not enabled on bucket | Re-run put-bucket-notification-configuration command. |
| Step Function fails at Lambda | IAM Role lacks lambda:InvokeFunction | Add the Lambda invocation permission to the Step Functions execution role. |
| SNS Emails not received | Subscription not confirmed | Check your spam folder for the AWS Notification confirmation email. |
Concept Review
When orchestrating in AWS, you must choose the tool that fits the workload requirements:
| Service | Best Use Case | Logic Style |
|---|---|---|
| AWS Step Functions | Serverless, event-driven, microservices | Visual State Machine (ASL) |
| Amazon MWAA (Airflow) | Complex Python-based DAGs, Data Science | Code-based (DAGs) |
| AWS Glue Workflows | AWS Glue-centric ETL jobs | UI-based Graph |
| EventBridge | Simple 1-to-1 triggers | Event Patterns |
Cost Estimate
- AWS Step Functions: First 4,000 state transitions/month are free (Standard Workflows).
- AWS Lambda: First 1 million requests/month are free.
- S3: Minimal cost for small file storage (~$0.023/GB).
- SNS: First 1,000 email notifications/month are free.
- Total Expected Lab Cost: $0.00 (within Free Tier).
Challenge
Modify the state_machine.asl.json to include a Choice State. If the file extension is .csv, proceed to the Lambda. If the extension is .json, skip the Lambda and go straight to success. Use the $.detail.object.key path to inspect the filename.
Teardown
[!WARNING] Failure to delete these resources may result in small ongoing storage costs.
# 1. Delete S3 Bucket and objects
aws s3 rb s3://brainybee-lab-data-<YOUR_ACCOUNT_ID> --force
# 2. Delete Step Function
aws stepfunctions delete-state-machine --state-machine-arn arn:aws:states:us-east-1:<YOUR_ACCOUNT_ID>:stateMachine:DataPipelineOrchestrator
# 3. Delete Lambda
aws lambda delete-function --function-name DataTransformFunc
# 4. Delete SNS Topic
aws sns delete-topic --topic-arn arn:aws:sns:us-east-1:<YOUR_ACCOUNT_ID>:lab-pipeline-notifications