Lab: Building a Real-Time Serverless Transformation Pipeline with Amazon Data Firehose and AWS Lambda
Data Transformation and Processing
Lab: Building a Real-Time Serverless Transformation Pipeline with Amazon Data Firehose and AWS Lambda
In this lab, you will architect and deploy a real-time data ingestion pipeline. You will use Amazon Data Firehose to capture streaming data, trigger an AWS Lambda function to perform in-flight transformation (data enrichment/formatting), and deliver the final results to Amazon S3.
Prerequisites
- AWS Account: An active AWS account with permissions for IAM, S3, Lambda, and Firehose.
- CLI Tools: AWS CLI installed and configured with
AdministratorAccesscredentials. - Region: Use
us-east-1(N. Virginia) for this lab to ensure service availability. - Knowledge: Basic understanding of Python for the Lambda transformation logic.
Learning Objectives
- Provision a destination S3 bucket for processed data storage.
- Develop a Lambda function using the Firehose transformation blueprint.
- Configure a Firehose delivery stream with specific buffer and transformation settings.
- Validate the end-to-end data flow using the AWS CLI producer simulation.
Architecture Overview
Step-by-Step Instructions
Step 1: Create the Destination S3 Bucket
Amazon Data Firehose requires a destination to store the transformed records. We will create a unique bucket.
# Replace <YOUR_UNIQUE_ID> with your initials or a random number
aws s3 mb s3://brainybee-lab-data-transformed-<YOUR_UNIQUE_ID>Step 2: Create the IAM Role for Lambda
Lambda needs permission to be invoked by Firehose and to log to CloudWatch.
- Create a file named
trust-policy.json:json{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } - Create the role:
bash
aws iam create-role --role-name firehose-transform-role --assume-role-policy-document file://trust-policy.json aws iam attach-role-policy --role-name firehose-transform-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Step 3: Deploy the Lambda Transformation Function
The Lambda function will receive a batch of records, decode them from Base64, convert text to uppercase, and return them to Firehose.
- Create
lambda_function.py:pythonimport base64 def lambda_handler(event, context): output = [] for record in event['records']: # Decode data payload = base64.b64decode(record['data']).decode('utf-8') # Transform: Uppercase the data transformed_data = payload.upper() # Encode back to base64 output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(transformed_data.encode('utf-8')).decode('utf-8') } output.append(output_record) print(f"Processed {len(event['records'])} records.") return {'records': output} - Zip and deploy:
bash
zip function.zip lambda_function.py aws lambda create-function --function-name firehose-transformer \ --zip-file fileb://function.zip --handler lambda_function.lambda_handler \ --runtime python3.9 --role arn:aws:iam::<YOUR_ACCOUNT_ID>:role/firehose-transform-role
Step 4: Create the Firehose Delivery Stream
Now we link everything together.
[!TIP] Use the AWS Management Console for this step to easily select the Lambda function and S3 bucket from dropdowns.
▶Console alternative
- Go to Kinesis > Delivery streams > Create delivery stream.
- Source: Direct PUT | Destination: Amazon S3.
- Enable 'Data transformation' and select 'firehose-transformer' Lambda.
- Set Buffer hints: 1 MB or 60 seconds.
- Select your S3 bucket.
Checkpoints
- Lambda Check: Invoke the Lambda with a test event to ensure it returns the correct JSON structure.
- Firehose Status: Ensure the stream status is
ACTIVEin the console. - End-to-End: Send a test record:
bash
aws firehose put-record --delivery-stream-name <YOUR_STREAM_NAME> --record '{"Data":"hello brainybee"}' - S3 Verification: Wait 60 seconds and check the S3 bucket for a new folder structure (YYYY/MM/DD/HH). Download the file; it should contain
HELLO BRAINYBEE.
Concept Review
Buffer Logic Visualization
Firehose buffers incoming data based on size and time. The first condition met triggers the delivery.
Comparison: Transformation Tools
| Service | Transformation Type | Typical Latency | Best For |
|---|---|---|---|
| AWS Lambda | Lightweight / Event-driven | Seconds | Single record enrichment, filtering |
| AWS Glue | Complex ETL / Spark | Minutes | Large scale batch/micro-batch processing |
| Amazon EMR | Distributed Big Data | Minutes | Petabyte-scale Hadoop/Spark/Flink ecosystems |
| Redshift | SQL-based | Sub-second | Data warehouse aggregations |
Troubleshooting
| Error | Possible Cause | Fix |
|---|---|---|
ServiceUnavailableException | Firehose is still provisioning. | Wait 2-3 minutes for status to reach ACTIVE. |
S3 Delivery Failed | Firehose role lacks S3 PutObject permissions. | Check the IAM role attached to the Firehose stream. |
Lambda Transformation Failed | Lambda timed out or returned invalid JSON. | Check CloudWatch Logs for the Lambda; ensure result is 'Ok'. |
Cost Estimate
| Service | Usage | Estimated Cost (Free Tier) |
|---|---|---|
| Amazon Data Firehose | 1000 records (<1MB) | $0.00 (within free tier) |
| AWS Lambda | 100 requests | $0.00 (1M free requests/mo) |
| Amazon S3 | <1MB storage | $0.00 (5GB free/mo) |
[!WARNING] Remember to run the teardown commands to avoid ongoing charges if you exceed the free tier limits.
Teardown
# 1. Delete Firehose stream
aws firehose delete-delivery-stream --delivery-stream-name <YOUR_STREAM_NAME>
# 2. Delete Lambda function
aws lambda delete-function --function-name firehose-transformer
# 3. Empty and delete S3 bucket
aws s3 rb s3://brainybee-lab-data-transformed-<YOUR_UNIQUE_ID> --forceStretch Challenge
Modify your Lambda function to perform Conditional Routing. If the incoming data contains the word "CRITICAL", append a timestamp to the record. If not, drop the record entirely using the 'Dropped' result status in the Firehose response.