In the previous tutorial, you learned about advanced coroutine topics. Now let’s learn about Flow. Flow is Kotlin’s way of working with reactive streams. It lets you emit multiple values over time, with full support for coroutines.
In this tutorial, you will learn:
- What is Flow
- Flow builder and
emit/collect - Operators:
map,filter,take,transform - Terminal operators:
toList,first,reduce,fold flowOnfor changing dispatcherscombineandzip- Error handling with
catchandretry onCompletionandonEachconflateanddistinctUntilChangedflatMapConcat,flatMapMerge,flatMapLatestStateFlowandSharedFlow(hot flows)stateInandshareIn- Practical examples
What is Flow?
Flow is a cold, asynchronous stream that emits values one at a time.
- Cold means it only runs when someone collects it. No collector, no execution.
- Asynchronous means it works with coroutines and does not block.
Think of a flow as a function that can return multiple values over time.
Regular function: () -> T // Returns one value
Suspend function: suspend () -> T // Returns one value (async)
Flow: () -> Flow<T> // Returns multiple values (async)
Flow vs Channel
| Feature | Flow | Channel |
|---|---|---|
| Hot/Cold | Cold (runs on collect) | Hot (runs immediately) |
| Multiple collectors | Each gets own execution | Values shared between collectors |
| Cancellation | Automatic on scope cancel | Manual close needed |
| Use case | Data streams, transformations | Communication between coroutines |
Creating a Flow
flow Builder
The flow builder creates a flow. Use emit() to send values.
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(50) // Simulate some work
emit(i) // Emit a value
}
}
flowOf
flowOf creates a flow from a fixed set of values:
val greetings = flowOf("Hello", "World", "Kotlin")
asFlow
asFlow converts a collection or range to a flow:
val numbers = (1..5).asFlow()
val fromList = listOf("a", "b", "c").asFlow()
Collecting a Flow
collect is the primary way to receive values from a flow. It is a suspend function.
simpleFlow().collect { value ->
println("Received: $value")
}
Output:
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
A flow is cold. It only runs when you call collect. Each call to collect starts a new execution:
var count = 0
val myFlow = flow {
count++
emit(count)
}
println(myFlow.first()) // 1
println(myFlow.first()) // 2 — a new execution each time
Flow Operators
Flow operators transform the stream. They are similar to collection operators, but they work lazily.
map
map transforms each element:
(1..5).asFlow()
.map { "Item $it" }
.collect { println(it) }
// Item 1, Item 2, Item 3, Item 4, Item 5
filter
filter keeps only matching elements:
(1..10).asFlow()
.filter { it % 2 == 0 }
.collect { println(it) }
// 2, 4, 6, 8, 10
take
take limits the number of elements:
(1..100).asFlow()
.take(5)
.collect { println(it) }
// 1, 2, 3, 4, 5
transform
transform is the most flexible operator. You can emit zero, one, or many values for each input:
(1..3).asFlow()
.transform { value ->
emit("Processing $value")
delay(50)
emit("Done $value")
}
.collect { println(it) }
Output:
Processing 1
Done 1
Processing 2
Done 2
Processing 3
Done 3
Chaining Operators
You can chain multiple operators together:
(1..20).asFlow()
.filter { it % 2 == 0 } // Keep evens: 2, 4, 6, ...
.map { it * it } // Square them: 4, 16, 36, ...
.take(5) // First 5: 4, 16, 36, 64, 100
.map { "Value: $it" } // Format
.collect { println(it) }
Terminal Operators
Terminal operators start the flow execution and produce a result.
val numbers = (1..5).asFlow()
// toList — collect all values into a list
val list = numbers.toList() // [1, 2, 3, 4, 5]
// toSet — collect into a set
val set = flowOf(1, 2, 2, 3).toSet() // {1, 2, 3}
// first — get the first element
val first = numbers.first() // 1
// first with predicate
val firstEven = numbers.first { it % 2 == 0 } // 2
// reduce — accumulate without initial value
val sum = numbers.reduce { acc, value -> acc + value } // 15
// fold — accumulate with initial value
val product = numbers.fold(1) { acc, value -> acc * value } // 120
// count
val count = numbers.count() // 5
flowOn
flowOn changes the dispatcher for the upstream flow. The collector still runs on the original dispatcher.
fun heavyFlow(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i on ${Thread.currentThread().name}")
emit(i)
}
}.flowOn(Dispatchers.Default) // Emit on Default dispatcher
// Usage
heavyFlow().collect { value ->
println("Collected $value on ${Thread.currentThread().name}")
}
The emission happens on Dispatchers.Default, but the collection happens on the caller’s dispatcher.
flowOn vs withContext
Never use withContext inside a flow builder. Use flowOn instead:
// BAD — do not use withContext in flow
flow {
withContext(Dispatchers.IO) { // Wrong!
emit(fetchData())
}
}
// GOOD — use flowOn
flow {
emit(fetchData())
}.flowOn(Dispatchers.IO)
combine
combine merges two or more flows. Every time any flow emits a new value, the latest values from all flows are combined.
val name = flowOf("Alex")
val age = flowOf(28)
val city = flowOf("Berlin")
combine(name, age, city) { n, a, c ->
"$n, $a, $c"
}.collect { println(it) }
// "Alex, 28, Berlin"
combine is useful for UI state. When any input changes, the combined output updates:
val searchQuery = MutableStateFlow("")
val sortOrder = MutableStateFlow("name")
val filterActive = MutableStateFlow(true)
combine(searchQuery, sortOrder, filterActive) { query, sort, active ->
loadItems(query, sort, active)
}
zip
zip pairs elements from two flows one by one. It stops when either flow completes.
val names = flowOf("Alex", "Sam", "Jordan")
val scores = flowOf(95, 87, 92)
names.zip(scores) { name, score ->
"$name: $score"
}.collect { println(it) }
Output:
Alex: 95
Sam: 87
Jordan: 92
combine vs zip
| Feature | combine | zip |
|---|---|---|
| Pairing | Latest from each | One-to-one |
| Emission count | Every time any flow emits | Once per pair |
| Stops when | All flows complete | Either flow completes |
| Use case | UI state, live data | Pairing related data |
Error Handling
catch
catch handles exceptions from upstream operators. It does not catch exceptions in the collector.
flow {
emit(1)
emit(2)
throw RuntimeException("Error at 3")
}.catch { e ->
println("Caught: ${e.message}")
emit(-1) // Emit a fallback value
}.collect { println(it) }
Output:
1
2
Caught: Error at 3
-1
retry
retry re-runs the flow when an exception occurs:
var attempt = 0
flow {
attempt++
if (attempt < 3) throw RuntimeException("Attempt $attempt failed")
emit(42)
}.retry(3) { e ->
println("Retrying: ${e.message}")
delay(100)
true // Return true to retry
}.collect { println(it) }
Output:
Retrying: Attempt 1 failed
Retrying: Attempt 2 failed
42
onCompletion
onCompletion runs when the flow completes, whether normally or with an exception:
(1..3).asFlow()
.onCompletion { cause ->
if (cause == null) println("Completed successfully")
else println("Failed: ${cause.message}")
}
.collect { println(it) }
Output:
1
2
3
Completed successfully
onEach
onEach performs a side effect for each element without changing it. Useful for logging:
(1..5).asFlow()
.onEach { println("Processing: $it") }
.filter { it % 2 == 0 }
.collect { println("Result: $it") }
conflate
conflate skips intermediate values when the collector is slow. The collector always gets the most recent value.
flow {
for (i in 1..10) {
emit(i)
delay(10) // Fast producer
}
}.conflate()
.collect { value ->
println("Collecting $value")
delay(50) // Slow collector — some values are skipped
}
Use conflate when you only care about the latest value. For example, live sensor data where you only need the current reading.
distinctUntilChanged
distinctUntilChanged filters out consecutive duplicate values:
flowOf(1, 1, 2, 2, 3, 1, 1, 4)
.distinctUntilChanged()
.collect { println(it) }
// 1, 2, 3, 1, 4
Note: it only removes consecutive duplicates. The value 1 appears twice because there is a 3 between them.
flatMap Operators
flatMapConcat
flatMapConcat processes inner flows one at a time, in order:
(1..3).asFlow()
.flatMapConcat { value ->
flow {
emit("$value-a")
delay(50)
emit("$value-b")
}
}
.collect { println(it) }
// 1-a, 1-b, 2-a, 2-b, 3-a, 3-b
flatMapMerge
flatMapMerge processes inner flows concurrently:
(1..3).asFlow()
.flatMapMerge { value ->
flow {
emit("$value-a")
delay(50)
emit("$value-b")
}
}
.collect { println(it) }
// Order may vary: 1-a, 2-a, 3-a, 1-b, 2-b, 3-b
flatMapLatest
flatMapLatest cancels the previous inner flow when a new value arrives:
(1..3).asFlow()
.flatMapLatest { value ->
flow {
emit("$value-a")
delay(100)
emit("$value-b") // May be cancelled
}
}
.collect { println(it) }
// 1-a, 2-a, 3-a, 3-b (only the last inner flow completes)
Use flatMapLatest for search-as-you-type: cancel the old search when the user types a new character.
Practical Example: Search with Debounce
Wait for the user to stop typing before searching:
fun searchWithDebounce(queries: Flow<String>): Flow<List<String>> {
return queries
.debounce(300) // Wait 300ms after last keystroke
.distinctUntilChanged() // Ignore same query
.filter { it.isNotBlank() } // Ignore empty queries
.map { query ->
searchDatabase(query) // Perform the search
}
}
This is a very common pattern in mobile and web apps. Without debounce, every keystroke triggers a search. With debounce, the search only runs after the user pauses.
Practical Example: Polling
Fetch data at regular intervals:
fun pollData(intervalMs: Long): Flow<String> {
return flow {
var count = 0
while (true) {
count++
emit("Data update $count")
delay(intervalMs)
}
}
}
// Usage — take first 5 updates
pollData(1000).take(5).collect { println(it) }
The flow emits a new value every second. It runs forever, but take(5) stops after 5 values.
Practical Example: Dashboard State
Combine multiple data sources into a single UI state:
fun userDashboard(
userName: Flow<String>,
notifications: Flow<Int>
): Flow<String> {
return userName.combine(notifications) { name, count ->
"$name has $count notifications"
}
}
When either the user name or notification count changes, the dashboard updates.
Practical Example: Batch Processing
Process a list of items as a flow with progress tracking:
fun processBatch(items: List<String>): Flow<String> {
return items.asFlow()
.map { item ->
delay(50)
item.uppercase()
}
.onEach { println("Processed: $it") }
}
// Usage
val results = processBatch(listOf("hello", "world")).toList()
println(results) // [HELLO, WORLD]
StateFlow
StateFlow is a hot flow that always holds a value. It emits the current value to new collectors and only emits when the value changes. Think of it as a reactive variable.
val counter = MutableStateFlow(0) // Initial value is 0
// Read the current value
println(counter.value) // 0
// Update the value
counter.value = 5
println(counter.value) // 5
Collecting StateFlow
val userName = MutableStateFlow("Alex")
// Each collector gets the current value immediately, then updates
launch {
userName.collect { name ->
println("Name is: $name")
}
}
userName.value = "Sam" // Collector prints: Name is: Sam
userName.value = "Sam" // NOT printed — same value, no emission
userName.value = "Jordan" // Collector prints: Name is: Jordan
StateFlow only emits when the value changes. Setting the same value again does not trigger a new emission. This is similar to distinctUntilChanged.
MutableStateFlow vs StateFlow
Expose StateFlow (read-only) to the outside, keep MutableStateFlow private:
class UserRepository {
private val _users = MutableStateFlow<List<String>>(emptyList())
val users: StateFlow<List<String>> = _users.asStateFlow()
fun addUser(name: String) {
_users.value = _users.value + name
}
}
This pattern is common in Android ViewModels and any reactive architecture.
SharedFlow
SharedFlow is a hot flow that can emit values to multiple collectors. Unlike StateFlow, it does not require an initial value, and it can emit the same value multiple times.
val events = MutableSharedFlow<String>()
// Collector 1
launch {
events.collect { println("Collector 1: $it") }
}
// Collector 2
launch {
events.collect { println("Collector 2: $it") }
}
events.emit("click")
// Collector 1: click
// Collector 2: click
Both collectors receive the same event. This is useful for one-time events like navigation, snackbar messages, or error notifications.
SharedFlow Configuration
val events = MutableSharedFlow<String>(
replay = 1, // New collectors get the last 1 event
extraBufferCapacity = 10 // Buffer for slow collectors
)
StateFlow vs SharedFlow
| Feature | StateFlow | SharedFlow |
|---|---|---|
| Initial value | Required | Not required |
| Replays | Always replays current value | Configurable replay |
| Duplicate values | Skipped (distinctUntilChanged) | Emitted |
| Use case | State (UI state, data) | Events (clicks, navigation) |
stateIn and shareIn
stateIn and shareIn convert a cold flow to a hot flow.
stateIn
Convert a cold flow to a StateFlow:
val userFlow: Flow<String> = flow {
emit(fetchUserName())
}
// Convert to StateFlow with an initial value
val userName: StateFlow<String> = userFlow.stateIn(
scope = coroutineScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = "Loading..."
)
shareIn
Convert a cold flow to a SharedFlow:
val tickerFlow = flow {
var count = 0
while (true) {
emit(count++)
delay(1000)
}
}
// Share among multiple collectors
val sharedTicker = tickerFlow.shareIn(
scope = coroutineScope,
started = SharingStarted.WhileSubscribed(),
replay = 1
)
SharingStarted Options
| Option | Behavior |
|---|---|
Eagerly | Start immediately, never stop |
Lazily | Start on first collector, never stop |
WhileSubscribed() | Start on first collector, stop when last unsubscribes |
WhileSubscribed(5000) keeps the flow alive for 5 seconds after the last collector unsubscribes. This is useful for surviving configuration changes in Android.
Common Mistakes
Mistake 1: Using withContext Inside flow
// BAD
flow {
withContext(Dispatchers.IO) {
emit(fetchData())
}
}
// GOOD
flow {
emit(fetchData())
}.flowOn(Dispatchers.IO)
Mistake 2: Collecting on the Wrong Thread
// BAD — collecting on Main but doing heavy work
heavyFlow.collect { value ->
processHeavyData(value) // Blocks Main thread
}
// GOOD — use flowOn to move work off Main
heavyFlow
.map { processHeavyData(it) }
.flowOn(Dispatchers.Default)
.collect { updateUI(it) }
Mistake 3: Not Handling Errors
// BAD — exception crashes the app
riskyFlow.collect { println(it) }
// GOOD — handle errors with catch
riskyFlow
.catch { e -> println("Error: ${e.message}") }
.collect { println(it) }
Summary
| Concept | Description |
|---|---|
flow { } | Create a cold flow |
emit() | Send a value downstream |
collect | Receive values (terminal) |
map, filter | Transform elements |
take | Limit number of elements |
flowOn | Change upstream dispatcher |
combine | Merge flows (latest values) |
zip | Pair flows one-to-one |
catch | Handle upstream errors |
retry | Re-run flow on error |
onCompletion | Run code when flow finishes |
conflate | Skip intermediate values |
debounce | Wait before processing |
distinctUntilChanged | Remove consecutive duplicates |
flatMapConcat | Sequential inner flows |
flatMapMerge | Concurrent inner flows |
flatMapLatest | Cancel previous inner flow |
StateFlow | Hot flow that holds current value |
SharedFlow | Hot flow for events (multiple collectors) |
stateIn | Convert cold flow to StateFlow |
shareIn | Convert cold flow to SharedFlow |
Source Code
You can find the source code for this tutorial on GitHub: tutorial-19-flow
What’s Next?
In the next tutorial, you will learn about inline functions, reified types, and contracts.