In the previous tutorial, you learned about advanced coroutine topics. Now let’s learn about Flow. Flow is Kotlin’s way of working with reactive streams. It lets you emit multiple values over time, with full support for coroutines.

In this tutorial, you will learn:

  • What is Flow
  • Flow builder and emit/collect
  • Operators: map, filter, take, transform
  • Terminal operators: toList, first, reduce, fold
  • flowOn for changing dispatchers
  • combine and zip
  • Error handling with catch and retry
  • onCompletion and onEach
  • conflate and distinctUntilChanged
  • flatMapConcat, flatMapMerge, flatMapLatest
  • StateFlow and SharedFlow (hot flows)
  • stateIn and shareIn
  • Practical examples

What is Flow?

Flow is a cold, asynchronous stream that emits values one at a time.

  • Cold means it only runs when someone collects it. No collector, no execution.
  • Asynchronous means it works with coroutines and does not block.

Think of a flow as a function that can return multiple values over time.

Regular function: () -> T       // Returns one value
Suspend function: suspend () -> T  // Returns one value (async)
Flow:             () -> Flow<T>    // Returns multiple values (async)

Flow vs Channel

FeatureFlowChannel
Hot/ColdCold (runs on collect)Hot (runs immediately)
Multiple collectorsEach gets own executionValues shared between collectors
CancellationAutomatic on scope cancelManual close needed
Use caseData streams, transformationsCommunication between coroutines

Creating a Flow

flow Builder

The flow builder creates a flow. Use emit() to send values.

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(50) // Simulate some work
        emit(i)   // Emit a value
    }
}

flowOf

flowOf creates a flow from a fixed set of values:

val greetings = flowOf("Hello", "World", "Kotlin")

asFlow

asFlow converts a collection or range to a flow:

val numbers = (1..5).asFlow()
val fromList = listOf("a", "b", "c").asFlow()

Collecting a Flow

collect is the primary way to receive values from a flow. It is a suspend function.

simpleFlow().collect { value ->
    println("Received: $value")
}

Output:

Received: 1
Received: 2
Received: 3
Received: 4
Received: 5

A flow is cold. It only runs when you call collect. Each call to collect starts a new execution:

var count = 0
val myFlow = flow {
    count++
    emit(count)
}

println(myFlow.first()) // 1
println(myFlow.first()) // 2 — a new execution each time

Flow Operators

Flow operators transform the stream. They are similar to collection operators, but they work lazily.

map

map transforms each element:

(1..5).asFlow()
    .map { "Item $it" }
    .collect { println(it) }
// Item 1, Item 2, Item 3, Item 4, Item 5

filter

filter keeps only matching elements:

(1..10).asFlow()
    .filter { it % 2 == 0 }
    .collect { println(it) }
// 2, 4, 6, 8, 10

take

take limits the number of elements:

(1..100).asFlow()
    .take(5)
    .collect { println(it) }
// 1, 2, 3, 4, 5

transform

transform is the most flexible operator. You can emit zero, one, or many values for each input:

(1..3).asFlow()
    .transform { value ->
        emit("Processing $value")
        delay(50)
        emit("Done $value")
    }
    .collect { println(it) }

Output:

Processing 1
Done 1
Processing 2
Done 2
Processing 3
Done 3

Chaining Operators

You can chain multiple operators together:

(1..20).asFlow()
    .filter { it % 2 == 0 }      // Keep evens: 2, 4, 6, ...
    .map { it * it }              // Square them: 4, 16, 36, ...
    .take(5)                       // First 5: 4, 16, 36, 64, 100
    .map { "Value: $it" }         // Format
    .collect { println(it) }

Terminal Operators

Terminal operators start the flow execution and produce a result.

val numbers = (1..5).asFlow()

// toList — collect all values into a list
val list = numbers.toList() // [1, 2, 3, 4, 5]

// toSet — collect into a set
val set = flowOf(1, 2, 2, 3).toSet() // {1, 2, 3}

// first — get the first element
val first = numbers.first() // 1

// first with predicate
val firstEven = numbers.first { it % 2 == 0 } // 2

