Core API

The main airlock module. Import with import airlock.

Functions

scope

scope(policy: Policy | None = None, *, _cls: type[Scope] | None = None, **kwargs) -> Iterator[Scope]

Context manager defining a lifecycle boundary for side effects.

Parameters:
  • policy (Policy | None, default: None ) –

    Policy controlling what intents are allowed. Defaults to configured policy or AllowAll if not configured.

  • _cls (type[Scope] | None, default: None ) –

    Scope class to use. Defaults to configured scope_cls or Scope if not configured. Subclass Scope and override should_flush() to customize flush/discard behavior.

  • **kwargs

    Additional arguments passed to Scope constructor (e.g., executor).

  • executor

    Callable that executes intents. Defaults to configured executor or synchronous execution if not configured. See airlock.integrations.executors for available executors.

Behavior
  • On normal exit: calls flush() if should_flush(None) returns True
  • On exception: calls flush() if should_flush(error) returns True, else discard()

The default Scope.should_flush() returns True on success, False on error. Subclass Scope to customize this behavior.

Note

Arguments passed explicitly always override configured defaults. Use airlock.configure() to set application-wide defaults.

Example::

# Use celery executor
from airlock.integrations.executors.celery import celery_executor
with airlock.scope(executor=celery_executor):
    airlock.enqueue(my_task, ...)

# Use django-q executor
from airlock.integrations.executors.django_q import django_q_executor
with airlock.scope(executor=django_q_executor):
    airlock.enqueue(my_task, ...)

enqueue

enqueue(task: Callable, *args: Any, _name: str | None = None, _origin: str | None = None, _dispatch_options: dict[str, Any] | None = None, **kwargs: Any) -> None

Express intent to perform a side effect.

This is the ONLY function domain code should call.

