Intereting Posts
Возможно ли распространять список внутри списка в Котлине? Как заставить компилятор Kotlin рассматривать предупреждения как ошибки? Как извлечь карту из коллекции объектов, с ключом как одним из объектов того же поля и значением фактических объектов Kotlin для Android. Тост Ошибка: выполнение выполнено для задачи ': app: javaPreCompileDebug'. > java.io.IOException: не удалось удалить annotationProcessors.json Котлин и дискриминационные союзы (типы сумм) restAssured – невозможно обработать почтовый метод Как вызвать функцию Javascript из кода Котлина? Могу ли я использовать Kotlin с Codename One? Динамическое использование AnkoComponent <*>. StartActivity () Как правильно выполнять нулевые проверки с использованием функций расширения Kotlin в действии Android SonarQube: как применить несколько профилей качества к одному проекту? Я не могу инициализировать свой массив (MutableList) в классе на Android GoogleApiClient: невозможно вручную подключиться и выполнить signOut впоследствии В TornadoFX, как я могу изменить одно свойство при изменении других свойств?

Как управлять потоком без .flatMap, который разбивает реактивный поток, предотвращающий работу операторов, таких как distinctUntilChanged, от работы со всем потоком

Я хочу обращаться с другой наблюдаемой цепью логики для разных реализаций State . Это может быть легко достигнуто запечатанным классом / алгебраическим типом данных / union + .flatMap() , но это разбивает поток, где такие операторы, как .distinctUntilChanged() работают только в функции .flatMap() , а не на весь поток ,

 sealed class State { object Loading : State() data class Loaded(val value: Int) : State() } @Test fun distinctTest() { val relay = PublishRelay.create<State>() relay.flatMap { fun handle(state: State): Observable<*> = when (state) { State.Loading -> Observable.just(state) .distinctUntilChanged() .doOnNext { println("loading") } is State.Loaded -> Observable.just(state) .distinctUntilChanged() .doOnNext { println(it.value) } } handle(it) } .subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) //desired: loading, 1, 2, 3 //actual: loading, 1, 2, 3, 3 } 

Обратите внимание, что это упрощенный пример. Хотя я просто печатаю здесь, я действительно хочу выполнять разные действия (визуализировать пользовательский интерфейс по-разному) на основе типа реализации State

Это может быть достигнуто с помощью объекта / реле, но это создаст отключенный, изменяемый поток, который я также хотел бы избежать.

Не могли бы вы разделить Observable на несколько наблюдаемых, каждый из которых получал события одного типа? Затем вы можете выполнить некоторые операции над этими наблюдаемыми, прежде чем снова объединить их снова.

Я не могу проверить это прямо сейчас, поэтому может потребоваться несколько настроек. Во всяком случае, я надеюсь, что вы получите эту идею здесь:

 @Test fun distinctTest() { val relay = PublishRelay.create<State>() val loadingObs = relay.filter { it is State.Loading } .distinctUntilChanged() .doOnNext { println("loading") } val loadedObs = relay.filter { it is State.Loaded } .distinctUntilChanged() .doOnNext { println(it.value) } val merged = loadingObs.mergeWith(loadedObs) merged.subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) // Hopefully prints this: loading, 1, 2, 3 }