Download all docs

Recipe: Wire Fan-In

This recipe collects events from several inbound channels into one ordered stream and feeds them to a single intelligence step, without coupling any source to the processor. Multiple io triggers publish into a queue, a rate-limit protects the downstream, and a lab consumes the merged flow. The connective tissue is the wire — the element that carries typed envelopes from one port to another.

The problem it solves

Events arrive from everywhere — a chat message, an HTTP webhook, an internal platform event — and you want one place that processes them, in order, at a rate the processor can survive. If each source talks directly to the processor you get tight coupling, no backpressure, and a thundering-herd risk. This recipe funnels every source through one buffered, rate-protected pipe.

Elements

ElementRole
slack, http, platform-triggerInbound triggers — each fires when its channel produces an event.
queueBuffers and orders the merged stream; publish to enqueue, consume downstream.
rate-limitCascade modifier on the queue that caps the request rate into the processor.
labThe intelligence step that processes each message.
wireConnects each source’s output port to the queue’s input port and carries the envelopes.

Flow

  1. Create a queue. This is the fan-in point — every source converges here.
  2. Create your inbound io elements — for example a slack listener, an http endpoint, and a platform-trigger that fires on internal platform events.
  3. Draw a wire from each source’s output port to the queue’s input port. A wire is from.element:from.port → to.element:to.port; it carries each payload as a homogeneous envelope ({data, meta: {actor_id, ts, trace_id, schema_ref, port_id}}) and persists the latest value per port plus a short ring-buffer history. Several sources wiring into the one queue port is the fan-in.
  4. Attach a rate-limit to the queue. The queue’s contract attaches rate-limit directly; enforcement happens in the cascade middleware on incoming requests, and you can preview a decision (without spending quota) using rate-limit’s evaluate operation.
  5. Create a lab and have it consume from the queue. The lab’s invoke / chat / categorize operations process each message as it is dequeued.
  6. Inspect what flowed: a wire’s get_port_cell returns the latest envelope on a port and get_port_history returns the last N (ring-buffered, newest first). The queue’s queue_stats shows depth and throughput.

What this shows

The wire is the platform’s connectivity primitive — a typed, inspectable edge, not an invisible arrow. Because every payload is a uniform envelope persisted into per-port cells and history, you can open any wire and see exactly what last crossed it, which makes a fan-in pipeline debuggable at every hop. Sources stay decoupled from the processor: add a fourth trigger by drawing one more wire to the same queue port — nothing downstream changes. The queue supplies the buffer and ordering, the rate-limit supplies the backpressure, and the lab never sees more load than it can take.

Next pages