D20 Technical Services


Custom Software Development and Cloud Experts


Fanout Patterns in AWS using SNS, SQS and Lambda

AWS recently added support for driving Lambda functions via SQS queues. This is a very cool feature that greatly simplifies the process of creating messaging solutions that implement one-to-many notification and processing flows.

The canonical pub/sub messaging tool in AWS is its Simple Notification Service, which supports a wide variety of delivery mechanisms. However, its messages are not persistent and the pattern for achieving this goal has been to use it in conjunction with Simple Queueing Service, subscribing queues to topics where such behavior is desired.

While Lambda supports SNS as an event source, and SQS queues can be connected to SNS topics, connecting Lambda functions to queues has been somewhat more challenging. Prior to the release of SQS Event Sources, one would have to create and deploy some process (in EC2 or otherwise) to poll SQS for messages and invoke the desired Lambda functions as they arrive. Error/retry handling and scaling of such components were similarly left as an exercise to the reader.

But no more. With this feature we can have the robustness of SQS combined with the serverless convenience of Lambda. This allows us to easily implement message flows with multiple consumers. For example, a system that response to a DynamoDB update event by archiving data to S3, re-indexing an Elasticsearch record, and notifying interested parties of the state change.

The example below demonstrates this process. The simple template below creates a SNS topic and exports a reference as FanoutTopic-TopicArn for use in the configurations to follow.

AWSTemplateFormatVersion: "2010-09-09"

Parameters:

  TopicName:
    Type: String

Resources:
  
  Topic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: !Ref TopicName
      
Outputs:

  TopicArn:
    Value: !Ref Topic
    Export:
      Name: !Sub "${AWS::StackName}-TopicArn"

Instantiate it with:

aws cloudformation create-stack --stack-name FanoutTopic --template-body file://local/fanout_sns.yml --parameters ParameterKey=TopicName,ParameterValue=FanoutTopic

Next we’ll walk through a template that creates a Lambda function, SQS queue and the plumbing to connect them together.

First, we’ll deploy a very basic Lambda function that simply logs its input:

AWSTemplateFormatVersion: "2010-09-09"

Parameters:

  TopicStackName:
    Type: String

  FunctionName:
    Type: String
  
  DeadLetterQueueRetentionPeriodInSeconds:
    Type: Number
    Default: 1209600

  ProcessingQueueMaxReceiveCount:
    Type: Number
    Default: 3
	
Resources:

  Lambda:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Ref FunctionName
      Runtime: python3.6
      Handler: index.handler
      Role: !GetAtt LambdaRole.Arn
      Timeout: 5
      Code:
        ZipFile: !Sub |
          import json
          def handler(event, context):
            print("{0}: {1}".format(context.function_name, json.dumps(event,indent=2)))

Next, we create a SQS queue that will gather messages to be sent to our Lambda function. We’ll also create a dead letter queue for capturing failed messages. This isn’t strictly necessary for the example, but it shows how to set up SQS to implement retry and error handing:

  ProcessingQueue:
    Type: AWS::SQS::Queue
    Properties:
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn
        maxReceiveCount: !Ref ProcessingQueueMaxReceiveCount
        
  DeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: !Ref DeadLetterQueueRetentionPeriodInSeconds

Note that we’ve made the message retention period and max receive count parameters to the template as well. One could presumably extend this be creating CloudWatch alarms on the DeadLetterQueue to notify responsible parties when processing has failed.

Our next step is to grant the appropriate permissions to our resources:

  1. The Topic must be able to send messages to the ProcessingQueue
  2. The Lambda function must be able interact with the ProcessingQueue and receive messages from it

We can achieve the former with a custom QueuePolicy, and the latter by assigning the managed AWSLambdaSQSQueueExecutionRole to our Lambda function:

  ProcessingQueuePolicy:
    Type: AWS::SQS::QueuePolicy
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              AWS: "*"
            Action:
              - sqs:SendMessage
            Resource: "*"
            Condition:
              ArnEquals:
                "aws:SourceArn":
                  "Fn::ImportValue": !Sub "${TopicStackName}-TopicArn"
      Queues:
        - !Ref ProcessingQueue
		
  LambdaRole:
      Type: AWS::IAM::Role
      Properties:
        AssumeRolePolicyDocument:
          Version: "2012-10-17"
          Statement:
            - Effect: Allow
              Principal:
                Service:
                  - lambda.amazonaws.com
              Action:
                - sts:AssumeRole
        Path: /
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/service-role/AWSLambdaSQSQueueExecutionRole

Lastly, we create a subscription from the ProcessingQueue to the Topic, and configure an event source mapping to our Lambda:

  TopicSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      Endpoint: !GetAtt ProcessingQueue.Arn
      Protocol: sqs
      TopicArn: 
        "Fn::ImportValue": !Sub ${TopicStackName}-TopicArn
      
  LambdaEventSource:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      Enabled: true
      EventSourceArn: !GetAtt ProcessingQueue.Arn
      FunctionName: !Ref Lambda

The full stack can be instantiated as follows:

aws cloudformation create-stack --stack-name FanoutQueueAlpha --template-body file://local/fanout_sqs_lambda.yml --capabilities CAPABILITY_IAM --parameters ParameterKey=TopicStackName,ParameterValue=FanoutTopic ParameterKey=FunctionName,ParameterValue=Alpha

You can create multiple instances of this stack (with different stack/function names) to see the the delivery to multiple consumers in action. Send a message to the topic with:

aws sns publish --topic-arn XXX --message "Hello World"

You’ll then see it appear in the CloudWatchLogs of the respective Lambda functions. Be sure to replace the ARN of the FanoutTopic (XXX) with the real value output from your template.

Full templates for the above are available here and here.