Download all docs
io

Queue

The asynchronous seam between your flows and a message broker — wire it as a flow entry point and it consumes messages one run at a time, or drive it from another element and it publishes, with consumer groups, acknowledgement modes, and retry-to-dead-letter handled for you.

Working with it

Selecting a Queue reveals its settings in the properties panel; it has no dedicated full-screen workbench.

How it appears

The same element type rendered as a definition, a circle instance, and a live workspace card.

Qu
type

Queue

Connect to message queues — consume messages and publish to queues

ioatomdefinition

When to use / not

When to use

  • Triggering a flow per message off a durable stream — wire queue as the entry point and each delivery starts a run, with retries up to max_deliver before dead-lettering.
  • Decoupling a slow or bursty producer from its consumer so work buffers instead of blocking, drained at the consumer's own pace.
  • Fanning one stream across shared workers via a durable consumer_group, so messages load-balance instead of duplicating.
  • Emitting a one-off message — ad-hoc, from a test, or mid-flow — with the publish operation, or watching backlog and dead-letter depth with queue_stats.

When not to use

  • Calling a request/response service and waiting on the reply — that is synchronous request/response; reach for http instead.
  • Firing work on a wall-clock cadence rather than per inbound message — schedule is the timer-driven trigger.
  • Real-time, connection-oriented duplex streaming to a live client — websocket holds the open socket queue does not.

Topology

Created from the library and placed inside an app or circle. It is a top-level building block you compose with other elements.

Properties

providerstring
Queue provider
subjectstring
NATS subject to publish/subscribe on (e.g. queue.orders)
queuestring
Queue/topic name (alias for display; defaults to subject)

Capabilities

Inherited from io
  • Network
  • Observe

Operations

  • activityGET
  • attachmentsGET
  • batch_statsGET
  • composePOST
  • contextGET
  • createPOST
  • deleteDELETE
  • disablePOST
  • enablePOST
  • export_bundleGET
  • getGET
  • import_bundlePOST
  • intentionGET
  • promotePOST
  • publishPOST
  • purgePOST
  • queue_statsGET
  • readmeGET
  • readme_updatePOST
  • receivePOST
  • remove-modifierPOST
  • restorePOST
  • schemaGET
  • sendPOST
  • sourceGET
  • source_branchesGET
  • source_promotePOST
  • source_repairPOST
  • source_statusGET
  • source_validatePOST
  • statsGET
  • test_connectionPOST
  • treeGET
  • updatePATCH
  • update_metaPATCH
  • versionGET

Ports

Inputs

  • triggerevent
  • requestrequest
  • messageevent
  • resultevent
  • acksignal

Composition

Errors / when it fails

queue_name is required for queue integration
Fails unless: queue_name != null && len(queue_name) > 0

Validation rules

  • High max_in_flight (>1000) may cause memory pressure

Queue (queue)

Category: io | Form: | Symbol: Qu

Connect to message queues — consume messages and publish to queues

Bidirectional message queue integration supporting NATS, RabbitMQ, SQS, and Kafka. Configure spec.provider, spec.queue, and spec.connection. When wired as a flow entry point (trigger input exposed), the element consumes messages from the configured queue. When driven from another element (request input wired), it publishes messages to a queue. Consumer and acknowledgment settings apply on the receive side. Batching and delivery settings apply on the publish side. Use queue_stats to monitor queue health. The purge operation removes messages by status — requires admin auth. The publish operation explicitly sends a single message without driving a full flow step.

Guide

Connect to message queues — consume messages and publish to queues

What It Does

Queue is an IO connector for asynchronous message processing. It can consume messages from a queue (triggering flows on each message) or publish messages to a queue from your flows. Supports NATS JetStream as the backing transport with configurable consumer groups, acknowledgement modes, and retry policies.

Element Definition

PropertyValue
Typequeue
Categoryio
Formatom

Key Properties

FieldTypeDefaultDescription
subjectstringNATS subject pattern to subscribe/publish to
consumer_groupstringDurable consumer group name for shared consumption
ack_policystringexplicitAcknowledgement: explicit, none, or all
max_deliverinteger3Maximum delivery attempts before dead-lettering
batch_sizeinteger1Messages to fetch per pull request

Usage

  1. Configure the subject pattern for your message stream
  2. Wire to an action element that processes each message
  3. Failed messages retry up to max_deliver times before being dead-lettered

