backend
advanced
Background Tasks with Celery and Redis
min read
Frederick Tubiermont
Background Tasks with Celery and Redis
Some operations are too slow for a web request: sending emails, calling an AI API, generating PDFs, processing uploads. The solution: hand the work off to a background worker and respond immediately.
When You Need Background Tasks
| Situation | Without Celery | With Celery |
|---|---|---|
| Send welcome email | User waits 2s for SMTP | Response in 50ms, email sent in background |
| Generate PDF report | Request times out after 30s | Response immediately, PDF ready in minutes |
| Call OpenAI API | User sees spinner for 5s | Instant response, result stored in DB |
| Process uploaded CSV | Browser times out | Background processes row by row |
Architecture
Flask App → Redis (queue) → Celery Worker → Database/Email/etc.
(broker) (separate process)
Installation
pip install celery redis
You also need Redis running. On Mac:
brew install redis && brew services start redis
On Railway, add a Redis plugin to your project.
Setup: celery_app.py
# celery_app.py
from celery import Celery
import os
def make_celery(app):
"""Create a Celery instance bound to the Flask app."""
celery = Celery(
app.import_name,
broker=os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
backend=os.environ.get("REDIS_URL", "redis://localhost:6379/0")
)
# Tasks run with the Flask app context
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
# app.py
from flask import Flask
from celery_app import make_celery
app = Flask(__name__)
app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY")
celery = make_celery(app)
Defining Tasks
# tasks/email_tasks.py
from app import celery
from utils.email import send_email
from utils.db import get_db
@celery.task(bind=True, max_retries=3)
def send_welcome_email_task(self, user_id: int):
"""Send a welcome email. Retries up to 3 times on failure."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT email, name FROM users WHERE id = %s", (user_id,))
user = cur.fetchone()
if not user:
return # User deleted — nothing to do
try:
send_email(
to=user["email"],
subject="Welcome to Flask Vibe!",
body=f"Hi {user['name']}, welcome aboard!"
)
except Exception as exc:
# Retry after 60 seconds
raise self.retry(exc=exc, countdown=60)
# tasks/report_tasks.py
from app import celery
from utils.db import get_db
@celery.task
def generate_monthly_report(month: int, year: int) -> dict:
"""Generate a monthly report and store results in DB."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT COUNT(*) as signups
FROM users
WHERE EXTRACT(MONTH FROM created_at) = %s
AND EXTRACT(YEAR FROM created_at) = %s
""", (month, year))
result = cur.fetchone()
cur.execute(
"INSERT INTO reports (month, year, signups) VALUES (%s, %s, %s)",
(month, year, result["signups"])
)
conn.commit()
return {"signups": result["signups"]}
Calling Tasks from Flask Routes
Use .delay() to queue a task without waiting for it:
# routes/auth.py
from tasks.email_tasks import send_welcome_email_task
@app.route("/register", methods=["POST"])
def register():
# ... create user in database ...
user_id = new_user["id"]
# Queue the email — don't wait for it
send_welcome_email_task.delay(user_id)
# Respond immediately
return redirect("/dashboard")
For tasks where you want to check the result later:
@app.route("/reports/generate", methods=["POST"])
@admin_required
def trigger_report():
month = int(request.form["month"])
year = int(request.form["year"])
# Queue task, get task ID
task = generate_monthly_report.delay(month, year)
# Store task ID so user can check progress
return redirect(f"/reports/status/{task.id}")
@app.route("/reports/status/<task_id>")
@admin_required
def report_status(task_id):
from celery.result import AsyncResult
result = AsyncResult(task_id)
return {
"status": result.status, # PENDING, SUCCESS, FAILURE
"result": result.result if result.ready() else None
}
Running Everything
You need two processes running simultaneously:
# Terminal 1: Flask app
python app.py
# Terminal 2: Celery worker
celery -A app.celery worker --loglevel=info
In production (Procfile):
web: gunicorn app:app
worker: celery -A app.celery worker --loglevel=info
Monitoring with Flower
Flower is a web UI for monitoring Celery tasks:
pip install flower
celery -A app.celery flower --port=5555
Visit http://localhost:5555 to see queued, running, and completed tasks.
Add REDIS_URL to .env
REDIS_URL=redis://localhost:6379/0
On Railway, this is automatically set when you add a Redis plugin.
Was this helpful?
Get More Flask Vibe Tutorials
Join 1,000+ developers getting weekly Flask tips and AI-friendly code patterns.
No spam. Unsubscribe anytime.