RxJava как создать наблюдаемый из подписки

Я ищу способ создания Observable после обработки результата в subscribe .

Учитывая, что у меня есть это Observable из productRepo.list() которое является Retrofit Observable<Response<ProductResponse>> .

 productRepo .list() .retry(3) .subscribe { response -> if (response.isSuccessful) { response.body().apply { cache.saveProducts(data) } } } 

Целью этого является сохранение результата в локальном cache БД. Это плюс еще один очень похожий вызов, заполняющий локальную БД удаленными данными из API.

После завершения двух вызовов я хотел загрузить данные из cache .

Я не хочу сочетать оба наблюдаемых в любом случае. Просто захотите выполнить некоторую задачу позже.

Я хочу, чтобы эта обработка была как единое целое в графике вызовов Rx, так что она одновременно вызывает Call1 и Call2 и после завершения Call1 и Call2 запускает Task3. Каков наилучший способ в этом сценарии? Я действительно предпочитаю, если абонент для каждого звонка будет разделен.

Является ли flatMap лучшим вариантом здесь?

Как вы уже упоминали,

Я действительно предпочитаю, если абонент для каждого звонка будет разделен.

Предположим, что мы имеем два наблюдаемых

 val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) val call2 = Observable.from(arrayOf(2,4,6,8)) 

Если мы просто используем Observable.zip как Observable.zip ниже, у него может быть только один абонент для обоих Call1 & Call2.

 Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber) 

Если мы используем три отдельных абонента, как показано ниже, поток Call1 & Call2 будет запускаться дважды .

 call1.subscribe(call1Subscriber) call2.subscribe(call2Subscriber) Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber) 

Поэтому нам нужно использовать .share().cacheWithInitialCapacity(1) чтобы делать трюки

 val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) .share() .cacheWithInitialCapacity(1) val call2 = Observable.from(arrayOf(2,4,6,8)) .share() .cacheWithInitialCapacity(1) val task3Signal = Observable.zip(call1,call2){ c1, c2 -> c1 + c2 } call1.subscribe(call1Subscriber) call2.subscribe(call2Subscriber) task3Signal.subscribe(task3Subscriber) 

Вы также можете доказать / проверить свою концепцию графика Rx из простого тестового примера.

 class SimpleJUnitTest { @Test fun test(){ val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) .doOnNext { println("call1 doOnNext $it") } .share() .cacheWithInitialCapacity(1) val call2 = Observable.from(arrayOf(2,4,6,8)) .doOnNext { println("call2 doOnNext $it") } .share() .cacheWithInitialCapacity(1) val task3Signal = Observable.zip(call1,call2){ c1, c2 -> println("task3Signal c1:$c1, c2: $c2") c1 + c2 } val testSubscriber1 = TestSubscriber<Int>() val testSubscriber2 = TestSubscriber<Int>() val testSubscriber3 = TestSubscriber<Int>() call1.subscribe(testSubscriber1) call2.subscribe(testSubscriber2) task3Signal.subscribe(testSubscriber3) testSubscriber1.assertReceivedOnNext(listOf(1,2,3,4,5,6,7,8)) testSubscriber2.assertReceivedOnNext(listOf(2,4,6,8)) testSubscriber3.assertReceivedOnNext(listOf(3,6,9,12)) testSubscriber1.assertValueCount(8) testSubscriber2.assertValueCount(4) testSubscriber3.assertValueCount(4) } } 

Вывод:

 call1 doOnNext 1 call1 doOnNext 2 call1 doOnNext 3 call1 doOnNext 4 call1 doOnNext 5 call1 doOnNext 6 call1 doOnNext 7 call1 doOnNext 8 call2 doOnNext 2 call2 doOnNext 4 call2 doOnNext 6 call2 doOnNext 8 task3Signal c1:1, c2: 2 task3Signal c1:2, c2: 4 task3Signal c1:3, c2: 6 task3Signal c1:4, c2: 8 
 .doOnNext() 

это ваш ответ, потому что вернет ваш окончательный ответ или каждый ответ, если он кратен. Попробуй.

Посмотрите на Zip . Сделайте что-нибудь вроде Observable.zip (firstObservable, secondObservable, ….. {Task 3}