Kotlin Flow Code Examples

The earlier chapter titled “An Introduction to Kotlin Coroutines” taught us about Kotlin Coroutines and explained how they can be used to perform multiple tasks concurrently without blocking the main thread. As we have seen, coroutine suspend functions are ideal for performing tasks that return a single result value. In this chapter, we will introduce Kotlin Flows and explore how these can be used to return sequential streams of results from coroutine-based tasks.

By the end of the chapter, you should have a good understanding of the Flow, StateFlow, and SharedFlow Kotlin types, and appreciate the difference between hot and cold flow streams. In the next chapter (An Android SharedFlow Tutorial), we will look more closely at using SharedFlow within the context of Android app development

Understanding Kotlin Flows

Flows are a part of the Kotlin programming language and are designed to allow multiple values to be returned sequentially from coroutine-based asynchronous tasks. A stream of data arriving over time via a network connection would, for example, be an ideal situation for using a Kotlin flow.

Flows are comprised of producers, intermediaries, and consumers. Producers are responsible for providing the data that makes up the flow. The code that retrieves the stream of data from our hypothetical network connection, for example, would be considered a producer. As each data value becomes available, the producer emits that value to the flow. The consumer sits at the opposite end of the flow stream and collects the values as they are emitted by the producer.

Intermediaries may be placed between the producer and consumer to perform additional operations on the data such as filtering the stream, performing additional processing, or transforming the data in other ways before it reaches the consumer. Figure 69-1 illustrates the typical structure of a Kotlin flow:

Figure 69-1

The flow shown in the above diagram consists of a single producer and consumer. In practice, it is possible both for multiple consumers to collect emissions from a single producer, and for a single consumer to collect data from multiple producers. The remainder of this chapter will demonstrate many of the key features of Kotlin flows.

Creating the Sample Kotlin Flow Project

Select the New Project option from the Android Studio welcome screen and, within the resulting new project dialog, choose the Empty Activity template before clicking on the Next button.

Enter FlowDemo into the Name field and specify com.ebookfrenzy.flowdemo as the package name. Before clicking on the Finish button, change the Minimum API level setting to API 26: Android 8.0 (Oreo) and the Language menu to Kotlin.

Once the new project has been created, locate and load the activity_main.xml layout file located in the Project tool window under app -> res -> layout and, with the Layout Editor tool in Design mode, replace the TextView object with a Button view and set the text property so that it reads “Start”. Once the text value has been set, follow the usual steps to extract the string to a resource.

With the button still selected in the layout, locate the onClick property in the Attributes panel and configure it to call a method named handleFlow.

Adding the Kotlin Lifecycle Library

Kotlin flow requires that the Kotlin extensions lifecycle library is included as a dependency, so edit the build.gradle (Module: FlowDemo.app) file and add the library to the dependencies section as follows:

dependencies {
.
.
    implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.4.1"
.
.
}Code language: Gradle (gradle)

When prompted, click on the Sync Now button at the top of the editor panel to commit to the change.

Declaring a Flow

The most basic form of flow is represented by the Kotlin Flow type. Each flow is only able to emit data of a single type which must be specified when the flow is declared. The following declaration, for example, declares a Flow instance designed to stream String-based data:

Flow<String>Code language: Kotlin (kotlin)

When declaring a flow, we need to assign to it the code that will generate the data stream. This code is referred to as the producer block. This can be achieved using the flow builder which takes as a parameter a coroutine suspend block containing the producer block code. Add the following code to the MainActivity.kt file to declare a flow named myFlow designed to emit a stream of integer values:

.
.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
.
.
fun myFlow(): Flow = flow {
    // Producer block  
}Code language: Kotlin (kotlin)

As an alternative to the flow builder, the flowOf() builder can be used to convert a fixed set of values into a flow:

val myFlow2 = flowOf(2, 4, 6, 8)Code language: Kotlin (kotlin)

Also, many Kotlin collection types now include an asFlow() extension function that can be called to convert the contained data to a flow. The following code, for example, converts an array of string values to a flow:

val myArrayFlow = arrayOf("Red", "Green", "Blue").asFlow()Code language: Kotlin (kotlin)

Emitting Flow Data

