Minimal Agent¶
This guide walks through the examples/echo_agent.py example — the smallest
possible cortex-runtime agent that can receive and respond to tasks.
Prerequisites¶
- Python 3.12+
- Redis running locally (or via Docker:
docker run -p 6379:6379 redis:7-alpine) pip install cortex-runtime
The code¶
examples/echo_agent.py
"""
Minimal cortex-runtime agent: receives tasks on the 'eng' domain and echoes them back.
Requirements:
pip install cortex-runtime
Usage:
REDIS_HOST=localhost python examples/echo_agent.py
To send a test task (from another terminal):
redis-cli XADD cortex:tasks:eng '*' \\
msg_type task \\
payload '{"message_id":"t1","text":"hello","sender":"test","sender_id":"u1","domain":"eng"}'
"""
import asyncio
import logging
import os
from cortex_runtime.bus_redis import RedisStreamBus
from cortex_runtime.consumer import TaskConsumer
from cortex_runtime.models import BusConfig, StreamChunk, StreamChunkKind
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(message)s")
async def echo_execute(prompt: str, *, context=None, working_directory=None):
"""Trivial provider that echoes the prompt. Replace with your LLM call."""
yield StreamChunk(kind=StreamChunkKind.OUTPUT, data=f"Echo: {prompt}")
yield StreamChunk(kind=StreamChunkKind.COMPLETE)
async def main() -> None:
config = BusConfig(
host=os.environ.get("REDIS_HOST", "localhost"),
port=int(os.environ.get("REDIS_PORT", "6379")),
password=os.environ.get("REDIS_PASSWORD") or None,
)
bus = RedisStreamBus(config)
consumer = TaskConsumer(
identity="example-agent",
domains=["eng"],
bus=bus,
channel=None,
)
logging.getLogger(__name__).info(
"Echo agent starting — listening on domain 'eng'. Ctrl-C to stop."
)
await consumer.start(execute_fn=echo_execute)
if __name__ == "__main__":
asyncio.run(main())
What it does¶
BusConfig()readsREDIS_HOST,REDIS_PORT, andREDIS_PASSWORDfrom the environment. Defaults tolocalhost:6379with no auth.RedisStreamBus(config)creates the Redis connection pool.TaskConsumersubscribes to theengdomain stream (cortex:tasks:engin the default namespace).echo_executeis the provider: it yields oneOUTPUTchunk and then aCOMPLETEchunk.consumer.start()blocks until interrupted.
Sending a test task¶
With the agent running, open another terminal and push a task directly:
redis-cli XADD cortex:tasks:eng '*' \
msg_type task \
payload '{"message_id":"test-1","text":"hello world","sender":"test","sender_id":"u1","domain":"eng"}'
The agent will log the task and echo a response.
Next steps¶
- Replace
echo_executewith a real LLM call — see the Provider Guide. - Add external service integrations — see the Plugin Guide.
- Use
domains=["eng", "personal"]to subscribe to multiple streams. - Set
CORTEX_ORG_ID=myorgto partition all keys undercortex:myorg:*.