Flow

Flow

 
 
 

What is Flow?

  • A stream of data that can be computed asynchronously is conceptually referred to as a Flow.
  • It is constructed using Coroutines. An appropriate Kotlin type for modeling data streams is Flow.
  • Flow, like LiveData and RxJava streams, allows you to implement the observer pattern: a software design pattern consisting of an object (source) that keeps a list of its dependents, called observers (collectors) and automatically notifies them of any state changes.
  • A Flow uses suspended functions to consume and produce in an asynchronous manner.

Why do we need Flow?

After Coroutines were introduced, people started enjoying them due to their simplicity and structured concurrency.
Coroutines, combined with the growing usage of Kotlin, led people to express their interest in having a pure Kotlin implementation of RxJava to leverage the power of Kotlin like Type Systems, Coroutines, etc. When these are combined, they form Flow.
We can say Flow takes advantage of LiveData and RxJava.

Working of Flow: Entities involved in Flow

notion image
There are three entities involved in flow:
  1. producer produces data that is added to the stream.
2. Intermediaries(Optional) can modify each value emitted into the stream or the stream itself without consuming the values.
3. A consumer consumes the values from the stream.

Creating a Flow and consuming values

To create a Flow, first you need to create a producer. The standard library provides you with several ways to create a flow, the easiest way is to use the flow operator:
val numbersFlow: Flow<Int> = flow { repeat(60) { it -> emit(it+1) //Emits the result of the request to the flow delay(1000) //Suspends the coroutine for some time } }
To collect flow, first you will launch a Coroutine because flow operates on Coroutines under the hood. The collect operator is used to collect the values emitted by it.
lifecycleScope.launch { viewModel.numbersFlow.collect { it -> binding.textTimer.text = it.toString() } }
  • Example
val job = GlobalScope.launch { val data = producer() // terminal method needed to start flow to get data // operator method can not start flow data.collect { Log.e(TAG, "$it") } } // there is no specific method to stop stream data from flow // but flow stop emitting data if no consumer available // and consumer works in coroutine scope so if we kill coroutine scope then no consumer left alive so flow will also stop emitting data. GlobalScope.launch { delay(3500) job.cancel() } // example to understand flow fun producer() = flow<Int> { val list = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) list.forEach { delay(1000) emit(it) } }
 
  • Use case
    • When we need to update ui based on data
    • we can update state like from onStart() we can show loader on screen to indicate loading then collect{..} will get Data and update ui to show result
GlobalScope.launch { val data = producer() data .onStart { // emit(-1)can emit data Log.e(TAG, "on Start") } .onCompletion { // emit(11) can emit data Log.e(TAG, "on Completed") } .onEach { // can get current data but can not emit Log.e(TAG, "about to emit $it") } .collect { // can get data but can not emit Log.e(TAG, "$it") } }

Types of Flow

  • Cold Flow — e.g. flow
    • It does not start producing values until one starts to collect them. It can have only one subscriber.
  • Hot Flow — It will produce values even if no one is collecting them.e.g. StateFlow, SharedFlow

Commonly used operators and their types

  • Terminal Operators —
    • These complete normally or exceptionally depending on the successful or failed execution of all the flow operations upstream.
    • The most basic terminal operator is collect.
    • terminal function are suspend function
data.first() // we get only first data and return data.toList() // get all value and convert to list then return
  • Intermediate Operators —
    •  These are map, filter, take, zip, etc. They only set up a chain of operations for future execution and quickly return.
notion image
Filter Operator Returns a flow containing only values of the original flow that match the given predicate.
notion image
Map Operator Returns a flow containing the results of applying the given transform function to each value of the original flow.
notion image
withIndex Operator Returns a flow that wraps each element into IndexedValue, containing value and its index (starting from zero).
notion image
lifecycleScope.launch { viewModel.numbersFlow .map { it -> it * it }.filter { it -> it % 2 == 0 }.catch { exception -> handleException(exception) }.collect { it -> binding.tvFlow.text = it.toString() } }
Buffer Operator can be used when large data is provided but it take times to consume each data so buffer helps by storing some data and generating response faster.
data.buffer(3) // stores 3 values and when it availabe for cosuming .collect { // then it provides values from stored buffer first delay(1500) Log.e(TAG, "$it") }
flowOn Operator
  • normally flow will produce data on same coroutine thread on which collect{..} or Any Terminal functional called.
  • so provider and consumer share same coroutine scope.
  • but if try to run both on different thread then it will throw error
  • so that why flowOn() method used it takes dispatcher to and run all task , on that thread and after that it will switch context again to parent thread after it executed in functional chaining.
  • all method before flowOn() method executed on different coroutine scope and can be change multiple times for different methods and after executed it will change to parent scope to execute remaining task on it.
  • Example:
    • Here only collect will be done on Main thread and other process are done on IO Thread.
GlobalScope.launch(Dispatchers.Main) { val data = producer() data.map { it -> it * it Log.e(TAG, "Thread -> ${Thread.currentThread().name}") }.filter { it -> Log.e(TAG, "Thread -> ${Thread.currentThread().name}") it % 2 == 0 }.flowOn(Dispatchers.IO) .collect { Log.e(TAG, "Thread -> ${Thread.currentThread().name}") Log.e(TAG, "$it") } } fun producer() = flow<Int> { val list = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) list.forEach { delay(1000) Log.e(TAG, "Thread -> ${Thread.currentThread().name}") emit(it) } }

Exception Handling in Flow

Flow collection can complete with an exception if it’s thrown inside the building block or any of the operators.
The correct way to encapsulate exception handling in an emitter is to use the catch operator.
Flows must be transparent to exceptions, and it is a violation of the exception transparency principle to emit values in a flow builder from inside of a try/catch block. That is why try/catch is not recommended.
  • Example:
try { val data = producer() data .collect { Log.e(TAG, "$it") } } catch (e: Exception) { Log.e(TAG, "Consumer Exception ${e.message}") } fun producer() = flow<Int> { val list = listOf(1, 2, 3, 4, 5) list.forEach { delay(1000) emit(it) } }.catch { Log.e(TAG, "producer Exception ${it.message}") emit(-1) // if exception occur send callback element }

Comparison of LiveData and Flow

notion image

StateFlow and SharedFlow

With StateFlow and SharedFlow, flows can efficiently update states and values to multiple consumers.
StateFlow:
  • It is a hot flow. Its active instance exists independently of the presence of collectors.
  • It needs an initial value.
  • We can create its variable like :val stateFlow = MutableStateFlow(0)
  • The only value that is emitted is the last known value.
  • The value property allows us to check the current value.
  • It does not emit consecutive repeated values. When the value differs from the previous item, it emits the value.
class CounterModel { private val counter = MutableStateFlow(0) // private mutable state flow val counterValue = counter.asStateFlow() // public read-only state flow fun incrementAtomically() { counter.update { count -> { count + 1 } } fun incrementCounter() { val count = counter.value counterValue.value = count + 1 } }
SharedFlow:
  • By default, it does not emit any value since it does not need an initial value.
  • We can create its variable like :val sharedFlow = MutableSharedFlow<Int>()
  • With the replay operator, it is possible to emit many previous values at once.
  • It does not have a value property.
  • The emitter emits all the values without caring about the distinct differences from the previous item. It emits consecutive repeated values also.
  • It is useful for broadcasting events that happen inside an application to subscribers that can come and go.
GlobalScope.launch(Dispatchers.Main) { val result = producer() result.collect { Log.e(TAG, "Flow 1 Item- $it") } } GlobalScope.launch(Dispatchers.Main) { val result = producer() delay(2500) result.collect { Log.e(TAG, "Flow 2 Item- $it") } } private fun producer(): Flow<Int> { val mutableSharedFlow = MutableSharedFlow<Int>() GlobalScope.launch { val list = listOf(1, 2, 3, 4, 5) list.forEach { mutableSharedFlow.emit(it) delay(1000) } } return mutableSharedFlow }
class EventBus { private val events = MutableSharedFlow<Event>() // private mutable shared flow val eventsValue = events.asSharedFlow() // public read-only shared flow suspend fun produceEvent(event: Event) { events.emit(event) // suspends until all subscribers receive it } }

stateInand shareIn

The shareIn and stateIn operators convert cold flows into hot flows.
The shareIn operator returns a SharedFlow instance whereas stateIn returns a StateFlow.
stateIn contains 3 parameters scope, started and initialValue.
  • scope = the coroutine scope to define.
  • started = SharingStarted strategy:— Eagerly: Sharing is started immediately and never stops.— Lazily: Sharing is started when the first subscriber appears and never stops.— WhileSubscribedSharing is started when the first subscriber appears, immediately stops when the last subscriber disappears (by default), keeping the replay cache forever (by default).
  • initialValue = initial value.
val stateFlow: StateFlow<SomeState> = someFlow .stateIn( scope = viewModelScope, started = SharingStarted.WhileSubscribed(5000), initialValue = someInitialValue, )
shareIn contains the same three parameters as stateIn, but instead of initialValue, it has a replay parameter.
  • replay = how many times to emit the value?
val sharedFlow: SharedFlow<SomeState> = someFlow .shareIn( scope = viewModelScope, started = SharingStarted.WhileSubscribed(5000), replay = 1, )

callbackFlow

callbackFlow is a flow builder that lets you convert callback-based API into flows.
The resulting flow is cold, which means that [block] is called every time a terminal operator is applied to the resulting flow.
The following example uses location callback of fusedLocationClient to convert it into callbackFlow.
// Get location updates of fusedLocationClient with callbackFlow val locationUpdatesFlow = callbackFlow<Location> { val callback = object : LocationCallback() { override fun onLocationResult(result: LocationResult?) { result ?: return // Send the new location trySend(result.lastLocation) } } // Request location updates fusedLocationClient.requestLocationUpdates( locationRequest, callback, Looper.getMainLooper() ).addOnFailureListener { e -> close(e) // in case of exception, close the Flow } awaitClose { // Reomve location updates when Flow collection ends fusedLocationClient.removeLocationUpdates(callback) } }

Collecting Flow from View (i.e. Activity or Fragment)

// For Fragment use viewLifecycleOwner.lifecycleScope lifecycleScope.launch { // For Fragment use viewLifecycleOwner.repeatOnLifecycle repeatOnLifecycle(LifeCycle.State.STARTED) { viewModel.flow.collect { // do something with values } // However, here if we start collecting flow then it will never going to be collected // Therefore, to overcome this we should use launch { } launch { viewModel.userDetails.collect { // do something with user details } } } }
To collect Flow in activity, we use lifecycleScope.launch.
In that block, we need to call repeatOnLifecycle(LifeCycle.State.Started) to collect flow safely when the lifecycle state is in started state.
repeatOnLifecycle establishes a suspending point that executes the block anytime the lifecycle enters the specified state and cancels it when it falls below it.
It requires a Lifecycle.State as a parameter. When the lifecycle reaches that state, it immediately creates and launches a new Coroutine with the block supplied to it, and it cancels the ongoing Coroutine that is running the block when the lifecycle falls below that state.

Collecting Flow in Compose

val someFlow by viewModel.flow.collectAsStateWithLifecycle()
To collect flow in compose, we use collectAsStateWithLifecycle.
Compose provides the collectAsStateWithLifecycle function, which collects values from a flow and gives the latest value to be used wherever needed. When a new flow value is emitted, we get the updated value, and re-composition takes place to update the state of the value.
It uses LifeCycle.State.Started by default to start collecting values when the lifecycle is in the specified state and stops when it falls below it.

Some useful points

  • Flow can be used with Data Binding from Android Studio Arctic Fox | 2020.3.1 onwards.
  • Room, DataStore, Paging3 and other various libraries provide support for Flow.
  • Flow is extremely appropriate for data updates. For example, you can use flow with Room to be notified of changes in your database.
  • For one-shot operations, LiveData is sufficient.