Celery Tasks¶
Background task processing with Celery and the controller pattern.
Overview¶
Celery is used for distributed task processing with Redis as the broker and backend.
Architecture¶
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │────>│ Redis │────>│ Worker │
│ (API/Bot) │ │ (Broker) │ │ (Celery) │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Results │
│ (Redis) │
└─────────────┘
Topics¶
-
Task Controllers
Using the controller pattern for Celery tasks.
-
Task Registry
Type-safe task access and invocation.
-
Beat Scheduler
Scheduled and periodic tasks.
Quick Start¶
Start the Worker¶
Start Beat Scheduler¶
Call a Task¶
from ioc.container import get_container
from delivery.tasks.registry import TasksRegistry
container = get_container()
registry = container.resolve(TasksRegistry)
# Async call (returns immediately)
result = registry.ping.delay()
# Wait for result
print(result.get(timeout=10)) # {"result": "pong"}
Configuration¶
| Variable | Type | Default | Description |
|---|---|---|---|
REDIS_URL |
SecretStr |
Required | Redis connection URL |
The Celery app uses Redis for both broker and results:
celery_app = Celery(
"main",
broker=settings.redis_settings.redis_url.get_secret_value(),
backend=settings.redis_settings.redis_url.get_secret_value(),
)
Entry Point¶
The Celery app is defined in delivery/tasks/app.py:
from celery import Celery
from core.configs.infrastructure import configure_infrastructure
from ioc.container import get_container
configure_infrastructure(service_name="celery-worker")
_container = get_container()
app = _container.resolve(Celery)
Available Tasks¶
| Task Name | Description |
|---|---|
ping |
Health check task returning {"result": "pong"} |
Docker Compose Services¶
celery-worker:
command:
- celery
- --app=delivery.tasks.app
- worker
- --loglevel=${LOGGING_LEVEL:-INFO}
- --concurrency=4
celery-beat:
command:
- celery
- --app=delivery.tasks.app
- beat
- --loglevel=${LOGGING_LEVEL:-INFO}
Related Topics¶
- Your First Celery Task — Tutorial
- Celery Task Tests — Testing tasks
- Celery Documentation — Official docs