Channels
A short tutorial — connect a live, bursty transport (Slack, Discord) to the agent without letting a fast socket overwhelm a slow model. One bounded, coalescing queue does the impedance matching.
A channel is a long-lived, bursty transport — a Slack or Discord websocket —
that emits an unbounded stream of inbound messages. runAgent is the opposite: a
request/response unit, one prompt per session, that loops to a final answer. The
job of this guide is to put the two together without coupling their two rate
domains: the socket must drain continuously (stall it and the provider
disconnects you), while the model is slow and rate-limited.
The fix is a bounded, coalescing queue between them, plus a small amount of
wiring. runAgent itself never changes — everything new is a seam plus
caller-owned policy.
The three pieces
socket ── ChannelSource ──→ Dispatcher ──→ runAgent
(liveness/connect) (bounded queue, (unchanged)
coalesce, semaphore,
supersede)
ChannelSource.send ←── ChannelBridge ←── events (TextDelta, coalesced)ChannelSource— the transport seam, analogous toModelClient. It owns liveness only: heartbeat, reconnect with backoff, resume cursors, and normalizing provider events into oneInboundMessage { channelId, threadId, userId, text }. It knows nothing about the model.Dispatcher— the throttling layer: at most one in-flight run per session (concurrent runs would corrupt memory ordering), a burst coalesced into a single prompt, a global concurrency cap, and optional supersede (a newer message aborts a stale in-flight run).ChannelBridge— wires the two: each inbound message is mapped to asessionId(one per thread by default, so a thread becomes arunAgentconversation andMemoryJust Works) and submitted; each run's reply is coalesced and posted back to the originating thread.
A runnable example (no API key)
This program stands a InMemoryChannelSource in for a real socket and a tiny
latency-injecting echo model in for a real LLM, so you can watch the
backpressure mechanics with zero setup:
import {
assistantMessage,
ChannelBridge,
contentToText,
InMemoryChannelSource,
Role,
SessionMemoryStore,
StreamEventType,
} from "@open-agent-loops/core";
import type { ModelClient, ModelRequest } from "@open-agent-loops/core";
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
// A stand-in model with latency: inbound messages pile up while a run is in
// flight, which is what makes backpressure observable. It echoes the (coalesced)
// user turns it actually received, so you can see which messages survived.
const echoModel: ModelClient = {
async *stream(request: ModelRequest) {
await sleep(150);
const received = request.messages
.filter((m) => m.role === Role.User)
.map((m) => contentToText(m.content))
.join(" + ");
const reply = `handled: ${received}`;
for (const piece of reply.match(/.{1,12}/g) ?? []) {
yield { type: StreamEventType.TextDelta, text: piece };
}
yield { type: StreamEventType.Done, message: assistantMessage({ content: reply }) };
},
};
// The transport. A real bot swaps in a Slack/Discord ChannelSource here — the
// rest of the wiring is identical, because the transport is just a seam.
const source = new InMemoryChannelSource();
// The bridge wires the transport to runAgent through a bounded, coalescing queue.
const bridge = new ChannelBridge({
source,
base: { model: echoModel, memory: new SessionMemoryStore() },
capacity: 4, // per-thread spam ceiling
overflow: "drop-oldest", // shed the stalest message under a flood
maxConcurrency: 2, // at most 2 runs across all threads
});
await bridge.start();
// A burst: 10 messages to one thread, faster than a run can even start. The
// bounded buffer keeps the last 4 and sheds the rest; the survivors coalesce
// into ONE run, and its reply is posted back to the originating thread.
for (let i = 1; i <= 10; i++) {
source.emit({ channelId: "#general", threadId: "t1", userId: "u", text: `msg ${i}` });
}
console.log("right after the burst:", bridge.dispatcher.stats());
await sleep(400); // let the slow run finish
console.log("reply posted back: ", source.sent.map((s) => s.text));
console.log("final stats: ", bridge.dispatcher.stats());
await bridge.stop();Run it:
bun run examples/channels-tutorial/step1.tsright after the burst: { sessions: 1, inFlight: 1, queued: 4, dropped: 6, highWater: 4 }
reply posted back: [ "handled: msg 7 + msg 8 + msg 9 + msg 10" ]
final stats: { sessions: 1, inFlight: 0, queued: 0, dropped: 6, highWater: 4 }Ten messages arrive faster than a run can start. The buffer's capacity: 4 +
drop-oldest keeps the last four and sheds six; the survivors coalesce into
one run, whose single reply — handled: msg 7 + msg 8 + msg 9 + msg 10 — is
posted back to the thread it came from. The socket was never blocked: backpressure
was applied at the queue, and bridge.dispatcher.stats() reports it
(dropped, highWater) as it happens.
This is the load-bearing idea: bounded ≠ adaptive. The bounded queue keeps the
system from falling over and makes the load measurable; tuning to that load (an
AIMD controller reading highWater/dropped) is a separate layer you add on top.
The knobs
All forwarded to the dispatcher the bridge owns:
capacity— the per-thread spam ceiling.overflow— what happens when a thread's buffer is full:drop-oldest/drop-newest/block(propagate real backpressure to a blockable producer) /{ coalesce }(fold the arrival in).maxConcurrency— the global cap on runs in flight across all threads, the protection for the provider rate limit.supersede— abort the in-flight run when a newer message lands, instead of queueing behind it. Safe becauserunAgentpersists the prompt to memory before its first abort check, so a superseded turn survives in history.sessionIdFor— thethread → sessionIdmapping (default: one session per thread). Override for per-channel or per-user grain.
Make it real
Two swaps, no other changes:
- A real model — replace the echo model with an
OpenAICompatibleModel({ apiKey, model, baseURL }). - A real transport — implement
ChannelSource(start/send/stop) over the Slack Events API or Discord gateway, normalizing each event into anInboundMessage. Heartbeat and reconnect live entirely inside that class.
The fuller demo (two threads, isolation under a flood) is at
examples/channels/channels.ts; for the design and open questions, see
agent-loop-core/docs/channels.md.