RxJava: onBackpressureBlock () странное поведение

Я играю с RxJava (точнее, RxKotlin). Здесь у меня есть следующие Observable s:

 fun metronome(ms: Int) = observable<Int> { var i = 0; while (true) { if (ms > 0) { Thread.sleep(ms.toLong()) } if (it.isUnsubscribed()) { break } it.onNext(++i) } } 

И я хотел бы, чтобы некоторые из них сливались и работали одновременно. Они игнорируют противодавление, поэтому к ним должны применяться операторы противодавления.

Затем я создаю

 val cores = Runtime.getRuntime().availableProcessors() val threads = Executors.newFixedThreadPool(cores) val scheduler = Schedulers.from(threads) 

И затем я объединим metronome :

 val o = Observable.merge(listOf(metronome(0), metronome(1000).map { "---------" }) .map { it.onBackpressureBlock().subscribeOn(scheduler) }) .take(5000, TimeUnit.MILLISECONDS) 

Первый должен испускать предметы непрестанно. Если я сделаю это за последние 3 секунды пробега, я получу следующий результат:

 ... [RxComputationThreadPool-5]: 369255 [RxComputationThreadPool-5]: 369256 [RxComputationThreadPool-5]: 369257 [RxComputationThreadPool-5]: --------- [RxComputationThreadPool-5]: --------- [RxComputationThreadPool-5]: --------- 

Кажется, что Observable s подписываются на один и тот же поток, а первый наблюдаемый блокируется на 3+ секунды.

Но когда я обмениваюсь onBackpressureBlock() и subscribeOn(scheduler) вызывает вывод, то, что я ожидал, результат объединяется в течение всего выполнения.

Для меня очевидно, что порядок звонков имеет значение в RxJava, но я не совсем понимаю, что происходит в этой конкретной ситуации.

Итак, что происходит, когда оператор onBackpressureBlock применяется до subscribeOn и что, если после ?

Оператор onBackpressureBlock – неудачный эксперимент; он требует ухода, где применять. Например, subscribeOn().onBackpressureBlock() но не наоборот.

RxJava имеет неблокирующий периодический таймер, называемый interval поэтому вам не нужно откатывать свои собственные.