Kotlin Flow with Jetpack Compose

The earlier chapter titled “Coroutines and Side Effects in Jetpack Compose” taught us about Kotlin Coroutines and explained how they can be used to perform multiple tasks concurrently without blocking the main thread. A shortcoming of suspend functions is that they are typically only useful for performing tasks that either do not return a result or only return a single value. In this chapter, we will introduce Kotlin Flows and explore how these can be used to return sequential streams of results from coroutine-based tasks.

By the end of the chapter, you should have a good understanding of the Flow, StateFlow, and SharedFlow Kotlin types, and appreciate the difference between hot and cold flow streams. In the next chapter (“A Jetpack Compose SharedFlow Tutorial”), we will look more closely at using SharedFlow within the context of an example Android app project.

Understanding Flows

Flows are a part of the Kotlin programming language and are designed to allow multiple values to be returned sequentially from coroutine-based asynchronous tasks. A stream of data arriving over time via a network connection would, for example, be an ideal situation for using a Kotlin flow.

Flows are comprised of producers, intermediaries, and consumers. Producers are responsible for providing the data that makes up the flow. The code that retrieves the stream of data from our hypothetical network connection, for example, would be considered a producer. As each data value becomes available, the producer emits that value to the flow. The consumer sits at the opposite end of the flow stream and collects the values as they are emitted by the producer.

Intermediaries may be placed between the producer and consumer to perform additional operations on the data such as filtering the stream, performing additional processing, or transforming the data in other ways before it reaches the consumer. Figure 1-1 illustrates the typical structure of a Kotlin flow:

Figure 1-1

The flow shown in the above diagram consists of a single producer and consumer. In practice, it is possible both for multiple consumers to collect emissions from a single producer, and for a single consumer to collect data from multiple producers.

The remainder of this chapter will demonstrate many of the key features of Kotlin flows within the context of Jetpack Compose-based development.

Creating the Sample Project

Launch Android Studio and create a new Empty Compose Activity project named FlowDemo, specifying com. example.flowdemo as the package name, and selecting a minimum API level of API 26: Android 8.0 (Oreo). Within the MainActivity.kt file, delete the Greeting function and add a new empty composable named ScreenSetup which, in turn, calls a function named MainScreen:

@Composable
fun ScreenSetup() {
    MainScreen()
}
 
@Composable
fun MainScreen() {
    
}

Edit the onCreate() method function to call ScreenSetup instead of Greeting (we will modify the DefaultPreview composable later).

Next, modify the build.gradle (Module: FlowDemo.app) file to add the Compose view model and Kotlin runtime extensions libraries to the dependencies section:

dependencies {
.
.
    implementation 'androidx.lifecycle:lifecycle-viewmodel-compose:2.4.1'
    implementation 'androidx.lifecycle:lifecycle-runtime-ktx:2.4.1'
.
.
}

When prompted, click on the Sync Now button at the top of the editor panel to commit the change.

Adding a view model to the project

For this project, the flow will reside in a view model class. Add this model to the project by locating and rightclicking on the app -> java -> com.example.flowdemo entry in the project tool window and selecting the New -> Kotlin Class/File menu option. In the resulting dialog, name the class DemoViewModel before tapping the keyboard Enter key. Once created, modify the file so that it reads as follows:

package com.example.flowdemo
 
import androidx.lifecycle.ViewModel
 
class DemoViewModel : ViewModel() {
}

Return to the MainActivity.kt file and make changes to access an instance of the view model:

.
.
import androidx.lifecycle.viewmodel.compose.viewModel
.
.
@Composable
fun ScreenSetup(viewModel: DemoViewModel = viewModel()) {
    MainScreen()
}

Declaring the flow

The most basic form of flow is represented by the Kotlin Flow type. Each flow is only able to emit data of a single type which must be specified when the flow is declared. The following declaration, for example, declares a Flow instance designed to stream String-based data:

Flow<String>

When declaring a flow, we need to assign to it the code that will generate the data stream. This code is referred to as the producer block. This can be achieved using the flow() builder which takes as a parameter a coroutine suspend block containing the producer block code. Add the following code to the DemoViewModel.kt file to declare a flow named myFlow designed to emit a stream of integer values:

package com.example.flowdemo
 
import androidx.lifecycle.ViewModel
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
 
