Implementing Asynchronous and Scheduled Tasks in Django with Celery and RabbitMQ

Install Redis for Windows from GitHub. For setup guidance, refer to a tutorial on Redis installation. If enconutering a binding error on port 6379, check solutions online. On Windows, install eventlet via pip install eventlet.

Install Celery 4.1.1 using pip install celery==4.1.1. Review resources for Celery basics and scheduling.

Initialize the Django project by modifying __init__.py:

import pymysql
from .celery import app as celery_app

pymysql.install_as_MySQLdb()
__all__ = ['celery_app']

Set up Celery in celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project_name.settings')

app = Celery('project_name', backend='amqp', broker='amqp://guest:guest@localhost:5672//')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Configure settings in settings.py:

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'custom_app',
]

BROKER_URL = 'amqp://guest:guest@127.0.0.1:5672//'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'execute-periodically': {
        'task': 'custom_app.tasks.compute',
        'schedule': timedelta(seconds=2),
        'args': (2, 3)
    }
}

To run asynchronous tasks, start the worker with celery -A project_name worker -l info. Define tasks in tasks.py:

from time import sleep
from celery import shared_task

@shared_task
def process_task(*args, **kwargs):
    print('Starting lengthy operation...')
    print(args)
    print(kwargs)
    sleep(10)
    print('Lengthy operation completed.')

Invoke tasks from views in views.py:

from .tasks import process_task
import json

def trigger_task(request):
    data = {"output": "sample data"}
    task_result = process_task.delay(json.dumps(data))
    print(f"Task ID: {task_result}")
    return render(request, 'template.html')

For scheduled tasks, initiate the scheduler with celery -A project_name beat -l info. Consult documentation for advancde scheduling configurations.

Tags: Celery Django RabbitMQ asynchronous Scheduled Tasks

Posted on Fri, 08 May 2026 05:33:40 +0000 by grail