Skip to content

Context

Langclaw has two distinct context objects — don't confuse them:

  • LangclawContext — per-request channel metadata (who/where a message came from), available to tools and middleware.
  • WorkflowContext — the ctx handed to a @app.workflow() body; the step surface (tool, llm, subagent, agent, parallel, phase, log).

LangclawContext

langclaw.LangclawContext(*, user_role='viewer', channel='', user_id='', context_id='', chat_id='', metadata=dict()) dataclass

Runtime context schema passed to every agent invocation.

Centralises channel metadata and RBAC role so middleware, tools, and user code can all read runtime.context uniformly.

WorkflowContext

The object every workflow body receives as its first argument (async def (ctx, inp) -> output). See the Workflows guide for usage.

langclaw.WorkflowContext(*, executor, memoize=None, phase_cb=None, log_cb=None, max_steps=1000, semaphore=None, _phase='default', _prefix='', _counter=None)

Step surface for a workflow body.

Parameters:

Name Type Description Default
executor StepExecutor

Async callable that performs a :class:StepRequest.

required
memoize Callable[[StepRequest, Callable[[], Awaitable[Any]]], Awaitable[Any]] | None

Optional async callable (StepRequest, runner) -> result that returns a cached result or runs runner and persists it (durable resume). When None, every step executes live.

None
phase_cb PhaseCallback | None

Optional callback invoked on each :meth:phase change.

None
log_cb LogCallback | None

Optional callback invoked on each :meth:log line — a free-text progress narrator (e.g. projected to a channel).

None
max_steps int

Step-count backstop for the whole run.

1000
semaphore Semaphore | None

Shared concurrency limiter for parallel fan-out.

None

The leading-underscore _phase / _prefix / _counter parameters are internal deterministic-ID state, also used to spawn child contexts inside :meth:parallel.

Source code in langclaw/workflows/context.py
def __init__(
    self,
    *,
    executor: StepExecutor,
    memoize: Callable[[StepRequest, Callable[[], Awaitable[Any]]], Awaitable[Any]]
    | None = None,
    phase_cb: PhaseCallback | None = None,
    log_cb: LogCallback | None = None,
    max_steps: int = 1000,
    semaphore: asyncio.Semaphore | None = None,
    _phase: str = "default",
    _prefix: str = "",
    _counter: list[int] | None = None,
) -> None:
    self._executor = executor
    self._memoize = memoize
    self._phase_cb = phase_cb
    self._log_cb = log_cb
    self._max_steps = max_steps
    self._semaphore = semaphore or asyncio.Semaphore(8)
    self._phase = _phase
    self._prefix = _prefix
    # Shared mutable step counter so the global backstop counts every step
    # across the run (and across child contexts spawned by parallel()).
    self._counter = _counter if _counter is not None else [0]
    # Per-(context) sequence for deterministic, run-stable step IDs.
    self._seq = 0

agent(name, prompt, *, schema=None) async

Run a step against a named agent, returning its reply.

When schema is given the reply is validated against it; a validation failure raises :class:WorkflowStepError.

Source code in langclaw/workflows/context.py
async def agent(self, name: str, prompt: str, *, schema: type | None = None) -> Any:
    """Run a step against a named agent, returning its reply.

    When *schema* is given the reply is validated against it; a validation
    failure raises :class:`WorkflowStepError`.
    """
    return await self._run_step("agent", name, prompt, schema=schema)

llm(prompt, *, schema=None, model=None, system=None) async

Make a single model call — no tools, no agent loop.

The lightweight third option beside :meth:tool (a deterministic capability) and :meth:subagent (an autonomous worker): a bare LLM call for one-shot judgment — classify, score, extract, rewrite, summarise.

Like every step it is memoised and crash-resumable, so a bare model call becomes a first-class, durable workflow step.

Parameters:

Name Type Description Default
prompt str

The user prompt.

required
schema type | None

Optional Pydantic model. When given, the reply is validated structured output (one structured call); when omitted, plain text.

None
model str | None

Optional model spec (e.g. "openai:gpt-4.1") overriding the workflow's default model for this call.

None
system str | None

Optional system instruction.

