Kotlin Flow Guide

The earlier chapter, A Guide to Kotlin Coroutines taught us about Kotlin Coroutines and explained how they can perform multiple tasks concurrently without blocking the main thread. As we have seen, coroutine suspend functions are ideal for performing tasks that return a single result value. In this chapter, we will introduce Kotlin Flows and explore how these can be used to return sequential streams of results from coroutine-based tasks.

By the end of the chapter, you should understand the Flow, StateFlow, and SharedFlow Kotlin types and appreciate the difference between hot and cold flow streams. In the next chapter (An Android Studio SharedFlow Tutorial), we will look more closely at using SharedFlow within the context of Android app development.

Understanding Flows

Flows are a part of the Kotlin programming language and are designed to allow multiple values to be returned sequentially from coroutine-based asynchronous tasks. A stream of data arriving over time via a network connection is ideal for using a Kotlin flow.

Flows are comprised of producers, intermediaries, and consumers. Producers are responsible for providing the data that makes up the flow. For example, the code that retrieves the stream of data from our hypothetical network connection would be considered a producer. As each data value becomes available, the producer emits that value to the flow. The consumer sits at the opposite end of the flow stream and collects the values as the producer emits them.

Intermediaries may be placed between the producer and consumer to perform additional operations on the data, such as filtering the stream, performing additional processing, or transforming the data in other ways before it reaches the consumer. Figure 66-1 illustrates the typical structure of a Kotlin flow:

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

Figure 66-1

The flow shown in the above diagram consists of a single producer and a consumer. In practice, multiple consumers can collect emissions from a single producer, and a single consumer can collect data from multiple producers. The remainder of this chapter will demonstrate many of the key features of Kotlin flows.

Creating the Sample Project

Select the New Project option from the Android Studio welcome screen and, within the resulting new project dialog, choose the Empty Views Activity template before clicking on the Next button.

Enter FlowDemo into the Name field and specify com.ebookfrenzy.flowdemo as the package name. Before clicking on the Finish button, change the Minimum API level setting to API 26: Android 8.0 (Oreo) and the Language menu to Kotlin.

Once the new project has been created, locate and load the activity_main.xml layout file located in the Project tool window under app -> res -> layout and, with the Layout Editor tool in Design mode, replace the TextView object with a Button view and set the text property so that it reads “Start”. Once the text value has been set, follow the usual steps to extract the string to a resource.

With the button still selected in the layout, locate the onClick property in the Attributes panel and configure it to call a method named handleFlow.

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

Adding the Kotlin Lifecycle Library

Kotlin flow requires that the Kotlin extensions lifecycle library is included as a dependency, so edit the build. gradle.kts (Module: app) file and add the library to the dependencies section as follows:

dependencies {
.
.
    implementation ("androidx.lifecycle:lifecycle-runtime-ktx:2.6.1")
.
.
}Code language: Gradle (gradle)

When prompted, click the Sync Now button at the top of the editor panel to commit to the change.

Declaring a Flow

The Kotlin Flow type represents the most basic form of flow. Each flow can only emit data of a single type which must be specified when the flow is declared. The following declaration, for example, declares a Flow instance designed to stream String-based data:

Flow<String>Code language: Kotlin (kotlin)

When declaring a flow, we need to assign to it the code that will generate the data stream. This code is referred to as the producer block. This can be achieved using the flow builder, which takes a coroutine suspend block containing the producer block code as a parameter. Add the following code to the MainActivity.kt file to declare a flow named myFlow designed to emit a stream of integer values:

.
.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
.
.
fun myFlow(): Flow<Int> = flow {
    // Producer block  
}Code language: Kotlin (kotlin)

As an alternative to the flow builder, the flowOf() builder can be used to convert a fixed set of values into a flow:

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

val myFlow2 = flowOf(2, 4, 6, 8)Code language: Kotlin (kotlin)

Also, many Kotlin collection types now include an asFlow() extension function that can be called to convert the contained data to a flow. The following code, for example, converts an array of string values to a flow:

val myArrayFlow = arrayOf<String>("Red", "Green", "Blue").asFlow()Code language: Kotlin (kotlin)

Emitting Flow Data

