Asynchronous Tasks Setup using Django, Celery and rabbitMQ

In this post, I’ll be talking about setting up a distributed task processing system for doing asynchronous processing. As your website grows and handles lot of traffic, there naturally comes a need to ensure best performance for your users. While there are multiple things which need to be done to achieve that, one of the most important things is processing things in background.

One of the common example is sending an email to the user. Instead of sending the email synchronously and making the user wait till it completes, a better way is to put this email into a queue to be processed in background and let the user continue with other actions. Email is just an example, there are tons of other things which can be moved to background processing to give seamless experience to the user. Also if you are getting too many requests, your server might be busy in processing them one by one and lot of users have to wait for the request to be served if you are doing everything synchronously. The background processing comes as an effective way to solve this. In this post, we will learn to setup this system using Django, Celery, RabbitMQ and flower.

Django is a high-level Python Web framework that encourages rapid development and clean, pragmatic design. It is blazing fast and a very matured framework with all the functionalities you will ever need to create a full fledged website. It’s out of the box admin interface is just amazing. Some of the well-known websites likes Pinterest, Instagram, Disqus, Bitbucket etc are built using django framework.

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.

RabbitMQ is a messaging broker which acts as a middleman between your web application and celery workers. It implements the Advanced Message Queuing Protocol (AMQP).

Flower is a web based tool for monitoring and administrating celery clusters. It is also the recommended monitoring tool for celery

 

How it Works

  1. The web application sends an amp message to the broker (rabbitMQ).
  2. Exchange receives the message and routes it to the appropriate queue.
  3. Now Celery workers consumes tasks from the queues they are subscribed to.

The following diagram explains the same

celery rabbitmq

Setting up RabbitMQ

Although there are various choices for messaging broker with two most popular being Redis and rabbitMQ,  rabbitMQ works best with celery. You can read more about brokers here

Install rabbitmq using the following command

sudo apt-get install rabbitmq-server

Once the installation is complete, create user, add a virtual host and set desired permissions.

sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

Now start the rabbitmq-server using this command

sudo rabbitmq-server

To run it in background, use the detached flag

sudo rabbitmq-server -detached

Now that your broker is running and listening for messages, lets setup celery now.

 

Setting up Celery

Install celery using the following command

pip install celery

Assuming you already have a Django project created, lets create a file named tasks.py which will contain celery tasks and configuration.

from celery import Celery
from django.conf import settings

app = Celery('tasks')

app.config_from_object('django.conf:settings')
@app.task
def add(x, y):
    return x + y

@app.task
def multiply(x,y):
    return x * y

Here we have created a celery instance named app. This instance is used as an entry point for everything you want to do with celery and it will be imported into other modules as well.

After that, we set some properties from settings file, we will come to that later.

Post that we have created 2 tasks, one for add and one for multiply.

 

Now lets define some properties in django settings file

BROKER_URL = "amqp://myuser:mypassword@localhost:5672/myvhost/"

For now, just setting the broker_url is enough. Notice that we have already defined myuser, mypassword and myvhost while setting up rabbitMQ, so you just have to substitute those values here.

Now just start the celery worker using the following command

celery -A tasks worker --loglevel=info

To call the task, simply use delay() method. You can also use apply_async if you want to give eta/countdown to the task.

>>> from tasks import add
>>> add.delay(1, 2)

 

Keeping the results

If you want to track the tasks’ states and history, you have to store the result somewhere. It can be database, memcache, redis, rpc etc. You can read more about the result backends here

Here I am taking example of database backed result backend

To set this, just add the following in the django settings file

CELERY_RESULT_BACKEND = 'db+scheme://user:password@host:port/dbname'

Now the results will be stored in the database. Storing results can be useful, but make sure that you don’t overload the database or whatever result backend you are using. By default, all the task results get expired after 1 day. This setting is controlled using CELERY_TASK_RESULT_EXPIRES. You can set this variable according to your requirement.

 

Routing tasks

By default all the tasks goes into a single queue named “celery” and all the workers will consume from this queue. However as the project goes bigger and tasks increases, it makes sense to have separate queues for different tasks. This is helpful to analyse and monitor specific tasks, and also to prioritise tasks as more dedicated workers can be assigned to high priorities queues. This arrangement gives much more flexibility in processing tasks. There is already a functionality in celery to achieve the same which is called task routing.

To do that, first we need to define routers. Create a file named router.py with the following contents

class MyRouter(object):
  def route_for_task(self, task, args=None, kwargs=None):
    return {'exchange': 'celery',
      'exchange_type': 'direct',
      'queue': task,
      'routing_key': task
    }

Here we have create a router class named MyRouter which defines the queue as the name of the task and routing key also as the name of the task.

Now lets set the CELERY_ROUTES and CELERY_QUEUES variables in our settings file

CELERY_ROUTES = ('proj.router.MyRouter',)
CELERY_QUEUES = (
  Queue('tasks.add', Exchange('celery'), routing_key='tasks.add'),
  Queue('tasks.multiply', Exchange('celery'), routing_key='tasks.multiply'),
)

CELERY_ROUTES specifies our MyRouter class here which sets the exchange, queue and routing_key for every task. CELERY_QUEUES defines the celery queues.

For this use case, there will be separate queues, one for add and other for multiply task.

Now lets say you want to start two different celery workers each dedicated to different queue. To achieve that, you can specify the queue from which you want that worker to fetch tasks

celery -A tasks worker -Q tasks.add --loglevel=info
celery -A tasks worker -Q tasks.multiply --loglevel=info

 

Monitoring tasks

Now that we have learnt to setup the broker, execute tasks, route tasks to specific queues and dedicating workers to according to queues; one more thing which is important is monitoring the tasks.

There are multiple tools available for monitoring and debugging celery clusters like celery command line utilities, django-admin monitor, celerymon etc, but the most popular one is flower. It is also the recommended monitoring tool for celery. Being a web based monitoring tool, its easy to use and provides extensive functionality. You can read more in detail here

Using flower is pretty straight forward, first install flower using pip

pip install flower

Now run flower using the following command, note that we have specified broker option here

celery flower --broker=amqp://myuser:mypassword@localhost:5672/myvhost

Just hit the localhost:5555 to access the tool.

 

By default it runs on port 5555, you can change it by using –port option

celery flower --port=5000 --broker=amqp://myuser:mypassword@localhost:5672/myvhost

 

I hope you find this article helpful. Let me know if you have any suggestions/ feedback in the comments section below.

Fun Fact: Game of Thrones season 6 episode 4 is also titled as the book of stranger 🙂

2 thoughts on “Asynchronous Tasks Setup using Django, Celery and rabbitMQ

  1. Pingback: Import Python: Import Python Weekly Issue 110 – Million requests per second, Jupyter notebook, Arduino and more | Adrian Tudor Web Designer and Programmer

  2. Pingback: Django Weekly: Django Weekly 24 – How to Evaluate Django Apps, Django ORM Mistakes and more | Adrian Tudor Web Designer and Programmer

Come on, I know you want to say it