Skip to content

Celery Tasks

Background task processing with Celery and the controller pattern.

Overview

Celery is used for distributed task processing with Redis as the broker and backend.

Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Client    │────>│    Redis    │────>│   Worker    │
│ (API/Bot)   │     │  (Broker)   │     │  (Celery)   │
└─────────────┘     └─────────────┘     └─────────────┘
                    ┌─────────────┐
                    │   Results   │
                    │   (Redis)   │
                    └─────────────┘

Topics

Quick Start

Start the Worker

make celery-dev

Start Beat Scheduler

make celery-beat-dev

Call a Task

from ioc.container import get_container
from delivery.tasks.registry import TasksRegistry

container = get_container()
registry = container.resolve(TasksRegistry)

# Async call (returns immediately)
result = registry.ping.delay()

# Wait for result
print(result.get(timeout=10))  # {"result": "pong"}

Configuration

Variable Type Default Description
REDIS_URL SecretStr Required Redis connection URL

The Celery app uses Redis for both broker and results:

celery_app = Celery(
    "main",
    broker=settings.redis_settings.redis_url.get_secret_value(),
    backend=settings.redis_settings.redis_url.get_secret_value(),
)

Entry Point

The Celery app is defined in delivery/tasks/app.py:

from celery import Celery

from core.configs.infrastructure import configure_infrastructure
from ioc.container import get_container

configure_infrastructure(service_name="celery-worker")

_container = get_container()
app = _container.resolve(Celery)

Available Tasks

Task Name Description
ping Health check task returning {"result": "pong"}

Docker Compose Services

celery-worker:
  command:
    - celery
    - --app=delivery.tasks.app
    - worker
    - --loglevel=${LOGGING_LEVEL:-INFO}
    - --concurrency=4

celery-beat:
  command:
    - celery
    - --app=delivery.tasks.app
    - beat
    - --loglevel=${LOGGING_LEVEL:-INFO}