In the previous tutorial, we learned async programming with Tokio. Now we dive deeper into channels — the primary way async tasks communicate with each other.
Channels let tasks send and receive messages without sharing memory directly. This is the “message passing” model of concurrency. Instead of locking shared data with a Mutex, you send data through a channel. The task that receives it owns it completely.
Tokio provides four channel types. Each one solves a different problem. By the end of this tutorial, you will know when to use each one.
The Four Channel Types
| Channel | Senders | Receivers | Messages | Use Case |
|---|---|---|---|---|
mpsc | Many | One | Stream | Task queues, pipelines |
oneshot | One | One | Single value | Request/response |
broadcast | Many | Many | Every receiver gets all | Events, notifications |
watch | One | Many | Latest value only | Config, state changes |
MPSC — Multiple Producer, Single Consumer
mpsc is the most common channel type. Many tasks can send messages, but only one task receives them. Think of it as a queue where multiple workers add items and one manager processes them.
Basic MPSC
use tokio::sync::mpsc;
async fn mpsc_basic() -> Vec<String> {
let (tx, mut rx) = mpsc::channel::<String>(10);
tokio::spawn(async move {
for i in 0..3 {
tx.send(format!("message {}", i)).await.unwrap();
}
// tx is dropped here — the channel closes
});
let mut messages = vec![];
while let Some(msg) = rx.recv().await {
messages.push(msg);
}
messages
}
mpsc::channel(10) creates a channel with a buffer of 10 messages. The function returns a sender (tx) and a receiver (rx).
When all senders are dropped, rx.recv() returns None, and the while let loop ends. This is how you know the channel is closed.
Multiple Producers
Clone the sender to have multiple tasks send messages:
async fn mpsc_multiple_producers() -> Vec<i32> {
let (tx, mut rx) = mpsc::channel::<i32>(32);
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
tx.send(i * 10).await.unwrap();
});
}
// Drop the original sender so the channel closes
// when all clones are dropped
drop(tx);
let mut results = vec![];
while let Some(val) = rx.recv().await {
results.push(val);
}
results.sort();
results // [0, 10, 20]
}
Important: you must drop(tx) the original sender. Otherwise the channel never closes because the original sender still exists. The while let loop would wait forever.
Backpressure with Bounded Channels
The buffer size controls backpressure. When the buffer is full, send() waits until the receiver consumes a message:
async fn backpressure_example() {
// Small buffer — producer will slow down when buffer is full
let (tx, mut rx) = mpsc::channel::<i32>(2);
let producer = tokio::spawn(async move {
for i in 0..5 {
println!("Sending {}", i);
tx.send(i).await.unwrap();
println!("Sent {}", i);
}
});
// Slow consumer
while let Some(val) = rx.recv().await {
println!("Received {}", val);
tokio::time::sleep(Duration::from_millis(100)).await;
}
producer.await.unwrap();
}
With a buffer of 2, the producer sends the first two messages immediately. Then it waits for the consumer to read one before it can send the third. This prevents the producer from overwhelming the consumer.
Unbounded Channels
If you do not want backpressure, use an unbounded channel:
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
tx.send("hello".to_string()).unwrap(); // Never blocks
Unbounded channels never block the sender. But they can use unlimited memory if the producer is faster than the consumer. Use bounded channels in production code.
Oneshot — Single Value
oneshot sends exactly one value. It is perfect for request/response patterns where you send a request and wait for a single response.
Basic Oneshot
use tokio::sync::oneshot;
async fn oneshot_basic() -> String {
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
tx.send("response".to_string()).unwrap();
});
rx.await.unwrap()
}
The sender can only call send() once — it consumes itself. The receiver .awaits the single value.
Request/Response Pattern
The most powerful pattern with oneshot is embedding a response channel inside a request:
struct Request {
query: String,
respond_to: oneshot::Sender<String>,
}
async fn request_response_pattern() -> String {
let (tx, mut rx) = mpsc::channel::<Request>(10);
// Spawn a "server" that handles requests
tokio::spawn(async move {
while let Some(req) = rx.recv().await {
let response = format!("Result for: {}", req.query);
let _ = req.respond_to.send(response);
}
});
// Send a request and wait for response
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(Request {
query: "find users".to_string(),
respond_to: resp_tx,
})
.await
.unwrap();
resp_rx.await.unwrap()
// Returns: "Result for: find users"
}
This pattern is extremely useful. The “server” task receives requests through an mpsc channel. Each request includes a oneshot sender for the response. The caller sends the request and waits for the response on the oneshot receiver.
This is how many real-world Rust services are built internally. The actor model in Rust is based on this pattern.
Oneshot for Cancellation
You can also use oneshot as a cancellation signal:
async fn cancellable_work() {
let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
let worker = tokio::spawn(async move {
tokio::select! {
_ = async {
// Long running work
tokio::time::sleep(Duration::from_secs(60)).await;
} => {
println!("Work completed");
}
_ = cancel_rx => {
println!("Work cancelled");
}
}
});
// Cancel after 100ms
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = cancel_tx.send(());
worker.await.unwrap();
}
Broadcast — Everyone Gets Everything
broadcast sends every message to every receiver. When you send a message, all active subscribers receive a copy.
Basic Broadcast
use tokio::sync::broadcast;
async fn broadcast_basic() {
let (tx, _) = broadcast::channel::<String>(16);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tx.send("hello".to_string()).unwrap();
tx.send("world".to_string()).unwrap();
// Both receivers get both messages
assert_eq!(rx1.recv().await.unwrap(), "hello");
assert_eq!(rx1.recv().await.unwrap(), "world");
assert_eq!(rx2.recv().await.unwrap(), "hello");
assert_eq!(rx2.recv().await.unwrap(), "world");
}
Notice: you create new receivers by calling tx.subscribe(). The initial receiver from broadcast::channel() is usually unused (that is why we name it _).
Event System
Broadcast channels are ideal for event systems where multiple parts of your application need to react to the same event:
async fn event_system() -> Vec<String> {
let (tx, _) = broadcast::channel::<String>(16);
let mut handles = vec![];
// Spawn multiple listeners
for id in 0..3 {
let mut rx = tx.subscribe();
let handle = tokio::spawn(async move {
let msg = rx.recv().await.unwrap();
format!("Listener {} got: {}", id, msg)
});
handles.push(handle);
}
// Small delay to make sure receivers are ready
tokio::time::sleep(Duration::from_millis(10)).await;
// Broadcast an event
tx.send("user_logged_in".to_string()).unwrap();
let mut results = vec![];
for handle in handles {
results.push(handle.await.unwrap());
}
results
}
Broadcast Limitations
Broadcast channels have a fixed buffer. If a receiver is slow and the buffer fills up, it will miss messages. The receiver gets a RecvError::Lagged error with the number of missed messages:
let (tx, mut rx) = broadcast::channel::<i32>(2); // Small buffer
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap(); // Oldest message (1) is dropped
match rx.recv().await {
Ok(val) => println!("Got: {}", val),
Err(broadcast::error::RecvError::Lagged(n)) => {
println!("Missed {} messages", n);
}
Err(_) => println!("Channel closed"),
}
Watch — Latest Value Only
watch is different from the other channels. It does not queue messages. It always holds the most recent value. Receivers can check the current value at any time and get notified when it changes.
Basic Watch
use tokio::sync::watch;
async fn watch_basic() {
let (tx, mut rx) = watch::channel("initial".to_string());
// Read the current value
println!("Current: {}", *rx.borrow()); // "initial"
// Update the value
tx.send("updated".to_string()).unwrap();
// Wait for the change notification
rx.changed().await.unwrap();
println!("Now: {}", *rx.borrow()); // "updated"
}
rx.borrow() returns a reference to the current value. rx.changed() waits until the value changes.
Configuration Watcher
Watch channels are perfect for sharing configuration that can change at runtime:
async fn config_watcher() -> String {
let (tx, rx) = watch::channel("v1".to_string());
// Spawn a task that reacts to config changes
let mut task_rx = rx.clone();
let handle = tokio::spawn(async move {
task_rx.changed().await.unwrap();
let new_config = task_rx.borrow().clone();
format!("Config updated to: {}", new_config)
});
// Update the config
tokio::time::sleep(Duration::from_millis(10)).await;
tx.send("v2".to_string()).unwrap();
handle.await.unwrap()
// Returns: "Config updated to: v2"
}
Multiple tasks can watch the same value. Each one gets notified when it changes. If the value changes multiple times before a task checks, it only sees the latest value. Intermediate values are skipped.
Watch vs Broadcast
| Feature | Watch | Broadcast |
|---|---|---|
| Buffer | Only latest value | Configurable buffer |
| Missed messages | Sees latest only | Can lag/miss |
| Use case | State/config | Events/notifications |
| Initial value | Always has one | No initial value |
Common Patterns
Fan-In: Multiple Sources to One Channel
Collect data from multiple sources into a single channel:
async fn fan_in() -> Vec<String> {
let (tx, mut rx) = mpsc::channel::<String>(32);
let tx1 = tx.clone();
tokio::spawn(async move {
tx1.send("from source 1".to_string()).await.unwrap();
});
let tx2 = tx.clone();
tokio::spawn(async move {
tx2.send("from source 2".to_string()).await.unwrap();
});
drop(tx);
let mut results = vec![];
while let Some(msg) = rx.recv().await {
results.push(msg);
}
results
}
Graceful Shutdown
Use channels to coordinate shutdown across multiple tasks:
async fn graceful_shutdown() {
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
let worker = tokio::spawn(async move {
let mut count = 0;
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
println!("Worker stopped after {} iterations", count);
break;
}
_ = tokio::time::sleep(Duration::from_millis(10)) => {
count += 1;
println!("Working... iteration {}", count);
}
}
}
});
// Let the worker run for a bit
tokio::time::sleep(Duration::from_millis(50)).await;
// Send shutdown signal
let _ = shutdown_tx.send(()).await;
worker.await.unwrap();
}
The worker listens for both its regular work and a shutdown signal using tokio::select!. When the shutdown channel receives a message, the worker breaks out of the loop.
Pipeline Pattern
Chain multiple stages with channels:
async fn pipeline() {
let (stage1_tx, mut stage1_rx) = mpsc::channel::<i32>(10);
let (stage2_tx, mut stage2_rx) = mpsc::channel::<i32>(10);
// Stage 1: Double the numbers
tokio::spawn(async move {
while let Some(val) = stage1_rx.recv().await {
stage2_tx.send(val * 2).await.unwrap();
}
});
// Stage 2: Collect results
let collector = tokio::spawn(async move {
let mut results = vec![];
while let Some(val) = stage2_rx.recv().await {
results.push(val);
}
results
});
// Feed data into the pipeline
for i in 1..=5 {
stage1_tx.send(i).await.unwrap();
}
drop(stage1_tx);
let results = collector.await.unwrap();
println!("Pipeline results: {:?}", results); // [2, 4, 6, 8, 10]
}
Each stage receives from one channel and sends to the next. Dropping the sender closes the channel and lets the next stage finish.
Choosing the Right Channel
Here is a decision guide:
“I need to send many messages to one consumer” -> Use mpsc
“I need a single response to a request” -> Use oneshot
“I need every subscriber to see every message” -> Use broadcast
“I need subscribers to see the latest state” -> Use watch
“I need backpressure (slow down fast producers)” -> Use bounded mpsc
“I need fire-and-forget sends” -> Use unbounded mpsc or broadcast
Common Mistakes
Mistake 1: Not Dropping the Original Sender
// BAD: channel never closes
let (tx, mut rx) = mpsc::channel::<i32>(10);
let tx2 = tx.clone();
tokio::spawn(async move {
tx2.send(1).await.unwrap();
});
// tx still exists — rx.recv() will hang forever
// GOOD: drop the original sender
drop(tx);
while let Some(val) = rx.recv().await {
println!("{}", val);
}
Mistake 2: Subscribing After Send
// BAD: rx misses the message
let (tx, _) = broadcast::channel::<String>(16);
tx.send("hello".to_string()).unwrap();
let mut rx = tx.subscribe(); // Subscribes AFTER the message
// rx.recv() will NOT get "hello"
// GOOD: subscribe before sending
let (tx, _) = broadcast::channel::<String>(16);
let mut rx = tx.subscribe();
tx.send("hello".to_string()).unwrap();
// rx.recv() gets "hello"
Mistake 3: Holding Borrow Across Await
// BAD: borrow held across await point
let (_, mut rx) = watch::channel("hello".to_string());
let val = rx.borrow(); // Holds a read lock
tokio::time::sleep(Duration::from_millis(10)).await; // Lock held!
println!("{}", val);
// GOOD: clone the value
let val = rx.borrow().clone(); // Clone and release lock
tokio::time::sleep(Duration::from_millis(10)).await;
println!("{}", val);
Source Code
Find the complete code on GitHub: tutorial-16-channels
What’s Next?
In this tutorial, we learned the four Tokio channel types: mpsc, oneshot, broadcast, and watch. We built patterns like request/response, fan-in, pipelines, and graceful shutdown.
In the next tutorial, we will make HTTP requests with Reqwest — sending GET and POST requests, parsing JSON with Serde, and handling errors in real network code.