Once a flow has been built, the next step is to ensure the data is emitted to reach any consumers observing the flow. Of the three flow builders we looked at in the previous section, only the flowOf() and asFlow() builders create flows that automatically emit the data as soon as a consumer starts collecting. In the case of the flow builder, however, we need to write code to emit each value as it becomes available manually. We achieve this by making calls to the emit() function and passing through as an argument the current value to be streamed. The following changes to our myFlow declaration implement a loop that emits the value of an incrementing counter. To demonstrate the asynchronous nature of flow streams, a two-second delay is performed on each loop iteration:

fun myFlow(): Flow<Int> = flow {
    var counter = 1
    
    while (counter < 6) {
        emit(counter)
        counter++
        delay(2000)
    }
}Code language: Kotlin (kotlin)

Collecting Flow Data

A consumer can collect the streaming data within a flow by calling the collect() method on the flow instance. This will continue to collect data from the stream either until the stream ends or the lifecycle scope in which the collection is being performed is destroyed. For example, we can collect the data from the myFlow stream and output each value by adding the handleFlow() onClick function:

.
.
import android.view.View
.
.
fun handleFlow(view: View) {
    lifecycleScope.launch {
        myFlow().collect() { value ->
            println("Collected value = $value")
        }
    }
}Code language: Kotlin (kotlin)

Note that collect() is a suspend function, so it must be called from within a coroutine scope.

Compile and run the app on a device or emulator and display the Logcat tool window. When the Start button is clicked in the running app, the following output should appear with a two-second delay between each output:

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

Collected value = 1
Collected value = 2
Collected value = 3
Collected value = 4
Collected value = 5Code language: plaintext (plaintext)

To add code to be executed when the stream ends, the collection can be performed in a try/finally construct, for example:

fun handleFlow(view: View) {
    lifecycleScope.launch {
        try {
            myFlow().collect() { value ->
                println("Collected value = $value")
            }
        } finally {
            println("Flow stream ended.")
        }
    }
}Code language: Kotlin (kotlin)

The collect() operator will collect every value the producer emits, even if new values are emitted while the consumer is still processing the last value. For example, our producer is configured to emit a new value every two seconds. Suppose, however, that we simulate our consumer taking 2.5 seconds to process each collected value as follows:

fun handleFlow(view: View) {
    lifecycleScope.launch {
        myFlow().collect() { value ->
            println("Collected value = $value")
            delay(2500)
        }
    }
}Code language: Kotlin (kotlin)

When executed, we will still see all the values listed in the output because collect() does not discard any uncollected values, regardless of whether more recent ones have been emitted since the last collection. This type of behavior is essential to avoid data loss within the flow. In some situations, however, the consumer may be uninterested in any intermediate values emitted between the most recently processed value and the latest emitted value. In this case, the collectLatest() operator can be called on the flow instance. This operator works by canceling the current collection if a new value arrives before processing completes on the previous value and restarts the process on the latest value.

The conflate() operator is similar to the collectLatest() operator except that instead of canceling the current collection operation when a new value arrives, conflate() allows the current operation to complete but discards intermediate values that arrive during this process. When the current operation completes, the most recent value is then collected.

Another collection operator is the single() operator. This operator collects a single value from the flow and throws an exception if it finds another value in the stream. This operator is generally only useful where the appearance of a second stream value indicates that something else has gone wrong somewhere in the app or data source.

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

Adding a Flow Buffer

When a consumer takes time to process the values emitted by a producer, there is the potential for execution time inefficiencies. Suppose, for example, that in addition to the two-second delay between each emission from our myFlow producer, the collection process in our consumer takes an additional second to complete. We can simulate this behavior as follows:

.
.
import kotlin.system.measureTimeMillis
.
.
fun handleFlow(view: View) {
    lifecycleScope.launch {
        val elapsedTime = measureTimeMillis {
            myFlow()
                .collect() { value ->
                    println("Collected value = $value")
                    delay(1000)
                }
        }
        println("Duration = $elapsedTime")
    }
}Code language: Kotlin (kotlin)

To allow us to measure the total time to process the flow fully, the consumer code has been placed in the closure of a call to the Kotlin measureTimeMillis() function. After execution completes, a duration similar to the following will be reported:

Duration = 15024Code language: plaintext (plaintext)

