Как присоединиться к двум RxJava2 Obvervables по ключу?

У меня есть две несортированные наблюдаемые разные типы. Оба эти типа имеют общий ключ. Я хотел бы объединить их в новые наблюдаемые излучающие пары соответствующих элементов, и я не могу понять, как это сделать.

Обратите внимание, что некоторые из ключей могут отсутствовать. Было бы хорошо, если бы не полные пары были сброшены, но отсутствие null места вместо пропавшей части было бы еще лучше.

Вход 1:

 Entity(id = 2), Entity(id = 1), Entity(id = 4) 

Вход 2:

 Dto(id = 3), Dto(id = 2), Dto(id = 1) 

Ожидаемый результат (в любом порядке):

 Pair(Entity(id = 1), Dto(id = 1)), Pair(Entity(id = 2), Dto(id = 2)), Pair(null, Dto(id = 3)), Pair(Entity(id = 4), null) 

Во-первых, Observable.merge потоки вместе: это дает вам поток всех предметов. (В приведенном ниже коде я использовал отдельный класс для тегов каждого потока.)

Затем для каждого элемента в потоке попытайтесь сопоставить его с ранее наблюдаемым элементом другого типа и выведите пару. Если нет, сохраните его для последующего согласования.

Наконец, как только поток будет выполнен, оставшиеся непревзойденные элементы не будут сопоставлены ни с чем, поэтому они могут быть выпущены непарными.

 import io.reactivex.Observable data class Entity(val id: Int) data class Dto(val id: Int) sealed class Either<out A, out B> data class Left<A>(val value: A) : Either<A, Nothing>() data class Right<B>(val value: B) : Either<Nothing, B>() fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> { val unmatchedA = mutableMapOf<C, A>() val unmatchedB = mutableMapOf<C, B>() val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest -> when (latest) { is Left -> { val id = idA(latest.value) unmatchedB.remove(id)?.let { return@flatMap Observable.just(latest.value to it) } unmatchedA.put(id, latest.value) } is Right -> { val id = idB(latest.value) unmatchedA.remove(id)?.let { return@flatMap Observable.just(it to latest.value) } unmatchedB.put(id, latest.value) } } Observable.empty<Nothing>() } return Observable.concat(merged, Observable.create { emitter -> unmatchedA.values.forEach { emitter.onNext(it to null) } unmatchedB.values.forEach { emitter.onNext(null to it) } emitter.onComplete() }) } fun main(args: Array<String>) { val entities = Observable.just(Entity(2), Entity(1), Entity(4)) val dtos = Observable.just(Dto(3), Dto(2), Dto(1)) joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println) } 
 (Entity(id=2), Dto(id=2)) (Entity(id=1), Dto(id=1)) (Entity(id=4), null) (null, Dto(id=3)) 

Обратите внимание, что это может иметь некоторое нечетное поведение, если идентификаторы повторяются в потоке, и в зависимости от структуры потоков возможно, что это приведет к буферизации большого количества элементов в памяти.

Intereting Posts
Функция Kotlin Call Javascript в Android без webView При создании интерфейса в Kotlin имеет значение, если свойства get / set? В Kotlin как передать фрагмент в FragmentManager.replace правильно Несоответствие типа приемника с макетами Anko Адаптер связывания Пикассо «сообщение было утечено» Синтаксис Котлина 'it' в контексте Volley Проблема с kotlin для Android Является ли я единственным автоответчиком IDEA, который не работает в Котлине? Предоставлять насмешливый объект другому конструктору конструктивного объекта? Класс отсутствует конструктор без аргументов, но я предоставил конструктор без аргументов Настройка установщика интерфейса в классе, реализующего его Могут ли члены данных Kotlin инициализироваться в java с помощью kotlin default getter и setter? Строительство многомодульного проекта градиента в Travis CI В Котлине, как мне интегрировать обещание Завета с ответами Elasticearch async? Kotlin lazy свойство в зависимости от другого свойства, инициализированного в init