AWS services like Amazon ECS, Amazon S3, Amazon Kinesis, Amazon SQS and Amazon RDS are used extensively around the world. Here at Barracuda, we use AWS Simple Queue Service (SQS) to manage messaging within and among the microservices that we have developed on the Django framework.
AWS SQS is a message queuing service that can “send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available.” SQS is designed to help organizations decouple applications and scale services, and it was the perfect tool for our work on microservices. However, each new Django-based microservice or decoupling of an existing service using AWS SQS required that we duplicate our code and logic to communicate with AWS SQS. This resulted in lot of repeat code and encouraged our team to build this GitHub library: DJANGO-EB-SQS
Django-EB-SQS is a python library meant to help developers quickly integrate AWS SQS with existing and/or new Django based applications. The library takes care of the following tasks:
- Serializing the data
- Adding delaying logic
- Continuous polling from queue
- De-serializing the data as per AWS SQS standards and/or using third-party libraries to communicate with AWS SQS.
In short, it abstracts all the complexity involved in communicating with AWS SQS and lets developers focus only on core business logic.
The library is based on Django ORM framework and boto3 library.
How we use it
Our team works on an email protection solution that uses artificial intelligence to detect spear phishing and other social engineering attacks. We integrate with our customer's Office 365 account and receive notifications whenever they receive new emails. One of the tasks is to determine if the new email is clean from any fraud or not. On receiving such notifications, one of our services (Figure1: Service 1) talks to Office 365 via Graph API and gets those emails. For further processing of those emails and to make the emails available for other services, those emails are then pushed to AWS SQS queue (Figure1: queue_1).

Service 2 is configured to listen to queue_1 to retrieve the raw email bodies.
Let's say that Service 2 performs the following actions:
…
# extract headers and feature sets from emails
…
# submit a task
process_message.delay(tenant_id=, email_id=, headers=, tenant_id=, feature_set=, ….)
This process_message method won’t be called up synchronously, instead it will be queued up as a task and will get executed once one of the workers picks it up. The worker here could be from same service or from a different service. The caller of the method need not worry about the underlying behavior and how the task will get executed.
Let’s look at how the process_message method is defined as a task.
@task(queue_name='queue_2′, max_retries=3)
def process_message(tenant_id: int, email_id: str, headers: List[dict], feature_set: List[dict], …) :
try:
# perform some action using headers and feature sets
# also can queue up further tasks, if required
except(OperationalError, InterfaceError) as exc:
try:
process_message.retry()
except MaxRetriesReachedException:
logger.error(‘MaxRetries reached for Service2:process_message ex: {exc}')
When we decorate the method with the task decorator, what happens underneath is that it adds extra data like the calling method, target method, its arguments, and some additional metadata before it serializes the message and pushes it to the AWS SQS queue. When the message is consumed from the queue by one of the workers, it has all the information needed to execute the task: which method to call, which parameters to pass, and so on.
We can also retry the task in case of an exception. However, to avoid an infinite loop scenario, we can set an optional parameter max_retries where we can stop processing after we reach the maximum number of retries. We can then log the error or send the task to a dead letter queue for further analysis.
AWS SQS gives the ability to delay the processing of the message up to 15 mins. We can add similar capability to our task by passing the delay parameter:
Executing the tasks can be achieved by running the Django command process_queue. This supports listening to one or more queues, reading from the queues indefinitely and executing the tasks as they come in:
We just saw how this library makes communication within service or between services easy via AWS SQS queues.
More details on how to configure the library with Django settings, and ability to listen to multiple queues, development setup and many more features can be found here.
Contribute
If you wish to contribute to the project, please refer here: DJANGO-EB-SQS
Rohan Patil is a Principal Software Engineer at Barracuda Networks. Currently he is working on Barracuda Sentinel, an AI based protection from phishing and account takeover. He has worked last five years on cloud technologies and the past decade in various roles around software development. He holds a Masters in Computer Science from California State University and Bachelors in Computers from Mumbai, India.