Core Model: The 3 Concerns¶
Airlock separates three orthogonal concerns that can be mixed and customized.
| Concern | Controlled By | Question |
|---|---|---|
| WHEN | Scope | When do effects escape? |
| WHAT | Policy | Which effects execute? |
| HOW | Executor | How do they run? |
Concern 1: WHEN (Scope)¶
Scopes control timing and lifecycle.
# Basic scope: flush on success, discard on error
with airlock.scope():
do_stuff()
# Effects buffered...
# Effects execute here (on normal exit)
# Transaction-aware scope: wait for commit
from airlock.integrations.django import DjangoScope
with transaction.atomic():
with airlock.scope(_cls=DjangoScope):
order.save()
airlock.enqueue(send_email, order.id)
# Effects still buffered...
# Effects execute here (after commit)
Scope decides:¶
- When to flush (end of block, after commit, custom)
- Whether to flush (success vs error)
- How to store buffer (memory, database, etc.)
Default: flush on normal exit, discard on exception.
Concern 2: WHAT (Policy)¶
Policies filter and observe intents.
# Drop all effects (dry-run)
with airlock.scope(policy=airlock.DropAll()):
process_orders() # Effects buffered but never dispatched
# Assert no effects (testing)
with airlock.scope(policy=airlock.AssertNoEffects()):
pure_function() # Raises if any enqueue() called
# Block specific tasks
with airlock.scope(policy=airlock.BlockTasks({"send_email"})):
process_order() # Emails dropped, other tasks execute
# Log everything
with airlock.scope(policy=airlock.LogOnFlush(logger)):
do_stuff() # All dispatches logged
Policy decides:¶
- Which intents are allowed (filter)
- What to observe (logging, metrics)
- When to fail fast (assertions)
Default: allow everything.
Concern 3: HOW (Executor)¶
Executors control dispatch mechanism.
# Sync execution (default)
with airlock.scope():
airlock.enqueue(my_function, arg=123)
# Executes: my_function(arg=123)
# Celery
from airlock.integrations.executors.celery import celery_executor
with airlock.scope(executor=celery_executor):
airlock.enqueue(celery_task, arg=123)
# Executes: celery_task.delay(arg=123)
# django-q
from airlock.integrations.executors.django_q import django_q_executor
with airlock.scope(executor=django_q_executor):
airlock.enqueue(any_function, arg=123)
# Executes: async_task(any_function, arg=123)
Executor decides:¶
- How to run the task (sync, queue, thread pool...)
- What protocol to use (Celery, django-q, Huey, custom)
Default: synchronous execution.
Mixing Concerns¶
The power is in composition:
# Transaction-aware + Celery + logging
from airlock.integrations.django import DjangoScope
from airlock.integrations.executors.celery import celery_executor
with airlock.scope(
_cls=DjangoScope, # WHEN: after transaction.on_commit()
executor=celery_executor, # HOW: via Celery
policy=LogOnFlush(logger) # WHAT: log everything
):
order.save()
airlock.enqueue(send_email, order.id)
# Waits for commit, dispatches via Celery, logs
# Test scope + sync executor + assertion
with airlock.scope(
_cls=Scope, # WHEN: immediate (no transaction)
executor=sync_executor, # HOW: synchronous
policy=AssertNoEffects() # WHAT: fail if anything enqueued
):
test_calculation()
# Migration scope + drop all + immediate
with airlock.scope(
_cls=Scope, # WHEN: immediate
executor=sync_executor, # HOW: doesn't matter (nothing runs)
policy=DropAll() # WHAT: suppress everything
):
backfill_data()
Global Defaults with configure()¶
Instead of passing arguments to every scope() call, you can set global defaults:
import airlock
from airlock.integrations.executors.celery import celery_executor
# Set once at app startup
airlock.configure(
executor=celery_executor, # Default HOW
policy=airlock.AllowAll(), # Default WHAT
)
# Now all scopes use these defaults
with airlock.scope(): # Uses celery_executor
airlock.enqueue(task)
@airlock.scoped() # Also uses configured defaults
def my_function():
airlock.enqueue(other_task)
Overriding Defaults¶
Explicit arguments always override configured defaults:
airlock.configure(policy=AllowAll())
# Uses configured defaults
with airlock.scope():
...
# Override policy for this scope
with airlock.scope(policy=DropAll()):
...
Configuration API¶
# Set defaults
airlock.configure(
scope_cls=..., # Default scope class
policy=..., # Default policy
executor=..., # Default executor
)
# Get current configuration (returns a copy)
config = airlock.get_configuration()
# Reset to defaults (mainly for testing)
airlock.reset_configuration()
Framework integrations (like Django) typically call configure() automatically at startup.