Markdown source
Pipeline Stage Communication Markdown source
Readable source view for humans. The raw Markdown endpoint remains available for crawlers and agent readers.
---
title: "Pipeline Stage Communication"
description: "Patterns for connecting independent pipeline stages via message queues — decoupled producers and consumers with batch collection and backpressure."
kind: snippet
maturity: seedling
confidence: medium
origin: ai-drafted
author: "Agent"
directedBy: "krow"
tags: [architecture, patterns]
published: 2026-04-07
modified: 2026-04-21
wordCount: 490
readingTime: 3
related: [worker-pool-isolation, parallel-ai-research-pipelines, aimd-rate-limiting]
url: https://krowdev.com/snippet/pipeline-stage-communication/
---
## Agent Context
- Canonical: https://krowdev.com/snippet/pipeline-stage-communication/
- Markdown: https://krowdev.com/snippet/pipeline-stage-communication.md
- Full corpus: https://krowdev.com/llms-full.txt
- Kind: snippet
- Maturity: seedling
- Confidence: medium
- Origin: ai-drafted
- Author: Agent
- Directed by: krow
- Published: 2026-04-07
- Modified: 2026-04-21
- Words: 490 (3 min read)
- Tags: architecture, patterns
- Related: worker-pool-isolation, parallel-ai-research-pipelines, aimd-rate-limiting
- Content map:
- h2: The Shape
- h2: Producer Side: Send and Move On
- h2: Consumer Side: Batch Collection
- h2: Throughput Matching
- h2: Key Details
- h2: Sources
- Crawl policy: same canonical content is exposed through HTML, Markdown, and llms-full; no crawler-specific content gate.
Connect pipeline stages through message queues so each stage runs as an independent service. Stages don't call each other directly — they produce to and consume from queues. Combine with [worker pool isolation](/snippet/worker-pool-isolation/) per stage and [AIMD rate limiting](/note/aimd-rate-limiting/) on external calls for a resilient pipeline.
## The Shape
```
Stage A → [queue] → Stage B → [queue] → Stage C
producer buffer consumer/ buffer consumer
producer
```
Each stage owns its own runtime, scaling, and failure domain. The queue is the contract between them. Stage A doesn't know or care whether Stage B is written in a different language, runs on different hardware, or processes items one at a time or in batches.
## Producer Side: Send and Move On
The producer pushes results to a channel or queue and immediately returns to its own work. No waiting for the consumer.
```rust
fn run(jobs: &Receiver<Input>, results: &Sender<Output>) {
while let Ok(item) = jobs.recv() {
match process(item) {
Ok(output) => { results.send(output).ok(); }
Err(e) => { handle_error(e); }
}
}
}
```
The `.ok()` on send is intentional — if the downstream queue is gone, this stage logs and continues rather than panicking.
## Consumer Side: Batch Collection
Some stages work more efficiently in batches. Collect items up to a batch size, with a timeout so partial batches don't stall forever.
```python
async def collect_batch(queue, batch_size: int = 50) -> list:
items = []
while len(items) < batch_size:
try:
item = await asyncio.wait_for(queue.get(), timeout=5.0)
items.append(item)
except asyncio.TimeoutError:
break # flush partial batch
return items
```
The timeout is critical. Without it, a batch that's 49/50 full waits indefinitely if the upstream slows down.
## Throughput Matching
Stages rarely have identical throughput. The queue absorbs bursts and smooths mismatches.
| Pattern | When to use |
|---------|------------|
| 1:1 queue | Stages have similar throughput |
| Fan-out (1:N) | Consumer is slower — parallelize it |
| Batching | Consumer has high per-call overhead, amortize it |
| Bounded queue + backpressure | Prevent memory growth when consumer falls behind |
If Stage B is 3x slower than Stage A, run 3 instances of Stage B consuming from the same queue. The queue is the load balancer.
## Key Details
**Bounded queues.** Unbounded queues hide backpressure until memory runs out. Set a hard cap and let the queue push back on producers when full.
**Per-stage monitoring.** Track queue depth between each pair of stages. Growing depth means the consumer can't keep up — scale it or investigate before the queue hits its limit.
**Graceful drain.** On shutdown, stop accepting new items, flush in-progress work, then close the output queue. Stages shut down in order from the head of the pipeline.
At the workflow level, [Parallel AI Research Pipelines](/article/parallel-ai-research-pipelines/) uses the same separation: each phase talks through persisted artifacts instead of direct agent-to-agent coupling.
## Sources
- Python, [asyncio queues](https://docs.python.org/3/library/asyncio-queue.html)
- Rust, [std::sync::mpsc](https://doc.rust-lang.org/std/sync/mpsc/)
- Go, [Concurrency patterns: pipelines and cancellation](https://go.dev/blog/pipelines)