Overview

  • Scale EC2 groups based on CloudWatch Alarms using SQS capacity metrics
  • Create CloudWatch Alarms
    • Create scale-in and scale-out alarms using the SQS ApproximateNumberOfMessagesVisible metric
  • Create AutoScaling policies
    • Add/ Remove capacity units (EC2 instances) based on alarms
  • Test using sample SQS sender and receiver scripts

Setup

  • Create an SQS queue — standard, “Messages”, all defaults
  • Create a CloudWatch Alarm for ApproximateNumberOfMessagesVisible 
    • Create a new SNS topic
    • Create two alarms to scale in/ out based on e.g.
      • ApproximateNumberOfMessagesVisible  > 100
      • ApproximateNumberOfMessagesVisible  < 100
  • Create a bastion host for testing purposes — to send messages
  • Create an auto-scaling group that will pull from SQS and drive scaling demand
    • Create a launch configuration template
    • Create an autoscaling group
    • Setup IAM roles
  • Apply scaling policies (add/ remove) based on CloudWatch alarms
    • Simple Scaling > Scale-Out > 30s > Add 1 Capacity Unit (EC2 instance)
    • Simple Scaling > Scale-In > 30s > Remove 1 Capacity Unit

SQS Test Scripts

Send Messages
import argparse
import logging
import sys
import uuid
from time import sleep

import boto3
from botocore.exceptions import ClientError

parser = argparse.ArgumentParser()
parser.add_argument("--queue-name", "-q", default="Messages", help="SQS queue name")
parser.add_argument("--interval", "-i", default=0.1, help="timer interval", type=float)
parser.add_argument("--message", "-m", help="message to send")
parser.add_argument("--log", "-l", default="INFO", help="logging level")
args = parser.parse_args()

if args.log:
    logging.basicConfig(format="[%(levelname)s] %(message)s", level=args.log)
else:
    parser.print_help(sys.stderr)

sqs = boto3.client("sqs")

try:
    logging.info(f"Getting queue URL for queue: {args.queue_name}")
    response = sqs.get_queue_url(QueueName=args.queue_name)
except ClientError as e:
    logging.error(e)
    exit(1)

queue_url = response["QueueUrl"]

logging.info(f"Queue URL: {queue_url}")

while True:
    try:
        message = str(uuid.uuid4())
        logging.info("Sending message: " + message)
        response = sqs.send_message(QueueUrl=queue_url, MessageBody=message)
        logging.info("MessageId: " + response["MessageId"])
        sleep(args.interval)
    except ClientError as e:
        logging.error(e)
        exit(1)

Receive Messages
#!/usr/bin/env python3

import logging
import time

import boto3
from botocore.exceptions import ClientError

QUEUE_NAME = "Messages"

logging.basicConfig(format="[%(levelname)s] %(message)s", level="INFO")
sqs = boto3.client("sqs")

try:
    logging.info(f"Getting queue URL for queue: {QUEUE_NAME}")
    response = sqs.get_queue_url(QueueName=QUEUE_NAME)
except ClientError as e:
    logging.error(e)
    exit(1)

queue_url = response["QueueUrl"]
logging.info(f"Queue URL: {queue_url}")

logging.info("Receiving messages from queue...")

while True:
    messages = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10)
    if "Messages" in messages:
        for message in messages["Messages"]:
            logging.info(f"Message body: {message['Body']}")
            time.sleep(1)  # simulate work
            sqs.delete_message(
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
            )
        else:
            logging.info("Queue is now empty")

Auto Scaling Test Results

  1. Four instances added to handle the extra workload
  2. Four instances terminated after the spike messages cleared