Overview
Scale EC2 groups based on CloudWatch Alarms using SQS capacity metrics Create CloudWatch AlarmsCreate scale-in and scale-out alarms using the SQS ApproximateNumberOfMessagesVisible
metric Create AutoScaling policies Add/ Remove capacity units (EC2 instances) based on alarms Test using sample SQS sender and receiver scripts
Setup
Create an SQS queue — standard, “Messages”, all defaults Create a CloudWatch Alarm for ApproximateNumberOfMessagesVisible
Create a new SNS topic Create two alarms to scale in/ out based on e.g. ApproximateNumberOfMessagesVisible
> 100ApproximateNumberOfMessagesVisible
< 100 Create a bastion host for testing purposes — to send messages Create an auto-scaling group that will pull from SQS and drive scaling demand Create a launch configuration template Create an autoscaling group Setup IAM roles Apply scaling policies (add/ remove) based on CloudWatch alarmsSimple Scaling > Scale-Out > 30s > Add 1 Capacity Unit (EC2 instance) Simple Scaling > Scale-In > 30s > Remove 1 Capacity Unit
SQS Test Scripts
Send Messages
import argparse
import logging
import sys
import uuid
from time import sleep
import boto3
from botocore.exceptions import ClientError
parser = argparse.ArgumentParser()
parser.add_argument("--queue-name", "-q", default="Messages", help="SQS queue name")
parser.add_argument("--interval", "-i", default=0.1, help="timer interval", type=float)
parser.add_argument("--message", "-m", help="message to send")
parser.add_argument("--log", "-l", default="INFO", help="logging level")
args = parser.parse_args()
if args.log:
logging.basicConfig(format="[%(levelname)s] %(message)s", level=args.log)
else:
parser.print_help(sys.stderr)
sqs = boto3.client("sqs")
try:
logging.info(f"Getting queue URL for queue: {args.queue_name}")
response = sqs.get_queue_url(QueueName=args.queue_name)
except ClientError as e:
logging.error(e)
exit(1)
queue_url = response["QueueUrl"]
logging.info(f"Queue URL: {queue_url}")
while True:
try:
message = str(uuid.uuid4())
logging.info("Sending message: " + message)
response = sqs.send_message(QueueUrl=queue_url, MessageBody=message)
logging.info("MessageId: " + response["MessageId"])
sleep(args.interval)
except ClientError as e:
logging.error(e)
exit(1)
Receive Messages
#!/usr/bin/env python3
import logging
import time
import boto3
from botocore.exceptions import ClientError
QUEUE_NAME = "Messages"
logging.basicConfig(format="[%(levelname)s] %(message)s", level="INFO")
sqs = boto3.client("sqs")
try:
logging.info(f"Getting queue URL for queue: {QUEUE_NAME}")
response = sqs.get_queue_url(QueueName=QUEUE_NAME)
except ClientError as e:
logging.error(e)
exit(1)
queue_url = response["QueueUrl"]
logging.info(f"Queue URL: {queue_url}")
logging.info("Receiving messages from queue...")
while True:
messages = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10)
if "Messages" in messages:
for message in messages["Messages"]:
logging.info(f"Message body: {message['Body']}")
time.sleep(1) # simulate work
sqs.delete_message(
QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
)
else:
logging.info("Queue is now empty")
Auto Scaling Test Results
Four instances added to handle the extra workload Four instances terminated after the spike messages cleared
Like this: Like Loading...