DJANGO-EB-SQS: An easier way for Django applications to communicate with AWS SQS

Print Friendly, PDF & Email

AWS services like Amazon ECS, Amazon S3, Amazon Kinesis, Amazon SQS and Amazon RDS are used extensively around the world. Here at Barracuda, we use AWS Simple Queue Service (SQS) to manage messaging within and among the microservices that we hae developed on the Django framework.

AWS SQS is a message queuing service that can “send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available.” SQS is designed to help organizations decouple applications and scale services, and it was the perfect tool for our work on microservices.  However, each new Django-based microservice or decoupling of an existing service using AWS SQS required that we duplicate our code and logic to communicate with AWS SQS. This resulted in lot of repeat code and encouraged our team to build this GitHub library: DJANGO-EB-SQS

Django-EB-SQS is a python library meant to help developers quickly integrate AWS SQS with existing and/or new Django based applications. The library takes care of the following tasks:

  • Serializing the data
  • Adding delaying logic
  • Continuous polling from queue
  • De-serializing the data as per AWS SQS standards and/or using third-party libraries to communicate with AWS SQS.

In short, it abstracts all the complexity involved in communicating with AWS SQS and lets developers focus only on core business logic.

The library is based on Django ORM framework and boto3 library.

How we use it

Our team works on an email protection solution that uses artificial intelligence to detect spear phishing and other social engineering attacks. We integrate with our customers Office 365 account and receive notifications whenever they receive new emails. One of the tasks is to determine if the new email is clean from any fraud or not. On receiving such notifications, one of our services (Figure1: Service 1) talks to Office 365 via Graph API and gets those emails. For further processing of those emails and to make the emails available for other services, those emails are then pushed to AWS SQS queue (Figure1: queue_1).

Figure 1
Let’s look at a simple use case on how we use the library in our solutions. One of our services (Figure 1: Service 2) is responsible to extract headers and feature sets from individual emails and make them available for other services to consume.

Service 2 is configured to listen to queue_1 using the library settings from where it gets the raw email bodies.

Say, Service 2 performs following actions,

# consume email messages from queue_1

# extract headers and feature sets from emails

# submit a task

process_message.delay(tenant_id=, email_id=, headers=, tenant_id=, feature_set=, ….)

This process_message method won’t be called up synchronously, instead it will be queued up as a task and will get executed once one of the workers picks it up. The worker here could be from same service or from different service. The caller of the method need not have to worry about the underlying behavior and how the task will get executed.

Let’s look at how the process_message method is defined as a task.

from eb_sqs.decorators import task

@task(queue_name='queue_2′, max_retries=3)
def process_message(tenant_id: int, email_id: str, headers: List[dict], feature_set: List[dict], …) :

try:
# perform some action using headers and feature sets
# also can queue up further tasks, if required

except(OperationalError, InterfaceError) as exc:
try:
process_message.retry()
except MaxRetriesReachedException:
logger.error(‘MaxRetries reached for Service2:process_message ex: {exc}')

What happens underneath when we decorate the method with task decorator is it adds extra data like calling method, target method, its argument and some more metadata before it serializes the message and push it to the AWS SQS queue. So, when the message is consumed from queue by one of the workers it has all the information needed to execute the task as in which method to call, which parameters to pass and so on.

We can also retry the task in case of exception. However, to not run into an infinite loop scenario we can set an optional parameter max_retries where we can stop processing after we reach maximum retries. We can then log the error or send the task to dead letter queue for further analyzing.

AWS SQS gives the ability to delay the processing of the message up to 15 mins. We can add similar capability to our task by passing the delay parameter

process_message.delay(email_id=, headers=, …., delay=300) # delaying by 5 min

Executing the tasks can be achieved by running the Django command process_queue. This supports listening to one or more queues, and reads from the queues infinitely and executes the task as they come in

python manage.py process_queue –queues

We just saw how this library makes communication within service or between services via AWS SQS queues easy.

More details on how to configure the library with Django settings, and ability to listen to multiple queues, development setup and many more features can be found here.

Contribute

If you wish to contribute to the project, please refer here: DJANGO-EB-SQS

Leave a Reply

Your email address will not be published. Required fields are marked *

Scroll to top
Tweet
Share
Share