как реализовать ограниченный вызов с клиентом блокировки доработки и сопрограммами

У меня есть следующий код:

val context = newFixedThreadPoolContext(nThreads = 10, name="myThreadPool") val total = 1_000_000 //can be other number as well val maxLimit = 1_000 return runBlocking { (0..total step maxLimit).map { async(context) { val offset = it val limit = it + maxLimit blockingHttpCall(offset, limit) } }.flatMap { it.await() }.associateBy { ... }.toMutableMap() } 

Я бы хотел, чтобы только 10 вызовов выполнялись одновременно с блокировкой api. Однако, кажется, что вышеприведенный код не делает этого, как я ожидаю (я думаю, что все вызовы начинаются немедленно), или, по крайней мере, я не понимаю, если это так.
Каков правильный способ его реализации? Будет ли такое же решение работать, если я использую async api для доработки?

Я не знаю точно ваше дело, но самый простой способ – использовать OkHttp API для настройки уровня параллелизма, например, это стратегия параллелизма по умолчанию для OkHttp

Но у вас может быть собственная стратегия, если вы установите собственный экземпляр Dispatcher в OkHttpClient.Builder

Конечно, вы можете использовать также сопрограммы

Ваша текущая реализация неверна, потому что вы создаете диспетчер сопрограмм для каждого элемента, но чтобы иметь общий пул потоков, все сопрограммы должны использовать один и тот же диспетчер, просто переместите создание newFixedThreadPoolContext за пределы цикла (теперь у вас есть 1000 диспетчеров с 10 потоками).

Но я не рекомендую вам использовать сопрограммы + блокировки вызовов, лучше настраивать параллелизм OkHttp (он более гибкий) и использовать сопрограммы с неблокирующими вызовами (вы можете написать собственный адаптер или использовать существующую библиотеку, например, kotlin-coroutines-retrofit ) , Это позволит вам смешивать ваши HTTP-запросы и код пользовательского интерфейса или другие задачи.

Поэтому, если вы используете неблокирующий API + OkHttp для внутреннего параллелизма, вам не нужно иметь специальный код для управления параллелизмом, конечно, вы можете ограничить количество одновременных вызовов, как в приведенном выше примере (с фиксированной конструкцией диспетчера), но Я действительно не думаю, что это имеет смысл, потому что вы можете уменьшить уровень параллелизма, а не увеличивать его.

После перехода на неблокирующий API вы можете просто запускать все свои сопрограммы в любом диспетчере сопрограмм параллельно (даже в потоке пользовательского интерфейса) и ждать результатов без блокировки.

Кроме того, неявный контроль параллелизма с использованием конфигурации OkHttpClient выглядит более правильным способом с точки зрения архитектуры (у вас может быть код DI, который настраивает Retrofit + OkHttp и предоставляет его вашему коду клиента с предварительно сконфигурированной политикой параллелизма). Конечно, вы можете достичь этого, используя другие подходы, но для меня это выглядит более естественным.