Recipe: Wire Fan-In
This recipe collects events from several inbound channels into one ordered stream and feeds them to a single intelligence step, without coupling any source to the processor. Multiple io triggers publish into a queue, a rate-limit protects the downstream, and a lab consumes the merged flow. The connective tissue is the wire — the element that carries typed envelopes from one port to another.
The problem it solves
Events arrive from everywhere — a chat message, an HTTP webhook, an internal platform event — and you want one place that processes them, in order, at a rate the processor can survive. If each source talks directly to the processor you get tight coupling, no backpressure, and a thundering-herd risk. This recipe funnels every source through one buffered, rate-protected pipe.
Elements
| Element | Role |
|---|---|
slack, http, platform-trigger | Inbound triggers — each fires when its channel produces an event. |
queue | Buffers and orders the merged stream; publish to enqueue, consume downstream. |
rate-limit | Cascade modifier on the queue that caps the request rate into the processor. |
lab | The intelligence step that processes each message. |
wire | Connects each source’s output port to the queue’s input port and carries the envelopes. |
Flow
- Create a
queue. This is the fan-in point — every source converges here. - Create your inbound
ioelements — for example aslacklistener, anhttpendpoint, and aplatform-triggerthat fires on internal platform events. - Draw a
wirefrom each source’s output port to the queue’s input port. A wire isfrom.element:from.port → to.element:to.port; it carries each payload as a homogeneous envelope ({data, meta: {actor_id, ts, trace_id, schema_ref, port_id}}) and persists the latest value per port plus a short ring-buffer history. Several sources wiring into the one queue port is the fan-in. - Attach a
rate-limitto the queue. The queue’s contract attachesrate-limitdirectly; enforcement happens in the cascade middleware on incoming requests, and you can preview a decision (without spending quota) using rate-limit’sevaluateoperation. - Create a
laband have it consume from the queue. The lab’sinvoke/chat/categorizeoperations process each message as it is dequeued. - Inspect what flowed: a wire’s
get_port_cellreturns the latest envelope on a port andget_port_historyreturns the last N (ring-buffered, newest first). The queue’squeue_statsshows depth and throughput.
What this shows
The wire is the platform’s connectivity primitive — a typed, inspectable edge, not an invisible arrow. Because every payload is a uniform envelope persisted into per-port cells and history, you can open any wire and see exactly what last crossed it, which makes a fan-in pipeline debuggable at every hop. Sources stay decoupled from the processor: add a fourth trigger by drawing one more wire to the same queue port — nothing downstream changes. The queue supplies the buffer and ordering, the rate-limit supplies the backpressure, and the lab never sees more load than it can take.