Queue
The asynchronous seam between your flows and a message broker — wire it as a flow entry point and it consumes messages one run at a time, or drive it from another element and it publishes, with consumer groups, acknowledgement modes, and retry-to-dead-letter handled for you.
Working with it
Selecting a Queue reveals its settings in the properties panel; it has no dedicated full-screen workbench.
How it appears
The same element type rendered as a definition, a circle instance, and a live workspace card.
When to use / not
When to use
- Triggering a flow per message off a durable stream — wire queue as the entry point and each delivery starts a run, with retries up to max_deliver before dead-lettering.
- Decoupling a slow or bursty producer from its consumer so work buffers instead of blocking, drained at the consumer's own pace.
- Fanning one stream across shared workers via a durable consumer_group, so messages load-balance instead of duplicating.
- Emitting a one-off message — ad-hoc, from a test, or mid-flow — with the publish operation, or watching backlog and dead-letter depth with queue_stats.
When not to use
- Calling a request/response service and waiting on the reply — that is synchronous request/response; reach for http instead.
- Firing work on a wall-clock cadence rather than per inbound message — schedule is the timer-driven trigger.
- Real-time, connection-oriented duplex streaming to a live client — websocket holds the open socket queue does not.
Topology
Created from the library and placed inside an app or circle. It is a top-level building block you compose with other elements.
Properties
providerstring- Queue provider
subjectstring- NATS subject to publish/subscribe on (e.g. queue.orders)
queuestring- Queue/topic name (alias for display; defaults to subject)
Capabilities
Inherited from io
- Network
- Observe
Operations
- activityGET
- attachmentsGET
- batch_statsGET
- composePOST
- contextGET
- createPOST
- deleteDELETE
- disablePOST
- enablePOST
- export_bundleGET
- getGET
- import_bundlePOST
- intentionGET
- promotePOST
- publishPOST
- purgePOST
- queue_statsGET
- readmeGET
- readme_updatePOST
- receivePOST
- remove-modifierPOST
- restorePOST
- schemaGET
- sendPOST
- sourceGET
- source_branchesGET
- source_promotePOST
- source_repairPOST
- source_statusGET
- source_validatePOST
- statsGET
- test_connectionPOST
- treeGET
- updatePATCH
- update_metaPATCH
- versionGET
Ports
Inputs
- triggerevent
- requestrequest
- messageevent
- resultevent
- acksignal
Composition
Errors / when it fails
- queue_name is required for queue integration
- Fails unless:
queue_name != null && len(queue_name) > 0
Validation rules
- High max_in_flight (>1000) may cause memory pressure
Queue (queue)
Category: io | Form: | Symbol: Qu
Connect to message queues — consume messages and publish to queues
Bidirectional message queue integration supporting NATS, RabbitMQ, SQS, and Kafka. Configure spec.provider, spec.queue, and spec.connection. When wired as a flow entry point (trigger input exposed), the element consumes messages from the configured queue. When driven from another element (request input wired), it publishes messages to a queue. Consumer and acknowledgment settings apply on the receive side. Batching and delivery settings apply on the publish side. Use queue_stats to monitor queue health. The purge operation removes messages by status — requires admin auth. The publish operation explicitly sends a single message without driving a full flow step.
Guide
Connect to message queues — consume messages and publish to queues
What It Does
Queue is an IO connector for asynchronous message processing. It can consume messages from a queue (triggering flows on each message) or publish messages to a queue from your flows. Supports NATS JetStream as the backing transport with configurable consumer groups, acknowledgement modes, and retry policies.
Element Definition
| Property | Value |
|---|---|
| Type | queue |
| Category | io |
| Form | atom |
Key Properties
| Field | Type | Default | Description |
|---|---|---|---|
subject | string | — | NATS subject pattern to subscribe/publish to |
consumer_group | string | — | Durable consumer group name for shared consumption |
ack_policy | string | explicit | Acknowledgement: explicit, none, or all |
max_deliver | integer | 3 | Maximum delivery attempts before dead-lettering |
batch_size | integer | 1 | Messages to fetch per pull request |
Usage
- Configure the subject pattern for your message stream
- Wire to an action element that processes each message
- Failed messages retry up to
max_delivertimes before being dead-lettered
Relationships
- Attaches to: rate-limit
- Uses: variable
Capabilities
- consume: Consume messages from NATS JetStream, RabbitMQ, SQS, or Kafka
- publish: Publish messages to NATS JetStream, RabbitMQ, SQS, or Kafka
- ack: Manual message acknowledgment
- batching: Batch message processing and publishing
- replay: Message replay from offset
- delay: Delayed message delivery
- dedup: Message deduplication
Properties
| Property | Type | Default | Description |
|---|---|---|---|
provider | string | "nats" | Queue provider |
subject | string | — | NATS subject to publish/subscribe on (e.g. queue.orders) |
queue | string | — | Queue/topic name (alias for display; defaults to subject) |
connection | object | — | Connection configuration |
consumer | object | — | Consumer configuration (applies when receiving) |
acknowledgment | object | — | Acknowledgment settings (applies when receiving) |
dead_letter | object | — | Dead letter queue settings (applies when receiving) |
target_ref | string | — | UUID of the element (typically an automation) to trigger when a message arrives on this queue. Resolved by ConsumerRunner via the YAML-declared receives binding (signals.receives in contract.yaml). Empty = no automatic trigger; the queue still functions as a publish endpoint. |
message | object | — | Message configuration (applies when publishing) |
batching | object | — | Batching configuration (applies when publishing) |
delivery | object | — | Delivery settings (applies when publishing) |
Operations
activity
Get /ops/activity | Auth: Read
Get activity events for this element
Scope depends on element capabilities: individual elements query by element_id, project-form elements with activity-scope-members include member activities, circle-level elements with activity-scope-all query the entire circle. Gracefully returns empty list if activities table is missing (old circles).
attachments
Get /ops/attachments | Auth: Read
List all modifiers and resources attached to this element
Returns both modifiers (policy enforcement) and resources (data injection) with is_modifier flag to distinguish. Items in the generated MODIFIER_TYPES list are modifiers; everything else is a resource. Includes cascade_policy and version pin info.
batch_stats
Get /ops/batch_stats | Auth: Read
Get per-element statistics for all children of this element
Returns per-child stats plus an aggregate. Most meaningful on compound or manifest form elements (repositories, circles, projects); atoms have no children so the result is an empty children array with a zeroed aggregate. Uses efficient GROUP BY SQL. Weighted averages for eval scores.
compose
Post /ops/compose | Auth: Execute
Batch add and remove modifiers on this element in a single call
Declarative composition: add modifiers by ref path (slug or path@version) and remove by attachment ID, all in one atomic call on the target element. Each ‘add’ entry resolves the source element, validates topology, attaches with optional priority and cascade policy. Each ‘remove’ entry deletes the attachment row. Returns a summary of what was added and removed. Example: compose({ add: [{ref: “my-prompt”}, {ref: “rate-limit/api@v2”, priority: 50}], remove: [{attachment_id: “uuid”}] })
context
Get /ops/context | Auth: Read
Get connected elements (graph traversal)
Graph traversal showing all connected elements with their relationship type (contains, contained_by, references, referenced_by, attaches, etc.). Use ?depth=N to control traversal depth (default 1) and ?types=actor,data to filter by element types.
create
Post /ops/create | Auth: Write
Create child element
POST to the parent path — element_type goes in the request body, NOT the URL. Both element_type and slug are required and must be non-empty. Name is derived from slug if omitted. Writes to both Git and PostgreSQL. All elements are stored flat under the circle — no intermediate library wrapper rows.
delete
Delete /ops/delete | Auth: Admin
Delete element (soft delete)
Soft delete — sets state to ‘deleted’ but retains the record. Cannot delete elements that have children (has_no_bond precondition) or active runs. Requires admin auth and confirmation.
disable
Post /ops/disable | Auth: Admin
Disable element (hides and prevents use)
Idempotent — safe to call on already-disabled elements. Optionally pass a reason string. Disabled elements cannot be invoked or executed. Inverse of enable.
enable
Post /ops/enable | Auth: Admin
Enable element (makes usable and visible)
Idempotent — safe to call on already-enabled elements. Transitions element to ready/enabled state. Cannot enable deleted elements. Inverse of disable.
export_bundle
Get /ops/export/bundle | Auth: Read
Export element as downloadable git bundle
On non-root-namespace elements, returns a binary git bundle. On root-namespace (circle) elements, dispatch hands off to the circle’s own export_bundle op, which returns a multi-element JSON envelope with one base64 bundle per child element — this is intentional, not an error.
get
Get /ops/get | Auth: Read
Get element details
Element is already resolved by the routing layer — this returns the cached element, not a fresh DB query. Use the path /api/{circle}/{slug} to address elements.
import_bundle
Post /ops/import/bundle | Auth: Write
Import git bundle into element
Accepts a base64-encoded git bundle in the JSON bundle_base64 field. Use overwrite=true to replace existing elements with same slug (default skips duplicates). Imported elements get new UUIDs. Returns counts of imported/skipped elements and any errors.
intention
Get /ops/intention | Auth: Read
Get element intention with full inheritance chain
Returns three levels: direct (this element’s intention), inherited (from category and root), and resolved (final merged intention). Useful for understanding an element’s purpose in context of its hierarchy.
promote
Post /ops/promote | Auth: Admin
Promote element configuration to a target environment
Only for manifest-form elements (projects). Environments advance: dev → demo → live. dev→demo requires member+ role, demo→live requires admin. Freezes member versions at promotion time (creates snapshot). Persists environment config to spec.environments.
publish
Post /ops/publish | Auth: Execute
Publish a single message to the queue
Explicitly publishes one message to the configured queue without requiring a flow step. Useful for ad-hoc publishing, testing, or one-off messages from the operations panel. Put the message body in the
payloadfield (canonical). The handler also acceptsmessageanddataas aliases forpayload— useful when integrating with systems that use those field names. Optionalsubjectoverride,headers,delay_ms, anddurable(default true) fields are supported. Returns the message ID and published_at timestamp.
purge
Post /ops/purge | Auth: Admin
Purge messages from queue
Removes messages by status: pending, failed, dead_letter, or all. Defaults to failed. Destructive operation — requires admin auth. Returns count of purged messages. Use to clear dead letter queues, reset stuck consumers, or drain the publish buffer.
queue_stats
Get /ops/queue/stats | Auth: Read
Get queue statistics
Returns message counts by status: pending, processing, sent, failed, completed, dead_letter. Covers both consumer and publisher metrics in a single call. Use to monitor queue health, identify backlogs, and detect dead letter accumulation.
readme
Get /ops/readme | Auth: Read
Get element README.md content
Reads README.md from the element’s git repository. Returns empty content (not an error) if no README exists. Always returns markdown format.
readme_update
Post /ops/readme_update | Auth: Write
Update element README.md content
Creates or overwrites README.md in the element’s git repo. Commits to the draft branch. Content must be provided as a markdown string.
receive
Post /ops/receive | Auth: None
Receive incoming external traffic
Entry point for external traffic reaching this IO element. Declared auth: none to bypass platform auth — element-level auth is enforced by IoReceiveExecutor before dispatching into the flow graph. The flow/app that wires this element as an entry point determines what happens next.
remove-modifier
Post /ops/remove-modifier | Auth: Execute
Remove an attached modifier from this element by attachment ID
Removes a modifier/resource attachment by its row ID. The ID comes from the attachments or context API. This is the reverse of attach — called on the target element, not the source.
restore
Post /ops/restore | Auth: Admin
Restore element to a specific version
Automatically snapshots the current state before restoring (creates a ‘Before restore to vN’ version entry). Writes restored spec to git as .triform/spec.yaml. Git failures warn but don’t fail the operation — DB state is authoritative. Cannot restore deleted elements.
schema
Get /ops/schema | Auth: Read
Get element input/output schema (MCP tools/list compatible)
Returns type-level port schemas from the TypeRegistry — not instance-specific overrides. Includes direction (input/output), required flag, and JSON schema per port. Useful for understanding what data an element accepts and produces.
send
Post /ops/send | Auth: Execute
Send a message/request to external system
Explicitly sends payload to the configured external target. For HTTP elements, POSTs to the target URL. For chat platforms, sends via the platform API. Put data in the payload field. Returns send status and response details.
source
Get /ops/source | Auth: Read
Get any file’s content from the element’s git repository
Reads an arbitrary file from the element’s CAS-backed git tree by its relative path. Same store as
readme, just generalized. Path safety: rejects..traversal, leading/, and null bytes. Use this to viewmain.pyfor action elements, asset files for SPAs, etc. Returns empty content (not an error) if the file doesn’t exist.
source_branches
Get /ops/source/branches | Auth: Read
List Source branches for this element
Returns the standard draft/demo/live Source branches, their current commits, and promotion relationships. Use GET /api/{element_path}/ops/source/branches.
source_promote
Post /ops/source/promote | Auth: Write
Promote Source branch forward
Promotes draft to demo or demo to live through the generated element op path. Direct Git pushes to demo/live are blocked by Source policy.
source_repair
Post /ops/source/repair | Auth: Write
Inspect or repair the element Source index
Runs Source repair through the element operation path. Defaults to dry_run=true; set dry_run=false only after reviewing a dry-run report.
source_status
Get /ops/source/status | Auth: Read
Get Source control status for this element
Returns the branch-aware clone URL, checkout commands, current draft commit, child source-link count, portable export summary, Source health, warnings, and auth hints for the addressed element. Use the element-first path: GET /api/{element_path}/ops/source/status.
source_validate
Post /ops/source/validate | Auth: Read
Validate Source branch contents
Validates a Source branch before accepting local Git workflow changes or promotion. Defaults to branch=draft and rejects runtime data, generated output, secret material, and unreadable CAS refs.
stats
Get /ops/stats | Auth: Read
Get aggregate statistics for this element
Health status is computed: error if errors_per_day > 5 or success_rate < 0.8, warning if errors_per_day > 0 or success_rate < 0.95. Firing alerts escalate health to error/warning. Default period is ‘day’. Returns runs_per_day, success_rate, avg_duration_ms, and more.
test_connection
Post /ops/test-connection | Auth: Execute
Test connection configuration
Validates the element’s configuration locally without making an actual external connection. Checks that required credentials are set, URLs are valid, etc. Returns success boolean and error details. Safe to call repeatedly.
tree
Get /ops/tree | Auth: Read
Get the element’s position in the graph — ancestors, children, references, and subtree statistics
Uses per-circle ElementGraph cache for O(1) lookups. Returns ancestors (containment chain), children (direct), members (references), referenced_by (reverse refs), attachments, and subtree stats. Default depth is 3, max is 10. Pass ?include_metadata=true for name/state on each node.
update
Patch /ops/update | Auth: Write
Update element
Partial update — send only the fields you want to change.
spec,name, andintentionare all independently optional.specMUST be a JSON object when present; deep-merged into the existing spec by default. Empty{"spec":{}}preserves existing spec content but still records a new version (no-op for content, not for version state). To clear/replace the entire spec wholesale send{"spec":{...},"deep":false}. List-typed spec fields use replace semantics (the patch list replaces the existing list, no array merging). Coordinates Git + DB writes. Slug cannot be changed after creation.
update_meta
Patch /ops/update_meta | Auth: Write
Update element metadata (lightweight merge — does NOT bump version or snapshot spec)
Shallow JSONB merge into element.meta. Top-level keys in the provided value replace existing meta values; other keys are preserved. Used for UI metadata like canvas positions, panel state, viewer preferences. Wire-shape op_name is
update_meta(distinct fromupdate) so SSE subscribers + the cache auto-invalidator can distinguish lightweight metadata changes from spec edits without inspecting the payload. The MutatingElementStore wrapper stamps this op_name on the lifecycle event emitted byupdate_element_metastorage calls.
version
Get /ops/version | Auth: Read
Get current version or full history
Returns current version by default. Pass ?history=true for full version history (up to ?limit=N, default 50). Versions are backed by the element_versions table. Every spec update creates a new version entry.
Error Codes
| Code | Class | Retryable | Description |
|---|---|---|---|
QUEUE_CONNECTION_FAILED | internal | yes | Queue broker connection failed |
QUEUE_CONSUMER_FAILED | internal | yes | Consumer setup failed |
QUEUE_PUBLISH_FAILED | internal | yes | Message publish failed |
QUEUE_NOT_FOUND | not_found | no | Queue does not exist |
Lifecycle / runtime
Inherited from io
Before request
- validate_auth
- check_rate_limit
After request
- record_metrics
On error
- log_error
- retry_if_transient
Execution model: async
Observability
Defined for this element
Metrics
- publish_count
- consume_count
- ack_count
- nack_count
- pending_messages
- dead_letter_depth
- processing_duration_ms
- purge_count
Events
- queue.message.published
- queue.message.consumed
- queue.message.acked
- queue.message.nacked
- queue.message.dead_lettered
- queue.purged
Pricing / cost
Platform default
Operation costs
- create: free
- update: free
- delete: free
- get: free
- list: free
- invoke: 10000 micro-AU
- tool_use: free
Set it up
- Providerstring
- Queue Namestring