// reduce — accumulate without initial value
val sum = numbers.reduce { acc, value -> acc + value } // 15

// fold — accumulate with initial value
val product = numbers.fold(1) { acc, value -> acc * value } // 120

// count
val count = numbers.count() // 5

flowOn

flowOn changes the dispatcher for the upstream flow. The collector still runs on the original dispatcher.

fun heavyFlow(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i on ${Thread.currentThread().name}")
        emit(i)
    }
}.flowOn(Dispatchers.Default) // Emit on Default dispatcher

// Usage
heavyFlow().collect { value ->
    println("Collected $value on ${Thread.currentThread().name}")
}

The emission happens on Dispatchers.Default, but the collection happens on the caller’s dispatcher.

flowOn vs withContext

Never use withContext inside a flow builder. Use flowOn instead:

// BAD — do not use withContext in flow
flow {
    withContext(Dispatchers.IO) { // Wrong!
        emit(fetchData())
    }
}

// GOOD — use flowOn
flow {
    emit(fetchData())
}.flowOn(Dispatchers.IO)

combine

combine merges two or more flows. Every time any flow emits a new value, the latest values from all flows are combined.

val name = flowOf("Alex")
val age = flowOf(28)
val city = flowOf("Berlin")

combine(name, age, city) { n, a, c ->
    "$n, $a, $c"
}.collect { println(it) }
// "Alex, 28, Berlin"

combine is useful for UI state. When any input changes, the combined output updates:

val searchQuery = MutableStateFlow("")
val sortOrder = MutableStateFlow("name")
val filterActive = MutableStateFlow(true)

combine(searchQuery, sortOrder, filterActive) { query, sort, active ->
    loadItems(query, sort, active)
}

zip

zip pairs elements from two flows one by one. It stops when either flow completes.

val names = flowOf("Alex", "Sam", "Jordan")
val scores = flowOf(95, 87, 92)

names.zip(scores) { name, score ->
    "$name: $score"
}.collect { println(it) }

Output:

Alex: 95
Sam: 87
Jordan: 92

combine vs zip

Featurecombinezip
PairingLatest from eachOne-to-one
Emission countEvery time any flow emitsOnce per pair
Stops whenAll flows completeEither flow completes
Use caseUI state, live dataPairing related data

Error Handling

catch

catch handles exceptions from upstream operators. It does not catch exceptions in the collector.

flow {
    emit(1)
    emit(2)
    throw RuntimeException("Error at 3")
}.catch { e ->
    println("Caught: ${e.message}")
    emit(-1) // Emit a fallback value
}.collect { println(it) }

Output:

1
2
Caught: Error at 3
-1

retry

retry re-runs the flow when an exception occurs:

var attempt = 0
flow {
    attempt++
    if (attempt < 3) throw RuntimeException("Attempt $attempt failed")
    emit(42)
}.retry(3) { e ->
    println("Retrying: ${e.message}")
    delay(100)
    true // Return true to retry
}.collect { println(it) }

Output:

Retrying: Attempt 1 failed
Retrying: Attempt 2 failed
42

onCompletion

onCompletion runs when the flow completes, whether normally or with an exception:

(1..3).asFlow()
    .onCompletion { cause ->
        if (cause == null) println("Completed successfully")
        else println("Failed: ${cause.message}")
    }
    .collect { println(it) }

Output:

1
2
3
Completed successfully

onEach

onEach performs a side effect for each element without changing it. Useful for logging:

(1..5).asFlow()
    .onEach { println("Processing: $it") }
    .filter { it % 2 == 0 }
    .collect { println("Result: $it") }

conflate

conflate skips intermediate values when the collector is slow. The collector always gets the most recent value.

flow {
    for (i in 1..10) {
        emit(i)
        delay(10) // Fast producer
    }
}.conflate()
.collect { value ->
    println("Collecting $value")
    delay(50) // Slow collector — some values are skipped
}

Use conflate when you only care about the latest value. For example, live sensor data where you only need the current reading.

distinctUntilChanged

distinctUntilChanged filters out consecutive duplicate values:

flowOf(1, 1, 2, 2, 3, 1, 1, 4)
    .distinctUntilChanged()
    .collect { println(it) }
