как закрыть kotlin coroutines максимальный параллелизм

У меня есть Sequence (из File.walkTopDown), и мне нужно запустить долговременную операцию для каждого из них. Я бы хотел использовать лучшие практики / сопрограммы Kotlin, но я либо не получаю параллелизма, либо слишком много параллелизма и попал в ошибку «слишком много открытых файлов».

File("/Users/me/Pictures/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .filter { ... only big images... } .map { file -> async { // I *think* I want async and not "launch"... ImageProcessor.fromFile(file) } } 

Кажется, это не работает параллельно, и мой многоядерный процессор никогда не стоит выше 1 ЦП. Есть ли способ с сопрограммами для выполнения «параллельных операций с NumberOfCores» стоимостью отложенных заданий?

Я посмотрел на Multithreading, используя Kotlin Coroutines, который сначала создает ВСЕ задания, а затем присоединяет их, но это означает, что завершение последовательности Sequence / file tree завершается до этапа обработки тяжелой обработки, и это кажется … iffy! Разделение его на сбор и этап процесса означает, что сбор может работать до обработки.

 val jobs = ... the Sequence above... .toSet() println("Found ${jobs.size}") jobs.forEach { it.await() } 

Проблема с вашим первым фрагментом заключается в том, что он не запускается вообще – помните, что Sequence ленив, и вы должны использовать терминальную операцию, такую ​​как toSet() или forEach() . Кроме того, вам необходимо ограничить количество потоков, которые могут быть использованы для этой задачи, путем newFixedThreadPoolContext контекста newFixedThreadPoolContext и использования его в async :

 val pictureContext = newFixedThreadPoolContext(nThreads = 10, name = "reading pictures in parallel") File("/Users/me/Pictures/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .filter { ... only big images... } .map { file -> async(pictureContext) { ImageProcessor.fromFile(file) } } .toList() .forEach { it.await() } 

Изменить: вы должны использовать оператор терминала ( toList ) toList ожиданием результатов

Я работал с каналом. Но, может быть, я излишне с вашей стороны?

 val pipe = ArrayChannel<Deferred<ImageFile>>(20) launch { while (!(pipe.isEmpty && pipe.isClosedForSend)) { imageFiles.add(pipe.receive().await()) } println("pipe closed") } File("/Users/me/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .forEach { pipe.send(async { ImageFile.fromFile(it) }) } pipe.close()