Core Model: The 3 Concerns

Airlock separates three orthogonal concerns that can be mixed and customized.

Concern Controlled By Question
WHEN Scope When do effects escape?
WHAT Policy Which effects execute?
HOW Executor How do they run?

Concern 1: WHEN (Scope)

Scopes control timing and lifecycle.

# Basic scope: flush on success, discard on error
with airlock.scope():
    do_stuff()
    # Effects buffered...
# Effects execute here (on normal exit)
# Transaction-aware scope: wait for commit
from airlock.integrations.django import DjangoScope

with transaction.atomic():
    with airlock.scope(_cls=DjangoScope):
        order.save()
        airlock.enqueue(send_email, order.id)
    # Effects still buffered...
# Effects execute here (after commit)

Scope decides:

  • When to flush (end of block, after commit, custom)
  • Whether to flush (success vs error)
  • How to store buffer (memory, database, etc.)

Default: flush on normal exit, discard on exception.

Concern 2: WHAT (Policy)

Policies filter and observe intents.

# Drop all effects (dry-run)
with airlock.scope(policy=airlock.DropAll()):
    process_orders()  # Effects buffered but never dispatched

# Assert no effects (testing)
with airlock.scope(policy=airlock.AssertNoEffects()):
    pure_function()  # Raises if any enqueue() called

# Block specific tasks
with airlock.scope(policy=airlock.BlockTasks({"send_email"})):
    process_order()  # Emails dropped, other tasks execute

# Log everything
with airlock.scope(policy=airlock.LogOnFlush(logger)):
    do_stuff()  # All dispatches logged

Policy decides:

  • Which intents are allowed (filter)
  • What to observe (logging, metrics)
  • When to fail fast (assertions)

Default: allow everything.

Concern 3: HOW (Executor)

Executors control dispatch mechanism.

# Sync execution (default)
with airlock.scope():
    airlock.enqueue(my_function, arg=123)
# Executes: my_function(arg=123)

# Celery
from airlock.integrations.executors.celery import celery_executor

with airlock.scope(executor=celery_executor):
    airlock.enqueue(celery_task, arg=123)
# Executes: celery_task.delay(arg=123)

# django-q
from airlock.integrations.executors.django_q import django_q_executor

with airlock.scope(executor=django_q_executor):
    airlock.enqueue(any_function, arg=123)
# Executes: async_task(any_function, arg=123)

Executor decides:

  • How to run the task (sync, queue, thread pool...)
  • What protocol to use (Celery, django-q, Huey, custom)

Default: synchronous execution.

Mixing Concerns

The power is in composition:

# Transaction-aware + Celery + logging
from airlock.integrations.django import DjangoScope
from airlock.integrations.executors.celery import celery_executor

with airlock.scope(
    _cls=DjangoScope,           # WHEN: after transaction.on_commit()
    executor=celery_executor,   # HOW: via Celery
    policy=LogOnFlush(logger)   # WHAT: log everything
):
    order.save()
    airlock.enqueue(send_email, order.id)
# Waits for commit, dispatches via Celery, logs
# Test scope + sync executor + assertion
with airlock.scope(
    _cls=Scope,                      # WHEN: immediate (no transaction)
    executor=sync_executor,          # HOW: synchronous
    policy=AssertNoEffects()         # WHAT: fail if anything enqueued
):
    test_calculation()
# Migration scope + drop all + immediate
with airlock.scope(
    _cls=Scope,                      # WHEN: immediate
    executor=sync_executor,          # HOW: doesn't matter (nothing runs)
    policy=DropAll()                 # WHAT: suppress everything
):
    backfill_data()

Global Defaults with configure()

Instead of passing arguments to every scope() call, you can set global defaults:

import airlock
from airlock.integrations.executors.celery import celery_executor

# Set once at app startup
airlock.configure(
    executor=celery_executor,      # Default HOW
    policy=airlock.AllowAll(),     # Default WHAT
)

# Now all scopes use these defaults
with airlock.scope():  # Uses celery_executor
    airlock.enqueue(task)

@airlock.scoped()  # Also uses configured defaults
def my_function():
    airlock.enqueue(other_task)

Overriding Defaults

Explicit arguments always override configured defaults:

airlock.configure(policy=AllowAll())

# Uses configured defaults
with airlock.scope():
    ...

# Override policy for this scope
with airlock.scope(policy=DropAll()):
    ...

Configuration API

# Set defaults
airlock.configure(
    scope_cls=...,    # Default scope class
    policy=...,       # Default policy
    executor=...,     # Default executor
)

# Get current configuration (returns a copy)
config = airlock.get_configuration()

# Reset to defaults (mainly for testing)
airlock.reset_configuration()

Framework integrations (like Django) typically call configure() automatically at startup.