// 1, 2, 3, 1, 4

Note: it only removes consecutive duplicates. The value 1 appears twice because there is a 3 between them.

flatMap Operators

flatMapConcat

flatMapConcat processes inner flows one at a time, in order:

(1..3).asFlow()
    .flatMapConcat { value ->
        flow {
            emit("$value-a")
            delay(50)
            emit("$value-b")
        }
    }
    .collect { println(it) }
// 1-a, 1-b, 2-a, 2-b, 3-a, 3-b

flatMapMerge

flatMapMerge processes inner flows concurrently:

(1..3).asFlow()
    .flatMapMerge { value ->
        flow {
            emit("$value-a")
            delay(50)
            emit("$value-b")
        }
    }
    .collect { println(it) }
// Order may vary: 1-a, 2-a, 3-a, 1-b, 2-b, 3-b

flatMapLatest

flatMapLatest cancels the previous inner flow when a new value arrives:

(1..3).asFlow()
    .flatMapLatest { value ->
        flow {
            emit("$value-a")
            delay(100)
            emit("$value-b") // May be cancelled
        }
    }
    .collect { println(it) }
// 1-a, 2-a, 3-a, 3-b (only the last inner flow completes)

Use flatMapLatest for search-as-you-type: cancel the old search when the user types a new character.

Practical Example: Search with Debounce

Wait for the user to stop typing before searching:

fun searchWithDebounce(queries: Flow<String>): Flow<List<String>> {
    return queries
        .debounce(300)              // Wait 300ms after last keystroke
        .distinctUntilChanged()     // Ignore same query
        .filter { it.isNotBlank() } // Ignore empty queries
        .map { query ->
            searchDatabase(query)    // Perform the search
        }
}

This is a very common pattern in mobile and web apps. Without debounce, every keystroke triggers a search. With debounce, the search only runs after the user pauses.

Practical Example: Polling

Fetch data at regular intervals:

fun pollData(intervalMs: Long): Flow<String> {
    return flow {
        var count = 0
        while (true) {
            count++
            emit("Data update $count")
            delay(intervalMs)
        }
    }
}

// Usage — take first 5 updates
pollData(1000).take(5).collect { println(it) }

The flow emits a new value every second. It runs forever, but take(5) stops after 5 values.

Practical Example: Dashboard State

Combine multiple data sources into a single UI state:

fun userDashboard(
    userName: Flow<String>,
    notifications: Flow<Int>
): Flow<String> {
    return userName.combine(notifications) { name, count ->
        "$name has $count notifications"
    }
}

When either the user name or notification count changes, the dashboard updates.

Practical Example: Batch Processing

Process a list of items as a flow with progress tracking:

fun processBatch(items: List<String>): Flow<String> {
    return items.asFlow()
        .map { item ->
            delay(50)
            item.uppercase()
        }
        .onEach { println("Processed: $it") }
}

// Usage
val results = processBatch(listOf("hello", "world")).toList()
println(results) // [HELLO, WORLD]

StateFlow

StateFlow is a hot flow that always holds a value. It emits the current value to new collectors and only emits when the value changes. Think of it as a reactive variable.

val counter = MutableStateFlow(0) // Initial value is 0

// Read the current value
println(counter.value) // 0

// Update the value
counter.value = 5
println(counter.value) // 5

Collecting StateFlow

val userName = MutableStateFlow("Alex")

// Each collector gets the current value immediately, then updates
launch {
    userName.collect { name ->
        println("Name is: $name")
    }
}

userName.value = "Sam"   // Collector prints: Name is: Sam
userName.value = "Sam"   // NOT printed — same value, no emission
userName.value = "Jordan" // Collector prints: Name is: Jordan

StateFlow only emits when the value changes. Setting the same value again does not trigger a new emission. This is similar to distinctUntilChanged.

MutableStateFlow vs StateFlow

Expose StateFlow (read-only) to the outside, keep MutableStateFlow private:

class UserRepository {
    private val _users = MutableStateFlow<List<String>>(emptyList())
    val users: StateFlow<List<String>> = _users.asStateFlow()