Once a flow has been built, the next step is to make sure the data is emitted so that it reaches any consumers that are observing the flow. Of the three flow builders we looked at in the previous section, only the flowOf() and asFlow() builders create flows that automatically emit the data as soon as a consumer starts collecting. In the case of the flow builder, however, we need to write code to manually emit each value as it becomes available. We achieve this by making calls to the emit() function and passing through as an argument the current value to be streamed. The following changes to our myFlow declaration implement a loop that emits the value of an incrementing counter. To demonstrate the asynchronous nature of flow streams, a three-second delay is performed on each loop iteration:

fun myFlow(): Flow = flow {
    var counter = 1
    
    while (counter < 6) {
        emit(counter)
        counter++
        delay(2000)
    }
}Code language: Kotlin (kotlin)

Collecting Flow Data

The streaming data within a flow can be collected by a consumer by calling the collect() method on the flow instance. This will continue to collect data from the stream either until the stream ends or the lifecycle scope in which the collection is being performed is destroyed. For example, we can collect the data from the myFlow stream and output each value by adding the handleFlow() onClick function:

.
.
import android.view.View
.
.
fun handleFlow(view: View) {
    lifecycleScope.launch {
        myFlow().collect() { value ->
            println("Collected value = $value")
        }
    }
}Code language: Kotlin (kotlin)

Note that collect() is a suspend function so must be called from within a coroutine scope.

Compile and run the app on a device or emulator, display the Logcat tool window and enter System.out into the search bar as shown in Figure 69-2. This will filter the log output to display only that generated by the println() statement:

Figure 69-2

When the Start button is clicked in the running app, the following output should appear with a two-second delay between each output:

Collected value = 1
Collected value = 2
Collected value = 3
Collected value = 4
Collected value = 5Code language: plaintext (plaintext)

To add code to be executed when the stream ends, the collection can be performed in a try/finally construct, for example:

fun handleFlow(view: View) {
    lifecycleScope.launch {
        try {
            myFlow().collect() { value ->
                println("Collected value = $value")
            }
        } finally {
            println("Flow stream ended.")
        }
    }
}Code language: Kotlin (kotlin)

The collect() operator will collect every value emitted by the producer, even if new values are emitted while the last value is still being processed in the consumer. For example, our producer is configured to emit a new value every two seconds. Suppose, however, that we simulate our consumer taking 2.5 seconds to process each collected value as follows:

fun handleFlow(view: View) {
    lifecycleScope.launch {
        myFlow().collect() { value ->
            println("Collected value = $value")
            delay(2500)
        }
    }
}Code language: Kotlin (kotlin)

When executed, we will still see all of the values listed in the output because collect() does not discard any uncollected values regardless of whether more recent values have been emitted since the last collection. This type of behavior is essential to avoid data loss within the flow. In some situations, however, the consumer may be uninterested in any intermediate values emitted between the most recently processed value and the latest emitted value. In this case, the collectLatest() operator can be called on the flow instance. This operator works by canceling the current collection if a new value arrives before processing completes on the previous value and restarts the process on the latest value.

The conflate() operator is similar to the collectLatest() operator except that instead of canceling the current collection operation when a new value arrives, conflate() allows the current operation to complete, but discards intermediate values that arrive during this process. When the current operation completes, the most recent value is then collected.

Another collection operator is the single() operator. This operator collects a single value from the flow and throws an exception if it finds another value in the stream. This operator is generally only useful where the appearance of a second stream value indicates that something else has gone wrong somewhere in the app or data source.

Adding a Flow Buffer

When a consumer takes time to process the values emitted by a producer, there is the potential for execution time inefficiencies to occur. Suppose, for example, that in addition to the two-second delay between each emission from our myFlow producer, the collection process in our consumer takes an additional second to complete. We can simulate this behavior as follows:

.
.
import kotlin.system.measureTimeMillis
.
.
fun handleFlow(view: View) {
    lifecycleScope.launch {
        val elapsedTime = measureTimeMillis {
            myFlow()
                .collect() { value ->
                    println("Collected value = $value")
                    delay(1000)
                }
        }
        println("Duration = $elapsedTime")
    }
}Code language: Kotlin (kotlin)

To allow us to measure the total time to fully process the flow, the consumer code has been placed in the closure of a call to the Kotlin measureTimeMillis() function. After execution completes, a duration similar to the following will be reported:

Duration = 15024Code language: plaintext (plaintext)

