Как объединить 2 отдельных потока, буферизовать заполненные данные из них и subsrcibe к нему после некоторого времени

Я пытаюсь проверить ситуацию следующим образом:

У меня есть 2 класса, которые просто простираются от одного и того же Родительского.

Я создаю и Observables из списка элементов для каждого класса:

val listSomeClass1 = ArrayList<SomeClass1>() val listSomeClass2 = ArrayList<SomeClass2>() fun populateJust1() { listSomeClass1.add(SomeClass1("23", 23)) listSomeClass1.add(SomeClass1("24", 24)) listSomeClass1.add(SomeClass1("25", 25)) } fun populateJust2() { listSomeClass2.add(SomeClass2(23.00)) listSomeClass2.add(SomeClass2(24.00)) listSomeClass2.add(SomeClass2(25.00)) } populateItemsSomeClass1() populateItemsSomeClass2() 

Теперь я могу создать 2 наблюдаемых:

  val someClass1Observable = Observable.fromIterable(listSomeClass1) val someClass2Observable = Observable.fromIterable(listSomeClass2) 

И здесь я хочу слить эмиссию с них, буферизировать ее и подписаться на нее через 10 секунд:

  Observable.merge(someClass1Observable, someClass2Observable) .buffer(10, TimeUnit.SECONDS) .doOnSubscribe { Log.v("parentObservable", "STARTED") } .subscribe { t: MutableList<Parent> -> Log.v("parentObservable", "onNext") t.forEach { Log.v("onNext", it.toString()) } } 

Однако наблюдаемый не начинается через 10 секунд, как я ожидал, и просто начинаю немедленно с этими данными. Как имитировать что-то подобное, я соберу 2 отдельных потока, и через 10 секунд я смогу получить собранные данные

Я должен указать, что я не хочу использовать какой-либо предмет.

ОБНОВИТЬ

Я сделал что-то вроде этого:

  val list1 = listOf(SomeClass1("1", 1), SomeClass1("2", 2), SomeClass1("3", 3)) val list2 = listOf(SomeClass2(5.00), SomeClass2(4.00), SomeClass2(6.00)) val someClass1Observable = Observable .fromIterable(list1) .zipWith(Observable.interval(2, TimeUnit.SECONDS), BiFunction { item: SomeClass1, _: Long -> item }) val someClass2Observable = Observable .fromIterable(list2) .zipWith(Observable.interval(1, TimeUnit.SECONDS), BiFunction { item: SomeClass2, _: Long -> item }) someClass1Observable.subscribe { Log.v("someClass1", it.toString()) } someClass2Observable.subscribe { Log.v("someClass2", it.toString()) } Observable.merge(someClass1Observable, someClass2Observable) .buffer(10, TimeUnit.SECONDS) .delay(10, TimeUnit.SECONDS) .doOnSubscribe { Log.v("parentObservable", "STARTED") } .subscribe { t: MutableList<Parent> -> Log.v("parentObservable", "onNext") t.forEach { Log.v("onNext", it.toString()) } } Thread.sleep(13000) someClass1Observable.subscribe { Log.v("someClass1", it.toString()) } someClass2Observable.subscribe { Log.v("someClass2", it.toString()) } 

Здесь я хочу просто имитировать 2 бесконечных потока someClass1 и someclass2 Observables и то же самое для слияния Observable.

Опять же, я хочу иметь возможность объединить эти 2 потока, заполнять буферные данные и делать что-то с ним через 10 секунд. Если через 10 секунд эти 2 потока снова заполнят некоторые данные, слияние Observable должно очистить предыдущий буфер и снова буферизовать новые данные и испускать через 10 секунд и т. Д. Бесконечно. Однако мой код не работает так, как я ожидал, какие изменения мне нужно сделать, чтобы сделать это, как я описал?

Я думаю, вы ищете оператора delay

http://reactivex.io/documentation/operators/delay.html

Задержка сдвигает выбросы от Наблюдаемого вперед во времени на определенную сумму

Итак, что-то вроде:

 .delay(10, TimeUnit.SECONDS)