AWS recently added support for driving Lambda functions via SQS queues. This is a very cool feature that greatly simplifies the process of creating messaging solutions that implement one-to-many notification and processing flows.
The canonical pub/sub messaging tool in AWS is its Simple Notification Service, which supports a wide variety of delivery mechanisms. However, its messages are not persistent and the pattern for achieving this goal has been to use it in conjunction with Simple Queueing Service, subscribing queues to topics where such behavior is desired.
While Lambda supports SNS as an event source, and SQS queues can be connected to SNS topics, connecting Lambda functions to queues has been somewhat more challenging. Prior to the release of SQS Event Sources, one would have to create and deploy some process (in EC2 or otherwise) to poll SQS for messages and invoke the desired Lambda functions as they arrive. Error/retry handling and scaling of such components were similarly left as an exercise to the reader.
But no more. With this feature we can have the robustness of SQS combined with the serverless convenience of Lambda. This allows us to easily implement message flows with multiple consumers. For example, a system that response to a DynamoDB update event by archiving data to S3, re-indexing an Elasticsearch record, and notifying interested parties of the state change.
The example below demonstrates this process. The simple template below creates a SNS topic and exports a reference as FanoutTopic-TopicArn
for use in the configurations to follow.
AWSTemplateFormatVersion: "2010-09-09"
Parameters:
TopicName:
Type: String
Resources:
Topic:
Type: AWS::SNS::Topic
Properties:
TopicName: !Ref TopicName
Outputs:
TopicArn:
Value: !Ref Topic
Export:
Name: !Sub "${AWS::StackName}-TopicArn"
Instantiate it with:
aws cloudformation create-stack --stack-name FanoutTopic --template-body file://local/fanout_sns.yml --parameters ParameterKey=TopicName,ParameterValue=FanoutTopic
Next we’ll walk through a template that creates a Lambda function, SQS queue and the plumbing to connect them together.
First, we’ll deploy a very basic Lambda function that simply logs its input:
AWSTemplateFormatVersion: "2010-09-09"
Parameters:
TopicStackName:
Type: String
FunctionName:
Type: String
DeadLetterQueueRetentionPeriodInSeconds:
Type: Number
Default: 1209600
ProcessingQueueMaxReceiveCount:
Type: Number
Default: 3
Resources:
Lambda:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Ref FunctionName
Runtime: python3.6
Handler: index.handler
Role: !GetAtt LambdaRole.Arn
Timeout: 5
Code:
ZipFile: !Sub |
import json
def handler(event, context):
print("{0}: {1}".format(context.function_name, json.dumps(event,indent=2)))
Next, we create a SQS queue that will gather messages to be sent to our Lambda function. We’ll also create a dead letter queue for capturing failed messages. This isn’t strictly necessary for the example, but it shows how to set up SQS to implement retry and error handing:
ProcessingQueue:
Type: AWS::SQS::Queue
Properties:
RedrivePolicy:
deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn
maxReceiveCount: !Ref ProcessingQueueMaxReceiveCount
DeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: !Ref DeadLetterQueueRetentionPeriodInSeconds
Note that we’ve made the message retention period and max receive count parameters to the template as well. One could presumably extend this be creating CloudWatch alarms on the DeadLetterQueue
to notify responsible parties when processing has failed.
Our next step is to grant the appropriate permissions to our resources:
- The
Topic
must be able to send messages to theProcessingQueue
- The
Lambda
function must be able interact with theProcessingQueue
and receive messages from it
We can achieve the former with a custom QueuePolicy
, and the latter by assigning the managed AWSLambdaSQSQueueExecutionRole
to our Lambda function:
ProcessingQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
AWS: "*"
Action:
- sqs:SendMessage
Resource: "*"
Condition:
ArnEquals:
"aws:SourceArn":
"Fn::ImportValue": !Sub "${TopicStackName}-TopicArn"
Queues:
- !Ref ProcessingQueue
LambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaSQSQueueExecutionRole
Lastly, we create a subscription from the ProcessingQueue
to the Topic
, and configure an event source mapping to our Lambda
:
TopicSubscription:
Type: AWS::SNS::Subscription
Properties:
Endpoint: !GetAtt ProcessingQueue.Arn
Protocol: sqs
TopicArn:
"Fn::ImportValue": !Sub ${TopicStackName}-TopicArn
LambdaEventSource:
Type: AWS::Lambda::EventSourceMapping
Properties:
Enabled: true
EventSourceArn: !GetAtt ProcessingQueue.Arn
FunctionName: !Ref Lambda
The full stack can be instantiated as follows:
aws cloudformation create-stack --stack-name FanoutQueueAlpha --template-body file://local/fanout_sqs_lambda.yml --capabilities CAPABILITY_IAM --parameters ParameterKey=TopicStackName,ParameterValue=FanoutTopic ParameterKey=FunctionName,ParameterValue=Alpha
You can create multiple instances of this stack (with different stack/function names) to see the the delivery to multiple consumers in action. Send a message to the topic with:
aws sns publish --topic-arn XXX --message "Hello World"
You’ll then see it appear in the CloudWatchLogs of the respective Lambda functions. Be sure to replace the ARN of the FanoutTopic (XXX) with the real value output from your template.