class DemoViewModel : ViewModel() {
 
    val myFlow: Flow<Int> = flow {
        // Producer block
    }
}

As an alternative to the flow builder, the flowOf() builder can be used to convert a fixed set of values into a flow:

val myFlow2 = flowOf(2, 4, 6, 8)

Also, many Kotlin collection types now include an asFlow() extension function that can be called to convert the contained data to a flow. The following code, for example, converts an array of string values to a flow:

val myArrayFlow = arrayOf<String>("Red", "Green", "Blue").asFlow()

Emitting flow data

Once a flow has been built, the next step is to make sure the data is emitted so that it reaches any consumers that are observing the flow. Of the three flow builders we looked at in the previous section, only the flowOf() and asFlow() builders create flows that automatically emit the data as soon as a consumer starts collecting. In the case of the flow builder, however, we need to write code to manually emit each value as it becomes available. We achieve this by making calls to the emit() function and passing through as an argument the current value to be streamed. The following changes to our myFlow declaration implement a loop that emits the value of an incrementing counter. To demonstrate the asynchronous nature of flow streams, a 2-second delay is performed on each loop iteration:

val myFlow: Flow<Int> = flow {
    for (i in 0..9) {
        emit(i)
        delay(2000)
    }
}

Collecting flow data as state

As we will see later in the chapter, one way to collect data from a flow within a consumer is to call the collect() method on the flow instance. When working with Compose, however, a less flexible, but more convenient option is to convert the flow to state by calling the collectAsState() function on the flow instance. This allows us to treat the data just as we would any other state within our code. To see this in action, edit the MainActivity.kt file and make the following changes:

.
.
import androidx.compose.runtime.*
import kotlinx.coroutines.flow.*
.
.
@Composable
fun ScreenSetup(viewModel: DemoViewModel = viewModel()) {
    MainScreen(viewModel.myFlow)
}
 
@Composable
fun MainScreen(flow: Flow<Int>) {
   val count by flow.collectAsState(initial = 0)
}
.
.
@Preview(showBackground = true)
@Composable
fun DefaultPreview() {
    FlowDemoTheme {
        ScreenSetup(viewModel())
    }
}

The changes pass a myFlow reference to the MainScreen composable where it is converted to a State with an initial value of 0. Next, we need to design a simple user interface to display the count values as they are emitted to the flow:

.
.
import androidx.compose.foundation.layout.*
import androidx.compose.ui.Alignment
import androidx.compose.ui.text.TextStyle
import androidx.compose.ui.unit.sp
.
.
@Composable
fun MainScreen(myFlow: Flow<Int>) {
    val count by myFlow.collectAsState(initial = 0)

    Column(
        modifier = Modifier.fillMaxSize(),
        verticalArrangement = Arrangement.Center,
        horizontalAlignment = Alignment.CenterHorizontally
    ) {
        Text(text = "$count", style = TextStyle(fontSize = 40.sp))
    }
}

Try out the app either using the preview panel in interactive mode or by running it on a device or emulator. Once the app starts, the count value displayed on the Text component should increment as each new value is emitted by the flow.

Transforming data with intermediaries

In the previous example, we passed the data values to the consumer without any modifications. Changes to the data can be made between the producer and consumer by applying one or more intermediate flow operators. In this section, we will look at some of these operators.

The map() operator can be used to convert the value to some other value. We can use map(), for example, to convert our integer value to a string and add some additional text. Edit the DemoViewModel.kt file and create a modified version of our flow as follows:

.
.
class DemoViewModel : ViewModel() {
    
    val myFlow: Flow<Int> = flow {
        for (i in 0..9) {
            emit(i)
            delay(2000)
        }
    }
 
    val newFlow = myFlow.map {
        "Current value = $it"
    }
}

Before we can test this operator, some changes are needed within the MainActivity.kt file to use this new flow:

Composable
fun ScreenSetup(viewModel: DemoViewModel = viewModel()) {
    MainScreen(viewModel.newFlow)
}
 
