Я играю с RxJava (точнее, RxKotlin). Здесь у меня есть следующие Observable
s:
fun metronome(ms: Int) = observable<Int> { var i = 0; while (true) { if (ms > 0) { Thread.sleep(ms.toLong()) } if (it.isUnsubscribed()) { break } it.onNext(++i) } }
И я хотел бы, чтобы некоторые из них сливались и работали одновременно. Они игнорируют противодавление, поэтому к ним должны применяться операторы противодавления.
Затем я создаю
val cores = Runtime.getRuntime().availableProcessors() val threads = Executors.newFixedThreadPool(cores) val scheduler = Schedulers.from(threads)
И затем я объединим metronome
:
val o = Observable.merge(listOf(metronome(0), metronome(1000).map { "---------" }) .map { it.onBackpressureBlock().subscribeOn(scheduler) }) .take(5000, TimeUnit.MILLISECONDS)
Первый должен испускать предметы непрестанно. Если я сделаю это за последние 3 секунды пробега, я получу следующий результат:
... [RxComputationThreadPool-5]: 369255 [RxComputationThreadPool-5]: 369256 [RxComputationThreadPool-5]: 369257 [RxComputationThreadPool-5]: --------- [RxComputationThreadPool-5]: --------- [RxComputationThreadPool-5]: ---------
Кажется, что Observable
s подписываются на один и тот же поток, а первый наблюдаемый блокируется на 3+ секунды.
Но когда я обмениваюсь onBackpressureBlock()
и subscribeOn(scheduler)
вызывает вывод, то, что я ожидал, результат объединяется в течение всего выполнения.
Для меня очевидно, что порядок звонков имеет значение в RxJava, но я не совсем понимаю, что происходит в этой конкретной ситуации.
Итак, что происходит, когда оператор onBackpressureBlock
применяется до subscribeOn
и что, если после ?
Оператор onBackpressureBlock
– неудачный эксперимент; он требует ухода, где применять. Например, subscribeOn().onBackpressureBlock()
но не наоборот.
RxJava имеет неблокирующий периодический таймер, называемый interval
поэтому вам не нужно откатывать свои собственные.