backend advanced

Background Tasks with Celery and Redis

min read Frederick Tubiermont

Background Tasks with Celery and Redis

Some operations are too slow for a web request: sending emails, calling an AI API, generating PDFs, processing uploads. The solution: hand the work off to a background worker and respond immediately.

When You Need Background Tasks

Situation Without Celery With Celery
Send welcome email User waits 2s for SMTP Response in 50ms, email sent in background
Generate PDF report Request times out after 30s Response immediately, PDF ready in minutes
Call OpenAI API User sees spinner for 5s Instant response, result stored in DB
Process uploaded CSV Browser times out Background processes row by row

Architecture

Flask App  →  Redis (queue)  →  Celery Worker  →  Database/Email/etc.
              (broker)          (separate process)

Installation

pip install celery redis

You also need Redis running. On Mac:

brew install redis && brew services start redis

On Railway, add a Redis plugin to your project.

Setup: celery_app.py

# celery_app.py
from celery import Celery
import os

def make_celery(app):
    """Create a Celery instance bound to the Flask app."""
    celery = Celery(
        app.import_name,
        broker=os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
        backend=os.environ.get("REDIS_URL", "redis://localhost:6379/0")
    )

    # Tasks run with the Flask app context
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery
# app.py
from flask import Flask
from celery_app import make_celery

app = Flask(__name__)
app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY")

celery = make_celery(app)

Defining Tasks

# tasks/email_tasks.py
from app import celery
from utils.email import send_email
from utils.db import get_db

@celery.task(bind=True, max_retries=3)
def send_welcome_email_task(self, user_id: int):
    """Send a welcome email. Retries up to 3 times on failure."""
    with get_db() as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT email, name FROM users WHERE id = %s", (user_id,))
            user = cur.fetchone()

    if not user:
        return  # User deleted — nothing to do

    try:
        send_email(
            to=user["email"],
            subject="Welcome to Flask Vibe!",
            body=f"Hi {user['name']}, welcome aboard!"
        )
    except Exception as exc:
        # Retry after 60 seconds
        raise self.retry(exc=exc, countdown=60)
# tasks/report_tasks.py
from app import celery
from utils.db import get_db

@celery.task
def generate_monthly_report(month: int, year: int) -> dict:
    """Generate a monthly report and store results in DB."""
    with get_db() as conn:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT COUNT(*) as signups
                FROM users
                WHERE EXTRACT(MONTH FROM created_at) = %s
                AND EXTRACT(YEAR FROM created_at) = %s
            """, (month, year))
            result = cur.fetchone()

            cur.execute(
                "INSERT INTO reports (month, year, signups) VALUES (%s, %s, %s)",
                (month, year, result["signups"])
            )
            conn.commit()

    return {"signups": result["signups"]}

Calling Tasks from Flask Routes

Use .delay() to queue a task without waiting for it:

# routes/auth.py
from tasks.email_tasks import send_welcome_email_task

@app.route("/register", methods=["POST"])
def register():
    # ... create user in database ...
    user_id = new_user["id"]

    # Queue the email — don't wait for it
    send_welcome_email_task.delay(user_id)

    # Respond immediately
    return redirect("/dashboard")

For tasks where you want to check the result later:

@app.route("/reports/generate", methods=["POST"])
@admin_required
def trigger_report():
    month = int(request.form["month"])
    year = int(request.form["year"])

    # Queue task, get task ID
    task = generate_monthly_report.delay(month, year)

    # Store task ID so user can check progress
    return redirect(f"/reports/status/{task.id}")


@app.route("/reports/status/<task_id>")
@admin_required
def report_status(task_id):
    from celery.result import AsyncResult
    result = AsyncResult(task_id)

    return {
        "status": result.status,  # PENDING, SUCCESS, FAILURE
        "result": result.result if result.ready() else None
    }

Running Everything

You need two processes running simultaneously:

# Terminal 1: Flask app
python app.py

# Terminal 2: Celery worker
celery -A app.celery worker --loglevel=info

In production (Procfile):

web: gunicorn app:app
worker: celery -A app.celery worker --loglevel=info

Monitoring with Flower

Flower is a web UI for monitoring Celery tasks:

pip install flower
celery -A app.celery flower --port=5555

Visit http://localhost:5555 to see queued, running, and completed tasks.

Add REDIS_URL to .env

REDIS_URL=redis://localhost:6379/0

On Railway, this is automatically set when you add a Redis plugin.

Was this helpful?

Get More Flask Vibe Tutorials

Join 1,000+ developers getting weekly Flask tips and AI-friendly code patterns.

No spam. Unsubscribe anytime.