Celery Integration

Celery-specific components for airlock.

from airlock.integrations.celery import LegacyTaskShim, install_global_intercept

Task Base Classes

LegacyTaskShim

Bases: Task

Migration helper that intercepts .delay() and routes through airlock.

Emits DeprecationWarning to encourage updating call sites.

Example::

@app.task(base=LegacyTaskShim)
def old_task(...):
    ...

# This now goes through airlock:
old_task.delay(...)  # DeprecationWarning

Global Intercept

install_global_intercept

install_global_intercept(app: Celery | None = None, *, wrap_task_execution: bool = True) -> None

Install global interception of .delay() and .apply_async() calls.

When a scope is active, all .delay()/.apply_async() calls are routed through airlock.enqueue(). When no scope is active, calls emit a deprecation warning and pass through normally.

With wrap_task_execution=True (default), task execution is also wrapped in an airlock scope. This means:

  • Tasks automatically get an airlock scope
  • Any .delay() calls within tasks are intercepted and buffered
  • On task success: buffered intents flush
  • On task exception: buffered intents are discarded

This is a monkey-patch. Call it once at app startup, after Celery is configured but before tasks are invoked.

Example::

# celery.py or conftest.py
from airlock.integrations.celery import install_global_intercept

app = Celery(...)
install_global_intercept(app)

# Or without execution wrapping (only intercept .delay() calls):
install_global_intercept(app, wrap_task_execution=False)
Parameters:
  • app (Celery | None, default: None ) –

    Optional Celery app. Currently unused but reserved for future per-app scoping.

  • wrap_task_execution (bool, default: True ) –

    If True (default), wrap Task.__call__ in an airlock scope so tasks automatically buffer and flush effects.

Raises:
  • RuntimeError

    If called more than once.

uninstall_global_intercept

uninstall_global_intercept() -> None

Remove global interception. Mainly for testing.