Parameters:
  • task (Callable) –

    The callable to execute (Celery task, function, etc.).

  • *args (Any, default: () ) –

    Positional arguments for the task.

  • _name (str | None, default: None ) –

    Optional explicit name for the intent. If not provided, the name is derived from the task (its __name__ attribute, or for Celery tasks, the last component of the task's .name).

  • _origin (str | None, default: None ) –

    Optional origin metadata for debugging/observability. This is NOT auto-detected - it must be set explicitly if needed. Integrations (Django middleware, Celery task wrapper) may set this to provide context like request path, task name, or trace/span IDs. For structured observability, prefer OpenTelemetry span context.

  • _dispatch_options (dict[str, Any] | None, default: None ) –

    Optional dispatch options (countdown, queue, etc.). Passed through to the task queue backend (e.g., Celery's apply_async). Ignored for plain callables.

  • **kwargs (Any, default: {} ) –

    Keyword arguments for the task.

Raises:
  • PolicyEnqueueError

    If called from within a policy callback.

  • NoScopeError

    If no scope is active.

  • PolicyViolation

    If a policy explicitly rejects the intent via on_enqueue(). For example, AssertNoEffects policy raises PolicyViolation on any enqueue. When this happens, the intent is NOT added to the buffer.

policy

policy(p: Policy) -> Iterator[None]

Context manager for local policy contexts.

Intents enqueued within this context capture the policy and apply it at flush time. This enables local control without nested buffers.

Example::

with airlock.scope():
    airlock.enqueue(task_a)  # will dispatch

    with airlock.policy(DropAll()):
        airlock.enqueue(task_b)  # will NOT dispatch

    airlock.enqueue(task_c)  # will dispatch

Unlike nested scopes, all intents go to the same buffer. The local policy is metadata that affects dispatch decisions at flush.

Note

This does NOT create a new buffer or nested scope. All intents from within this context still go to the enclosing scope's buffer. The policy is captured on each intent at enqueue time and evaluated at flush. This is intentional - it preserves a single dispatch boundary while allowing fine-grained control over which intents survive.

get_current_scope

get_current_scope() -> Scope | None

Get the currently active airlock scope, if any.

Classes

Scope

A lifecycle scope that buffers and controls side effect intents.

Parameters:
  • policy (Policy) –

    Policy controlling what intents are allowed.

  • executor (Executor | None, default: None ) –

    Callable that executes intents. Defaults to synchronous execution. See airlock.integrations.executors for available executors.

intents property

intents: list[Intent]

Read-only access to buffered intents for inspection.

own_intents property

own_intents: list[Intent]

Intents enqueued directly in this scope (not captured from nested scopes).

captured_intents property

captured_intents: list[Intent]

Intents captured from nested scopes.

is_flushed property

is_flushed: bool

is_discarded property

is_discarded: bool

is_active property

is_active: bool

True if this scope is currently the active scope.

enter

enter() -> Scope

Activate this scope.

Sets the context var so enqueue() routes intents to this scope. Must call exit() when done, before calling flush() or discard().

Returns:
  • Scope

    Self for chaining.

Raises:

exit

exit() -> None

Deactivate this scope.

Resets the context var to the previous scope (or None). Must be called before flush() or discard().

Raises:

flush

flush() -> list[Intent]

Flush all buffered intents - apply policy and dispatch.

Filters intents through policies (both local and scope-level), then dispatches them in FIFO order using the configured executor.

Returns:
  • list[Intent]

    List of intents that were dispatched (after policy filtering).

Raises:
  • ScopeStateError

    If scope is already flushed, discarded, or still active.

  • Exception

    Any exception raised by the executor during dispatch (fail-fast behavior). See _dispatch_all() docstring for details on exception handling.

Note

The scope is marked as flushed even if an executor raises an exception. This prevents retry attempts, as the scope is in an inconsistent state (some intents may have been dispatched before the failure).

discard

discard() -> list[Intent]

Discard all buffered intents without dispatching.

should_flush

should_flush(error: BaseException | None) -> bool

Decide terminal action when context manager exits.

Override this method in subclasses to customize flush/discard behavior.

Parameters:
  • error (BaseException | None) –

    The exception that caused exit, or None for normal exit.

Returns:
  • bool

    True to flush (dispatch intents), False to discard.

Default behavior: flush on success, discard on error.

before_descendant_flushes

before_descendant_flushes(exiting_scope: Scope, intents: list[Intent]) -> list[Intent]

Called when a nested scope exits and attempts to flush.

This method is called during the parent chain walk, allowing each ancestor to decide which intents the exiting scope may flush vs which to capture.

Parameters:
  • exiting_scope (Scope) –

    The nested scope that is exiting (may be deeply nested).

  • intents (list[Intent]) –

    The list of intents the exiting scope wants to flush.

Returns:
  • list[Intent]

    list[Intent]: The list of intents to allow through (the exiting scope will flush these). Any intents not in the returned list are captured into this scope's buffer. Important: Must return a list; returning None or other types raises TypeError.

Raises:
  • TypeError

    If return value is not a list. Any other exception raised by this method will propagate and abort the flush, potentially leaving the scope in a partially-modified state.

Default behavior: Capture all intents (return []). This is the controlled default - outer scopes have authority over nested scopes.

Override this method to allow nested scopes to flush independently:

  • Return [] to capture all intents (default, controlled)
  • Return intents to allow all (independent nested scopes)
  • Return filtered list to selectively capture
Note
  • Do not mutate the intents list. Return a new list or slice.
  • Returning intents not in the input list has undefined behavior.
  • In multi-level nesting, exiting_scope is always the innermost scope, not necessarily the immediate child. Intermediate scopes haven't exited yet.

Example::

class IndependentScope(Scope):
    def before_descendant_flushes(self, exiting_scope, intents):
        return intents  # Allow nested scopes to flush independently

class SmartScope(Scope):
    def before_descendant_flushes(self, exiting_scope, intents):
        # Capture dangerous tasks, allow safe ones
        return [i for i in intents if 'dangerous' not in i.name]

Intent dataclass

Represents the intent to perform a side effect.

Stores the actual callable, not just a name. The name property derives a string for serialization/logging.

Captures local policy context at enqueue time for introspection and deferred application at flush.

task instance-attribute

task: Callable

args instance-attribute

args: tuple[Any, ...]

kwargs instance-attribute

kwargs: dict[str, Any]

origin class-attribute instance-attribute

origin: str | None = None

dispatch_options class-attribute instance-attribute

dispatch_options: dict[str, Any] | None = None

name property

name: str

The task's simple name (e.g., 'send_email', not the full module path).

If an explicit name was provided via _name parameter to enqueue(), that name is returned. Otherwise, the name is derived from the task.

local_policies property

local_policies: tuple[Policy, ...]

Local policies captured at enqueue time.

passes_local_policies

passes_local_policies() -> bool

Check if this intent passes its captured local policies.

Returns:
  • bool

    True if all local policies allow this intent.

Note

This does NOT guarantee the intent will be dispatched. It does not consider:

  • Scope-level policy (checked separately at flush)
  • Whether the scope flushes or discards
  • Dispatch execution success

Use for inspection and audit, not execution prediction.

Protocols

Policy

Bases: Protocol

Protocol for side effect policies.

Policies are per-intent boolean gates that decide which intents dispatch. This design enforces FIFO order by construction - policies can filter intents but cannot reorder them.

Methods:

Name Description
on_enqueue

Called when an intent is added to the buffer. Use for observation, logging, or raising PolicyViolation for hard blocks.

allows

Called at flush time for each intent. Return True to dispatch, False to silently drop.

on_enqueue

on_enqueue(intent: Intent) -> None

Called when an intent is added to the buffer. Observe or raise PolicyViolation.

allows

allows(intent: Intent) -> bool

Called at flush time. Return True to dispatch, False to drop.

Executor

Bases: Protocol

Protocol for intent executors.

An executor is a callable that takes an Intent and executes it via some dispatch mechanism (synchronous, Celery, django-q, etc.).

Built-in executors are available in airlock.integrations.executors:

  • sync_executor: Synchronous execution (default)
  • celery_executor: Dispatch via Celery .delay() / .apply_async()
  • django_q_executor: Dispatch via django-q's async_task()
  • django_tasks_executor: Dispatch via Django 6+'s built-in tasks framework
  • huey_executor: Dispatch via Huey's .schedule()
  • dramatiq_executor: Dispatch via Dramatiq's .send()

Custom executors can be written by implementing this protocol.

__call__

__call__(intent: Intent) -> None

Execute the given intent.

Built-in Policies

AllowAll

Policy that allows all side effects.

DropAll

Policy that drops all side effects.

AssertNoEffects

Policy that raises if any side effect is attempted.

BlockTasks

Policy that blocks specific tasks by name.

LogOnFlush

Policy that logs intents at flush time (allows all).

Exceptions

AirlockError

Bases: Exception

Base exception for all airlock errors.

UsageError

Bases: AirlockError

Raised when airlock is used incorrectly (API misuse).

NoScopeError

Bases: UsageError

Raised when enqueue() is called with no active scope.

This is intentional: airlock requires explicit lifecycle boundaries. Side effects should not escape silently. Every enqueue() must occur within a scope() that decides when (and whether) effects dispatch.

If you're seeing this error, wrap your code in an airlock.scope()::

with airlock.scope():
    do_stuff()  # enqueue() calls are now valid

PolicyEnqueueError

Bases: UsageError

Raised when enqueue() is called from within a policy callback.

ScopeStateError

Bases: AirlockError

Raised when an operation is invalid for the scope's current lifecycle state.

PolicyViolation

Bases: AirlockError

Raised when a policy explicitly rejects an intent.