Слушатель внутри производителя

Я пытаюсь создать producer для слушателя. Мой код выглядит так:

 suspend fun foo() = produce{ someEvent.addListener { this.send(it) } } 

Но я получаю ошибку. Suspension functions can be called only within coroutine что имеет смысл. Мой вопрос. Есть ли способ реализовать этот шаблон с помощью сопрограмм?

Существует несколько способов его реализации, в зависимости от того, чего вы пытаетесь достичь:

Если вы хотите получить только самое последнее событие, то вы должны использовать объединенный канал и offer метод, который помогает ему:

 fun foo() = produce<T>(capacity = Channel.CONFLATED) { someEvent.addListener { offer(it) } } 

Если важно принимать все события, ваш выбор зависит от поведения вашего продюсера событий. Ключевым вопросом для размышления здесь является то, что произойдет, если ваш производитель событий начнет создавать множество событий «без остановок». Большинство «синхронных» производителей событий, как правило, не поддерживают явный сигнал обратного давления, но они по-прежнему поддерживают неявный сигнал обратного давления – они замедлятся, если их слушатели работают медленно или блокируют поток. Поэтому, как правило, следующее решение отлично работает для производителей синхронных событий:

 fun foo() = produce<T>() { someEvent.addListener { runBlocking { send(it) } } } 

Вы также можете указать параметр положительной capacity = xxx как параметр для produce компоновщика в качестве оптимизации производительности, если у вас есть случаи, когда пакет событий создается сразу, и вы не хотите блокировать производителя, но пусть потребитель обрабатывает их на своем Собственный темп.

В редком случае, когда ваш производитель не понимает неявный блокирующий сигнал обратного давления (когда это как некое многопоточное устройство, которое насильно создает события без внутренней синхронизации), вы можете использовать канал с неограниченной емкостью с offer , но будьте осторожны, вы рискуете исчерпать память, если производитель опережает потребителя:

 fun foo() = produce<T>(capacity = Channel.UNLIMITED) { someEvent.addListener { offer(it) } } 

Если ваш производитель поддерживает явный сигнал обратного давления (например, функциональные реактивные потоки), тогда вы должны использовать специальный адаптер для правильной передачи сигнала обратного давления в / из сопрограмм. Для этой kotlinx.coroutines библиотеке kotlinx.coroutines имеется ряд kotlinx.coroutines модулей интеграции с различными реактивными библиотеками. См. Здесь .

Примечание: вы не должны отмечать функцию foo с помощью модификатора suspend . Вызов foo не приостанавливает invoker в любом случае. Он сразу же (синхронно) запускает сопроводительную копию производителя.

Чтобы узнать больше о сопрограммах и различных каналах, я настоятельно рекомендую изучить руководство по kotlinx.coroutines .

Intereting Posts
Spring Boot: изменить идентификатор заполнителя Как создать кнопку в Kotlin, которая открывает новую активность (Android Studio)? Работа Gradle работает на локальной машине, но не работает на сервере Jenkins CI? Расширение полей в Котлине Как получить имена параметров через отражение в котлин? Библиотека Kotlin 'rxkotlin-0.21.0.jar' имеет неподдерживаемый формат. Обновите библиотеку или плагин Вызов метода Java из Kotlin со списком параметров Необязательная переменная класса данных Kotlin Использование функции «с» Конструкторы для типов с использованием обобщенных типов MissingMethodInvocationException тестирование открытого класса в Котлине «Ошибка неоднозначности разрешения перегрузки» разрешена с другой перегрузкой Kotlin: «Длина выражения» типа «Int» не может быть вызвана как функция. Функция 'invoke ()' не найдена Почему в этом методе Котлина есть обратные обратные линии? Kotlin – getPendingIntent с синтаксисом нескольких флагов