Open Agent Loops

Channels

A short tutorial — connect a live, bursty transport (Slack, Discord) to the agent without letting a fast socket overwhelm a slow model. One bounded, coalescing queue does the impedance matching.

A channel is a long-lived, bursty transport — a Slack or Discord websocket — that emits an unbounded stream of inbound messages. runAgent is the opposite: a request/response unit, one prompt per session, that loops to a final answer. The job of this guide is to put the two together without coupling their two rate domains: the socket must drain continuously (stall it and the provider disconnects you), while the model is slow and rate-limited.

The fix is a bounded, coalescing queue between them, plus a small amount of wiring. runAgent itself never changes — everything new is a seam plus caller-owned policy.

The three pieces

socket ── ChannelSource ──→ Dispatcher ──→ runAgent
        (liveness/connect)   (bounded queue,   (unchanged)
                              coalesce, semaphore,
                              supersede)
        ChannelSource.send ←── ChannelBridge ←── events (TextDelta, coalesced)
  • ChannelSource — the transport seam, analogous to ModelClient. It owns liveness only: heartbeat, reconnect with backoff, resume cursors, and normalizing provider events into one InboundMessage { channelId, threadId, userId, text }. It knows nothing about the model.
  • Dispatcher — the throttling layer: at most one in-flight run per session (concurrent runs would corrupt memory ordering), a burst coalesced into a single prompt, a global concurrency cap, and optional supersede (a newer message aborts a stale in-flight run).
  • ChannelBridge — wires the two: each inbound message is mapped to a sessionId (one per thread by default, so a thread becomes a runAgent conversation and Memory Just Works) and submitted; each run's reply is coalesced and posted back to the originating thread.

A runnable example (no API key)

This program stands a InMemoryChannelSource in for a real socket and a tiny latency-injecting echo model in for a real LLM, so you can watch the backpressure mechanics with zero setup:

examples/channels-tutorial/step1.ts
import {
  assistantMessage,
  ChannelBridge,
  contentToText,
  InMemoryChannelSource,
  Role,
  SessionMemoryStore,
  StreamEventType,
} from "@open-agent-loops/core";
import type { ModelClient, ModelRequest } from "@open-agent-loops/core";

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

// A stand-in model with latency: inbound messages pile up while a run is in
// flight, which is what makes backpressure observable. It echoes the (coalesced)
// user turns it actually received, so you can see which messages survived.
const echoModel: ModelClient = {
  async *stream(request: ModelRequest) {
    await sleep(150);
    const received = request.messages
      .filter((m) => m.role === Role.User)
      .map((m) => contentToText(m.content))
      .join(" + ");
    const reply = `handled: ${received}`;
    for (const piece of reply.match(/.{1,12}/g) ?? []) {
      yield { type: StreamEventType.TextDelta, text: piece };
    }
    yield { type: StreamEventType.Done, message: assistantMessage({ content: reply }) };
  },
};

// The transport. A real bot swaps in a Slack/Discord ChannelSource here — the
// rest of the wiring is identical, because the transport is just a seam.
const source = new InMemoryChannelSource();

// The bridge wires the transport to runAgent through a bounded, coalescing queue.
const bridge = new ChannelBridge({
  source,
  base: { model: echoModel, memory: new SessionMemoryStore() },
  capacity: 4, // per-thread spam ceiling
  overflow: "drop-oldest", // shed the stalest message under a flood
  maxConcurrency: 2, // at most 2 runs across all threads
});
await bridge.start();

// A burst: 10 messages to one thread, faster than a run can even start. The
// bounded buffer keeps the last 4 and sheds the rest; the survivors coalesce
// into ONE run, and its reply is posted back to the originating thread.
for (let i = 1; i <= 10; i++) {
  source.emit({ channelId: "#general", threadId: "t1", userId: "u", text: `msg ${i}` });
}
console.log("right after the burst:", bridge.dispatcher.stats());

await sleep(400); // let the slow run finish
console.log("reply posted back:    ", source.sent.map((s) => s.text));
console.log("final stats:          ", bridge.dispatcher.stats());

await bridge.stop();

Run it:

bun run examples/channels-tutorial/step1.ts
right after the burst: { sessions: 1, inFlight: 1, queued: 4, dropped: 6, highWater: 4 }
reply posted back:     [ "handled: msg 7 + msg 8 + msg 9 + msg 10" ]
final stats:           { sessions: 1, inFlight: 0, queued: 0, dropped: 6, highWater: 4 }

Ten messages arrive faster than a run can start. The buffer's capacity: 4 + drop-oldest keeps the last four and sheds six; the survivors coalesce into one run, whose single reply — handled: msg 7 + msg 8 + msg 9 + msg 10 — is posted back to the thread it came from. The socket was never blocked: backpressure was applied at the queue, and bridge.dispatcher.stats() reports it (dropped, highWater) as it happens.

This is the load-bearing idea: bounded ≠ adaptive. The bounded queue keeps the system from falling over and makes the load measurable; tuning to that load (an AIMD controller reading highWater/dropped) is a separate layer you add on top.

The knobs

All forwarded to the dispatcher the bridge owns:

  • capacity — the per-thread spam ceiling.
  • overflow — what happens when a thread's buffer is full: drop-oldest / drop-newest / block (propagate real backpressure to a blockable producer) / { coalesce } (fold the arrival in).
  • maxConcurrency — the global cap on runs in flight across all threads, the protection for the provider rate limit.
  • supersede — abort the in-flight run when a newer message lands, instead of queueing behind it. Safe because runAgent persists the prompt to memory before its first abort check, so a superseded turn survives in history.
  • sessionIdFor — the thread → sessionId mapping (default: one session per thread). Override for per-channel or per-user grain.

Make it real

Two swaps, no other changes:

  1. A real model — replace the echo model with an OpenAICompatibleModel({ apiKey, model, baseURL }).
  2. A real transport — implement ChannelSource (start / send / stop) over the Slack Events API or Discord gateway, normalizing each event into an InboundMessage. Heartbeat and reconnect live entirely inside that class.

The fuller demo (two threads, isolation under a flood) is at examples/channels/channels.ts; for the design and open questions, see agent-loop-core/docs/channels.md.

On this page