Relationships

  • Attaches to: rate-limit
  • Uses: variable

Capabilities

  • consume: Consume messages from NATS JetStream, RabbitMQ, SQS, or Kafka
  • publish: Publish messages to NATS JetStream, RabbitMQ, SQS, or Kafka
  • ack: Manual message acknowledgment
  • batching: Batch message processing and publishing
  • replay: Message replay from offset
  • delay: Delayed message delivery
  • dedup: Message deduplication

Properties

PropertyTypeDefaultDescription
providerstring"nats"Queue provider
subjectstringNATS subject to publish/subscribe on (e.g. queue.orders)
queuestringQueue/topic name (alias for display; defaults to subject)
connectionobjectConnection configuration
consumerobjectConsumer configuration (applies when receiving)
acknowledgmentobjectAcknowledgment settings (applies when receiving)
dead_letterobjectDead letter queue settings (applies when receiving)
target_refstringUUID of the element (typically an automation) to trigger when a message arrives on this queue. Resolved by ConsumerRunner via the YAML-declared receives binding (signals.receives in contract.yaml). Empty = no automatic trigger; the queue still functions as a publish endpoint.
messageobjectMessage configuration (applies when publishing)
batchingobjectBatching configuration (applies when publishing)
deliveryobjectDelivery settings (applies when publishing)

Operations

activity

Get /ops/activity | Auth: Read

Get activity events for this element

Scope depends on element capabilities: individual elements query by element_id, project-form elements with activity-scope-members include member activities, circle-level elements with activity-scope-all query the entire circle. Gracefully returns empty list if activities table is missing (old circles).

attachments

Get /ops/attachments | Auth: Read

List all modifiers and resources attached to this element

Returns both modifiers (policy enforcement) and resources (data injection) with is_modifier flag to distinguish. Items in the generated MODIFIER_TYPES list are modifiers; everything else is a resource. Includes cascade_policy and version pin info.

batch_stats

Get /ops/batch_stats | Auth: Read

Get per-element statistics for all children of this element

Returns per-child stats plus an aggregate. Most meaningful on compound or manifest form elements (repositories, circles, projects); atoms have no children so the result is an empty children array with a zeroed aggregate. Uses efficient GROUP BY SQL. Weighted averages for eval scores.

compose

Post /ops/compose | Auth: Execute

Batch add and remove modifiers on this element in a single call

Declarative composition: add modifiers by ref path (slug or path@version) and remove by attachment ID, all in one atomic call on the target element. Each ‘add’ entry resolves the source element, validates topology, attaches with optional priority and cascade policy. Each ‘remove’ entry deletes the attachment row. Returns a summary of what was added and removed. Example: compose({ add: [{ref: “my-prompt”}, {ref: “rate-limit/api@v2”, priority: 50}], remove: [{attachment_id: “uuid”}] })

context

Get /ops/context | Auth: Read

Get connected elements (graph traversal)

Graph traversal showing all connected elements with their relationship type (contains, contained_by, references, referenced_by, attaches, etc.). Use ?depth=N to control traversal depth (default 1) and ?types=actor,data to filter by element types.

create

Post /ops/create | Auth: Write

Create child element

POST to the parent path — element_type goes in the request body, NOT the URL. Both element_type and slug are required and must be non-empty. Name is derived from slug if omitted. Writes to both Git and PostgreSQL. All elements are stored flat under the circle — no intermediate library wrapper rows.

delete

Delete /ops/delete | Auth: Admin

Delete element (soft delete)

Soft delete — sets state to ‘deleted’ but retains the record. Cannot delete elements that have children (has_no_bond precondition) or active runs. Requires admin auth and confirmation.

disable

Post /ops/disable | Auth: Admin

Disable element (hides and prevents use)

Idempotent — safe to call on already-disabled elements. Optionally pass a reason string. Disabled elements cannot be invoked or executed. Inverse of enable.

enable

Post /ops/enable | Auth: Admin

Enable element (makes usable and visible)

Idempotent — safe to call on already-enabled elements. Transitions element to ready/enabled state. Cannot enable deleted elements. Inverse of disable.

export_bundle

Get /ops/export/bundle | Auth: Read

Export element as downloadable git bundle

On non-root-namespace elements, returns a binary git bundle. On root-namespace (circle) elements, dispatch hands off to the circle’s own export_bundle op, which returns a multi-element JSON envelope with one base64 bundle per child element — this is intentional, not an error.

get

