To handle long-running operations without blocking the main application thread, integrating Celery with Flask allows for efficient asynchronous task execution. This setup utilizes Redis as a message broker to manage the task queue and store results.
The implementation involves creating a Celery instance, configuring the connection details for Redis, registering specific functions as background tasks, and invoking them asynchronously.
from flask import Flask
from celery import Celery
import time
# Initialize Flask application
application = Flask(__name__)
# Configuration settings for message broker and result backend
application.config.update(
CELERY_BROKER_URL='redis://127.0.0.1:6379/1',
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/1'
)
# Create the Celery instance and link it to the app
celery_client = Celery(application.name, broker=application.config['CELERY_BROKER_URL'])
celery_client.conf.update(application.config)
# Define an asynchronous task
@celery_client.task
def long_running_process(duration):
time.sleep(duration)
return f"Completed after {duration} seconds"
@application.route('/trigger')
def trigger_task():
# Dispatch task to the worker
task_id = long_running_process.delay(10)
return f"Task started with ID: {task_id}"
if __name__ == '__main__':
application.run(debug=True)
For more complex deployments, configuration is better managed through a dedicated class. This approach centralizes settings related to timezones, serialization, worker limits, and reliability options.
import time
from flask import Flask
from celery import Celery
class QueueConfiguration:
# Timezone settings
timezone = 'UTC'
task_serializer = ''
result_serializer = ''
accept_content = ['']
# Redis configuration
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'
# Worker behavior tuning
worker_prefetch_multiplier = 1
task_acks_late = True
worker_max_tasks_per_child = 50
worker_disable_rate_limits = True
task_ignore_result = False
# Reliability and concurrency
worker_force_execv = True
task_time_limit = 60 * 60 # 1 hour hard limit
broker_transport_options = {
'visibility_timeout': 3600,
'max_retries': 3
}
server = Flask(__name__)
# Initialize Celery with the configuration class
celery_instance = Celery(server.name)
celery_instance.config_from_object(QueueConfiguration)
@celery_instance.task
def compute_heavy_task(seconds):
time.sleep(seconds)
return f"Processed for {seconds}s"
@server.route('/')
def index():
# Execute task asynchronously
job = compute_heavy_task.delay(30)
return f"Task ID: {job.id}
"
if __name__ == '__main__':
server.run()