Наблюдаемое значениеLatestFrom

Я реализовал псевдо-оператор под названием «FilterByLatestFrom» как функцию расширения для kotlin.

Я написал следующий код с помощью этого оператора:

fun testFilterByLatestFromOperator(){ val observableA : Observable<Int> = Observable.fromArray(1,2,3,4,5,6,7,8,9,10) val observableC : PublishSubject<Int> = PublishSubject.create() val observableB : Observable<Int> = Observable.just(2).mergeWith(observableC) observableB.subscribe { println("observableB onNext: $it") } observableA .subscribe({ println("Original : $it")}) observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 }) .subscribe({ println("Result A : $it") }) observableC.onNext(3) observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 }) .subscribe({ println("Result AC : $it") }) } 

выход:

 observableB onNext: 2 Original : 1 Original : 2 Original : 3 Original : 4 Original : 5 Original : 6 Original : 7 Original : 8 Original : 9 Original : 10 Result A : 2 Result A : 4 Result A : 6 Result A : 8 Result A : 10 observableB onNext: 3 Result AC : 2 Result AC : 4 Result AC : 6 Result AC : 8 Result AC : 10 

Я хочу, чтобы оператор фильтра отфильтровывал obsA согласно последнему значению наблюдаемого B. Он работает для первого блока, но когда я добавляю On-next с новым значением, он не изменяет результат (используйте то же последнее значение из исходного наблюдаемого).

это FilterByLatestFrom impl (это был дизайн, который также будет использоваться с Java (с составом):

 class FilterByLatestFrom<T,U>(private val observable: Observable<T>, private val biFunction: BiFunction<U, T, Boolean>){ fun filter() : ObservableTransformer<U,U> = ObservableTransformer { it .withLatestFrom( observable, BiFunction<U,T,Pair<U,Boolean>> { u, t -> Pair(u,biFunction.apply(u,t)) }) .filter { it.second } .map { it.first } } } fun <T,U> Observable<U>.filterByLatestFrom(observable: Observable<T>, biFunction: BiFunction<U, T, Boolean>) : Observable<U> = this.compose(FilterByLatestFrom(observable,biFunction).filter()) 

Что мне не хватает?

EDIT: Я думаю, что я нашел проблему: PublishSubject должен быть BehaviorSubject. и функция слияния должна быть согласована с обещанием, что obsC выйдет после obsB.

Ваш псевдо-оператор filterByLatestFrom просто прекрасен, проблема заключается в тестировании, PublishSubject будет PublishSubject только последующие элементы, поэтому, когда в вашей последней подписке («результат AC»), observableB будет испускать только 2, поскольку observableC уже выбрал 3 и будет не воспроизводить его на observableB (используя merge ).

Просто переместите observableC.onNext(3) значение observableC.onNext(3) после последней подписки (последняя строка), и вы увидите ожидаемое поведение.

EDIT: также изменение на PublishSubject как вы решили одну и ту же проблему (субъект будет воспроизводить последнее значение для новой подписки)