Почему у меня есть нежелательный выход Log при слиянии 2 наблюдаемых в другие Observable, которые буферизуют их каждые 10 секунд

Я пытаюсь имитировать слияние двух разных потоков Observable, которые излучают некоторый объект каждую секунду. Этот объект имеет тот же Родитель в этих двух потоках.

Я думал, что в консоли появится новый объект со значением через 1 секунду. Однако, когда я печатаю эти объекты, я получаю объекты, которые пропускают предыдущую эмиссию. Точно так же объект со значением 1, 3, 5, 7 и т. Д.

Однако в буфере, который объединяет эти два, кажется, что он только буферизует выбросы 2, 4, 6, 8 и т. Д.

Вот мой код:

override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) var counter = 0 var counter2 = 0 val periodicSomeClass1 = Observable.interval(1, TimeUnit.SECONDS) .flatMap( { counter++ Observable.just(SomeClass1("$counter", counter)) } ) val periodicSomeClass2 = Observable.interval(1, TimeUnit.SECONDS) .flatMap( { counter2++ Observable.just(SomeClass2(counter2.toDouble())) } ) periodicSomeClass1.subscribe { t: SomeClass1 -> Log.v("periodicSomeClass1", t.toString()) } periodicSomeClass2.subscribe { t: SomeClass2 -> Log.v("periodicSomeClass2", t.toString()) } Observable.merge(periodicSomeClass1, periodicSomeClass2) .buffer(10, TimeUnit.SECONDS) .doOnSubscribe { Log.v("bufferObservable", "STARTED") } .subscribe { t: MutableList<Parent> -> Log.v("bufferObservable", "onNext") t.forEach { Log.v("onNext", it.toString()) } } } 

И вот что я имею в выходе журнала с первым буфером / слиянием emsission:

  V/periodicSomeClass1: SomeClass1(a=1, b=1) V/periodicSomeClass2: SomeClass2(a=1.0) V/periodicSomeClass1: SomeClass1(a=3, b=3) V/periodicSomeClass2: SomeClass2(a=3.0) V/periodicSomeClass1: SomeClass1(a=5, b=5) V/periodicSomeClass2: SomeClass2(a=5.0) V/periodicSomeClass1: SomeClass1(a=7, b=7) V/periodicSomeClass2: SomeClass2(a=7.0) V/periodicSomeClass1: SomeClass1(a=9, b=9) V/periodicSomeClass2: SomeClass2(a=9.0) V/periodicSomeClass1: SomeClass1(a=11, b=11) V/periodicSomeClass2: SomeClass2(a=11.0) V/periodicSomeClass1: SomeClass1(a=13, b=13) V/periodicSomeClass2: SomeClass2(a=13.0) V/periodicSomeClass1: SomeClass1(a=15, b=15) V/periodicSomeClass2: SomeClass2(a=15.0) V/periodicSomeClass1: SomeClass1(a=17, b=17) V/periodicSomeClass2: SomeClass2(a=17.0) V/periodicSomeClass1: SomeClass1(a=19, b=19) V/periodicSomeClass2: SomeClass2(a=19.0) V/bufferObservable: onNext V/onNext: SomeClass1(a=2, b=2) V/onNext: SomeClass2(a=2.0) V/onNext: SomeClass1(a=4, b=4) V/onNext: SomeClass2(a=4.0) V/onNext: SomeClass1(a=6, b=6) V/onNext: SomeClass2(a=6.0) V/onNext: SomeClass1(a=8, b=8) V/onNext: SomeClass2(a=8.0) V/onNext: SomeClass1(a=10, b=10) V/onNext: SomeClass2(a=10.0) V/onNext: SomeClass1(a=12, b=12) V/onNext: SomeClass2(a=12.0) V/onNext: SomeClass1(a=14, b=14) V/onNext: SomeClass2(a=14.0) V/onNext: SomeClass1(a=16, b=16) V/onNext: SomeClass2(a=16.0) V/onNext: SomeClass1(a=18, b=18) 

Как вы правильно подозревали в комментарии к своему другому вопросу , эта проблема действительно связана.

1.) Вы подписываетесь дважды на оба ваших наблюдателя источника, один раз для каждого из них и один раз через подписку на merge d Observable.

2.) Таким образом, у вас теперь есть четыре наблюдаемых объекта, в которых два из них увеличивают counter (и считывают с), а два других увеличивают (и считывают с) counter2 .

3.) Для каждой из этих пар два интервала настолько немного смещены, и каждая flatMap из первой пары каждой пары видит значение n , увеличивает его до n+1 и печатает его. Затем вскоре после этого другой экземпляр приходит и видит n+1 , увеличивается до n+2 , печатает это и т. Д.

Наконец, buffer скрывает, что все это происходит чередующимся, потому что он печатает все четные значения после всех нечетных.

Любое решение действительно зависит от того, чего вы хотите достичь – это просто пример игровой площадки или он моделирует некоторые реальные проблемы?