Get /ops/get | Auth: Read

Get element details

Element is already resolved by the routing layer — this returns the cached element, not a fresh DB query. Use the path /api/{circle}/{slug} to address elements.

import_bundle

Post /ops/import/bundle | Auth: Write

Import git bundle into element

Accepts a base64-encoded git bundle in the JSON bundle_base64 field. Use overwrite=true to replace existing elements with same slug (default skips duplicates). Imported elements get new UUIDs. Returns counts of imported/skipped elements and any errors.

intention

Get /ops/intention | Auth: Read

Get element intention with full inheritance chain

Returns three levels: direct (this element’s intention), inherited (from category and root), and resolved (final merged intention). Useful for understanding an element’s purpose in context of its hierarchy.

promote

Post /ops/promote | Auth: Admin

Promote element configuration to a target environment

Only for manifest-form elements (projects). Environments advance: dev → demo → live. dev→demo requires member+ role, demo→live requires admin. Freezes member versions at promotion time (creates snapshot). Persists environment config to spec.environments.

publish

Post /ops/publish | Auth: Execute

Publish a single message to the queue

Explicitly publishes one message to the configured queue without requiring a flow step. Useful for ad-hoc publishing, testing, or one-off messages from the operations panel. Put the message body in the payload field (canonical). The handler also accepts message and data as aliases for payload — useful when integrating with systems that use those field names. Optional subject override, headers, delay_ms, and durable (default true) fields are supported. Returns the message ID and published_at timestamp.

purge

Post /ops/purge | Auth: Admin

Purge messages from queue

Removes messages by status: pending, failed, dead_letter, or all. Defaults to failed. Destructive operation — requires admin auth. Returns count of purged messages. Use to clear dead letter queues, reset stuck consumers, or drain the publish buffer.

queue_stats

Get /ops/queue/stats | Auth: Read

Get queue statistics

Returns message counts by status: pending, processing, sent, failed, completed, dead_letter. Covers both consumer and publisher metrics in a single call. Use to monitor queue health, identify backlogs, and detect dead letter accumulation.

readme

Get /ops/readme | Auth: Read

Get element README.md content

Reads README.md from the element’s git repository. Returns empty content (not an error) if no README exists. Always returns markdown format.

readme_update

Post /ops/readme_update | Auth: Write

Update element README.md content

Creates or overwrites README.md in the element’s git repo. Commits to the draft branch. Content must be provided as a markdown string.

receive

Post /ops/receive | Auth: None

Receive incoming external traffic

Entry point for external traffic reaching this IO element. Declared auth: none to bypass platform auth — element-level auth is enforced by IoReceiveExecutor before dispatching into the flow graph. The flow/app that wires this element as an entry point determines what happens next.

remove-modifier

Post /ops/remove-modifier | Auth: Execute

Remove an attached modifier from this element by attachment ID

Removes a modifier/resource attachment by its row ID. The ID comes from the attachments or context API. This is the reverse of attach — called on the target element, not the source.

restore

Post /ops/restore | Auth: Admin

Restore element to a specific version

Automatically snapshots the current state before restoring (creates a ‘Before restore to vN’ version entry). Writes restored spec to git as .triform/spec.yaml. Git failures warn but don’t fail the operation — DB state is authoritative. Cannot restore deleted elements.

schema

Get /ops/schema | Auth: Read

Get element input/output schema (MCP tools/list compatible)

Returns type-level port schemas from the TypeRegistry — not instance-specific overrides. Includes direction (input/output), required flag, and JSON schema per port. Useful for understanding what data an element accepts and produces.

send

Post /ops/send | Auth: Execute

Send a message/request to external system

Explicitly sends payload to the configured external target. For HTTP elements, POSTs to the target URL. For chat platforms, sends via the platform API. Put data in the payload field. Returns send status and response details.

source

Get /ops/source | Auth: Read

Get any file’s content from the element’s git repository

Reads an arbitrary file from the element’s CAS-backed git tree by its relative path. Same store as readme, just generalized. Path safety: rejects .. traversal, leading /, and null bytes. Use this to view main.py for action elements, asset files for SPAs, etc. Returns empty content (not an error) if the file doesn’t exist.

source_branches

Get /ops/source/branches | Auth: Read

List Source branches for this element

Returns the standard draft/demo/live Source branches, their current commits, and promotion relationships. Use GET /api/{element_path}/ops/source/branches.

source_promote