This accounts for approximately ten seconds to process the five values within myFlow and an additional five seconds for those values to be collected. There is an inefficiency here because the producer is waiting for the consumer to process each value before starting on the next value. This would be much more efficient if the producer did not have to wait for the consumer. We could, of course, use the collectLatest() or conflate() operators, but only if the loss of intermediate values is not a concern. To speed up the processing while also collecting every emitted value we can make use of the buffer() operator. This operator buffers values as they are emitted and passes them to the consumer when it is ready to receive them. This allows the producer to continue emitting values while the consumer is processing preceding values while also ensuring that every emitted value is collected. The buffer() operator may be applied to a flow as follows:

val elapsedTime = measureTimeMillis {
    myFlow()
        .buffer()
        .collect() { value ->
        println("Collected value = $value")
        delay(1000)
    }
}
println("Duration = $elapsedTime")Code language: Kotlin (kotlin)

Execution of the above code indicates that we have now reclaimed the five seconds previously lost in the collection code:

Duration = 10323Code language: plaintext (plaintext)

Transforming Data with Intermediaries

All of the examples we have looked at so far in this chapter have passed the data values to the consumer without any modifications. Changes to the data can be made between the producer and consumer by applying one or more intermediate flow operators. In this section, we will look at some of these operators.

The map() operator can be used to convert the value to some other value. We can use map(), for example, to convert our integer value to a string:

fun handleFlow(view: View) {
    lifecycleScope.launch {
        myFlow()
            .map {
                "Collected value = $it"
            }
            .collect() {
                println(it)
            }
        }
    }
}Code language: Kotlin (kotlin)

When executed, this will give us the following output:

Collected value = 1
Collected value = 2
Collected value = 3
Collected value = 4
Collected value = 5Code language: plaintext (plaintext)

The map() operator will perform the conversion on every collected value. The filter() operator can be used to control which values get collected. The filter code block needs to contain an expression that returns a Boolean value. Only if the expression evaluates to true does the value pass through to the collection. The following code filters odd numbers out of the data flow (note that we’ve left the map() operator in place to demonstrate the chaining of operators):

fun handleFlow(view: View) {
    lifecycleScope.launch {
        myFlow()
            .filter {
                it % 2 == 0
            }
            .map {
                "Collected value $it"
            }
            .collect() {
                println(it)
            }
        }
    }
}Code language: Kotlin (kotlin)

The above changes will generate the following output:

Collected value = 2
Collected value = 4Code language: plaintext (plaintext)

The transform() operator serves a similar purpose to map() but provides more flexibility. The transform() operator also needs to manually emit the modified result. A particular advantage of transform() is that it can emit multiple values as demonstrated below:

.
.
myFlow()
    .transform {
        emit("Value = $it")
        var doubled = it * 2
        emit("Value doubled = $doubled")
    }
    .collect {
        println(it)
    }
}
.
.
// Output
Value = 1
Value doubled = 2
Value = 2
Value doubled = 4
Value = 3
Value doubled = 6
Value = 4
Value doubled = 8
Value = 5
Value doubled = 10Code language: Kotlin (kotlin)

Terminal Flow Operators

All of the collection operators covered previously are referred to as terminal flow operators. The reduce() operator is one of several other terminal flow operators that can be used in place of a collection operator to make changes to the flow data. The reduce() operator takes two parameters in the form of an accumulator and a value. The first flow value is placed in the accumulator and a specified operation is performed between the accumulator and the current value (with the result stored in the accumulator):

.
.
myFlow()
    .reduce { accumulator, value ->
        println("accumulator = $accumulator, value = $value")
        accumulator + value
    }
}
.
.
// Output
accumulator = 1, value = 2
accumulator = 3, value = 3
accumulator = 6, value = 4
accumulator = 10, value = 5Code language: Kotlin (kotlin)

The fold() operator works similarly to the reduce() operator, with the exception that it is passed an initial accumulator value:

.
.
myFlow()
    .fold(10) { accumulator, value ->
        println("accumulator = $accumulator, value = $value")
        accumulator * value
    }
}
.
.
// Output
accumulator = 10, value = 1
accumulator = 10, value = 2
accumulator = 20, value = 3
accumulator = 60, value = 4
accumulator = 240, value = 5Code language: Kotlin (kotlin)

Flow Flattening

As we have seen in earlier examples, we can use operators to perform tasks on values collected from a flow. An interesting situation occurs, however, when that task itself creates one or more flows resulting in a “flow of flows”. In such situations, these streams can be flattened into a single stream. Consider the following example code which declares two flows:

