Parallel operations on Kotlin collections?

kotlin parallel coroutines
kotlin coroutines
kotlin async
java parallel stream
kotlin stream vs collection
kotlin concurrency
kotlin coroutine performance
kotlin flow

In Scala, one can easily do a parallel map, forEach, etc, with:

collection.par.map(..)

Is there an equivalent in Kotlin?

The Kotlin standard library has no support for parallel operations. However, since Kotlin uses the standard Java collection classes, you can use the Java 8 stream API to perform parallel operations on Kotlin collections as well.

e.g.

myCollection.parallelStream()
        .map { ... }
        .filter { ... }

Parallel Map in Kotlin, Parallel Map in Kotlin written May 4, 2018 in collections, coroutines, kotlin, parallel Ever wonder how to run map in parallel using coroutines? … Collection operations are declared in the standard library in two ways: member functions of collection interfaces and extension functions. Member functions define operations that are essential for a collection type. For example, Collection contains the function isEmpty() for checking its emptiness; List contains get() for index access to elements, and so on.

As of Kotlin 1.1, parallel operations can also be expressed quite elegantly in terms of coroutines. Here is pmap on lists:

fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
    map { async(CommonPool) { f(it) } }.map { it.await() }
}

Note that coroutines are still an experimental feature.

Parallel Map in Java (From Kotlin), Parallel Map in Java (From Kotlin) written May 7, 2018 in collections, a parallel map operation would look like using Java's parallelStream . Collections and sequences in Kotlin. Collection operations use inline functions, so the bytecode of the operation, together with the bytecode of the lambda passed to it will be inlined.

There is no official support in Kotlin's stdlib yet, but you could define an extension function to mimic par.map:

fun <T, R> Iterable<T>.pmap(
          numThreads: Int = Runtime.getRuntime().availableProcessors() - 2, 
          exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
          transform: (T) -> R): List<R> {

    // default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
    val defaultSize = if (this is Collection<*>) this.size else 10
    val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))

    for (item in this) {
        exec.submit { destination.add(transform(item)) }
    }

    exec.shutdown()
    exec.awaitTermination(1, TimeUnit.DAYS)

    return ArrayList<R>(destination)
}

(github source)

Here's a simple usage example

val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }

If needed it allows to tweak threading by providing the number of threads or even a specific java.util.concurrent.Executor. E.g.

listOf("foo", "bar").pmap(4, transform = { it + "!" })

Please note, that this approach just allows to parallelize the map operation and does not affect any downstream bits. E.g. the filter in the first example would run single-threaded. However, in many cases just the data transformation (ie. map) requires parallelization. Furthermore, it would be straightforward to extend the approach from above to other elements of Kotlin collection API.

cvb941/kotlin-parallel-operations: Provides parallel map , Parallel coroutine operations on Kotlin collections. Provides parallelized map, reduce, etc. operations using coroutines in Kotlin. The parallel map  Java streams can be started in parallel mode using a parallel function. This can give us huge performance improvement in contexts when we have a machine with multiple cores (what is a standard today). Kotlin sequences can be used in common modules, Kotlin/JS and Kotlin/Native modules. Except that,

From 1.2 version, kotlin added a stream feature which is compliant with JRE8

So, iterating over a list asynchronously could be done like bellow:

fun main(args: Array<String>) {
  val c = listOf("toto", "tata", "tutu")
  c.parallelStream().forEach { println(it) }
}

Parallel operations on Kotlin collections?, However, since Kotlin uses the standard Java collection classes, you can use the Java 8 stream API to perform parallel operations on Kotlin collections as well. [/kotlin] 18.4 Elements operations contains. Returns true if the element is found in the collection. [kotlin] assertTrue(list.contains(2)) [/kotlin] elementAt. Returns an element at the given index or throws an IndexOutOfBoundsException if the index is out of bounds of this collection. [kotlin] assertEquals(2, list.elementAt(1)) [/kotlin

cvb941 / kotlin-parallel-operations Download, Parallel coroutine operations on Kotlin collections. Provides parallelized map, reduce, etc. operations using coroutines in Kotlin. The parallel map  Collection Write Operations Mutable collections support operations for changing the collection contents, for example, adding or removing elements. On this page, we'll describe write operations available for all implementations of MutableCollection .

Effective Kotlin: Use Sequence for bigger collections with more than , Effective Kotlin: Use Sequence for bigger collections with more than one Sequence filter is an intermediate operation, so it doesn't do any toList()) ); Java streams can be started in parallel mode using a parallel function. Zipping transformation is building pairs from elements with the same positions in both collections. In the Kotlin standard library, this is done by the zip() extension function. When called on a collection or an array with another collection (array) as an argument, zip() returns the List of Pair objects. The elements of the receiver collection are the first elements in these pairs.

Concurrency, Instead, we suggest a collection of alternative approaches, allowing you to use Raw shared memory using C globals; Coroutines for blocking operations (not  Kotlin lets you manipulate collections independently of the exact type of objects stored in them. In other words, you add a String to a list of String s the same way as you would do with Int s or a user-defined class. So, the Kotlin Standard Library offers generic interfaces, classes, and functions for creating,

Asynchronous Flow, Each individual collection of a flow is performed sequentially unless special operators that operate on multiple flows are used. The collection works directly in the  Creates a Grouping source from a collection to be used later with one of group-and-fold operations using the specified keySelector function to extract a key from each element.

Comments
  • Some of the fastest parallel collections around are from GS-Collections: github.com/goldmansachs/gs-collections ... which you can use from Kotlin (as any Java collection framework can be used)
  • how can one use Java 8 stream API in Kotlin?
  • @LordScone The same way as you'd do it in Java. E.g.: myCollection.parallelStream().map { ... }. filter { ... }
  • With Kotlin 1.3 out, is this still the best answer? I noticed @OlivierTerrien's Stream answer below, but I'd prefer to stick with Kotlin Sequences and Iterables.
  • @BenjaminH Thanks; I have marked yole's answer as accepted, as it also refers to the stream API and posted before OlivierTerrien's answer.
  • Quite elegantly? On the contrary, the code is pretty hard to read I would say.
  • @DzmitryLazerka I think I see where you're coming from, but this exact code isn't the elegant bit. The use of this code is what's elegant. If the above method is placed somewhere, it can be used with just foo.pmap { v -> ... }. I think that's fairly elegant.
  • I don't see how "destination.add(transform(item))" is thread safe. What's to keep two threads from calling "destination.add" at the same time, thus breaking stuff since ArrayList.add() is not a thread safe operation?
  • Thanks for the hint. Quite some people think that when just adding elements it should be fine without synchronization. However, I've changed it to use a synchronized list to improve thread-safety.
  • The order in destination may not be the same as in the original list
  • I think many parallel collection implementations (like in scala) do not care about preserving order. Though, by changing the for-each loop above to an indexed loop along with downstream resorting, order could be preserved easily.
  • I'm interested in a version that returns a Sequence<R> (or Flow<R>). Unfortunately I can't simply have the whole code execute in an = execute{ block and call yield instead of destination.add because yield can only execute in the original block, so within exec.submit { } is not an option. (Order need not be preserved.)