Implementing Asynchronous Task Processing in Flask with Celery

To handle long-running operations without blocking the main application thread, integrating Celery with Flask allows for efficient asynchronous task execution. This setup utilizes Redis as a message broker to manage the task queue and store results.

The implementation involves creating a Celery instance, configuring the connection details for Redis, registering specific functions as background tasks, and invoking them asynchronously.

from flask import Flask
from celery import Celery
import time

# Initialize Flask application
application = Flask(__name__)

# Configuration settings for message broker and result backend
application.config.update(
    CELERY_BROKER_URL='redis://127.0.0.1:6379/1',
    CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/1'
)

# Create the Celery instance and link it to the app
celery_client = Celery(application.name, broker=application.config['CELERY_BROKER_URL'])
celery_client.conf.update(application.config)

# Define an asynchronous task
@celery_client.task
def long_running_process(duration):
    time.sleep(duration)
    return f"Completed after {duration} seconds"

@application.route('/trigger')
def trigger_task():
    # Dispatch task to the worker
    task_id = long_running_process.delay(10)
    return f"Task started with ID: {task_id}"

if __name__ == '__main__':
    application.run(debug=True)

For more complex deployments, configuration is better managed through a dedicated class. This approach centralizes settings related to timezones, serialization, worker limits, and reliability options.

import time
from flask import Flask
from celery import Celery

class QueueConfiguration:
    # Timezone settings
    timezone = 'UTC'
    task_serializer = ''
    result_serializer = ''
    accept_content = ['']
    
    # Redis configuration
    broker_url = 'redis://localhost:6379/0'
    result_backend = 'redis://localhost:6379/0'
    
    # Worker behavior tuning
    worker_prefetch_multiplier = 1
    task_acks_late = True
    worker_max_tasks_per_child = 50
    worker_disable_rate_limits = True
    task_ignore_result = False
    
    # Reliability and concurrency
    worker_force_execv = True
    task_time_limit = 60 * 60 # 1 hour hard limit
    broker_transport_options = {
        'visibility_timeout': 3600,
        'max_retries': 3
    }

server = Flask(__name__)
# Initialize Celery with the configuration class
celery_instance = Celery(server.name)
celery_instance.config_from_object(QueueConfiguration)

@celery_instance.task
def compute_heavy_task(seconds):
    time.sleep(seconds)
    return f"Processed for {seconds}s"

@server.route('/')
def index():
    # Execute task asynchronously
    job = compute_heavy_task.delay(30)
    return f"

Task ID: {job.id}

" if __name__ == '__main__': server.run()

Tags: Flask Celery Redis python asynchronous-programming

Posted on Fri, 08 May 2026 10:32:07 +0000 by ozfred