Custom Executors

Write custom executors to dispatch intents however you want.

The Executor Interface

An executor is just a callable that accepts an Intent:

def my_executor(intent: Intent) -> None:
    """Execute an intent."""
    intent.task(*intent.args, **intent.kwargs)

That's it! No inheritance, no protocol, just a function.

Example: Thread Pool

from concurrent.futures import ThreadPoolExecutor

executor_pool = ThreadPoolExecutor(max_workers=10)

def thread_executor(intent):
    """Execute in thread pool."""
    executor_pool.submit(intent.task, *intent.args, **intent.kwargs)

with airlock.scope(executor=thread_executor):
    airlock.enqueue(cpu_bound_task, data)
# Dispatches in background thread

Using Dispatch Options

Executors can read intent.dispatch_options to customize behavior:

def priority_executor(intent):
    """Execute high-priority tasks immediately, queue low-priority."""
    priority = intent.dispatch_options.get("priority", 5)

    if priority >= 8:
        intent.task(*intent.args, **intent.kwargs)
    else:
        celery_executor(intent)

airlock.enqueue(urgent_task, _dispatch_options={"priority": 10})

Error Handling

Executor exceptions abort flush by default (fail-fast behavior):

def careful_executor(intent):
    try:
        intent.task(*intent.args, **intent.kwargs)
    except Exception as e:
        logger.error(f"Failed to execute {intent.name}: {e}")
        raise  # Re-raise to abort flush, or omit to continue

Composing Executors

Wrap executors for additional behavior:

def with_retry(executor, retries=3):
    """Wrap executor with retry logic."""
    def retrying_executor(intent):
        for attempt in range(retries):
            try:
                executor(intent)
                return
            except Exception as e:
                if attempt == retries - 1:
                    raise
                logger.warning(f"Retry {attempt + 1}/{retries}")
    return retrying_executor

executor = with_retry(celery_executor, retries=3)

Built-in Executors

Airlock provides executors for common backends:

from airlock.integrations.executors.sync import sync_executor
from airlock.integrations.executors.celery import celery_executor
from airlock.integrations.executors.django_q import django_q_executor
from airlock.integrations.executors.huey import huey_executor
from airlock.integrations.executors.dramatiq import dramatiq_executor

See the API Reference for details on each executor's behavior.