Event-Driven Workflows: Replacing Manual Operations with Serverless Automation
The Problem: Manual Account Closure Was Killing Our Productivity
Before we automated account closures, the process looked like this:
- Customer requests account deletion via support ticket
- Support team manually marks account as “pending_deletion”
- DBA runs SQL scripts to anonymize user data
- Platform team removes account from billing system
- Backend team deletes resource metadata
- Email is manually verified and sent to customer
- Compliance logs are manually updated
- Slack message is posted to accounting (for billing adjustments)
Total time: 4-6 hours of human effort per account closure. Process was error-prone.
Mistakes happened:
- Data wasn’t fully anonymized
- Billing stopped but refunds weren’t issued
- Compliance logs were inconsistent
- Manual data deletion created data integrity issues
When we hit 1,000+ account closures per month, this became unsustainable. We were burning 40+ hours weekly on a process that computers should handle.
The Solution: Event-Driven Architecture with AWS Lambda & Step Functions
We designed an event-driven workflow that:
- Listens for account closure events (from API, dashboard, or support)
- Orchestrates multiple tasks in sequence (with error handling)
- Notifies dependent systems via events
- Maintains audit trail for compliance
- Provides status visibility throughout the process
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ Customer Initiates Closure │
│ (API, Dashboard, Support Ticket) │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Lambda: Parse & Validate Request │
│ - Check permissions │
│ - Validate account exists │
│ - Check for pending transactions │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Step Function: Orchestrate Account Closure │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Task 1: Anonymize User Data │ │
│ │ Task 2: Cancel Subscriptions & Issue Refunds │ │
│ │ Task 3: Delete Resource Metadata │ │
│ │ Task 4: Publish Compliance Event │ │
│ │ Task 5: Send Confirmation Email │ │
│ │ Task 6: Notify Accounting │ │
│ └────────────────────────────────────────────────────────────┘ │
└────────────────────────┬────────────────────────────────────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
Database SQS Queue SNS Topic
(DynamoDB) (Notify Email) (Compliance Audit)
Implementation: Lambda Functions
1. Entry Point Lambda: Validate Request
import json
import boto3
import logging
dynamodb = boto3.resource('dynamodb')
accounts_table = dynamodb.Table('Accounts')
logger = logging.getLogger()
def lambda_handler(event, context):
"""
Triggered by API Gateway when account closure requested.
Validates request and starts Step Function execution.
"""
try:
account_id = event['pathParameters']['account_id']
user_id = event['requestContext']['authorizer']['claims']['sub']
# Verify permission: user must be account owner
account = accounts_table.get_item(Key={'account_id': account_id})
if not account:
return {
'statusCode': 404,
'body': json.dumps({'error': 'Account not found'})
}
account_data = account['Item']
if account_data['owner_id'] != user_id:
return {
'statusCode': 403,
'body': json.dumps({'error': 'Unauthorized'})
}
# Check for pending transactions
if account_data['status'] == 'pending_deletion':
return {
'statusCode': 409,
'body': json.dumps({'error': 'Account closure already in progress'})
}
# Check for active subscriptions
if account_data.get('active_subscriptions', 0) > 0:
return {
'statusCode': 400,
'body': json.dumps({
'error': 'Account has active subscriptions',
'detail': 'Cancel all subscriptions before deleting account'
})
}
# All validations passed - start Step Function
sfn = boto3.client('stepfunctions')
execution = sfn.start_execution(
stateMachineArn='arn:aws:states:region:account:stateMachine:AccountClosure',
name=f'closure-{account_id}-{int(time.time())}',
input=json.dumps({
'account_id': account_id,
'user_id': user_id,
'user_email': account_data['email'],
'timestamp': int(time.time())
})
)
# Update account status immediately
accounts_table.update_item(
Key={'account_id': account_id},
UpdateExpression='SET #status = :status, closure_request_id = :request_id',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':status': 'pending_deletion',
':request_id': execution['executionArn']
}
)
logger.info(f'Account closure started: {account_id}')
return {
'statusCode': 202,
'body': json.dumps({
'message': 'Account closure initiated',
'execution_id': execution['executionArn']
})
}
except Exception as e:
logger.error(f'Error initiating closure: {str(e)}', exc_info=True)
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
2. Data Anonymization Lambda
import boto3
import hashlib
from datetime import datetime
dynamodb = boto3.resource('dynamodb')
accounts_table = dynamodb.Table('Accounts')
users_table = dynamodb.Table('Users')
logger = logging.getLogger()
def anonymize_user_data(account_id):
"""
Remove PII from all user records associated with account.
Uses hashing instead of deletion to maintain referential integrity.
"""
try:
# Get all users in account
response = users_table.scan(
FilterExpression='account_id = :account_id',
ExpressionAttributeValues={':account_id': account_id}
)
users = response['Items']
# Anonymize each user
with users_table.batch_writer(
overwrite_by_pkeys=['account_id', 'user_id']
) as batch:
for user in users:
# Hash email to maintain uniqueness without storing PII
anonymized_email = hashlib.sha256(
f"{user['user_id']}{datetime.utcnow().isoformat()}".encode()
).hexdigest()[:20]
batch.put_item(Item={
'account_id': account_id,
'user_id': user['user_id'],
'name': 'DELETED_USER',
'email': f'deleted+{anonymized_email}@deleted.local',
'phone': None,
'address': None,
'anonymized_at': datetime.utcnow().isoformat(),
'anonymized': True,
# Keep these for audit trail
'original_user_id': user['user_id'],
'deleted_account_id': account_id
})
logger.info(f'Anonymized {len(users)} users for account {account_id}')
return {'success': True, 'users_anonymized': len(users)}
except Exception as e:
logger.error(f'Anonymization failed: {str(e)}', exc_info=True)
raise
3. Billing & Refund Lambda
import boto3
import stripe
from decimal import Decimal
dynamodb = boto3.resource('dynamodb')
subscriptions_table = dynamodb.Table('Subscriptions')
stripe.api_key = os.environ['STRIPE_API_KEY']
logger = logging.getLogger()
def handle_billing(account_id):
"""
Cancel subscriptions and issue refunds for unused time.
Integrates with Stripe for payment processing.
"""
try:
# Get all active subscriptions for account
response = subscriptions_table.scan(
FilterExpression='account_id = :account_id AND #status = :status',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':account_id': account_id,
':status': 'active'
}
)
subscriptions = response['Items']
total_refunded = Decimal('0')
for subscription in subscriptions:
try:
stripe_sub_id = subscription['stripe_subscription_id']
# Get subscription details from Stripe
stripe_subscription = stripe.Subscription.retrieve(stripe_sub_id)
# Calculate refund amount (unused days)
current_period_end = stripe_subscription['current_period_end']
amount_paid = stripe_subscription['plan']['amount']
period_days = (current_period_end - stripe_subscription['current_period_start']) / 86400
days_used = (datetime.utcnow().timestamp() - stripe_subscription['current_period_start']) / 86400
days_remaining = period_days - days_used
refund_amount = int((amount_paid / period_days) * days_remaining)
# Issue refund
if refund_amount > 0:
refund = stripe.Refund.create(
charge=stripe_subscription['default_payment_method'],
amount=refund_amount
)
logger.info(f'Refund issued: {refund_amount} cents')
total_refunded += Decimal(refund_amount) / 100
# Cancel subscription
stripe.Subscription.delete(stripe_sub_id)
# Update local database
subscriptions_table.update_item(
Key={
'account_id': account_id,
'subscription_id': subscription['subscription_id']
},
UpdateExpression='SET #status = :status, cancelled_at = :now, refund_amount = :refund',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':status': 'cancelled',
':now': datetime.utcnow().isoformat(),
':refund': refund_amount
}
)
except stripe.error.StripeError as e:
logger.error(f'Stripe error: {str(e)}')
raise
logger.info(f'Billing handled: {len(subscriptions)} subscriptions cancelled, ${total_refunded} refunded')
return {
'success': True,
'subscriptions_cancelled': len(subscriptions),
'total_refunded': float(total_refunded)
}
except Exception as e:
logger.error(f'Billing handling failed: {str(e)}', exc_info=True)
raise
Step Functions State Machine Definition
{
"Comment": "Account Closure Workflow",
"StartAt": "AnonymizeData",
"States": {
"AnonymizeData": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:AnonymizeUserData",
"TimeoutSeconds": 300,
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "AnonymizationFailed"
}
],
"Next": "HandleBilling"
},
"HandleBilling": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:HandleBilling",
"TimeoutSeconds": 300,
"Retry": [
{
"ErrorEquals": ["StripeError"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "BillingFailed"
}
],
"Next": "DeleteMetadata"
},
"DeleteMetadata": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:DeleteMetadata",
"TimeoutSeconds": 300,
"Next": "PublishComplianceEvent"
},
"PublishComplianceEvent": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:region:account:compliance-events",
"Subject": "Account Deleted - Compliance Event",
"Message.$": "$"
},
"Next": "SendConfirmationEmail"
},
"SendConfirmationEmail": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage",
"Parameters": {
"QueueUrl": "https://sqs.region.amazonaws.com/account/email-queue",
"MessageBody": {
"event_type": "account_deleted",
"account_id.$": "$.account_id",
"user_email.$": "$.user_email",
"timestamp.$": "$.timestamp"
}
},
"Next": "NotifyAccounting"
},
"NotifyAccounting": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:region:account:accounting-notifications",
"Subject": "Account Deletion - Refund Required",
"Message.$": "$"
},
"Next": "MarkAccountDeleted"
},
"MarkAccountDeleted": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:MarkAccountDeleted",
"TimeoutSeconds": 60,
"Next": "Success"
},
"Success": {
"Type": "Succeed"
},
"AnonymizationFailed": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:region:account:critical-alerts",
"Subject": "CRITICAL: Account Closure Failed - Anonymization",
"Message.$": "$"
},
"Next": "FailureState"
},
"BillingFailed": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:region:account:critical-alerts",
"Subject": "CRITICAL: Account Closure Failed - Billing",
"Message.$": "$"
},
"Next": "FailureState"
},
"FailureState": {
"Type": "Fail",
"Error": "AccountClosureFailed",
"Cause": "One or more steps in account closure workflow failed"
}
}
}
Results: What Changed
Before:
- Time per closure: 4-6 hours (manual)
- Accuracy: ~85% (missing refunds, incomplete anonymization)
- Operational effort: 40+ hours/week
- Errors per month: 15-20
- SLA for completion: 5-7 business days
After:
- Time per closure: <10 minutes (automated)
- Accuracy: 99.9% (fully automated, no human error)
- Operational effort: <2 hours/week (only for failures)
- Errors per month: 0 (for successful closures)
- SLA for completion: 30 minutes
Quantified Impact:
- 90% reduction in operational effort
- 35x faster processing (from 300 minutes to <10 minutes)
- Zero data integrity issues
- 100% refund accuracy (compared to ~80% before)
Monthly Impact at Scale
With 1,000+ closures per month:
| Metric | Before | After | Savings |
|---|---|---|---|
| Total manual hours | 80 | 2 | 78 hours/month |
| Processing time | 6,000 hours | 1.7 hours | 99.97% reduction |
| Human errors | 15-20 | 0 | 100% elimination |
| Audit failures | 3-5 | 0 | 100% elimination |
Key Lessons Learned
1. Separate Validation from Execution
Validate upfront (in the entry Lambda) to fail fast. This prevents wasting orchestration time on invalid requests.
2. Use Idempotency Keys to Prevent Duplicates
If the Step Function retries, we shouldn’t charge refunds twice:
idempotency_key = f"refund-{account_id}-{subscription_id}"
refund = stripe.Refund.create(
charge=charge_id,
amount=refund_amount,
idempotency_key=idempotency_key # Stripe won't process twice
)
3. Build in Dead-Letter Queues for Failed Messages
Not everything succeeds immediately. SNS and SQS have DLQs:
{
"Resource": "arn:aws:sqs:region:account:email-queue",
"RedrivePolicy": {
"deadLetterTargetArn": "arn:aws:sqs:region:account:email-queue-dlq",
"maxReceiveCount": 3
}
}
4. Use Step Function Parallel States for Independent Tasks
If anonymization and billing are truly independent, run them in parallel:
{
"ParallelTasks": {
"Type": "Parallel",
"Branches": [
{ "StartAt": "AnonymizeData", ... },
{ "StartAt": "HandleBilling", ... }
],
"Next": "DeleteMetadata"
}
}
5. Log Everything for Debugging
Event-driven workflows are black boxes if you don’t log:
logger.info(f'Step: anonymization', extra={
'account_id': account_id,
'users_processed': len(users),
'duration_ms': (end - start) * 1000,
'step': 'anonymize_data'
})
Cost Analysis
AWS pricing for account closure (per closure):
- Step Functions: $0.000025 per state transition (~20 states) = $0.0005
- Lambda execution: 5 minutes of combined runtime, 512MB = $0.000002
- DynamoDB: Updates to ~50 items = $0.0000015
- SNS/SQS: 4 messages = $0.000004
Total per closure: ~$0.0006 (less than a penny)
With manual processing at $50/hour (loaded cost):
- Old approach: $5-8 per closure (4-6 hours)
- New approach: $0.0006 per closure
- Cost savings: 99.99%
Implementation Checklist
- Design event schema and Lambda functions
- Create DynamoDB tables with proper TTLs for audit retention
- Implement idempotency for all state transitions
- Set up DLQ for failed messages
- Configure CloudWatch logging and alarms
- Create Step Function state machine with error handling
- Add retry logic for transient failures
- Test each Lambda individually with unit tests
- Load test the entire workflow
- Set up dashboard for monitoring
- Document error scenarios and runbooks
- Train support team on new process
Key Takeaways
Event-driven workflows are powerful because they:
- Eliminate human error - Automation is consistent
- Scale effortlessly - 1 or 1,000 closures, same cost
- Provide visibility - Every step is logged and auditable
- Handle failures gracefully - Retries, DLQ, alerting
- Reduce operational burden - No manual intervention needed
The pattern we built—Lambda entry point → Step Functions orchestration → SNS/SQS notifications—is reusable for any complex workflow (onboarding, migration, incident response, etc.).
Related Resources
- AWS Step Functions Best Practices
- Lambda Idempotency Patterns
- DynamoDB Design Patterns
- GitHub: Event-Driven Examples
What complex workflows are you automating? Share in the comments or reach out on LinkedIn.