У меня есть Sequence (из File.walkTopDown), и мне нужно запустить долговременную операцию для каждого из них. Я бы хотел использовать лучшие практики / сопрограммы Kotlin, но я либо не получаю параллелизма, либо слишком много параллелизма и попал в ошибку «слишком много открытых файлов».
File("/Users/me/Pictures/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .filter { ... only big images... } .map { file -> async { // I *think* I want async and not "launch"... ImageProcessor.fromFile(file) } }
Кажется, это не работает параллельно, и мой многоядерный процессор никогда не стоит выше 1 ЦП. Есть ли способ с сопрограммами для выполнения «параллельных операций с NumberOfCores» стоимостью отложенных заданий?
Я посмотрел на Multithreading, используя Kotlin Coroutines, который сначала создает ВСЕ задания, а затем присоединяет их, но это означает, что завершение последовательности Sequence / file tree завершается до этапа обработки тяжелой обработки, и это кажется … iffy! Разделение его на сбор и этап процесса означает, что сбор может работать до обработки.
val jobs = ... the Sequence above... .toSet() println("Found ${jobs.size}") jobs.forEach { it.await() }
Проблема с вашим первым фрагментом заключается в том, что он не запускается вообще – помните, что Sequence
ленив, и вы должны использовать терминальную операцию, такую как toSet()
или forEach()
. Кроме того, вам необходимо ограничить количество потоков, которые могут быть использованы для этой задачи, путем newFixedThreadPoolContext
контекста newFixedThreadPoolContext
и использования его в async
:
val pictureContext = newFixedThreadPoolContext(nThreads = 10, name = "reading pictures in parallel") File("/Users/me/Pictures/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .filter { ... only big images... } .map { file -> async(pictureContext) { ImageProcessor.fromFile(file) } } .toList() .forEach { it.await() }
Изменить: вы должны использовать оператор терминала ( toList
) toList
ожиданием результатов
Я работал с каналом. Но, может быть, я излишне с вашей стороны?
val pipe = ArrayChannel<Deferred<ImageFile>>(20) launch { while (!(pipe.isEmpty && pipe.isClosedForSend)) { imageFiles.add(pipe.receive().await()) } println("pipe closed") } File("/Users/me/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .forEach { pipe.send(async { ImageFile.fromFile(it) }) } pipe.close()