RxJava Observable.create обертывание наблюдаемых подписки

Я использовал Observable.create, чтобы я мог уведомить абонента, когда были доступны определенные данные. Я немного не согласен с подпиской на наблюдаемые внутри моего метода создания. Являются ли эти вложенные подписки для меня какой-то проблемой? Я не совсем знаком с созданием наблюдаемых с помощью Observable.create, поэтому я хотел убедиться, что я не делаю ничего необычного или неправильно использую его. Заранее спасибо!

abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) { abstract fun fetchFromApi(): Single<ApiType> abstract fun fetchFromDb(): Observable<Optional<DbType>> abstract fun saveToDb(apiType: ApiType?) abstract fun shouldFetchFromApi(cache: DbType?): Boolean fun fetch(): Observable<Optional<DbType>> { return Observable.create<Optional<DbType>> { val subscriber = it fetchFromDb() .subscribe({ subscriber.onNext(it) if(shouldFetchFromApi(it.get())) { fetchFromApi() .observeOn(schedulerProvider.io()) .map { saveToDb(it) it } .observeOn(schedulerProvider.ui()) .flatMapObservable { fetchFromDb() } .subscribe({ subscriber.onNext(it) subscriber.onComplete() }) } else { subscriber.onComplete() } }) } } } 

Да, это вызовет проблемы.

Во-первых, нелогично гнездиться Observable как это, одна из сильных сторон реактивного подхода, составляет Observables и, таким образом, имеет единственный чистый поток. таким образом, вы нарушаете цепочку, и непосредственный результат – это переплетенный код, который сложнее читать, и больше кода для подключения событий уведомления, в основном это похоже на метод асинхронного обратного вызова с Observable .
здесь, поскольку у вас уже есть реактивные компоненты, вы можете просто составить их вместо того, чтобы обрабатывать их с помощью метода обратного вызова.

Во-вторых, в результате разрушения цепочки наиболее суровая и немедленная – отмена подписки на внешнее Observable не повлияет автоматически на внутреннее Observable . То же самое происходит с попыткой добавить subscribeOn() и с другим сценарием, где важно противодавление, оно также применяется.

альтернатива композиции может быть примерно такой:

 fun fetch2(): Observable<Optional<DbType>> { return fetchFromDb() .flatMap { if (shouldFetchFromApi(it.get())) { fetchFromApi() .observeOn(schedulerProvider.io()) .doOnSuccess { saveToDb(it) } .observeOn(schedulerProvider.ui()) .flatMapObservable { fetchFromDb() } } else { Observable.empty() } } } 

если по какой-то причине вы хотите, чтобы в любом случае первый результат fetchFromDb() отдельно, вы также можете сделать это с помощью publish() с селектором:

  fun fetch2(): Observable<Optional<DbType>> { return fetchFromDb() .publish { Observable.merge(it, it.flatMap { if (shouldFetchFromApi(it.get())) { fetchFromApi() .observeOn(schedulerProvider.io()) .doOnSuccess { saveToDb(it) } .observeOn(schedulerProvider.ui()) .flatMapObservable { fetchFromDb() } } else { Observable.empty() } }) } }