This accounts for approximately ten seconds to process the five values within myFlow and another five seconds to collect those values. There is an inefficiency here because the producer waits for the consumer to process each value before starting on the next value. This would be much more efficient if the producer did not have to wait for the consumer. We could use the collectLatest() or conflate() operators, but only if the loss of intermediate values is not a concern. To speed up the processing while collecting emitted values, we can use the buffer() operator. This operator buffers values as they are emitted and passes them to the consumer when it is ready to receive them. This allows the producer to continue emitting values while the consumer processes preceding values while ensuring that every emitted value is collected. The buffer() operator may be applied to a flow as follows:

val elapsedTime = measureTimeMillis {
    myFlow()
        .buffer()
        .collect() { value ->
        println("Collected value = $value")
        delay(1000)
    }
}
println("Duration = $elapsedTime")Code language: Kotlin (kotlin)

Execution of the above code indicates that we have now reclaimed the five seconds previously lost in the collection code:

Duration = 10323Code language: plaintext (plaintext)

Transforming Data with Intermediaries

All of the examples we have looked at in this chapter have passed the data values to the consumer without any modifications. Changes to the data can be made between the producer and consumer by applying one or more intermediate flow operators. In this section, we will look at some of these operators.

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

The map() operator can convert the value to some other value. We can use map(), for example, to convert our integer value to a string:

fun handleFlow(view: View) {
    lifecycleScope.launch {
        myFlow()
            .map {
                "Collected value = $it"
            }
            .collect() {
                println(it)
            }
        }
    }
}Code language: Kotlin (kotlin)

When the code is executed, it will give us the following output:

Collected value = 1
Collected value = 2
Collected value = 3
Collected value = 4
Collected value = 5Code language: plaintext (plaintext)

The map() operator will perform the conversion on every collected value. The filter() operator can control which values get collected. The filter code block must contain an expression that returns a Boolean value. Only if the expression evaluates to true does the value pass through to the collection. The following code filters odd numbers out of the data flow (note that we’ve left the map() operator in place to demonstrate the chaining of operators):

fun handleFlow(view: View) {
    lifecycleScope.launch {
        myFlow()
            .filter {
                it % 2 == 0
            }
            .map {
                "Collected value $it"
            }
            .collect() {
                println(it)
            }
        }
    }
}Code language: Kotlin (kotlin)

The above changes will generate the following output:

Collected value = 2
Collected value = 4Code language: plaintext (plaintext)

The transform() operator serves a similar purpose to map() but provides more flexibility. The transform() operator also needs to emit the modified result manually. A particular advantage of transform() is that it can emit multiple values, as demonstrated below:

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

.
.
myFlow()
    .transform {
        emit("Value = $it")
        var doubled = it * 2
        emit("Value doubled = $doubled")
    }
    .collect {
        println(it)
    }
}
.
.
// Output
Value = 1
Value doubled = 2
Value = 2
Value doubled = 4
Value = 3
Value doubled = 6
Value = 4
Value doubled = 8
Value = 5
Value doubled = 10Code language: Kotlin (kotlin)

Terminal Flow Operators

All the collection operators covered previously are referred to as terminal flow operators. The reduce() operator is one of several other terminal flow operators that can be used in place of a collection operator to make changes to the flow data. The reduce() operator takes two parameters in the form of an accumulator and a value. The first flow value is placed in the accumulator, and a specified operation is performed between the accumulator and the current value (with the result stored in the accumulator):

.
.
myFlow()
    .fold(10) { accumulator, value ->
        println("accumulator = $accumulator, value = $value")
        accumulator * value
    }
}
.
.
// Output
accumulator = 10, value = 1
accumulator = 10, value = 2
accumulator = 20, value = 3
accumulator = 60, value = 4
accumulator = 240, value = 5Code language: Kotlin (kotlin)

Flow Flattening

As we have seen in earlier examples, we can use operators to perform tasks on values collected from a flow. However, an interesting situation occurs when that task creates one or more flows resulting in a “flow of flows”. In such situations, these streams can be flattened into a single stream. Consider the following example code, which declares two flows:

fun myFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        emit(i)
    }
}
 
fun doubleIt(value: Int) = flow {
    emit(value)
    delay(1000)
    emit(value + value)
}Code language: Kotlin (kotlin)

If we were to call doubleIt() for each value in the myFlow stream, we would end up with a separate flow for each value. This problem can be solved by concatenating the doubleIt() streams into a single flow using the flatMapConcat() operator as follows:

