RxJava: onBackpressureBlock () странное поведение

Я играю с RxJava (точнее, RxKotlin). Здесь у меня есть следующие Observable s:

 fun metronome(ms: Int) = observable<Int> { var i = 0; while (true) { if (ms > 0) { Thread.sleep(ms.toLong()) } if (it.isUnsubscribed()) { break } it.onNext(++i) } } 

И я хотел бы, чтобы некоторые из них сливались и работали одновременно. Они игнорируют противодавление, поэтому к ним должны применяться операторы противодавления.

Затем я создаю

 val cores = Runtime.getRuntime().availableProcessors() val threads = Executors.newFixedThreadPool(cores) val scheduler = Schedulers.from(threads) 

И затем я объединим metronome :

 val o = Observable.merge(listOf(metronome(0), metronome(1000).map { "---------" }) .map { it.onBackpressureBlock().subscribeOn(scheduler) }) .take(5000, TimeUnit.MILLISECONDS) 

Первый должен испускать предметы непрестанно. Если я сделаю это за последние 3 секунды пробега, я получу следующий результат:

 ... [RxComputationThreadPool-5]: 369255 [RxComputationThreadPool-5]: 369256 [RxComputationThreadPool-5]: 369257 [RxComputationThreadPool-5]: --------- [RxComputationThreadPool-5]: --------- [RxComputationThreadPool-5]: --------- 

Кажется, что Observable s подписываются на один и тот же поток, а первый наблюдаемый блокируется на 3+ секунды.

Но когда я обмениваюсь onBackpressureBlock() и subscribeOn(scheduler) вызывает вывод, то, что я ожидал, результат объединяется в течение всего выполнения.

Для меня очевидно, что порядок звонков имеет значение в RxJava, но я не совсем понимаю, что происходит в этой конкретной ситуации.

Итак, что происходит, когда оператор onBackpressureBlock применяется до subscribeOn и что, если после ?

Оператор onBackpressureBlock – неудачный эксперимент; он требует ухода, где применять. Например, subscribeOn().onBackpressureBlock() но не наоборот.

RxJava имеет неблокирующий периодический таймер, называемый interval поэтому вам не нужно откатывать свои собственные.

Intereting Posts
Упрощение кода, который является одинаковым удовольствием, за исключением подписи (varargs vs map) Плагин Android-плагинов жалуется на ошибку перевода Uncaught Android – Tab # 3 очищает представление списка из Tab # 1. как вибрировать устройство при нажатии кнопки с использованием эффектов вибратора? Использование Kotlin Запечатанный класс в Котлине, Ошибка Несовместимых типов Как расширить статические поля Java в kotlin Как получить общий класс параметров в Котлине Включить байт java.lang.OutOfMemoryError: pthread_create (стек 1040 КБ) не удалось: попробуйте Kotlin Coroutines – правильный путь в Android Невозможно разрешить моему приложению использовать обратную связь над другими правами на приложения Android ClassNotFoundException Не нашел класс на пути: DexPathList Свойства бина, не равные нулю при инициализации, становятся нулевыми при вызове метода @Transactional Интерфейс фабрики Kotlin с дженериками 2D-массив в Котлине