Automating Data Pipelines: Event-Driven Processing with Step Functions and Lambda
Automate data processing by using AWS services
Automating Data Pipelines: Event-Driven Processing with Step Functions and Lambda
This lab provides a hands-on experience in building a serverless, event-driven data processing pipeline. You will learn how to orchestrate multiple AWS services to transform raw data uploaded to Amazon S3 using AWS Lambda, all coordinated by AWS Step Functions.
[!WARNING] Remember to run the teardown commands at the end of this lab to avoid ongoing charges. While most resources are Free Tier eligible, leaving them active can incur costs.
Prerequisites
- An active AWS Account.
- AWS CLI installed and configured with Administrator access.
- Basic familiarity with Python and JSON syntax.
- A local terminal or AWS CloudShell access.
Learning Objectives
- Provision Amazon S3 buckets for raw and processed data storage.
- Develop a Python-based AWS Lambda function for data transformation.
- Design a serverless workflow using AWS Step Functions to orchestrate tasks.
- Configure Amazon EventBridge to trigger automation based on S3 events.
Architecture Overview
This architecture follows a decoupled, event-driven pattern where storage events trigger a managed state machine.
Step-by-Step Instructions
Step 1: Create S3 Infrastructure
You need two buckets: one to receive raw data (Source) and one to store the transformed output (Destination).
# Generate a unique ID to avoid bucket naming conflicts
export MY_ID=$RANDOM
aws s3 mb s3://brainybee-lab-source-$MY_ID
aws s3 mb s3://brainybee-lab-target-$MY_ID▶Console alternative
- Navigate to S3 > Buckets.
- Click Create bucket.
- Name:
brainybee-lab-source-<your-initials>-<date>. - Repeat for the target bucket.
Step 2: Create IAM Execution Role
Your Lambda function and Step Function need permissions to interact with S3 and CloudWatch.
# Create the trust policy file
cat <<EOF > trust-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": { "Service": ["lambda.amazonaws.com", "states.amazonaws.com"] },
"Action": "sts:AssumeRole"
}
]
}
EOF
# Create the role
aws iam create-role --role-name brainybee-lab-role --assume-role-policy-document file://trust-policy.json
# Attach Managed Policy for S3 and Logs
aws iam attach-role-policy --role-name brainybee-lab-role --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess
aws iam attach-role-policy --role-name brainybee-lab-role --policy-arn arn:aws:iam::aws:policy/CloudWatchLogsFullAccessStep 3: Deploy the Transformation Lambda
This function will simulate a transformation by converting a CSV header to uppercase.
# lambda_function.py
import json
import boto3
s3 = boto3.client('s3')
def lambda_handler(event, context):
source_bucket = event['bucket']
key = event['key']
target_bucket = source_bucket.replace('source', 'target')
# Fetch the object
response = s3.get_object(Bucket=source_bucket, Key=key)
content = response['Body'].read().decode('utf-8')
# Transformation Logic: Uppercase the first line
transformed_content = content.upper()
# Save to target
s3.put_object(Bucket=target_bucket, Key=f"processed-{key}", Body=transformed_content)
return {
'statusCode': 200,
'body': f"Successfully processed {key}"
}# Zip and deploy
zip function.zip lambda_function.py
aws lambda create-function --function-name brainybee-transformer \
--runtime python3.9 --role arn:aws:iam::<YOUR_ACCOUNT_ID>:role/brainybee-lab-role \
--handler lambda_function.lambda_handler --zip-file fileb://function.zipStep 4: Define the Step Functions State Machine
This workflow manages the execution logic and error handling.
{
"StartAt": "TransformData",
"States": {
"TransformData": {
"Type": "Task",
"Resource": "arn:aws:lambda:<REGION>:<ACCOUNT>:function:brainybee-transformer",
"End": true,
"Retry": [{ "ErrorEquals": ["States.ALL"], "IntervalSeconds": 2, "MaxAttempts": 3 }]
}
}
}[!TIP] Use the Workflow Studio in the AWS Console to visualize this state machine before deploying.
Checkpoints
- IAM Verification: Run
aws iam get-role --role-name brainybee-lab-roleand ensure theAssumeRolePolicyDocumentcontains bothlambdaandstatesservices. - Lambda Test: Manually invoke the Lambda with a mock JSON event containing a test bucket/key.
- Event Flow: Upload a file named
test.csvto your source bucket and check the Step Functions Console for a new execution.
Troubleshooting
| Error | Possible Cause | Fix |
|---|---|---|
AccessDenied | IAM Role missing S3 permissions | Check if AmazonS3FullAccess is attached to brainybee-lab-role. |
Lambda.NotFoundException | Incorrect ARN in Step Function | Verify the Lambda ARN in the Step Function JSON matches the deployed function. |
EventBridge not triggering | S3 Event Notifications disabled | Ensure the source bucket has "Amazon EventBridge" notifications enabled in the Properties tab. |
Clean-Up / Teardown
To avoid costs, delete all provisioned resources in this order:
# 1. Empty and Delete S3 Buckets
aws s3 rm s3://brainybee-lab-source-$MY_ID --recursive
aws s3 rb s3://brainybee-lab-source-$MY_ID
aws s3 rm s3://brainybee-lab-target-$MY_ID --recursive
aws s3 rb s3://brainybee-lab-target-$MY_ID
# 2. Delete Lambda and State Machine
aws lambda delete-function --function-name brainybee-transformer
aws stepfunctions delete-state-machine --state-machine-arn <SM_ARN>
# 3. Delete IAM Role
aws iam detach-role-policy --role-name brainybee-lab-role --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess
aws iam detach-role-policy --role-name brainybee-lab-role --policy-arn arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
aws iam delete-role --role-name brainybee-lab-roleCost Estimate
- S3: $0.023 per GB (First 5GB free). This lab uses <1MB.
- Lambda: 1 million free requests per month. This lab uses ~10 requests.
- Step Functions: 4,000 free state transitions per month.
- Estimated Total: $0.00 (If within Free Tier).
Stretch Challenge
Modify the Step Function workflow to include a Choice State. If the uploaded file is not a .csv, move it to an errors/ folder instead of calling the Lambda function.
Concept Review
Automation Service Comparison
| Service | Best Use Case | Logic Definition |
|---|---|---|
| AWS Step Functions | Coordinating multiple AWS services into a workflow. | ASL (JSON/YAML) |
| AWS Glue Workflows | Orchestrating ETL-specific jobs (Crawlers, Glue Jobs). | Visual / Python |
| Amazon MWAA | Complex, long-running data science pipelines (Airflow). | Python DAGs |
| AWS Lambda | Single-purpose, event-driven short tasks (<15 mins). | Code (Python, Node, etc.) |
Data Transformation Logic Visualized
\begin{tikzpicture}[node distance=2cm] \node (input) [draw, rectangle] {Raw CSV Content}; \node (process) [draw, diamond, below of=input] {Text Transformation}; \node (output) [draw, rectangle, below of=process] {UPPERCASE PARQUET/CSV}; \draw[->] (input) -- (process); \draw[->] (process) -- (output); \node[right of=process, xshift=2cm] (desc) {Applied via Boto3}; \draw[dashed] (process) -- (desc); \end{tikzpicture}