@Composable
fun MainScreen(flow: Flow<String>) {
    val count by flow.collectAsState(initial = "Current value =")
.
.

When the code is executed, the text will display the text string updated with the count:

Current value = 1
Current value = 2
.
.

The map() operator will perform the conversion on every collected value. The filter() operator can be used to control which values get collected. The filter code block needs to contain an expression that returns a Boolean value. Only if the expression evaluates to true does the value pass through to the collection. The following code filters odd numbers out of the data flow (note that we’ve left the map() operator in place to demonstrate the chaining of operators):

val newFlow = myFlow
    .filter {
        it % 2 == 0
    }
    .map {
        "Current value = $it"
    }

The above changes will display count updates only for even numbers.

The transform() operator serves a similar purpose to map() but provides more flexibility. The transform() operator also needs to manually emit the modified result. A particular advantage of transform() is that it can emit multiple values, for example:

val newFlow = myFlow
    .transform {
        emit("Value = $it")
        delay(1000)
        val doubled = it * 2
        emit("Value doubled = $doubled")
    }
 
// Output
Value = 0
Value doubled = 0
Value = 1
Value doubled = 2
Value = 2
Value doubled = 4
Value = 3
.
.

Before moving to the next step, revert the newFlow declaration to its original form:

val newFlow = myFlow.map {
    "Current value = $it"
}

Collecting flow data

So far in this chapter we have used the collectAsState() function to convert a flow to a State instance. Behind the scenes, this method is using the collect() function to initiate the collection of data. Although collectAsState() works well most of the time, there will be situations where you may need to call collect(). In fact, collect() is just one of several so-called terminal flow operators that can be called directly to achieve results that aren’t possible using collectAsState().

These operators are suspend functions so can only be called from within a coroutine scope. In the chapter entitled “Coroutines and Side Effects in Jetpack Compose”, we looked at coroutines and explained how to use LaunchedEffect to execute asynchronous code safely from within a composable function. Once we have implemented the LaunchedEffect call, we still need the streamed values to be stored as state, so we also need a mutable state into which to store the latest value. Bringing these requirements together, modify the MainScreen function so that it reads as follows:

@Composable
fun MainScreen(flow: Flow<String>) {
    
    var count by remember { mutableStateOf<String>("Current value =")}
 
    LaunchedEffect(Unit) {
        flow.collect {
            count = it
        }
    }
 
    Column(
        modifier = Modifier.fillMaxSize(),
.
.

Test the app and verify that the text component updates as expected. Now that we are using the collect() function we can begin to explore some options that were not available to us when we were using collectAsState().

For example, to add code to be executed when the stream ends, the collection can be performed in a try/finally construct, for example:

LaunchedEffect(Unit) {
    try {
        flow.collect {
            count = it
        }
    } finally {
        count = "Flow stream ended."
    }
}

The collect() operator will collect every value emitted by the producer, even if new values are emitted while the last value is still being processed in the consumer. For example, our producer is configured to emit a new value every two seconds. Suppose, however, that we simulate our consumer taking 2.5 seconds to process each collected value. When executed, we will still see all of the values listed in the output because collect() does not discard any uncollected values regardless of whether more recent values have been emitted since the last collection. This type of behavior is essential to avoid data loss within the flow. In some situations, however, the consumer may be uninterested in any intermediate values emitted between the most recently processed value and the latest emitted value. In this case, the collectLatest() operator can be called on the flow instance. This operator works by canceling the current collection if a new value arrives before processing completes on the previous value and restarts the process on the latest value.

The conflate() operator is similar to the collectLatest() operator except that instead of canceling the current collection operation when a new value arrives, conflate() allows the current operation to complete, but discards intermediate values that arrive during this process. When the current operation completes, the most recent value is then collected.

Another collection operator is the single() operator. This operator collects a single value from the flow and throws an exception if it finds another value in the stream. This operator is useful where the appearance of a second stream value indicates that something else has gone wrong somewhere in the app or data source.

Adding a flow buffer

When a consumer takes time to process the values emitted by a producer, there is the potential for execution time inefficiencies to occur. Suppose, for example, that in addition to the two-second delay between each emission from our myFlow producer, the collection process in our consumer takes an additional second to complete. We can simulate this behavior as follows:

.
.
import kotlin.system.measureTimeMillis
import kotlinx.coroutines.delay
.
.
LaunchedEffect(Unit) {
 
    val elapsedTime = measureTimeMillis {
        flow.collect {
                count = it
                delay(1000)
        }
    }
    count = "Duration = $elapsedTime"
}

To allow us to measure the total time to fully process the flow, the consumer code has been placed in the closure of a call to the Kotlin measureTimeMillis() function. After execution completes, a duration similar to the following will be reported:

Duration = 30044

This accounts for approximately 20 seconds to process the 10 values within myFlow and an additional 10 seconds for those values to be collected. There is an inefficiency here because the producer is waiting for the consumer to process each value before starting on the next value. This would be much more efficient if the producer did not have to wait for the consumer. We could, of course, use the collectLatest() or conflate() operators, but only if the loss of intermediate values is not a concern. To speed up the processing while also collecting every emitted value we can make use of the buffer() operator. This operator buffers values as they are emitted and passes them to the consumer when it is ready to receive them. This allows the producer to continue emitting values while the consumer is processing preceding values while also ensuring that every emitted value is collected. The buffer() operator may be applied to a flow as follows:

LaunchedEffect("Unit") {

    val elapsedTime = measureTimeMillis {
        flow
            .buffer()
            .collect {
                count = it
                delay(1000)
        }
    }
    count = "Duration = $elapsedTime"
}

Execution of the above code indicates that we have now reclaimed the 10 seconds previously lost in the collection code:

Duration = 20052

More terminal flow operators

The reduce() operator is one of several other terminal flow operators that can be used in place of a collection operator to make changes to the flow data. The reduce() operator takes two parameters in the form of an accumulator and a value. The first flow value is placed in the accumulator and a specified operation is performed between the accumulator and the current value (with the result stored in the accumulator). To try this out we need to revert to using myFlow instead of newFlow in addition to adding the reduce() operator call:

@Composable
fun ScreenSetup(viewModel: DemoViewModel = viewModel()) {
    MainScreen(viewModel.myFlow)
}

@Composable
fun MainScreen(flow: Flow<Int>) {
    
    var count by remember { mutableStateOf<Int>(0)}


    LaunchedEffect(Unit) {
    
        flow
            .reduce { accumulator, value ->
                count = accumulator
                accumulator + value
            }
    }
.
.

The fold() operator works similarly to the reduce() operator, with the exception that it is passed an initial accumulator value:

.
.
LaunchedEffect(Unit) {

    flow
        .fold(10) { accumulator, value ->
            count = accumulator
            accumulator + value
        }

}
.
.

Flow flattening

As we have seen in earlier examples, we can use operators to perform tasks on values collected from a flow. An interesting situation occurs, however, when that task itself creates one or more flows resulting in a “flow of flows”. In situations where this occurs, these streams can be flattened into a single stream. Consider the following example code which declares two flows:

val myFlow: Flow<Int> = flow {
    for (i in 1..5) {
        delay(1000)
        emit(i)
    }
}

fun doubleIt(value: Int) = flow {
    emit(value)
    delay(1000)
    emit(value + value)
}

If we were to call doubleIt() for each value in the myFlow stream we would end up with a separate flow for each value. This problem can be solved by concatenating the doubleIt() streams into a single flow using the flatMapConcat() operator as follows:

@Composable
fun ScreenSetup(viewModel: DemoViewModel = viewModel()) {
    MainScreen(viewModel)
}

@Composable
fun MainScreen(viewModel: DemoViewModel) {

    var count by remember { mutableStateOf<Int>(0)}

    LaunchedEffect(Unit) {

        viewModel.myFlow
            .flatMapConcat { viewModel.doubleIt(it) }
            .collect { count = it }
    }
.
.

When this modified code executes we will see the following output from the collect() operator:

1
2
2
4
3
6
4
8
5
10

As we can see from the output, the doubleIt() flow has emitted the value provided by myFlow followed by the doubled value. When using the flatMapConcat() operator, the doubleIt() calls are being performed synchronously, causing execution to wait until doubleIt() has emitted both values before processing the next flow value. The emitted values can instead be collected asynchronously using the flatMapMerge() operator as follows:

viewModel.myFlow
    .flatMapMerge { viewModel.doubleIt(it) }
    .collect { count = it }
}

Combining multiple flows

Multiple flows can be combined into a single flow using the zip() and combine() operators. The following code demonstrates the zip() operator being used to convert two flows into a single flow:

var count by remember { mutableStateOf<String>("")}

LaunchedEffect(Unit) {

    val flow1 = (1..5).asFlow()
        .onEach { delay(1000) }
    val flow2 = flowOf("one", "two", "three", "four")
        .onEach { delay(1500) }
    flow1.zip(flow2) { value, string -> "$value, $string" }
        .collect { count = it }
}
// Output
1, one
2, two
3, three
4, four

Note that we have applied the onEach() operator to both flows in the above code. This is a useful operator for performing a task on receipt of each stream value.

The zip() operator will wait until both flows have emitted a new value before performing the collection. The combine() operator works slightly differently in that it proceeds as soon as either flow emits a new value, using the last value emitted by the other flow in the absence of a new value:

.
.
    val flow1 = (1..5).asFlow()
        .onEach { delay(1000) }
    val flow2 = flowOf("one", "two", "three", "four")
        .onEach { delay(1500) }
    flow1.combine(flow2) { value, string -> "$value, $string" }
        .collect { count = it }
.
.
// Output
1, one
2, one
3, one
3, two
4, two
4, three
5, three
5, four

As we can see from the output, multiple instances have occurred where the last value has been reused on a flow because a new value was emitted on the other.

Hot and cold flows

So far in this chapter, we have looked exclusively at the Kotlin Flow type. Kotlin also provides additional types in the form of StateFlow and SharedFlow. Before exploring these, however, it is important to understand the concept of hot and cold flows.

A stream declared using the Flow type is referred to as a cold flow because the code within the producer does not begin executing until a consumer begins collecting values. StateFlow and SharedFlow, on the other hand, are referred to as hot flows because they begin emitting values immediately, regardless of whether any consumers are collecting the values.

Once a consumer begins collecting from a hot flow, it will receive the latest value emitted by the producer followed by any subsequent values. Unless steps are taken to implement caching, any previous values emitted before the collection starts will be lost.

Another important difference between Flow, StateFlow, and SharedFlow is that a Flow-based stream cannot have multiple collectors. Each Flow collector launches a new flow with its own independent data stream. With StateFlow and SharedFlow, on the other hand, multiple collectors share access to the same flow.

StateFlow

StateFlow, as the name suggests, is primarily used as a way to observe a change in state within an app such as the current setting of a counter, toggle button, or slider. Each StateFlow instance is used to store a single value that is likely to change over time and to notify all consumers when those changes occur. This enables you to write code that reacts to changes in state instead of code that has to continually check whether or not a state value has changed. StateFlow behaves the same way as LiveData with the exception that LiveData has lifecycle awareness and does not require an initial value (LiveData was covered previously in the chapter titled “Working with ViewModels in Compose”).

To create a StateFlow stream, begin by creating an instance of MutableStateFlow, passing through a mandatory initial value. This is the variable that will be used to change the current state value from within the app code:

private val _stateFlow = MutableStateFlow(0)

Next, call asStateFlow() on the MutableStateFlow instance to convert it into a StateFlow from which changes in state can be collected:

val stateFlow = _stateFlow.asStateFlow()

Once created, any changes to the state are made via the value property of the mutable state instance. The following code, for example, increments the state value:

_stateFlow.value += 1

Once the flow is active, the state can be consumed using collectAsState() or directly using a collection function, though it is generally recommended to collect from StateFlow using the collectLatest() operator. To try out an example, begin by making the following modifications to the DemoViewModel.kt file:

.
.
class DemoViewModel : ViewModel() {

    private val _stateFlow = MutableStateFlow(0)
    val stateFlow = _stateFlow.asStateFlow()

    fun increaseValue() {
        _stateFlow.value += 1
    }
.
.

Next, edit the MainActivity.kt file and change MainScreen so that it collects from the new state flow and to add a button configured to call the view model increaseValue() function:

.
.
import androidx.compose.material3.Button
.
.
@Composable
fun MainScreen(viewModel: DemoViewModel) {

    val count by viewModel.stateFlow.collectAsState()

    Column(
        modifier = Modifier.fillMaxSize(),
        verticalArrangement = Arrangement.Center,
        horizontalAlignment = Alignment.CenterHorizontally
    ) {
        Text(text = "$count", style = TextStyle(fontSize = 40.sp))
        Button(onClick = { viewModel.increaseValue() }) {
            Text("Click Me")
        }
    }
}

Run the app and verify that the button updates the count Text component with the incremented count value each time it is clicked.

SharedFlow

SharedFlow provides a more general-purpose streaming option than that offered by StateFlow. Some of the key differences between StateFlow and SharedFlow are as follows:

  • Consumers are generally referred to as subscribers.
  • An initial value is not provided when creating a SharedFlow instance.
  • SharedFlow allows values that were emitted prior to collection starting to be “replayed” to the collector.
  • SharedFlow emits values instead of using a value property.

SharedFlow instances are created using MutableSharedFlow as the backing property on which we call the asSharedFlow() function to obtain a SharedFlow reference. For example, make the following changes to the DemoViewModel class to declare a shared flow:

.
.
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.channels.BufferOverflow
.
.
class DemoViewModel : ViewModel() {

    private val _sharedFlow = MutableSharedFlow<Int>(
        replay = 10,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )

    val sharedFlow = _sharedFlow.asSharedFlow()
.
.

As configured above, new flow subscribers will receive the last 10 values before receiving any new values. The above flow is also configured to discard the oldest value when more than 10 values are buffered. The full set of options for handling buffer overflows are as follows:

  • DROP_LATEST – The latest value is dropped when the buffer is full leaving the buffer unchanged as new values are processed.
  • DROP_OLDEST – Treats the buffer as a “last-in, first-out” stack where the oldest value is dropped to make room for a new value when the buffer is full.
  • SUSPEND – The flow is suspended when the buffer is full.

Values are emitted on a SharedFlow stream by calling the emit() method of the MutableSharedFlow instance from within a coroutine. Remaining in the DemoViewModel.kt file, add a new method that can be called from the main activity to start the shared flow:

fun startSharedFlow() {
    
    viewModelScope.launch {
        for (i in 1..5) {
            _sharedFlow.emit(i)
            delay(2000)
        }
    }
}

Finally, make the following changes to the MainScreen composable:

@Composable
fun MainScreen(viewModel: DemoViewModel) {

    val count by viewModel.sharedFlow.collectAsState(initial = 0)

    Column(
        modifier = Modifier.fillMaxSize(),
        verticalArrangement = Arrangement.Center,
        horizontalAlignment = Alignment.CenterHorizontally
    ) {
        Text(text = "$count", style = TextStyle(fontSize = 40.sp))
        Button(onClick = { viewModel.startSharedFlow() }) {
            Text("Click Me")
        }
    }
}

Run the app on a device or emulator (shared flow code does not always work in the interactive preview) and verify that clicking the button causes the count to begin updating. Note that since new values are being emitted from within a coroutine you can click on the button repeatedly and collect values from multiple flows.

One final point to note about shared flows is that the current number of subscribers to a SharedFlow stream can be obtained via the subscriptionCount property of the mutable instance:

val subCount = _sharedFlow.subscriptionCount

Converting a flow from cold to hot

A cold flow can be made hot by calling the shareIn() function on the flow. This call requires a coroutine scope in which to execute the flow, a replay value, and start policy setting indicating the conditions under which the flow is to start and stop. The available start policy options are as follows:

  • SharingStarted.WhileSubscribed() – The flow is kept alive as long as it has active subscribers.
  • SharingStarted.Eagerly() – The flow begins immediately and remains active even in the absence of active subscribers.
  • SharingStarted.Lazily() – The flow begins only after the first consumer subscribes and remains active even in the absence of active subscribers.

We could, for example, make one of our earlier cold flows hot using the following code:

val hotFlow = myFlow.shareIn(
    viewModelScope, 
    replay = 1, 
    started = SharingStarted.WhileSubscribed()
)

Summary

Kotlin flows allow sequential data or state changes to be returned over time from asynchronous tasks. A flow consists of a producer that emits a sequence of values and consumers that collect and process those values. The flow stream can be manipulated between the producer and consumer by applying one or more intermediary operators including transformations and filtering. Flows are created based on the Flow, StateFlow, and SharedFlow types. A Flow-based stream can only have a single collector while StateFlow and SharedFlow can have multiple collectors.

Flows are categorized as being hot or cold. A cold flow does not begin emitting values until a consumer begins collection. Hot flows, on the other hand, begin emitting values as soon as they are created, regardless of whether or not the values are being collected. In the case of SharedFlow, a predefined number of values may be buffered and subsequently replayed to new subscribers when they begin collecting values. A cold flow can be made hot via a call to the flow’s shareIn() function.