Kotlin: Блокировка сопрограмм с неблокирующими ввода-выводами

Я пытаюсь использовать сопрограммы Kotlin для обработки неблокирующего ввода-вывода. Сценарий выглядит следующим образом:

  1. Получать данные из асинхронного обратного вызова, выполняющегося в потоке 1.
  2. Подождите, пока эти данные в потоке 2 будут уничтожены.

Мой текущий код выглядит так (упрощенный для краткости):

private var latch = CountDownLatch(1) private var data: Any? = null // Async callback from non-blocking I/O fun onReceive(data: Any) { currentData = data latch.countDown() } // Wait and consume data fun getData(): Any? { latch.await() latch = CountDownLatch(1) return currentData } fun processData() { launch(CommonPool) { while (true) { val data = getData() // Consume data } } } 

Насколько я понимаю, Kotlin coroutines должен помочь мне избавиться от CountDownLatch. После прочтения этого (удивительного) руководства все, что я мог придумать, это примерно так:

 // Wait and consume data fun getData() = async(CommonPool) { latch.await() latch = CountDownLatch(1) currentData } fun processData() { launch(CommonPool) { while (true) { runBlocking { val data = getData().await() // Consume data } } } } 

Я также попытался с Pipelines , с аналогичными результатами. Я, очевидно, не понимаю, как использовать эти функции.

вы не сказали, если данные, полученные в onReceive() могут обрабатываться параллельно. Это главный вопрос. Если да, вы можете просто сделать это в onReceive() . Если это не разрешено, пусть каждый вызов onReceive() запускает задачу на CommonPool без каких-либо сопрограмм. Если они должны обрабатываться последовательно, то самым простым способом является запуск потока с петлей внутри:

 fun onReceive(data: Any) { queue.put(data); } .... // loop in a thread while(true) { data = queue.take(); processData(data); } 

Опять же, сопрограммы не нужны.

Как правило, сопрограммы являются синтаксическим сахаром для представления асинхронной программы, как если бы она была синхронной. Я не думаю, что ваша программа используется для использования сопрограмм.