fun myFlow(): Flow = flow {
    for (i in 1..5) {
        emit(i)
    }
}
 
fun doubleIt(value: Int) = flow {
    emit(value)
    delay(1000)
    emit(value + value)
}Code language: Kotlin (kotlin)

If we were to call doubleIt() for each value in the myFlow stream we would end up with a separate flow for each value. This problem can be solved by concatenating the doubleIt() streams into a single flow using the flatMapConcat() operator as follows:

.
.
myFlow()
    .flatMapConcat { doubleIt(it) }
    .collect { println(it) }
.
.Code language: Kotlin (kotlin)

When this modified code executes we will see the following output from the collect() operator:

1
2
2
4
3
6
4
8
5
10Code language: plaintext (plaintext)

As we can see from the output, the doubleIt() flow has emitted the value provided by myFlow followed by the doubled value. When using the flatMapConcat() operator, the doubleIt() calls are being performed synchronously, causing execution to wait until doubleIt() has emitted both values before processing the next flow value. The emitted values can instead be collected asynchronously using the flatMapMerge() operator as follows:

myFlow()
    .flatMapMerge { doubleIt(it) }
    .collect { println(it) }
}Code language: Kotlin (kotlin)

When executed, the following output will appear:

1
2
3
4
5
2
4
6
8
10Code language: plaintext (plaintext)

Combining Multiple Flows

Multiple flows can be combined into a single flow using the zip() and combine() operators. The following code demonstrates the zip() operator being used to convert two flows into a single flow:

fun handleFlow(view: View) {
    lifecycleScope.launch {
        val flow1 = (1..5).asFlow()
            .onEach { delay(1000) }
        val flow2 = flowOf("one", "two", "three", "four")
            .onEach { delay(1500) }
        flow1.zip(flow2) { value, string -> "$value, $string" }
            .collect { println(it) }
    }
}
// Output
1, one
2, two
3, three
4, fourCode language: Kotlin (kotlin)

Note that we have applied the onEach() operator to both flows in the above code. This is a useful operator for performing a task on receipt of each stream value.

The zip() operator will wait until both flows have emitted a new value before performing the collection. The combine() operator works slightly differently in that it proceeds as soon as either flow emits a new value, using the last value emitted by the other flow in the absence of a new value:

.
.
val flow1 = (1..5).asFlow()
    .onEach { delay(1000) }
val flow2 = flowOf("one", "two", "three", "four")
    .onEach { delay(1500) }
flow1.combine(flow2) { value, string -> "$value, $string" }
    .collect { println(it) }
.
.
// Output
1, one
2, one
3, one
3, two
4, two
4, three
5, three
5, fourCode language: Kotlin (kotlin)

As we can see from the output, multiple instances have occurred where the last value has been reused on a flow because a new value was emitted on the other.

Hot and Cold Flows

So far in this chapter, we have looked exclusively at the Kotlin Flow type. Kotlin also provides additional types in the form of StateFlow and SharedFlow. Before exploring these, however, it is important to understand the concept of hot and cold flows.

A stream declared using the Flow type is referred to as a cold flow because the code within the producer does not begin executing until a consumer begins collecting values. StateFlow and SharedFlow, on the other hand, are referred to as hot flows because they begin emitting values immediately, regardless of whether any consumers are collecting the values.

Once a consumer begins collecting from a hot flow, it will receive the latest value emitted by the producer followed by any subsequent values. Unless steps are taken to implement caching, any previous values emitted before the collection starts will be lost.

Another important difference between Flow, StateFlow, and SharedFlow is that a Flow-based stream cannot have multiple collectors. Each Flow collector launches a new flow with its own independent data stream. With StateFlow and SharedFlow, on the other hand, multiple collectors share access to the same flow.

StateFlow

StateFlow, as the name suggests, is primarily used as a way to observe a change in state within an app such as the current setting of a counter, toggle button, or slider. Each StateFlow instance is used to store a single value that is likely to change over time and to notify all consumers when those changes occur. This enables you to write code that reacts to changes in state instead of code that has to continually check whether or not a state value has changed. StateFlow behaves the same way as LiveData with the exception that LiveData has lifecycle awareness and does not require an initial value (LiveData was covered previously beginning with the chapter titled “Modern Android App Architecture with Jetpack”).

