У меня есть набор объектов, которые мне нужно выполнить. В настоящее время я использую:
var myObjects: List<MyObject> = getMyObjects() myObjects.forEach{ myObj -> someMethod(myObj) }
Он отлично работает, но я надеялся ускорить его, запустив someMethod()
параллельно, вместо того, чтобы ждать завершения каждого объекта, прежде чем начинать с следующего.
Есть ли способ сделать это в Котлине? Может быть, с doAsyncTask
или что-то еще?
Я знаю, когда это было задано более года назад, это было невозможно, но теперь, когда у Котлина есть сопрограммы, подобные doAsyncTask
мне любопытно, может ли кто-нибудь из сопрограмм помочь
Вы можете использовать RxJava для решения этой проблемы.
List<MyObjects> items = getList() Observable.from(items).flatMap(object : Func1<MyObjects, Observable<String>>() { fun call(item: MyObjects): Observable<String> { return someMethod(item) } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : Subscriber<String>() { fun onCompleted() { } fun onError(e: Throwable) { } fun onNext(s: String) { // do on output of each string } })
Подписавшись на Schedulers.io()
, некоторый метод запланирован на фоновый поток.
Java Stream прост в использовании в Kotlin:
tasks.stream().parallel().forEach { computeNotSuspend(it) }
Если вы используете Android, однако вы не можете использовать Java 8, если вы хотите, чтобы приложение совместимо с API ниже 24.
Вы также можете использовать сопрограммы, как вы предложили. Но на данный момент это не является частью языка (август 2017 года), и вам нужно установить внешнюю библиотеку. Существует очень хорошее руководство с примерами .
runBlocking<Unit> { val deferreds = tasks.map { async(CommonPool) { compute(it) } } deferreds.forEach { it.await() } }
Обратите внимание, что сопрограммы реализованы с неблокирующей многопотоковой обработкой, что означает, что они могут быть быстрее, чем традиционная многопоточность. У меня есть код ниже, сравнивающий Stream параллельно с сопрограммой coroutine, и в этом случае подход coroutine в 7 раз быстрее на моей машине. Однако вам нужно сделать некоторую работу самостоятельно, чтобы убедиться, что ваш код «приостановлен» (не блокируется), что может быть довольно сложным. В моем примере я просто вызываю delay
которая является функцией suspend
предоставляемой библиотекой. Неблокирующая многопоточность не всегда быстрее, чем традиционная многопоточность. Это может быть быстрее, если у вас много потоков, которые ничего не делают, кроме ожидания ввода-вывода, что является моим опытом.
Мой контрольный код:
import kotlinx.coroutines.experimental.CommonPool import kotlinx.coroutines.experimental.async import kotlinx.coroutines.experimental.delay import kotlinx.coroutines.experimental.launch import kotlinx.coroutines.experimental.runBlocking import java.util.* import kotlin.system.measureNanoTime import kotlin.system.measureTimeMillis class SomeTask() { val durationMS = random.nextInt(1000).toLong() companion object { val random = Random() } } suspend fun compute(task: SomeTask): Unit { delay(task.durationMS) //println("done ${task.durationMS}") return } fun computeNotSuspend(task: SomeTask): Unit { Thread.sleep(task.durationMS) //println("done ${task.durationMS}") return } fun main(args: Array<String>) { val n = 100 val tasks = List(n) { SomeTask() } val timeCoroutine = measureNanoTime { runBlocking<Unit> { val deferreds = tasks.map { async(CommonPool) { compute(it) } } deferreds.forEach { it.await() } } } println("Coroutine ${timeCoroutine / 1_000_000} ms") val timePar = measureNanoTime { tasks.stream().parallel().forEach { computeNotSuspend(it) } } println("Stream parallel ${timePar / 1_000_000} ms") }
Выход на моем 4-ядерном компьютере:
Coroutine: 1037 ms Stream parallel: 7150 ms
Если вы раскомментируете println
в двух compute
функциях, вы увидите, что в неблокирующем сопрограмме сопроцессора задачи обрабатываются в правильном порядке, но не с потоками.
Да, это можно сделать с помощью сопрограмм. Следующая функция применяет параллельную операцию для всех элементов коллекции:
fun <A>Collection<A>.forEachParallel(f: suspend (A) -> Unit): Unit = runBlocking { map { async(CommonPool) { f(it) } }.forEach { it.await() } }
Хотя само определение немного загадочно, вы можете легко применить его, как и следовало ожидать:
myObjects.forEachParallel { myObj -> someMethod(myObj) }
Параллельная карта может быть реализована аналогичным образом, см. https://stackoverflow.com/a/45794062/1104870 .