.
.
myFlow()
    .flatMapConcat { doubleIt(it) }
    .collect { println(it) }
.
.Code language: Kotlin (kotlin)

When this modified code executes, we will see the following output from the collect() operator:

1
2
2
4
3
6
4
8
5
10Code language: Kotlin (kotlin)

As we can see from the output, the doubleIt() flow has emitted the value provided by myFlow, followed by the doubled value. When using the flatMapConcat() operator, the doubleIt() calls are being performed synchronously, causing execution to wait until doubleIt() has emitted both values before processing the next flow value. The emitted values can instead be collected asynchronously using the flatMapMerge() operator as follows:

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

myFlow()
    .flatMapMerge { doubleIt(it) }
    .collect { println(it) }
}Code language: Kotlin (kotlin)

When executed, the following output will appear:

1
2
3
4
5
2
4
6
8
10Code language: plaintext (plaintext)

Combining Multiple Flows

Multiple flows can be combined into a single flow using the zip() and combine() operators. The following code demonstrates the zip() operator being used to convert two flows into a single flow:

fun handleFlow(view: View) {
    lifecycleScope.launch {
        val flow1 = (1..5).asFlow()
            .onEach { delay(1000) }
        val flow2 = flowOf("one", "two", "three", "four")
            .onEach { delay(1500) }
        flow1.zip(flow2) { value, string -> "$value, $string" }
            .collect { println(it) }
    }
}
// Output
1, one
2, two
3, three
4, fourCode language: Kotlin (kotlin)

We have applied the onEach() operator to both flows in the above code. This is a useful operator for performing a task on receipt of each stream value.

The zip() operator will wait until both flows have emitted a new value before performing the collection. The combine() operator works slightly differently in that it proceeds as soon as either flow emits a new value, using the last value emitted by the other flow in the absence of a new value:

.
.
val flow1 = (1..5).asFlow()
    .onEach { delay(1000) }
val flow2 = flowOf("one", "two", "three", "four")
    .onEach { delay(1500) }
flow1.combine(flow2) { value, string -> "$value, $string" }
    .collect { println(it) }
.
.
// Output
1, one
2, one
3, one
3, two
4, two
4, three
5, three
5, fourCode language: Kotlin (kotlin)

As we can see from the output, multiple instances have occurred where the last value was reused on one flow because a new value was emitted on the other.

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

Hot and Cold Flows

So far, in this chapter, we have looked exclusively at the Kotlin Flow type. Kotlin also provides additional types in the form of StateFlow and SharedFlow. Before exploring these, however, it is important to understand the concept of hot and cold flows.

A stream declared using the Flow type is called a cold flow because the code within the producer does not begin executing until a consumer begins collecting values. StateFlow and SharedFlow, on the other hand, are referred to as hot flows because they begin emitting values immediately, regardless of whether consumers are collecting the values.

Once a consumer begins collecting from a hot flow, it will receive the latest value emitted by the producer, followed by any subsequent values. Any previous values emitted before the collection starts will be lost unless steps are taken to implement caching.

Another important difference between Flow, StateFlow, and SharedFlow is that a Flow-based stream cannot have multiple collectors. Each Flow collector launches a new flow with its own independent data stream. With StateFlow and SharedFlow, on the other hand, multiple collectors share access to the same flow.

StateFlow

StateFlow, as the name suggests, is primarily used to observe a state change within an app, such as the current setting of a counter, toggle button, or slider. Each StateFlow instance stores a single value likely to change over time and notifies all consumers when those changes occur. This enables you to write code that reacts to changes in the state instead of code that checks whether or not a state value has changed continually. StateFlow behaves the same way as LiveData except that LiveData has lifecycle awareness and does not require an initial value (LiveData was covered previously beginning with the chapter titled Modern Android App Architecture with Jetpack).

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

To create a StateFlow stream, create an instance of MutableStateFlow, passing through a mandatory initial value. This is the variable that will be used to change the current state value from within the app code:

private val _stateFlow = MutableStateFlow(0)Code language: Kotlin (kotlin)

Next, call asStateFlow() on the MutableStateFlow instance to convert it into a StateFlow from which changes in state can be collected:

val stateFlow = _stateFlow.asStateFlow()Code language: Kotlin (kotlin)

