Параллельная загрузка файлов S3 через Kotlin Corouts

Мне нужно загрузить много файлов на S3, для завершения этой работы потребуется несколько часов. Это то, что превосходит новые сопрограммы Kotlin, поэтому я хотел дать им первую попытку вместо того, чтобы снова возиться с какой-то службой исполнения на основе Thread.

Вот мой (упрощенный) код:

fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking { val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build() for ((x, ys) in superTiles) { val jobs = mutableListOf<Deferred<Any>>() for ((y, superTile) in ys) { val job = async(CommonPool) { uploadTile(s3, x, y, superTile) } jobs.add(job) } jobs.map { it.await() } } } suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) { val json: String = "{}" val key = "$s3Prefix/x4/$z/$x/$y.json" s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) } 

Проблема: код все еще очень медленный, и протоколирование показывает, что запросы все еще выполняются последовательно: задание завершено до создания следующего. Только в очень немногих случаях (1 из 10) я вижу, что работа выполняется одновременно.

Почему код не работает намного быстрее / одновременно? Что я могу сделать с этим?

Kotlin coroutines превосходит вас, когда вы работаете с асинхронным API, в то время AmazonS3.putObject API AmazonS3.putObject который вы используете, представляет собой синхронный API с блокировкой старой школы, поэтому вы получаете столько одновременных загрузок, сколько количества потоков в используемом CommonPool . Нет никакой ценности при маркировке вашей функции uploadTile с изменением suspend , поскольку она не использует никаких приостановленных функций в своем теле.

Первым шагом в получении большей пропускной способности в вашей задаче загрузки является использование для этого асинхронного API. Я бы предложил посмотреть на Amazon S3 TransferManager для этого кошелька. Посмотрите, сначала ли ваша проблема решена.

Kotlin coroutines разработаны, чтобы помочь вам объединить ваши асинхронные API в простые в использовании логические рабочие процессы. Например, легко адаптировать асинхронный API TransferManager для использования с сопрограммами, написав следующую функцию расширения:

 suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont -> addProgressListener { if (isDone) { // we know it should not actually wait when done try { cont.resume(waitForUploadResult()) } catch (e: Throwable) { cont.resumeWithException(e) } } } cont.invokeOnCompletion { abort() } } 

Это расширение позволяет вам писать очень плавный код, который работает с TransferManager и вы можете переписать функцию uploadTile для работы с TransferManager вместо работы с блокирующим интерфейсом AmazonS3 :

 suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) { val json: String = "{}" val key = "$s3Prefix/x4/$z/$x/$y.json" tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) .await() } 

Обратите внимание, как эта новая версия uploadTile использует функцию приостановки, которая была определена выше.