Intereting Posts
Есть ли разница между «объектом-получателем» и «приемником расширения», Спящий режим с Kotlin: @ManyToOne (fetch = FetchType.LAZY) Цепочка ошибок Завершение после flatMapCompletable Что закрывает канал kotlinx.coroutines Котлин требует библиотеку с гребнем Выполнение команды Интерфейс vs Function Invocation Как добавить новый источник с помощью gradle kotlin-dsl как реализовать Switch, используя привязку данных в android Как идиоматически проверить непустые, непустые строки в Котлин? Невозможно инициализировать парсер объектов для модели. Продукты, не найдены приемлемые конструкторы Присвоение свойств объекту javascript с использованием динамических имен свойств PendingIntent.getBroadcast () всегда возвращает null с FLAG_UPDATE_CURRENT Синтаксическая сериализация в Котлине Ошибка или функция: Kotlin позволяет изменить «val» на «var» в наследовании Как инициализировать представление в классе фрагментов в котлин?

Как управлять потоком без .flatMap, который разбивает реактивный поток, предотвращающий работу операторов, таких как distinctUntilChanged, от работы со всем потоком

Я хочу обращаться с другой наблюдаемой цепью логики для разных реализаций State . Это может быть легко достигнуто запечатанным классом / алгебраическим типом данных / union + .flatMap() , но это разбивает поток, где такие операторы, как .distinctUntilChanged() работают только в функции .flatMap() , а не на весь поток ,

 sealed class State { object Loading : State() data class Loaded(val value: Int) : State() } @Test fun distinctTest() { val relay = PublishRelay.create<State>() relay.flatMap { fun handle(state: State): Observable<*> = when (state) { State.Loading -> Observable.just(state) .distinctUntilChanged() .doOnNext { println("loading") } is State.Loaded -> Observable.just(state) .distinctUntilChanged() .doOnNext { println(it.value) } } handle(it) } .subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) //desired: loading, 1, 2, 3 //actual: loading, 1, 2, 3, 3 } 

Обратите внимание, что это упрощенный пример. Хотя я просто печатаю здесь, я действительно хочу выполнять разные действия (визуализировать пользовательский интерфейс по-разному) на основе типа реализации State

Это может быть достигнуто с помощью объекта / реле, но это создаст отключенный, изменяемый поток, который я также хотел бы избежать.

Solutions Collecting From Web of "Как управлять потоком без .flatMap, который разбивает реактивный поток, предотвращающий работу операторов, таких как distinctUntilChanged, от работы со всем потоком"

Не могли бы вы разделить Observable на несколько наблюдаемых, каждый из которых получал события одного типа? Затем вы можете выполнить некоторые операции над этими наблюдаемыми, прежде чем снова объединить их снова.

Я не могу проверить это прямо сейчас, поэтому может потребоваться несколько настроек. Во всяком случае, я надеюсь, что вы получите эту идею здесь:

 @Test fun distinctTest() { val relay = PublishRelay.create<State>() val loadingObs = relay.filter { it is State.Loading } .distinctUntilChanged() .doOnNext { println("loading") } val loadedObs = relay.filter { it is State.Loaded } .distinctUntilChanged() .doOnNext { println(it.value) } val merged = loadingObs.mergeWith(loadedObs) merged.subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) // Hopefully prints this: loading, 1, 2, 3 }