None
Source code in langclaw/workflows/context.py
async def llm(
    self,
    prompt: str,
    *,
    schema: type | None = None,
    model: str | None = None,
    system: str | None = None,
) -> Any:
    """Make a single model call — no tools, no agent loop.

    The lightweight third option beside :meth:`tool` (a deterministic
    capability) and :meth:`subagent` (an autonomous worker): a bare LLM call
    for one-shot judgment — classify, score, extract, rewrite, summarise.

    Like every step it is memoised and crash-resumable, so a bare model call
    becomes a first-class, durable workflow step.

    Args:
        prompt: The user prompt.
        schema: Optional Pydantic model. When given, the reply is validated
            structured output (one structured call); when omitted, plain text.
        model: Optional model spec (e.g. ``"openai:gpt-4.1"``) overriding the
            workflow's default model for this call.
        system: Optional system instruction.
    """
    payload = {"prompt": prompt, "system": system}
    return await self._run_step("llm", model or "", payload, schema=schema)

log(message)

Emit a free-text progress line (e.g. "3/10 sources fetched").

Routed to the injected log_cb (typically projected to the invoking channel as a progress event). A no-op when no callback is installed, so a workflow body can narrate freely without caring whether anyone is listening.

Source code in langclaw/workflows/context.py
def log(self, message: str) -> None:
    """Emit a free-text progress line (e.g. ``"3/10 sources fetched"``).

    Routed to the injected ``log_cb`` (typically projected to the invoking
    channel as a progress event).  A no-op when no callback is installed,
    so a workflow body can narrate freely without caring whether anyone is
    listening.
    """
    if self._log_cb is not None:
        self._log_cb(message)

parallel(thunks, *, return_exceptions=False) async

Run thunks concurrently, bounded by the run's concurrency budget.

Each thunk receives its own child context whose step IDs are prefixed by the branch index, so IDs stay deterministic across re-runs (required for durable resume). Returns results in input order.

By default this is fail-fast: if any branch raises, the exception propagates out of the batch (matching asyncio.gather). Pass return_exceptions=True to instead collect each failure in place — a branch that raises yields its exception object in the results list while its siblings still complete. Filter survivors with [r for r in results if not isinstance(r, Exception)].

Source code in langclaw/workflows/context.py
async def parallel(
    self,
    thunks: list[Callable[[WorkflowContext], Awaitable[Any]]],
    *,
    return_exceptions: bool = False,
) -> list[Any]:
    """Run *thunks* concurrently, bounded by the run's concurrency budget.

    Each thunk receives its own child context whose step IDs are prefixed by
    the branch index, so IDs stay deterministic across re-runs (required for
    durable resume).  Returns results in input order.

    By default this is **fail-fast**: if any branch raises, the exception
    propagates out of the batch (matching ``asyncio.gather``).  Pass
    ``return_exceptions=True`` to instead collect each failure *in place* —
    a branch that raises yields its exception object in the results list
    while its siblings still complete.  Filter survivors with
    ``[r for r in results if not isinstance(r, Exception)]``.
    """
    base = self._next_id()  # consume one slot to anchor the batch deterministically

    async def _run_branch(
        index: int, thunk: Callable[[WorkflowContext], Awaitable[Any]]
    ) -> Any:
        child = WorkflowContext(
            executor=self._executor,
            memoize=self._memoize,
            phase_cb=self._phase_cb,
            log_cb=self._log_cb,
            max_steps=self._max_steps,
            semaphore=self._semaphore,
            _phase=self._phase,
            _prefix=f"{base}:b{index}/",
            _counter=self._counter,
        )
        async with self._semaphore:
            return await thunk(child)

    return await asyncio.gather(
        *(_run_branch(i, t) for i, t in enumerate(thunks)),
        return_exceptions=return_exceptions,
    )

phase(name)

Start a new named phase; subsequent steps are grouped under it.

Source code in langclaw/workflows/context.py
def phase(self, name: str) -> None:
    """Start a new named phase; subsequent steps are grouped under it."""
    self._phase = name
    if self._phase_cb is not None:
        self._phase_cb(name)

subagent(subagent_type, prompt) async

Delegate to a subagent type, returning its reply.

Source code in langclaw/workflows/context.py
async def subagent(self, subagent_type: str, prompt: str) -> Any:
    """Delegate to a subagent type, returning its reply."""
    return await self._run_step("subagent", subagent_type, prompt)

tool(name, **kwargs) async

Invoke a tool by name with keyword arguments.

Source code in langclaw/workflows/context.py
async def tool(self, name: str, **kwargs: Any) -> Any:
    """Invoke a tool by name with keyword arguments."""
    return await self._run_step("tool", name, kwargs)