Как имитировать излучение 2 infiite Наблюдаемые потоки и есть другие Observable, которые объединяют их и буферируют каждые 10 секунд?

Я хочу, чтобы иметь возможность имитировать слияние двух отдельных потоков, которые испускают некоторые объекты (которые расширяют один и тот же родительский объект), буферизуют их с помощью оператора буфера и испускают собранные данные через 10 секунд. Я хочу, чтобы этот механизм был бесконечным, таким образом, чтобы этот слияние / буфер всегда вызывался всякий раз, когда происходит излучение из двух разделенных потоков.

Вот что я сделал до сих пор:

val list1 = mutableListOf<SomeClass1>( SomeClass1("1", 1), SomeClass1("2", 2), SomeClass1("3", 3), SomeClass1("4", 4), SomeClass1("5", 5), SomeClass1("6", 6), SomeClass1("7", 7), SomeClass1("8", 8), SomeClass1("9", 9) ) val list2 = mutableListOf<SomeClass2>( SomeClass2(1.00), SomeClass2(2.00), SomeClass2(3.00), SomeClass2(4.00), SomeClass2(5.00), SomeClass2(6.00), SomeClass2(7.00), SomeClass2(8.00), SomeClass2(9.00) ) val someClass1Observable = Observable .fromIterable(list1) .zipWith(Observable.interval(2, TimeUnit.SECONDS), BiFunction { item: SomeClass1, _: Long -> item }) val someClass2Observable = Observable .fromIterable(list2) .zipWith(Observable.interval(2, TimeUnit.SECONDS), BiFunction { item: SomeClass2, _: Long -> item }) someClass1Observable.subscribe { Log.v("someClass1", it.toString()) } someClass2Observable.subscribe { Log.v("someClass2", it.toString()) } Observable.merge(someClass1Observable, someClass2Observable) .buffer(10, TimeUnit.SECONDS) .repeat() .doOnSubscribe { Log.v("parentObservable", "STARTED") } .subscribe { t: MutableList<Parent> -> Log.v("parentObservable", "onNext") t.forEach { Log.v("onNext", it.toString()) } } Thread.sleep(30000) Log.v("AFTER_SLEEP", "AFTER_SLEEP") someClass1Observable.subscribe { Log.v("someClass1", it.toString()) } someClass2Observable.subscribe { Log.v("someClass2", it.toString()) } 

Первая эмиссия 2 потоков работает нормально, объём / буфер Observable собирает выбросы от них каждый раз через 10 секунд. Однако, когда эти 2 потока заканчивают выбросы, и я снова подписываюсь на них, буфер / слияние Observable не работает. Как сделать эту работу бесконечной? Есть ли лучший способ написать код для этих двух отдельных потоков, которые испускают объекты, которые им не нужны для чтения значений из списка, и вместо этого они будут генерировать новый объект каждые 2 секунды? Как сделать слияние / буфер Observable бесконечным, я имею в виду всякий раз, когда появляется новая эмиссия из этих 2 наблюдаемых потоков?

Чтобы потоки SomeClass бесконечными, вы можете просто поставить на них оператор .repeat() перед их использованием или построить объекты по требованию с interval :

 val someClass1Obs = Observable .interval(2, TimeUnit.SECONDS) .map { SomeClass1("$it", it.toInt()) } // <-- create objects on demand 

Я думаю, что это может решить и ваши другие проблемы.