Сбор данных с локального и удаленного одновременно с использованием RxJava

Поэтому я новичок в RxJava, но вот что я хочу сделать:

MainViewModel ведет переговоры с репозиторием. В репозитории есть и LocalDataStore (который ведет переговоры с базой данных), так и RemoteDataStore (Retrofit). Оба варианта – это разные реализации интерфейса DataStore.

То, что я хочу достичь, это иметь один вызов fetchData из репозитория, который возвращает Observable, но:

  • он сначала берет его из RemoteDataStore
  • после извлечения каждой вещи (onNext ()), она вставляет ее в базу данных
  • если он терпит неудачу, он возвращает результаты из LocalDataStore.

Однако я не знаю, как реализовать эту логику. Подписка происходит в конце ViewModel, но я не могу изменить наблюдаемый объект LocalDataStore с конца репозитория (?). Обновление данных в базе данных также возвращает Observable (Single to be exact), и для его работы ему нужна подписка.

Может кто-нибудь объяснить это мне или указать мне в хорошем направлении?

Мой код (проблема в комментариях репозитория):

Удаленное хранилище данных

override fun getData(): Observable<SomeData> = api .getData(token) .flatMapIterable { x -> x } 

Локальное хранилище данных

 override fun saveData(data: SomeData): Single<SomeData> { return database.upsert(data) } 

вместилище

  fun getData(): Observable<SomeData> { return remoteDataStore.getData() .doOnError { localDataStore.getData() //? this will invoke but nothing happens because I'm not subscribed to it } .doOnNext { saveData(it) //The same as before, nothing will happen } } 

ViewModel

 override fun fetchData() { repository.getData() .observeOn(androidScheduler) .subscribeOn(threadScheduler) .subscribe( { data: SomeData -> dataList.add(data) }, { throwable: Throwable? -> handleError(throwable) }, { //send data to view }, { disposable: Disposable -> compositeDisposable.add(disposable) } ) } 

Спасибо за ваше время.

Вам нужно использовать один из методов onErrorResumeNext . Я также предлагаю изменить тип потока с Observable на Single поскольку характер ваших данных выглядит как Get data once или throw error . Это просто хороший дизайн API.

В вашем конкретном случае я бы реализовал репозиторий таким образом:

 class RepositoryImpl @Inject constructor(private val localRepository: Repository, private val remoteRepository: Repository) : Repository { override fun getData(): Single<Data> = remoteRepository.getData() .onErrorResumeNext { throwable -> if (throwable is IOException) { return localRepository.getData() } return Single.error(throwable) } } 

Вы можете спросить, почему только ловить IOException ? Я обычно обрабатываю только это исключение, чтобы не пропустить ничего критического, но только несущественные сетевые ошибки. Если вы поймаете все исключения, которые вы можете пропустить, например, исключение NullPointerException .

onErrorResumeNext – это то, что вы ищете. doOnError вызывает побочное действие, не заменяет оригинал Observable другим.