Message Queues: Python and Celery

Written by Dan Sackett on September 26, 2014

Message queues are one of those things in software that you know exactly when you need to implement it.

In web, the decision usually stems from slow loading times because large database queries or expensive API calls. Luckily, the message queue is available to send tasks into the background, a lot like a cron job, and process them at another time so the user doesn't need to wait.

At work, I do PHP development and we use Resque, primarily the PHP wrapper created by Chris Boulton. I'm going to be doing a post about resque in the near future for those that write PHP, but today I wanted to cover a solution in Python.

The defacto solution in the world of Python is Celery because it's easy to use and can scale well. Today I wanted to get the basics down so you can start using it to process tasks in your applications.

What does it do

Celery gives us the ability to define tasks in a file. These tasks can be accessing an API, reading millions of records, or anything that might take some serious processing power. With these defined, we can then reference this task and tell Celery to delay it. By doing so, it will place it in the backend you specify and wait for a set of workers to come through and pick up the task. You can have any number of works running at once. They will process this task in the background and when it's done then it will remove it from the queue.

As well, we have the ability to specify a specific time to run a task and a scheduler can be activated to pick up those tasks to run.

Those two functions will help your pages load faster and give you some flexibility in how you manage your tasks.

Getting Started

Getting started with Celery is very easy. There are extensions to work with it in projects such as Django, but for our purposes all we're going to do is get a virtualenvironment set up and use pip to install our dependencies.

Once we have the virtualenvironment activated, let's get a few packages to play with:

$ pip install celery redis requests flower

and

$ sudo apt-get install redis

The following packages will be installed:

With those installed, let's get to the code.

Celery Config

When working with celery, I like to create a config file to handle whatever I need. Then in our application we can load the config into our celery object and use it from there. Let's create a new file called celeryconfig.py

# Broker settings.
BROKER_URL = 'redis://localhost:6379/0'

# Using the database to store task state and results.
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

# List of modules to import when celery starts.
CELERY_IMPORTS = ('tasks',)

# Set Serializers
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# Set timezone to UTC
CELERY_TIMEZONE = 'UTC'

Let's walk through this file:

If you'd like to learn more about configuration, checkout this link. There are plenty of options that you can tweak to get things setup how you prefer.

Writing our first task

Tasks are the heart and soul of this. We want to create functions that can be called in our code and they will be place in the queue to run when a worker can come by.

import requests
import celeryconfig
import json
from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)

@app.task
def count_number_of_repos(username):
    """Pass in a github username and return the number of repos they have"""
    url = 'https://api.github.com/users/{0}/repos'
    response = requests.get(url.format(username))
    print len(json.loads(response.content))

In this very simple example we load our dependencies and then define a variable app to the Celery() class. This creates a basic Celery config and in the next line we load our config file so we get the arguments that we set.

To define a task for our new app we can use the decorator @app.task which will say to the system that "this is a task function, add celery functionality to it". After that, we create a simple function like we would any other python function. In this function we pass it a username and it will send a request to the Github API to get the user's repositories. We then convert the response to JSON and count the number of results.

While this task actually isn't that intensive, it does hit an API and could be slow based on network and other factors so it's a good candidate for a message queue.

Running the task

Getting this task to run we will open two terminal sessions. In the first terminal, we have to first make sure that we have redis-server running.

$ sudo service redis-server start

We can then check that redis is running with:

$ redis-cli ping
PONG

If it returns PONG like above then we're good to go. As I mentioned, redis by default will run on localhost:6379.

Now in the same terminal let's start our celery worker script.

$ celery worker

You'll see some color output, the Celery options we have set, and a note that it's ready. Now in the other terminal, let's open the python interpreter.

$ python

And in the interpreter, let's queue our task.

>>> from tasks import count_number_of_repos
>>> count_number_of_repos.delay('dansackett')
<AsyncResult: 05ddef2d-7221-4d71-b856-05ba025e5c52>

Now if all went well and there were no errors in the script, you should see the AsyncResult object returned. If you look in the terminal where we have the celery worker running then you will see a timestamp and the printed result.

What happened?

Since we assigned our task function as a Celery task, it acquired a delay() method. When we called our function count_number_of_repos.delay(USERNAME) we said "Invoke this function but delay the processing until it can be computed by a worker". We pass the args to the delay method and the process is sent into the queue in our Redis database 0. Once it is processed, the result is sent to our Redis database 1.

Awesome, right?

So normally, we wouldn't call that task from the interpreter like that. Instead, we can add that call in our code base. Something like:

if request.method == 'POST':
    count_number_of_repos.delay(request.user.github_username)

This will then process when the task can be run and you don't need to wait for the result to send the user to a success page. It gives you a way to show that your website is super fast when in the background the server crunches the data.

Scheduling Tasks

A lot of times, we may want a task to run once a day at a certain time or perhaps we want it to run on an interval. For that, we have two options. The first is to add our schedules to the celeryconfig. We can define our scope and times and the task will be picked up as long as the scheduler is running.

The second way to do it would be to use the periodic_task decorator instead of the normal task decorator. I'll show both.

Adding to the celeryconfig:

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'every-5-seconds': {
        'task': 'tasks.count_number_of_repos',
        'schedule': timedelta(seconds=5)
        'args': ('dansackett')
    },
}

We could add that block to our config. Now obviously, this is a terrible idea in production. We wouldn't have a task that checks the number of repos for only my username every 5 seconds. For the sake of showing how it works though, we'll use this.

Now lets go back to the terminal. Let's exit the python interpreter but let's make sure you still have the celery worker running. In the other terminal type:

$ celery beat

Celery beat is the Celery scheduler. It will pick up tasks from the CELERYBEAT_SCHEDULE dict and will poll until it's time to run the task. This will show you that it's sending the task to the queue but will not show output. If you watch the worker though, you'll see values being printed every 5 seconds.

What's happening is the scheduler gets the task, sends it to the queue, and the worker picks them up as they become available. It's basically an answer to a cronjob.

Speaking of cronjobs, Celery allows us to specify Cron strings to run these tasks:

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'tasks.count_number_of_repos',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': ('dansackett'),
    },
}

Again, this isn't exactly the ideal task to run, but it demonstrates the use of a crontab.

I mentioned there is another way to schedule tasks and that's with a decorator. Open the tasks file and let's edit our task:

import requests
import celeryconfig
import json
from datetime import timedelta
from celery import Celery
from celery.task.base import periodic_task

app = Celery()
app.config_from_object(celeryconfig)

@periodic_task(run_every=timedelta(seconds=5))
def count_number_of_repos(username):
    """Pass in a github username and return the number of repos they have"""
    url = 'https://api.github.com/users/{0}/repos'
    response = requests.get(url.format(username))
    print len(json.loads(response.content))

We use the periodic_task decorator and when we run celery beat it will pick this task up. As well, you can pass a crontab object to the run_every option.

Monitoring

I mentioned in the beginning of the post that we could use something called Flower to monitor tasks. Let's try it.

$ flower

This will tell us to go to http://localhost:5555 to see the UI. If we go there, we can see a dashboard which will tell us about workers, tasks, the broker, and will also point us to the docs. This is a great way to keep track of things when you want to watch what's happening.

Conclusion

Celery is a great tool. There are a lot of features that I haven't mentioned yet (but hopefully will later). Hopefully now you understand why a message queue can help you and how you can implement one in Python.


python celery message queues

comments powered by Disqus