In the previous tutorial, we learned async programming with Tokio. Now we dive deeper into channels — the primary way async tasks communicate with each other.

Channels let tasks send and receive messages without sharing memory directly. This is the “message passing” model of concurrency. Instead of locking shared data with a Mutex, you send data through a channel. The task that receives it owns it completely.

Tokio provides four channel types. Each one solves a different problem. By the end of this tutorial, you will know when to use each one.

The Four Channel Types

ChannelSendersReceiversMessagesUse Case
mpscManyOneStreamTask queues, pipelines
oneshotOneOneSingle valueRequest/response
broadcastManyManyEvery receiver gets allEvents, notifications
watchOneManyLatest value onlyConfig, state changes

MPSC — Multiple Producer, Single Consumer

mpsc is the most common channel type. Many tasks can send messages, but only one task receives them. Think of it as a queue where multiple workers add items and one manager processes them.

Basic MPSC

use tokio::sync::mpsc;

async fn mpsc_basic() -> Vec<String> {
    let (tx, mut rx) = mpsc::channel::<String>(10);

    tokio::spawn(async move {
        for i in 0..3 {
            tx.send(format!("message {}", i)).await.unwrap();
        }
        // tx is dropped here — the channel closes
    });

    let mut messages = vec![];
    while let Some(msg) = rx.recv().await {
        messages.push(msg);
    }
    messages
}

mpsc::channel(10) creates a channel with a buffer of 10 messages. The function returns a sender (tx) and a receiver (rx).

When all senders are dropped, rx.recv() returns None, and the while let loop ends. This is how you know the channel is closed.

Multiple Producers

Clone the sender to have multiple tasks send messages:

async fn mpsc_multiple_producers() -> Vec<i32> {
    let (tx, mut rx) = mpsc::channel::<i32>(32);

    for i in 0..3 {
        let tx = tx.clone();
        tokio::spawn(async move {
            tx.send(i * 10).await.unwrap();
        });
    }

    // Drop the original sender so the channel closes
    // when all clones are dropped
    drop(tx);

    let mut results = vec![];
    while let Some(val) = rx.recv().await {
        results.push(val);
    }
    results.sort();
    results  // [0, 10, 20]
}

Important: you must drop(tx) the original sender. Otherwise the channel never closes because the original sender still exists. The while let loop would wait forever.

Backpressure with Bounded Channels

The buffer size controls backpressure. When the buffer is full, send() waits until the receiver consumes a message:

async fn backpressure_example() {
    // Small buffer — producer will slow down when buffer is full
    let (tx, mut rx) = mpsc::channel::<i32>(2);

    let producer = tokio::spawn(async move {
        for i in 0..5 {
            println!("Sending {}", i);
            tx.send(i).await.unwrap();
            println!("Sent {}", i);
        }
    });

    // Slow consumer
    while let Some(val) = rx.recv().await {
        println!("Received {}", val);
        tokio::time::sleep(Duration::from_millis(100)).await;
    }

    producer.await.unwrap();
}

With a buffer of 2, the producer sends the first two messages immediately. Then it waits for the consumer to read one before it can send the third. This prevents the producer from overwhelming the consumer.

Unbounded Channels

If you do not want backpressure, use an unbounded channel:

let (tx, mut rx) = mpsc::unbounded_channel::<String>();
tx.send("hello".to_string()).unwrap();  // Never blocks

Unbounded channels never block the sender. But they can use unlimited memory if the producer is faster than the consumer. Use bounded channels in production code.

Oneshot — Single Value

oneshot sends exactly one value. It is perfect for request/response patterns where you send a request and wait for a single response.

Basic Oneshot

use tokio::sync::oneshot;

async fn oneshot_basic() -> String {
    let (tx, rx) = oneshot::channel::<String>();

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_millis(10)).await;
        tx.send("response".to_string()).unwrap();
    });

    rx.await.unwrap()
}

The sender can only call send() once — it consumes itself. The receiver .awaits the single value.

Request/Response Pattern

The most powerful pattern with oneshot is embedding a response channel inside a request:

struct Request {
    query: String,
    respond_to: oneshot::Sender<String>,
}

async fn request_response_pattern() -> String {
    let (tx, mut rx) = mpsc::channel::<Request>(10);

    // Spawn a "server" that handles requests
    tokio::spawn(async move {
        while let Some(req) = rx.recv().await {
            let response = format!("Result for: {}", req.query);
            let _ = req.respond_to.send(response);
        }
    });

    // Send a request and wait for response
    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(Request {
        query: "find users".to_string(),
        respond_to: resp_tx,
    })
    .await
    .unwrap();

    resp_rx.await.unwrap()
    // Returns: "Result for: find users"
}

This pattern is extremely useful. The “server” task receives requests through an mpsc channel. Each request includes a oneshot sender for the response. The caller sends the request and waits for the response on the oneshot receiver.

