Skip to content

Minimal Agent

This guide walks through the examples/echo_agent.py example — the smallest possible cortex-runtime agent that can receive and respond to tasks.

Prerequisites

  • Python 3.12+
  • Redis running locally (or via Docker: docker run -p 6379:6379 redis:7-alpine)
  • pip install cortex-runtime

The code

examples/echo_agent.py
"""
Minimal cortex-runtime agent: receives tasks on the 'eng' domain and echoes them back.

Requirements:
    pip install cortex-runtime

Usage:
    REDIS_HOST=localhost python examples/echo_agent.py

To send a test task (from another terminal):
    redis-cli XADD cortex:tasks:eng '*' \\
        msg_type task \\
        payload '{"message_id":"t1","text":"hello","sender":"test","sender_id":"u1","domain":"eng"}'
"""

import asyncio
import logging
import os

from cortex_runtime.bus_redis import RedisStreamBus
from cortex_runtime.consumer import TaskConsumer
from cortex_runtime.models import BusConfig, StreamChunk, StreamChunkKind

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(message)s")


async def echo_execute(prompt: str, *, context=None, working_directory=None):
    """Trivial provider that echoes the prompt. Replace with your LLM call."""
    yield StreamChunk(kind=StreamChunkKind.OUTPUT, data=f"Echo: {prompt}")
    yield StreamChunk(kind=StreamChunkKind.COMPLETE)


async def main() -> None:
    config = BusConfig(
        host=os.environ.get("REDIS_HOST", "localhost"),
        port=int(os.environ.get("REDIS_PORT", "6379")),
        password=os.environ.get("REDIS_PASSWORD") or None,
    )

    bus = RedisStreamBus(config)

    consumer = TaskConsumer(
        identity="example-agent",
        domains=["eng"],
        bus=bus,
        channel=None,
    )

    logging.getLogger(__name__).info(
        "Echo agent starting — listening on domain 'eng'. Ctrl-C to stop."
    )
    await consumer.start(execute_fn=echo_execute)


if __name__ == "__main__":
    asyncio.run(main())

What it does

  1. BusConfig() reads REDIS_HOST, REDIS_PORT, and REDIS_PASSWORD from the environment. Defaults to localhost:6379 with no auth.
  2. RedisStreamBus(config) creates the Redis connection pool.
  3. TaskConsumer subscribes to the eng domain stream (cortex:tasks:eng in the default namespace).
  4. echo_execute is the provider: it yields one OUTPUT chunk and then a COMPLETE chunk.
  5. consumer.start() blocks until interrupted.

Sending a test task

With the agent running, open another terminal and push a task directly:

redis-cli XADD cortex:tasks:eng '*' \
  msg_type task \
  payload '{"message_id":"test-1","text":"hello world","sender":"test","sender_id":"u1","domain":"eng"}'

The agent will log the task and echo a response.

Next steps

  • Replace echo_execute with a real LLM call — see the Provider Guide.
  • Add external service integrations — see the Plugin Guide.
  • Use domains=["eng", "personal"] to subscribe to multiple streams.
  • Set CORTEX_ORG_ID=myorg to partition all keys under cortex:myorg:*.