Intereting Posts
Enums и With в Котлине java.lang.VerifyError в расширении свойства с kotlin Функция rxjava :: zip возвращает пустой результат MediaStore.Images получить полное изображение с большого пальца Uri / id Плагин JaCoCo Gradle сообщает об уровне покрытия, равном 0,0, для каждой упаковки Cloud Firestore – QuerySnapshot.toObjects выбрасывает ненулевой указатель Android numberPicker, передающий виджет Android в качестве значения Как сделать плагин идеи в градле генерировать правильную конфигурацию проекта для Kotlin? Kotlin 'небезопасный вызов ошибки компиляции с помощью NULL-приемника после нулевой проверки Как зарегистрировать InstanceCreator с Gson в Котлине? Как получить пакет kotlin путем отражения kotlin null указатель execption Каков наилучший способ обработки нулевых ситуаций в Kotlin при расширении класса Java? RecyclerView.Adapter- Ошибка: публичные функции раскрывают свой внутренний тип возврата в Котлине Spring Boot: изменить идентификатор заполнителя

Как управлять потоком без .flatMap, который разбивает реактивный поток, предотвращающий работу операторов, таких как distinctUntilChanged, от работы со всем потоком

Я хочу обращаться с другой наблюдаемой цепью логики для разных реализаций State . Это может быть легко достигнуто запечатанным классом / алгебраическим типом данных / union + .flatMap() , но это разбивает поток, где такие операторы, как .distinctUntilChanged() работают только в функции .flatMap() , а не на весь поток ,

 sealed class State { object Loading : State() data class Loaded(val value: Int) : State() } @Test fun distinctTest() { val relay = PublishRelay.create<State>() relay.flatMap { fun handle(state: State): Observable<*> = when (state) { State.Loading -> Observable.just(state) .distinctUntilChanged() .doOnNext { println("loading") } is State.Loaded -> Observable.just(state) .distinctUntilChanged() .doOnNext { println(it.value) } } handle(it) } .subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) //desired: loading, 1, 2, 3 //actual: loading, 1, 2, 3, 3 } 

Обратите внимание, что это упрощенный пример. Хотя я просто печатаю здесь, я действительно хочу выполнять разные действия (визуализировать пользовательский интерфейс по-разному) на основе типа реализации State

Это может быть достигнуто с помощью объекта / реле, но это создаст отключенный, изменяемый поток, который я также хотел бы избежать.

Solutions Collecting From Web of "Как управлять потоком без .flatMap, который разбивает реактивный поток, предотвращающий работу операторов, таких как distinctUntilChanged, от работы со всем потоком"

Не могли бы вы разделить Observable на несколько наблюдаемых, каждый из которых получал события одного типа? Затем вы можете выполнить некоторые операции над этими наблюдаемыми, прежде чем снова объединить их снова.

Я не могу проверить это прямо сейчас, поэтому может потребоваться несколько настроек. Во всяком случае, я надеюсь, что вы получите эту идею здесь:

 @Test fun distinctTest() { val relay = PublishRelay.create<State>() val loadingObs = relay.filter { it is State.Loading } .distinctUntilChanged() .doOnNext { println("loading") } val loadedObs = relay.filter { it is State.Loaded } .distinctUntilChanged() .doOnNext { println(it.value) } val merged = loadingObs.mergeWith(loadedObs) merged.subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) // Hopefully prints this: loading, 1, 2, 3 }