Merging kotlin flows

kotlin flow flatmap
flow merge
kotlin flow cancel
kotlin empty flow
kotlin flow zip
flow collect
kotlin flow foreach
kotlin flow catch

Given 2 or more flows with the same type, is there an existing Kotlin coroutine function to merge them, like the RX merge operator?

Currently I was considering this:

fun <T> merge(vararg flows: Flow<T>): Flow<T> = channelFlow {
    val flowJobs = flows.map { flow ->
        GlobalScope.launch { flow.collect { send(it) } }
    }
    flowJobs.joinAll()
}

but it seems somewhat clumsy.

I'm not too familiar with flows yet, so this might be suboptimal. Anyway, I think you could create a flow of all your input flows, and then use flattenMerge to flatten them into a single flow again. Something like this:

fun <T> merge(vararg flows: Flow<T>): Flow<T> = flowOf(*flows).flattenMerge()

Edit:

The merge-function was added to kotlinx-coroutines in the 1.3.3 release. See here: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/merge.html

How to Combine Kotlin Flows. Simplifying complex operations with , I'm not too familiar with flows yet, so this might be suboptimal. Anyway, I think you could create a flow of all your input flows, and then use� Kotlin flow is one of the latest and most powerful features of Coroutines. In this article, we’re going to learn how to combine Kotlin flows using merging operators. How do we combine emissions

This is now (Coroutines Version 1.3.5 at time of writing) part of the Coroutines library.

You use it like this:

val flowA = flow { emit(1) } 
val flowB = flow { emit(2) }

merge(flowA, flowB).collect{ println(it) } // Prints two integers
// or:
listOf(flowA, flowB).merge().collect { println(it) } // Prints two integers

You can read more in the source code

Merging kotlin flows, Library support for kotlin coroutines. onEach { delay(15) } flow.combine(flow2) { i, s -> i.toString() + s }.collect inline fun <T1, T2, T3, R> combine( flow:� I have 2 flows that i would like to merge like i used to do it in RxJava. In Rx-Java: Flowable.just(1).mergeWith(Flowable.just(2)).subscribe({ println(it)}) // result: 1, 2 How to replicate that in Kotlin Coroutines ? Thanks in advance.

It may be late but I believe this may be a viable solution:

fun <T> combineMerge(vararg flows: Flow<T>) = flow {
    coroutineScope {
        flows.forEach {
            launch {
                it.collect {
                    emit(it)
                }
            }
        }
    }
}

fun <T> combineConcat(vararg flows: Flow<T>) = flow {
    flows.forEach {
        it.collect {
            emit(it)
        }
    }
}

combine, The merge() operator is missing fun Flow .merge(vararg flows: Flow ): Flow = TODO() Use Case Representing clicks on a keypad: val key1:� In the diagram above, we have two Flows that are emitting items at different times. The first Flow is emitting numbers 1, 2 and 3 every second. The second Flow is emitting letters A, B, and C every 2 seconds. How do we combine these emissions?

Asynchronous Flow, Combine Flows and transform multiple asynchronous sources. Control concurrency with Flows . Learn how to choose between LiveData and Flow. What you'll� Library support for kotlin coroutines. Transforms elements emitted by the original flow by applying transform, that returns another flow, and then merging and flattening these flows.

Merge Flow operator is missing � Issue #1491 � Kotlin/kotlinx , KotlinCoroutines #ZipOperator #CombineOperator In this video, I have talked about Zip and Duration: 13:44 Posted: May 16, 2020 Kotlin Flow kotlinx.coroutines 1.3 introduced Flow, which is an important addition to the library which finally has support for cold streams. It’s (conceptually) a reactive streams implementation based on Kotlin’s suspending functions and channels API. Binding Android UI with Flow

Learn advanced coroutines with Kotlin Flow and LiveData, Kotlin Flow is a new asynchronous stream library from JetBrains, the the operators that are available in the Kotlin Flow API to combine flows,� A flow is an asynchronous version of a Sequence, a type of collection whose values are lazily produced. Just like a sequence, a flow produces each value on-demand whenever the value is needed, and flows can contain an infinite number of values. So, why did Kotlin introduce a new Flow type, and how

Comments
  • Looks like that works great! Not sure about how optimal it is, as you say, but for now it's perfect.
  • You should be aware that unlike the code provided in the question, this is be default limited to 16 flows running concurrently, see the concurrency parameter at flattenMerge.