Сборник процессов Котлина в параллель?

У меня есть набор объектов, которые мне нужно выполнить. В настоящее время я использую:

var myObjects: List<MyObject> = getMyObjects() myObjects.forEach{ myObj -> someMethod(myObj) } 

Он отлично работает, но я надеялся ускорить его, запустив someMethod() параллельно, вместо того, чтобы ждать завершения каждого объекта, прежде чем начинать с следующего.

Есть ли способ сделать это в Котлине? Может быть, с doAsyncTask или что-то еще?

Я знаю, когда это было задано более года назад, это было невозможно, но теперь, когда у Котлина есть сопрограммы, подобные doAsyncTask мне любопытно, может ли кто-нибудь из сопрограмм помочь

Вы можете использовать RxJava для решения этой проблемы.

 List<MyObjects> items = getList() Observable.from(items).flatMap(object : Func1<MyObjects, Observable<String>>() { fun call(item: MyObjects): Observable<String> { return someMethod(item) } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : Subscriber<String>() { fun onCompleted() { } fun onError(e: Throwable) { } fun onNext(s: String) { // do on output of each string } }) 

Подписавшись на Schedulers.io() , некоторый метод запланирован на фоновый поток.

Java Stream прост в использовании в Kotlin:

 tasks.stream().parallel().forEach { computeNotSuspend(it) } 

Если вы используете Android, однако вы не можете использовать Java 8, если вы хотите, чтобы приложение совместимо с API ниже 24.

Вы также можете использовать сопрограммы, как вы предложили. Но на данный момент это не является частью языка (август 2017 года), и вам нужно установить внешнюю библиотеку. Существует очень хорошее руководство с примерами .

  runBlocking<Unit> { val deferreds = tasks.map { async(CommonPool) { compute(it) } } deferreds.forEach { it.await() } } 

Обратите внимание, что сопрограммы реализованы с неблокирующей многопотоковой обработкой, что означает, что они могут быть быстрее, чем традиционная многопоточность. У меня есть код ниже, сравнивающий Stream параллельно с сопрограммой coroutine, и в этом случае подход coroutine в 7 раз быстрее на моей машине. Однако вам нужно сделать некоторую работу самостоятельно, чтобы убедиться, что ваш код «приостановлен» (не блокируется), что может быть довольно сложным. В моем примере я просто вызываю delay которая является функцией suspend предоставляемой библиотекой. Неблокирующая многопоточность не всегда быстрее, чем традиционная многопоточность. Это может быть быстрее, если у вас много потоков, которые ничего не делают, кроме ожидания ввода-вывода, что является моим опытом.

Мой контрольный код:

 import kotlinx.coroutines.experimental.CommonPool import kotlinx.coroutines.experimental.async import kotlinx.coroutines.experimental.delay import kotlinx.coroutines.experimental.launch import kotlinx.coroutines.experimental.runBlocking import java.util.* import kotlin.system.measureNanoTime import kotlin.system.measureTimeMillis class SomeTask() { val durationMS = random.nextInt(1000).toLong() companion object { val random = Random() } } suspend fun compute(task: SomeTask): Unit { delay(task.durationMS) //println("done ${task.durationMS}") return } fun computeNotSuspend(task: SomeTask): Unit { Thread.sleep(task.durationMS) //println("done ${task.durationMS}") return } fun main(args: Array<String>) { val n = 100 val tasks = List(n) { SomeTask() } val timeCoroutine = measureNanoTime { runBlocking<Unit> { val deferreds = tasks.map { async(CommonPool) { compute(it) } } deferreds.forEach { it.await() } } } println("Coroutine ${timeCoroutine / 1_000_000} ms") val timePar = measureNanoTime { tasks.stream().parallel().forEach { computeNotSuspend(it) } } println("Stream parallel ${timePar / 1_000_000} ms") } 

Выход на моем 4-ядерном компьютере:

 Coroutine: 1037 ms Stream parallel: 7150 ms 

Если вы раскомментируете println в двух compute функциях, вы увидите, что в неблокирующем сопрограмме сопроцессора задачи обрабатываются в правильном порядке, но не с потоками.

Да, это можно сделать с помощью сопрограмм. Следующая функция применяет параллельную операцию для всех элементов коллекции:

 fun <A>Collection<A>.forEachParallel(f: suspend (A) -> Unit): Unit = runBlocking { map { async(CommonPool) { f(it) } }.forEach { it.await() } } 

Хотя само определение немного загадочно, вы можете легко применить его, как и следовало ожидать:

 myObjects.forEachParallel { myObj -> someMethod(myObj) } 

Параллельная карта может быть реализована аналогичным образом, см. https://stackoverflow.com/a/45794062/1104870 .