This is how many real-world Rust services are built internally. The actor model in Rust is based on this pattern.

Oneshot for Cancellation

You can also use oneshot as a cancellation signal:

async fn cancellable_work() {
    let (cancel_tx, cancel_rx) = oneshot::channel::<()>();

    let worker = tokio::spawn(async move {
        tokio::select! {
            _ = async {
                // Long running work
                tokio::time::sleep(Duration::from_secs(60)).await;
            } => {
                println!("Work completed");
            }
            _ = cancel_rx => {
                println!("Work cancelled");
            }
        }
    });

    // Cancel after 100ms
    tokio::time::sleep(Duration::from_millis(100)).await;
    let _ = cancel_tx.send(());
    worker.await.unwrap();
}

Broadcast — Everyone Gets Everything

broadcast sends every message to every receiver. When you send a message, all active subscribers receive a copy.

Basic Broadcast

use tokio::sync::broadcast;

async fn broadcast_basic() {
    let (tx, _) = broadcast::channel::<String>(16);

    let mut rx1 = tx.subscribe();
    let mut rx2 = tx.subscribe();

    tx.send("hello".to_string()).unwrap();
    tx.send("world".to_string()).unwrap();

    // Both receivers get both messages
    assert_eq!(rx1.recv().await.unwrap(), "hello");
    assert_eq!(rx1.recv().await.unwrap(), "world");
    assert_eq!(rx2.recv().await.unwrap(), "hello");
    assert_eq!(rx2.recv().await.unwrap(), "world");
}

Notice: you create new receivers by calling tx.subscribe(). The initial receiver from broadcast::channel() is usually unused (that is why we name it _).

Event System

Broadcast channels are ideal for event systems where multiple parts of your application need to react to the same event:

async fn event_system() -> Vec<String> {
    let (tx, _) = broadcast::channel::<String>(16);

    let mut handles = vec![];

    // Spawn multiple listeners
    for id in 0..3 {
        let mut rx = tx.subscribe();
        let handle = tokio::spawn(async move {
            let msg = rx.recv().await.unwrap();
            format!("Listener {} got: {}", id, msg)
        });
        handles.push(handle);
    }

    // Small delay to make sure receivers are ready
    tokio::time::sleep(Duration::from_millis(10)).await;

    // Broadcast an event
    tx.send("user_logged_in".to_string()).unwrap();

    let mut results = vec![];
    for handle in handles {
        results.push(handle.await.unwrap());
    }
    results
}

Broadcast Limitations

Broadcast channels have a fixed buffer. If a receiver is slow and the buffer fills up, it will miss messages. The receiver gets a RecvError::Lagged error with the number of missed messages:

let (tx, mut rx) = broadcast::channel::<i32>(2);  // Small buffer

tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();  // Oldest message (1) is dropped

match rx.recv().await {
    Ok(val) => println!("Got: {}", val),
    Err(broadcast::error::RecvError::Lagged(n)) => {
        println!("Missed {} messages", n);
    }
    Err(_) => println!("Channel closed"),
}

Watch — Latest Value Only

watch is different from the other channels. It does not queue messages. It always holds the most recent value. Receivers can check the current value at any time and get notified when it changes.

Basic Watch

use tokio::sync::watch;

async fn watch_basic() {
    let (tx, mut rx) = watch::channel("initial".to_string());

    // Read the current value
    println!("Current: {}", *rx.borrow());  // "initial"

    // Update the value
    tx.send("updated".to_string()).unwrap();

    // Wait for the change notification
    rx.changed().await.unwrap();
    println!("Now: {}", *rx.borrow());  // "updated"
}

rx.borrow() returns a reference to the current value. rx.changed() waits until the value changes.

Configuration Watcher

Watch channels are perfect for sharing configuration that can change at runtime:

async fn config_watcher() -> String {
    let (tx, rx) = watch::channel("v1".to_string());

    // Spawn a task that reacts to config changes
    let mut task_rx = rx.clone();
    let handle = tokio::spawn(async move {
        task_rx.changed().await.unwrap();
        let new_config = task_rx.borrow().clone();
        format!("Config updated to: {}", new_config)
    });

    // Update the config
    tokio::time::sleep(Duration::from_millis(10)).await;
    tx.send("v2".to_string()).unwrap();

    handle.await.unwrap()
    // Returns: "Config updated to: v2"
}

Multiple tasks can watch the same value. Each one gets notified when it changes. If the value changes multiple times before a task checks, it only sees the latest value. Intermediate values are skipped.

Watch vs Broadcast

FeatureWatchBroadcast
BufferOnly latest valueConfigurable buffer
Missed messagesSees latest onlyCan lag/miss
Use caseState/configEvents/notifications
Initial valueAlways has oneNo initial value

