Task Registry¶
Type-safe task access and invocation.
Overview¶
The TasksRegistry provides type-safe access to Celery tasks:
from ioc.container import get_container
from delivery.tasks.registry import TasksRegistry
container = get_container()
registry = container.resolve(TasksRegistry)
# Type-safe task access
result = registry.ping.delay()
Registry Structure¶
Task Names¶
Define task names as an enum:
# src/delivery/tasks/registry.py
from enum import StrEnum
class TaskName(StrEnum):
PING = "ping"
CLEANUP_SESSIONS = "cleanup_sessions"
SEND_EMAIL = "send_email"
Base Registry¶
# src/infrastructure/celery/registry.py
from celery import Celery, Task
class BaseTasksRegistry:
def __init__(self, app: Celery) -> None:
self._app = app
def _get_task_by_name(self, name: str) -> Task:
return self._app.tasks[name]
Task Registry¶
# src/delivery/tasks/registry.py
from typing import TYPE_CHECKING
from celery import Task
from infrastructure.celery.registry import BaseTasksRegistry
if TYPE_CHECKING:
from delivery.tasks.tasks.ping import PingResult
class TasksRegistry(BaseTasksRegistry):
@property
def ping(self) -> Task[[], PingResult]:
return self._get_task_by_name(TaskName.PING)
Type Hints¶
Use TYPE_CHECKING for result types:
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from delivery.tasks.tasks.cleanup import CleanupResult
from delivery.tasks.tasks.email import EmailResult
class TasksRegistry(BaseTasksRegistry):
@property
def ping(self) -> Task[[], PingResult]:
return self._get_task_by_name(TaskName.PING)
@property
def cleanup_sessions(self) -> Task[[int], CleanupResult]:
"""
Args:
days_old: Delete sessions older than this many days
"""
return self._get_task_by_name(TaskName.CLEANUP_SESSIONS)
@property
def send_email(self) -> Task[[str, str, str], EmailResult]:
"""
Args:
to: Recipient email
subject: Email subject
body: Email body
"""
return self._get_task_by_name(TaskName.SEND_EMAIL)
Calling Tasks¶
Async (Non-blocking)¶
# Returns immediately with AsyncResult
result = registry.ping.delay()
# Check status
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
# Get result (blocks until complete)
print(result.get(timeout=10))
With Arguments¶
# Positional arguments
result = registry.send_email.delay(
"user@example.com",
"Welcome!",
"Welcome to our service.",
)
# Keyword arguments
result = registry.cleanup_sessions.delay(days_old=7)
Apply Async (More Options)¶
result = registry.send_email.apply_async(
args=["user@example.com", "Subject", "Body"],
countdown=60, # Delay execution by 60 seconds
expires=3600, # Expire if not started within 1 hour
retry=True, # Retry on connection errors
retry_policy={
"max_retries": 3,
"interval_start": 0,
"interval_step": 0.2,
"interval_max": 0.5,
},
)
Synchronous (Blocking)¶
# Call directly (not recommended in web requests)
result = registry.ping()
print(result) # {"result": "pong"}
Avoid Synchronous Calls in Web Requests
Calling tasks synchronously in HTTP handlers causes problems:
- Blocks the web worker — The request hangs until the task completes
- Request timeouts — Long-running tasks will exceed HTTP timeout limits
- Database deadlocks — Tasks requiring the same database connection may deadlock
- Poor scalability — Blocked workers can't serve other requests
Use .delay() or .apply_async() instead, and return a task ID for status polling.
IoC Registration¶
The registry is created by TasksRegistryFactory:
# src/ioc/container.py
def _register_celery(container: Container) -> None:
container.register(TasksRegistryFactory, scope=Scope.singleton)
container.register(
TasksRegistry,
factory=lambda: container.resolve(TasksRegistryFactory)(),
scope=Scope.singleton,
)
Adding New Tasks¶
1. Add Task Name¶
2. Add Registry Property¶
if TYPE_CHECKING:
from delivery.tasks.tasks.my_task import MyTaskResult
class TasksRegistry(BaseTasksRegistry):
@property
def my_new_task(self) -> Task[[str, int], MyTaskResult]:
return self._get_task_by_name(TaskName.MY_NEW_TASK)
3. Create Controller¶
See Task Controllers.
Using in HTTP Controllers¶
class OrderController(Controller):
def __init__(
self,
tasks_registry: TasksRegistry,
) -> None:
self._tasks = tasks_registry
def create_order(self, request: HttpRequest, body: OrderSchema) -> OrderResponse:
order = Order.objects.create(**body.model_dump())
# Send confirmation email asynchronously
self._tasks.send_email.delay(
to=order.user.email,
subject="Order Confirmation",
body=f"Your order #{order.id} has been placed.",
)
return OrderResponse.model_validate(order, from_attributes=True)
Result Handling¶
Check Status¶
result = registry.ping.delay()
if result.ready():
if result.successful():
print(result.result)
else:
print(f"Failed: {result.result}")
else:
print("Still processing...")
Timeout¶
try:
result = registry.ping.delay().get(timeout=30)
except TimeoutError:
print("Task took too long")
Ignore Results¶
For fire-and-forget tasks:
registry.send_email.apply_async(
args=["user@example.com", "Subject", "Body"],
ignore_result=True,
)
Related Topics¶
- Task Controllers — Controller pattern
- Beat Scheduler — Scheduling
- Celery Task Tests — Testing