Architecture¶
cortex-runtime is the kernel of a multi-agent mesh. This page covers the
load-bearing design decisions: why Redis Streams, how the turn loop works,
how session context stays coherent across turns, and how plugins integrate.
System topology¶
Conductor Agent(s)
────────────────────────── ──────────────────────────
TaskDispatcher TaskConsumer
│ │
│ XADD cortex:tasks:{domain} │
├──────────────────────────────────▶ XREADGROUP (consumer group)
│ │
│ XADD cortex:results │ execute_fn(prompt, context)
◀──────────────────────────────────┤
│ │
Redis Streams backbone (RedisStreamBus)
A mesh has one conductor and one or more agents. The conductor routes
incoming tasks (via TaskDispatcher) onto domain streams or agent-specific
streams. Each agent runs a TaskConsumer that blocks on XREADGROUP, executes
tasks, and writes results back.
The runtime ships without a bundled conductor or UI — you supply those.
Bus design: why Redis Streams¶
The bus (MessageBus protocol, RedisStreamBus implementation) makes four
commitments that ruled out simpler options:
| Requirement | Design choice |
|---|---|
| At-least-once delivery with explicit ACK | XREADGROUP + consumer groups |
| Crash recovery without reprocessing from offset 0 | Own-PEL redelivery via XAUTOCLAIM |
| Multi-agent fan-out on domain streams | One stream per domain; each agent is a consumer in the group |
| Agent-specific direct routing | Separate per-agent stream (cortex:tasks:agent:{name}) |
Compared to alternatives:
- Kafka — operationally heavy for a fleet of 2–10 agents; no native per-message ACK at the consumer-group level without offset management complexity.
- SQS / cloud queues — no offline self-hosted option; adds an AWS dependency to what is otherwise a local-first system.
- pub/sub (Redis PUBLISH) — fire-and-forget; no at-least-once semantics, no crash recovery.
Stale socket watchdog¶
Redis keeps TCP connections alive, but a half-open socket (e.g. after a network
partition) will block XREADGROUP indefinitely. The consumer uses
asyncio.wait_for() around each blocking read with a configurable
read_watchdog_ms (default 60 s). On timeout it reconnects and resumes.
This closed a real incident where an agent consumed no tasks for 40+ minutes
after an AWS Redis failover.
Own-PEL redelivery¶
On startup, RedisStreamBus calls XAUTOCLAIM to reclaim any messages in its
own pending-entry list (PEL) that were not ACKed before the last crash. This
prevents messages from sitting in the PEL indefinitely after an agent restart.
Turn loop¶
A single task execution follows this path:
1. XREADGROUP blocks on domain and agent streams
2. Deserialize message → TaskPayload
3. Mark message as "responded" in Redis (idempotency key)
4. Load session context (hot/warm/cold tiers — see below)
5. Build prompt (ContextRuntime.build_prompt)
6. Stream execute_fn(prompt, context) → yields StreamChunks
7. For each OUTPUT chunk → forward to reply channel
8. On COMPLETE chunk → finalize TaskResult, extract discoveries + memory proposals
9. XADD result to cortex:results stream
10. XACK message on the domain/agent stream
Steps 3 and 10 bracket the execution window. If the agent crashes between them,
XAUTOCLAIM returns the message on next startup (step 1 of the next run) and
the resumed: bool flag on the Envelope lets execute_fn adjust its behavior.
Session context: hot / warm / cold¶
Prompt context is tiered to stay within model token budgets without losing long-term continuity.
| Tier | What it holds | Token cost |
|---|---|---|
| Hot | Last N turns (configurable, default 20) | Full verbatim |
| Warm | Summarized transcript from older turns | Compressed |
| Cold | Agent memory files (long-term facts, user preferences) | On-demand |
ContextRuntime.prepare_payload() fills the hot tier from the session store,
requests a warm summary if the transcript exceeds warm_summary_threshold, and
attaches relevant cold memory entries. build_prompt() assembles the final
string from these tiers plus the incoming message.
Sessions are scoped by (channel, domain, thread_id). The SessionStore protocol
is intentionally abstract — swap in Redis, Postgres, or a flat file store.
Plugin system¶
Plugins give agents access to external services (calendars, databases, APIs) through a uniform action interface. They are discovered via Python entry points, not imports, so they can ship in separate packages without modifying the runtime.
Registering a plugin¶
In your plugin package's pyproject.toml:
Plugin lifecycle¶
discover_and_register() → instantiate, collect action metadata (no I/O)
│
ensure_ready(name) → call plugin.setup(broker) ← deferred until first use
│
execute(name, action, params) → validate params → plugin.execute_action()
│
shutdown() → plugin.teardown() for all loaded plugins
The two-phase approach means discovery is fast (no network) and setup failures don't block the agent from starting.
The ServicePlugin protocol¶
from typing import Protocol, runtime_checkable
from cortex_runtime.plugins.models import PluginActionInfo, PluginHealthReport, PluginResult
@runtime_checkable
class ServicePlugin(Protocol):
name: str
version: str
description: str
async def setup(self, broker) -> None: ...
async def teardown(self) -> None: ...
def list_actions(self) -> list[PluginActionInfo]: ...
async def execute_action(self, action: str, params: dict) -> PluginResult: ...
async def health(self) -> PluginHealthReport: ...
broker is typed as Any — supply your own credential broker. The runtime
never imports one, keeping this package dependency-free.
Redis key namespace¶
Keyspace centralizes all Redis key construction. Two modes:
| Mode | Prefix | When |
|---|---|---|
| Fleet (global) | cortex: |
CORTEX_ORG_ID absent, None, or "personal" |
| Org-partitioned | cortex:{org_id}: |
Any other CORTEX_ORG_ID |
from cortex_runtime.redis_keys import Keyspace
ks = Keyspace() # fleet
ks.tasks_domain_stream("eng") # → "cortex:tasks:eng"
ks = Keyspace("acme") # org-scoped
ks.tasks_domain_stream("eng") # → "cortex:acme:tasks:eng"
All stream names, result keys, responded keys, and session keys go through
Keyspace. This makes multi-tenant hosting possible by changing one config value.
Honest tradeoffs¶
These are known limitations of the current extraction, not future promises:
- Consumer name is hardcoded to
{identity}-1. Multi-consumer parallelism within a single agent process requires a naming scheme — not yet designed. - Memory proposals use string-matching (
[MEMORY: category/name]) rather than a structured protocol. Works in practice; fragile at the edges. - Domain file cache in
ContextRuntimeis never invalidated within a process lifetime. Long-running agents see stale domain facts after an update. - The provider turn-loop driver (
providers/_loop_driver.py) is a TODO stub. You must implementCortexProvideryourself for now.