    fun addUser(name: String) {
        _users.value = _users.value + name
    }
}

This pattern is common in Android ViewModels and any reactive architecture.

SharedFlow

SharedFlow is a hot flow that can emit values to multiple collectors. Unlike StateFlow, it does not require an initial value, and it can emit the same value multiple times.

val events = MutableSharedFlow<String>()

// Collector 1
launch {
    events.collect { println("Collector 1: $it") }
}

// Collector 2
launch {
    events.collect { println("Collector 2: $it") }
}

events.emit("click")
// Collector 1: click
// Collector 2: click

Both collectors receive the same event. This is useful for one-time events like navigation, snackbar messages, or error notifications.

SharedFlow Configuration

val events = MutableSharedFlow<String>(
    replay = 1,           // New collectors get the last 1 event
    extraBufferCapacity = 10 // Buffer for slow collectors
)

StateFlow vs SharedFlow

FeatureStateFlowSharedFlow
Initial valueRequiredNot required
ReplaysAlways replays current valueConfigurable replay
Duplicate valuesSkipped (distinctUntilChanged)Emitted
Use caseState (UI state, data)Events (clicks, navigation)

stateIn and shareIn

stateIn and shareIn convert a cold flow to a hot flow.

stateIn

Convert a cold flow to a StateFlow:

val userFlow: Flow<String> = flow {
    emit(fetchUserName())
}

// Convert to StateFlow with an initial value
val userName: StateFlow<String> = userFlow.stateIn(
    scope = coroutineScope,
    started = SharingStarted.WhileSubscribed(5000),
    initialValue = "Loading..."
)

shareIn

Convert a cold flow to a SharedFlow:

val tickerFlow = flow {
    var count = 0
    while (true) {
        emit(count++)
        delay(1000)
    }
}

// Share among multiple collectors
val sharedTicker = tickerFlow.shareIn(
    scope = coroutineScope,
    started = SharingStarted.WhileSubscribed(),
    replay = 1
)

SharingStarted Options

OptionBehavior
EagerlyStart immediately, never stop
LazilyStart on first collector, never stop
WhileSubscribed()Start on first collector, stop when last unsubscribes

WhileSubscribed(5000) keeps the flow alive for 5 seconds after the last collector unsubscribes. This is useful for surviving configuration changes in Android.

Common Mistakes

Mistake 1: Using withContext Inside flow

// BAD
flow {
    withContext(Dispatchers.IO) {
        emit(fetchData())
    }
}

// GOOD
flow {
    emit(fetchData())
}.flowOn(Dispatchers.IO)

Mistake 2: Collecting on the Wrong Thread

// BAD — collecting on Main but doing heavy work
heavyFlow.collect { value ->
    processHeavyData(value) // Blocks Main thread
}

// GOOD — use flowOn to move work off Main
heavyFlow
    .map { processHeavyData(it) }
    .flowOn(Dispatchers.Default)
    .collect { updateUI(it) }

Mistake 3: Not Handling Errors

// BAD — exception crashes the app
riskyFlow.collect { println(it) }

// GOOD — handle errors with catch
riskyFlow
    .catch { e -> println("Error: ${e.message}") }
    .collect { println(it) }

Summary

ConceptDescription
flow { }Create a cold flow
emit()Send a value downstream
collectReceive values (terminal)
map, filterTransform elements
takeLimit number of elements
flowOnChange upstream dispatcher
combineMerge flows (latest values)
zipPair flows one-to-one
catchHandle upstream errors
retryRe-run flow on error
onCompletionRun code when flow finishes
conflateSkip intermediate values
debounceWait before processing
distinctUntilChangedRemove consecutive duplicates
flatMapConcatSequential inner flows
flatMapMergeConcurrent inner flows
flatMapLatestCancel previous inner flow
StateFlowHot flow that holds current value
SharedFlowHot flow for events (multiple collectors)
stateInConvert cold flow to StateFlow
shareInConvert cold flow to SharedFlow

Source Code

You can find the source code for this tutorial on GitHub: tutorial-19-flow

What’s Next?

In the next tutorial, you will learn about inline functions, reified types, and contracts.