Common Patterns

Fan-In: Multiple Sources to One Channel

Collect data from multiple sources into a single channel:

async fn fan_in() -> Vec<String> {
    let (tx, mut rx) = mpsc::channel::<String>(32);

    let tx1 = tx.clone();
    tokio::spawn(async move {
        tx1.send("from source 1".to_string()).await.unwrap();
    });

    let tx2 = tx.clone();
    tokio::spawn(async move {
        tx2.send("from source 2".to_string()).await.unwrap();
    });

    drop(tx);

    let mut results = vec![];
    while let Some(msg) = rx.recv().await {
        results.push(msg);
    }
    results
}

Graceful Shutdown

Use channels to coordinate shutdown across multiple tasks:

async fn graceful_shutdown() {
    let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);

    let worker = tokio::spawn(async move {
        let mut count = 0;
        loop {
            tokio::select! {
                _ = shutdown_rx.recv() => {
                    println!("Worker stopped after {} iterations", count);
                    break;
                }
                _ = tokio::time::sleep(Duration::from_millis(10)) => {
                    count += 1;
                    println!("Working... iteration {}", count);
                }
            }
        }
    });

    // Let the worker run for a bit
    tokio::time::sleep(Duration::from_millis(50)).await;

    // Send shutdown signal
    let _ = shutdown_tx.send(()).await;

    worker.await.unwrap();
}

The worker listens for both its regular work and a shutdown signal using tokio::select!. When the shutdown channel receives a message, the worker breaks out of the loop.

Pipeline Pattern

Chain multiple stages with channels:

async fn pipeline() {
    let (stage1_tx, mut stage1_rx) = mpsc::channel::<i32>(10);
    let (stage2_tx, mut stage2_rx) = mpsc::channel::<i32>(10);

    // Stage 1: Double the numbers
    tokio::spawn(async move {
        while let Some(val) = stage1_rx.recv().await {
            stage2_tx.send(val * 2).await.unwrap();
        }
    });

    // Stage 2: Collect results
    let collector = tokio::spawn(async move {
        let mut results = vec![];
        while let Some(val) = stage2_rx.recv().await {
            results.push(val);
        }
        results
    });

    // Feed data into the pipeline
    for i in 1..=5 {
        stage1_tx.send(i).await.unwrap();
    }
    drop(stage1_tx);

    let results = collector.await.unwrap();
    println!("Pipeline results: {:?}", results);  // [2, 4, 6, 8, 10]
}

Each stage receives from one channel and sends to the next. Dropping the sender closes the channel and lets the next stage finish.

Choosing the Right Channel

Here is a decision guide:

“I need to send many messages to one consumer” -> Use mpsc

“I need a single response to a request” -> Use oneshot

“I need every subscriber to see every message” -> Use broadcast

“I need subscribers to see the latest state” -> Use watch

“I need backpressure (slow down fast producers)” -> Use bounded mpsc

“I need fire-and-forget sends” -> Use unbounded mpsc or broadcast

Common Mistakes

Mistake 1: Not Dropping the Original Sender

// BAD: channel never closes
let (tx, mut rx) = mpsc::channel::<i32>(10);
let tx2 = tx.clone();
tokio::spawn(async move {
    tx2.send(1).await.unwrap();
});
// tx still exists — rx.recv() will hang forever

// GOOD: drop the original sender
drop(tx);
while let Some(val) = rx.recv().await {
    println!("{}", val);
}

Mistake 2: Subscribing After Send

// BAD: rx misses the message
let (tx, _) = broadcast::channel::<String>(16);
tx.send("hello".to_string()).unwrap();
let mut rx = tx.subscribe();  // Subscribes AFTER the message
// rx.recv() will NOT get "hello"

// GOOD: subscribe before sending
let (tx, _) = broadcast::channel::<String>(16);
let mut rx = tx.subscribe();
tx.send("hello".to_string()).unwrap();
// rx.recv() gets "hello"

Mistake 3: Holding Borrow Across Await

// BAD: borrow held across await point
let (_, mut rx) = watch::channel("hello".to_string());
let val = rx.borrow();  // Holds a read lock
tokio::time::sleep(Duration::from_millis(10)).await;  // Lock held!
println!("{}", val);

// GOOD: clone the value
let val = rx.borrow().clone();  // Clone and release lock
tokio::time::sleep(Duration::from_millis(10)).await;
println!("{}", val);

Source Code

Find the complete code on GitHub: tutorial-16-channels

What’s Next?

In this tutorial, we learned the four Tokio channel types: mpsc, oneshot, broadcast, and watch. We built patterns like request/response, fan-in, pipelines, and graceful shutdown.

In the next tutorial, we will make HTTP requests with Reqwest — sending GET and POST requests, parsing JSON with Serde, and handling errors in real network code.