To create a StateFlow stream, begin by creating an instance of MutableStateFlow, passing through a mandatory initial value. This is the variable that will be used to change the current state value from within the app code:

private val _stateFlow = MutableStateFlow(0)Code language: Kotlin (kotlin)

Next, call asStateFlow() on the MutableStateFlow instance to convert it into a StateFlow from which changes in state can be collected:

val stateFlow = _stateFlow.asStateFlow()Code language: Kotlin (kotlin)

Once created, any changes to the state are made via the value property of the mutable state instance. The following code, for example, increments the state value:

_stateFlow.value += 1Code language: Kotlin (kotlin)

Once the flow is active, the state can be consumed in the usual ways, though it is generally recommended to collect from StateFlow using the collectLatest() operator, for example:

stateFlow.collectLatest { println("Counter = $it") }Code language: Kotlin (kotlin)

To try out this example, make the following modifications to the MainActivity.kt file:

.
.
class MainActivity : AppCompatActivity() {
 
    private val _stateFlow = MutableStateFlow(0)
    val stateFlow = _stateFlow.asStateFlow()
 
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
 
        lifecycleScope.launch {
            stateFlow.collectLatest {
                println("Counter = $it")
            }
        }
    }
 
    fun handleFlow(view: View) {
        _stateFlow.value += 1
    }
}Code language: Kotlin (kotlin)

Run the app and verify that the Start button outputs the incremented counter value each time it is clicked.

SharedFlow

SharedFlow provides a more general-purpose streaming option than that offered by StateFlow. Some of the key differences between StateFlow and SharedFlow are as follows:

  • Consumers are generally referred to as subscribers.
  • An initial value is not provided when creating a SharedFlow instance.
  • SharedFlow allows values that were emitted prior to collection starting to be “replayed” to the collector.
  • SharedFlow emits values instead of using a value property.

SharedFlow instances are created using MutableSharedFlow as the backing property on which we call the asSharedFlow() to obtain a SharedFlow reference:

.
.
import kotlinx.coroutines.channels.BufferOverflow
.
.
class MainActivity : AppCompatActivity() {
 
private val _sharedFlow = MutableSharedFlow<int>(
        replay = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val sharedFlow = _sharedFlow.asSharedFlow()
.
.</int>Code language: Kotlin (kotlin)

As configured above, new flow subscribers will receive the last 10 values before receiving any new values. The above flow is also configured to discard the oldest value when more than 10 values are buffered. The full set of options for handling buffer overflows are as follows:

  • DROP_LATEST – The latest value is dropped when the buffer is full leaving the buffer unchanged as new values are processed.
  • DROP_OLDEST – Treats the buffer as a “last-in, first-out” stack where the oldest value is dropped to make room for a new value when the buffer is full.
  • SUSPEND – The flow is suspended when the buffer is full.

Values are emitted on a SharedFlow stream by calling the emit() method of the MutableSharedFlow instance:

fun handleFlow(view: View) {
 
    var counter = 1
 
    lifecycleScope.launch {
        while (counter < 6) {
            _sharedFlow.emit(counter)
            counter++
            delay(2000)
        }
    }
}Code language: Kotlin (kotlin)

Once the flow is active, subscribers can collect values using the usual techniques on the SharedFlow instance. For example, we can add the following collection code to the onCreate() method of our example project to output the flow values:

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)
 
    lifecycleScope.launch {
        sharedFlow.collect {
            println("$it")
        }
    }
}Code language: Kotlin (kotlin)

Also, the current number of subscribers to a SharedFlow stream can be obtained via the subscriptionCount property of the mutable instance:

val subCount = _sharedFlow.subscriptionCountCode language: Kotlin (kotlin)

Summary

Kotlin flows allow sequential data or state changes to be returned over time from asynchronous tasks. A flow consists of a producer that emits a sequence of sequential values and consumers that collect and process those values. The flow stream can be manipulated between the producer and consumer by applying one or more intermediary operators including transformations and filtering. Flows are created based on the Flow, StateFlow, and SharedFlow types. A Flow-based stream can only have a single collector while StateFlow and SharedFlow can have multiple collectors.

Flows are categorized as being hot or cold. A cold flow does not begin emitting values until a consumer begins collection. Hot flows, on the other hand, begin emitting values as soon as they are created, regardless of whether or not the values are being collected. In the case of SharedFlow, a predefined number of values may be buffered and subsequently replayed to new subscribers when they begin collecting values.


Categories