Я пытаюсь создать producer
для слушателя. Мой код выглядит так:
suspend fun foo() = produce{ someEvent.addListener { this.send(it) } }
Но я получаю ошибку. Suspension functions can be called only within coroutine
что имеет смысл. Мой вопрос. Есть ли способ реализовать этот шаблон с помощью сопрограмм?
Существует несколько способов его реализации, в зависимости от того, чего вы пытаетесь достичь:
Если вы хотите получить только самое последнее событие, то вы должны использовать объединенный канал и offer
метод, который помогает ему:
fun foo() = produce<T>(capacity = Channel.CONFLATED) { someEvent.addListener { offer(it) } }
Если важно принимать все события, ваш выбор зависит от поведения вашего продюсера событий. Ключевым вопросом для размышления здесь является то, что произойдет, если ваш производитель событий начнет создавать множество событий «без остановок». Большинство «синхронных» производителей событий, как правило, не поддерживают явный сигнал обратного давления, но они по-прежнему поддерживают неявный сигнал обратного давления – они замедлятся, если их слушатели работают медленно или блокируют поток. Поэтому, как правило, следующее решение отлично работает для производителей синхронных событий:
fun foo() = produce<T>() { someEvent.addListener { runBlocking { send(it) } } }
Вы также можете указать параметр положительной capacity = xxx
как параметр для produce
компоновщика в качестве оптимизации производительности, если у вас есть случаи, когда пакет событий создается сразу, и вы не хотите блокировать производителя, но пусть потребитель обрабатывает их на своем Собственный темп.
В редком случае, когда ваш производитель не понимает неявный блокирующий сигнал обратного давления (когда это как некое многопоточное устройство, которое насильно создает события без внутренней синхронизации), вы можете использовать канал с неограниченной емкостью с offer
, но будьте осторожны, вы рискуете исчерпать память, если производитель опережает потребителя:
fun foo() = produce<T>(capacity = Channel.UNLIMITED) { someEvent.addListener { offer(it) } }
Если ваш производитель поддерживает явный сигнал обратного давления (например, функциональные реактивные потоки), тогда вы должны использовать специальный адаптер для правильной передачи сигнала обратного давления в / из сопрограмм. Для этой kotlinx.coroutines
библиотеке kotlinx.coroutines
имеется ряд kotlinx.coroutines
модулей интеграции с различными реактивными библиотеками. См. Здесь .
Примечание: вы не должны отмечать функцию foo
с помощью модификатора suspend
. Вызов foo
не приостанавливает invoker в любом случае. Он сразу же (синхронно) запускает сопроводительную копию производителя.
Чтобы узнать больше о сопрограммах и различных каналах, я настоятельно рекомендую изучить руководство по kotlinx.coroutines .