Post /ops/source/promote | Auth: Write

Promote Source branch forward

Promotes draft to demo or demo to live through the generated element op path. Direct Git pushes to demo/live are blocked by Source policy.

source_repair

Post /ops/source/repair | Auth: Write

Inspect or repair the element Source index

Runs Source repair through the element operation path. Defaults to dry_run=true; set dry_run=false only after reviewing a dry-run report.

source_status

Get /ops/source/status | Auth: Read

Get Source control status for this element

Returns the branch-aware clone URL, checkout commands, current draft commit, child source-link count, portable export summary, Source health, warnings, and auth hints for the addressed element. Use the element-first path: GET /api/{element_path}/ops/source/status.

source_validate

Post /ops/source/validate | Auth: Read

Validate Source branch contents

Validates a Source branch before accepting local Git workflow changes or promotion. Defaults to branch=draft and rejects runtime data, generated output, secret material, and unreadable CAS refs.

stats

Get /ops/stats | Auth: Read

Get aggregate statistics for this element

Health status is computed: error if errors_per_day > 5 or success_rate < 0.8, warning if errors_per_day > 0 or success_rate < 0.95. Firing alerts escalate health to error/warning. Default period is ‘day’. Returns runs_per_day, success_rate, avg_duration_ms, and more.

test_connection

Post /ops/test-connection | Auth: Execute

Test connection configuration

Validates the element’s configuration locally without making an actual external connection. Checks that required credentials are set, URLs are valid, etc. Returns success boolean and error details. Safe to call repeatedly.

tree

Get /ops/tree | Auth: Read

Get the element’s position in the graph — ancestors, children, references, and subtree statistics

Uses per-circle ElementGraph cache for O(1) lookups. Returns ancestors (containment chain), children (direct), members (references), referenced_by (reverse refs), attachments, and subtree stats. Default depth is 3, max is 10. Pass ?include_metadata=true for name/state on each node.

update

Patch /ops/update | Auth: Write

Update element

Partial update — send only the fields you want to change. spec, name, and intention are all independently optional. spec MUST be a JSON object when present; deep-merged into the existing spec by default. Empty {"spec":{}} preserves existing spec content but still records a new version (no-op for content, not for version state). To clear/replace the entire spec wholesale send {"spec":{...},"deep":false}. List-typed spec fields use replace semantics (the patch list replaces the existing list, no array merging). Coordinates Git + DB writes. Slug cannot be changed after creation.

update_meta

Patch /ops/update_meta | Auth: Write

Update element metadata (lightweight merge — does NOT bump version or snapshot spec)

Shallow JSONB merge into element.meta. Top-level keys in the provided value replace existing meta values; other keys are preserved. Used for UI metadata like canvas positions, panel state, viewer preferences. Wire-shape op_name is update_meta (distinct from update) so SSE subscribers + the cache auto-invalidator can distinguish lightweight metadata changes from spec edits without inspecting the payload. The MutatingElementStore wrapper stamps this op_name on the lifecycle event emitted by update_element_meta storage calls.

version

Get /ops/version | Auth: Read

Get current version or full history

Returns current version by default. Pass ?history=true for full version history (up to ?limit=N, default 50). Versions are backed by the element_versions table. Every spec update creates a new version entry.

Error Codes

CodeClassRetryableDescription
QUEUE_CONNECTION_FAILEDinternalyesQueue broker connection failed
QUEUE_CONSUMER_FAILEDinternalyesConsumer setup failed
QUEUE_PUBLISH_FAILEDinternalyesMessage publish failed
QUEUE_NOT_FOUNDnot_foundnoQueue does not exist

Lifecycle / runtime

Inherited from io

Before request

  • validate_auth
  • check_rate_limit

After request

  • record_metrics

On error

  • log_error
  • retry_if_transient

Execution model: async

Observability

Defined for this element

Metrics

  • publish_count
  • consume_count
  • ack_count
  • nack_count
  • pending_messages
  • dead_letter_depth
  • processing_duration_ms
  • purge_count

Events

  • queue.message.published
  • queue.message.consumed
  • queue.message.acked
  • queue.message.nacked
  • queue.message.dead_lettered
  • queue.purged

Pricing / cost

Platform default

Operation costs

  • create: free
  • update: free
  • delete: free
  • get: free
  • list: free
  • invoke: 10000 micro-AU
  • tool_use: free

Set it up

Providerstring
Queue Namestring