Once created, any changes to the state are made via the value property of the mutable state instance. The following code, for example, increments the state value:

_stateFlow.value += 1Code language: Kotlin (kotlin)

Once the flow is active, the state can be consumed in the usual ways, though it is generally recommended to collect from StateFlow using the collectLatest() operator, for example:

stateFlow.collectLatest {
    println("Counter = $it")
}Code language: Kotlin (kotlin)

To try out this example, make the following modifications to the MainActivity.kt file:

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

.
.
class MainActivity : AppCompatActivity() {
 
    private val _stateFlow = MutableStateFlow(0)
    val stateFlow = _stateFlow.asStateFlow()
 
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
 
        lifecycleScope.launch {
            stateFlow.collectLatest {
                println("Counter = $it")
            }
        }
    }
 
    fun handleFlow(view: View) {
        _stateFlow.value += 1
    }
}Code language: Kotlin (kotlin)

Run the app and verify that the Start button outputs the incremented counter value each time it is clicked.

SharedFlow

SharedFlow provides a more general-purpose streaming option than that offered by StateFlow. Some of the key differences between StateFlow and SharedFlow are as follows:

  • Consumers are generally referred to as subscribers.
  • An initial value is not provided when creating a SharedFlow instance.
  • SharedFlow allows values emitted before collection started to be “replayed” to the collector.
  • SharedFlow emits values instead of using a value property.

SharedFlow instances are created using MutableSharedFlow as the backing property on which we call the asSharedFlow() to obtain a SharedFlow reference:

.
.
import kotlinx.coroutines.channels.BufferOverflow
.
.
class MainActivity : AppCompatActivity() {
 
private val _sharedFlow = MutableSharedFlow<Int>(
        replay = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val sharedFlow = _sharedFlow.asSharedFlow()
.
.Code language: Kotlin (kotlin)

As configured above, new flow subscribers will receive the last ten values before receiving any new values. The above flow is configured to discard the oldest value when more than ten values are buffered. The full set of options for handling buffer overflows is as follows:

  • DROP_LATEST – The latest value is dropped when the buffer is full, leaving the buffer unchanged as new values are processed.
  • DROP_OLDEST – Treats the buffer as a “last-in, first-out” stack where the oldest value is dropped to make room for a new value when the buffer is full.
  • SUSPEND – The flow is suspended when the buffer is full.

Values are emitted on a SharedFlow stream by calling the emit() method of the MutableSharedFlow instance:

 

You are reading a sample chapter from an old edition of the Android Studio Essentials – Kotlin Edition book.

Purchase the fully updated Android Studio Iguana Kotlin Edition of this publication in eBook or Print format.

The full book contains 99 chapters and over 842 pages of in-depth information.

Learn more.

Preview  Buy eBook  Buy Print

 

fun handleFlow(view: View) {
 
    var counter = 1
 
    lifecycleScope.launch {
        while (counter < 6) {
            _sharedFlow.emit(counter)
            counter++
            delay(2000)
        }
    }
}Code language: Kotlin (kotlin)

Once the flow is active, subscribers can collect values using the usual techniques on the SharedFlow instance. For example, we can add the following collection code to the onCreate() method of our example project to output the flow values:

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)
 
    lifecycleScope.launch {
        sharedFlow.collect {
            println("$it")
        }
    }
}Code language: Kotlin (kotlin)

Also, the current number of subscribers to a SharedFlow stream can be obtained via the subscriptionCount property of the mutable instance:

val subCount = _sharedFlow.subscriptionCountCode language: Kotlin (kotlin)

Summary

Kotlin flows allow sequential data or state changes to be returned over time from asynchronous tasks. A flow consists of a producer that emits a sequence of values and consumers that collect and process those values. The flow stream can be manipulated between the producer and consumer by applying one or more intermediary operators, including transformations and filtering. Flows are created based on the Flow, StateFlow, and SharedFlow types. A Flow-based stream can only have a single collector, while StateFlow and SharedFlow can have multiple collectors.

Flows are categorized as being hot or cold. A cold flow does not begin emitting values until a consumer begins collection. On the other hand, hot flows begin emitting values as soon as they are created, regardless of whether or not the values are being collected. In the case of SharedFlow, a predefined number of values may be buffered and replayed to new subscribers when they begin collecting values.