sqs python diagram

Using Python to retrieve AWS SQS messages

Recently, a customer asked us to enhance an analytic solution by integrating a new subject area. Given that our platform is already entirely hosted in AWS, I was relieved and excited to find that the subject area could publish payloads to Amazon SQS. Though we hadn’t yet integrated anything with SQS, I knew that this approach would provide a lot of benefits as it relates to reliability, scalability and security. Now, I just had to figure out how to best consume the payload. 

To start, I’m not a traditional programmer. I’ve spent my whole career in the nurtured biosphere of ETL tools that generally take care of the heavy lifting without much more than SQL as far as a coding language. Recently, I picked up Python as more of a hobby and to investigate its usage to augment ETL tools where they lack the built in pre-processing or transformations I needed for various jobs.

This post is here to show you how easy it is, even with basic programming knowledge, to use Python to retrieve your SQS messages and manipulate them in any way you want for further processing.

Basic Process

In our project, the provider writes payloads to Amazon SQS as a JSON message. On our side we need to schedule a task that will get all of those messages, write them to files (so we can use them in our ETL processes) and back them up to S3.

sqs python diagram
Basic data flow

Prerequisites

Before getting started, you should become familiar with Boto. It is a Python interface for Amazon Web Services. It can be used with almost all of Amazon web services available. Examples in this post are from version 2.4, at the time this is written, the actual version is 3. It can be used alongside older versions, so it’s easy to implement it into any existing code.

For more information and tutorials for services other than SQS you can visit: http://boto.cloudhackers.com/en/latest/

Get SQS messages

First, we need to create connection object which we can use to write or retrieve messages from SQS.

This code is used to connect to SQS environment and create an object named ‘queue’ which we’ll use to get the messages (we can have multiple queues set up). Rather than embedding this sensitive information right in your code, you may want to investigate parameterization and IAM roles. For the sake of simplicity and testing, the code above should be what you need.
We can get up to 10 messages in a list at a time:

This creates a list ‘result_set’ with 10 random messages from the queue. To get the body of a first message in the list we use the following code:

Now we can write the ‘data’ variable to a file or process it any other way we want to. We write each of the messages in their own JSON file. After the message is read and written, we can delete it from the queue.
To improve performance, we’re using Python’s multiprocessing module to create four instances of our function. Even though we’re getting SQS messages randomly, there’s no possibility that two instances will try to read one message at the same time. That’s because of the SQS feature called ‘Visibility timeout’, which will make our message invisible to other processes for a certain period of time when we first access it, and that gives us enough time to process it and delete it after we’re done.

Backup files to S3

Once all of our messages are written to JSON files and before we load them into our database we want to back them up to S3. This process is also done using Boto for Python module.

We generate the list of files to upload using glob module and create a connection first to AWS S3 environment and then to the specific bucket we’ll to use as our backup bucket.
Below is an example of the code that iterates through list of files and uploads them to our S3 backup bucket.

Since glob module retrieves full path of our files, we use ntpath module to extract only the filename which we then use when creating new file on S3 bucket. We can also use this code inside a function and use multiprocessing to speed up the entire process.

Conclusion

This is just a small glimpse of what can be achieved using AWS and Python. AWS recently announced Python support for Lambda and the ability to schedule Lambda functions. We believe this is the next logical step for our code base which was previously running on an EC2 instance. As a developer, I just have to make sure my code does what it’s supposed to and can rely on serverless services like SQS and Lambda to scale as needed.

There’s even more possibilities with this type of architecture. For example, we can integrate our code with Amazon SNS to notify administrators of any processing issues. We can also extend this to leverage CloudWatch metrics for traceability and performance reporting.

Because the services are so tightly integrated, it is quite easy to be innovative and responsive to user needs.

If you’ve caught the Python bug like I have, you might want to check out Luigi which helps with dependency resolutions, workflow management, failure handling and general orchestration status. It probably wouldn’t make much sense with our simple example here, but there are certainly some benefits for larger and more complicated processes.


Let us know your feedback or questions. We’d love to hear how you’re integrating Python and AWS Services into your data workloads.

Leave a Reply

Your email address will not be published. Required fields are marked *