
Python RQ (Redis Queue) is a lightweight asynchronous task queue library built on Redis and written in Python. Its primary function is to assist developers in placing long-running tasks (such as sending emails, processing large datasets, etc.) into a queue, which are then handled asynchronously by background processes to enhance application performance and responsiveness.
Installation of RQ
You can install it directly via pip:
$ pip install rq
Alternatively, to try the development version from GitHub:
$ pip install -e git+git@github.com:nvie/rq.git@master#egg=rq
Core Concepts in RQ
In the Python RQ library, there are three core concepts: Queue, Worker, and Job. Below, I detail each concept with corresponding examples.

Queue
In RQ, a queue is a storage location for pending tasks. When you want to execute a time-consuming task, you can place it in the queue for asynchronous processing by a Worker.
Example:
from rq import Queue
from redis import Redis
# Connect to the Redis server
redis_connection = Redis()
# Create a queue
queue = Queue('default', connection=redis_connection) # The first parameter is the queue name; it defaults to 'default' if omitted
Job
A job represents a specific operation to be executed, which can be a function, method, or any callable object. Once a job is enqueued, a Worker retrieves and executes it.
Example:
from rq import Queue
from redis import Redis
# Define a task function to be executed
def add_numbers(a, b):
return a + b
# Enqueue and execute the task
redis_connection = Redis()
queue = Queue(connection=redis_connection)
task = queue.enqueue(add_numbers, 1, 2)
Worker
A worker in RQ is responsible for executing jobs from the queue, enabling asynchronous task processing. Workers run as independent processes and are started via the command line.
Example:
$ rq worker
This starts a worker named "default" that fetches and executes jobs from the default queue.
In practical use, if there are multiple queues with varying priorities, you can specify the order when starting the worker. For instance, in the following example, jobs from the 'low' queue are prioritized over 'high' and 'default':
$ rq worker low high default
Advanced Usage of RQ
Beyond basic usage, Python RQ offers advanced features and options to meet more complex task queue management needs.
Custom Failure Handling
If a job fails, RQ allows custom handling of failed tasks. You can create a custom failure handler function and pass it to the failure parameter of the enqueue method.
# main.py
def handle_failure(task, exception_type, exception_value, traceback_info):
# Perform custom actions when a task fails
print(f"Task failed: {task.id}")
print(f"Exception Type: {exception_type}")
print(f"Exception Value: {exception_value}")
print(f"Traceback: {traceback_info}")
# Add a task to the queue with a custom failure handler
job = queue.enqueue(compute_fibonacci, 10, failure=handle_failure)
Task Dependencies
Sometimes, tasks have dependencies where one must wait for another to complete before execution. RQ can manage these dependencies.
# main.py
# Create multiple tasks
task_a = queue.enqueue(compute_fibonacci, 5)
task_b = queue.enqueue(compute_fibonacci, 10)
task_c = queue.enqueue(compute_fibonacci, 15)
# Set task dependencies
task_b.dependency = task_a
task_c.dependency = task_b
Scheduled Tasks
For scheduled tasks, RQ provides a convenient way to handle them using the schedule module to arrange execution times.
# main.py
from rq import get_scheduler
from datetime import datetime, timedelta
# Create a task scheduler
scheduler = get_scheduler(connection=redis_connection)
# Add a scheduled task to the scheduler
job = scheduler.schedule(
scheduled_time=datetime.now() + timedelta(minutes=30),
func=compute_fibonacci,
args=[10],
)
In this example, a task scheduler is created, and the schedule method is used to arrange task execution at a future time.
RQ Web Interface
RQ also offers a web interface for monitoring and managing task queues. You can start it as follows:
$ rq-dashboard
Then, visit http://localhost:9181 to view queue status and task execution details.
Checking Task Results
In RQ, you can check task execution status in multiple ways.
Method One
job.get_status(): Queries the current status of a task. This method returns a string indicating the task's status, which may be "queued" (waiting), "started" (in progress), "finished" (completed), or "failed" (failed).
from rq import Queue
from redis import Redis
# Define a task function to be executed
def add_numbers(a, b):
return a + b
# Enqueue and execute the task
redis_connection = Redis()
queue = Queue(connection=redis_connection)
task = queue.enqueue(add_numbers, 1, 2)
# Query task status
status = task.get_status()
print(f"Task status: {status}")
Method Two
job.result: Retrieves the execution result of a task. If the task hasn't completed, this attribute will block until it finishes and returns the result.
Note that calling job.result will block the current thread if the task hasn't completed.
from rq import Queue
from redis import Redis
# Define a task function to be executed
def add_numbers(a, b):
return a + b
# Enqueue and execute the task
redis_connection = Redis()
queue = Queue(connection=redis_connection)
task = queue.enqueue(add_numbers, 1, 2)
# Get task result
result = task.result
print(f"Task result: {result}")
RQ vs. Celery
Compared to RQ, Celery is more well-known; both are commonly used Python libraries for handling asynchronous tasks, but they differ in implementation and feature sets.
RQ
- Simple and Easy to Use: RQ is designed for simplicity, making it easy to get started, suitable for small projects or scenarios with straightforward task porcessing needs.
- Redis-Based: RQ uses Redis as the backend storage for queues, leveraging Redis data structures to manage task queues.
- Lightweight: Due to its simple design, RQ is relatively lightweight, ideal for quick integration and use.
- Monitoring and Management: RQ provides a simple web interface and command-line tools for monitoring and managing task queues.
Celery
- Feature-Rich: Celery is a powerful distributed task queue supporting advanced features like delayed tasks, scheduled tasks, priority queues, etc.
- Scalability: Celery offers extensive plugins and extension mechanisms to meet complex task processing requirements.
- Multi-Backend Support: Celery supports various message brokers as task queue backends, such as Redis, RabbitMQ, Amazon SQS, etc.
- Active Community: Celery has a large community and extensive documentation resources, making it suitable for complex projects.
When choosing between RQ and Celery, consider your specific situation:
- Project Scale: For small projects or simple task processing needs, RQ may be a more lightweight and intuitive choice; for complex task processing or large projects, Celery's features and scalability are more appropriate.
- Technology Stack: If you're already using Redis in your project and have uncomplicated task processing needs, consider RQ; if you require more complex task scheduling and processing features or need integration with other message brokers, choose Celery.
Summary
Python RQ is a powerful yet simple task queue management library for handling background tasks. Whether developing web applications, data processing pipelines, or other types of applications, RQ can easily manage and execute tasks, improving application performance and maintainability. This article aims to help you get started with Python RQ and begin using distributed task queues in your projects.