Как присоединиться к двум RxJava2 Obvervables по ключу?

У меня есть две несортированные наблюдаемые разные типы. Оба эти типа имеют общий ключ. Я хотел бы объединить их в новые наблюдаемые излучающие пары соответствующих элементов, и я не могу понять, как это сделать.

Обратите внимание, что некоторые из ключей могут отсутствовать. Было бы хорошо, если бы не полные пары были сброшены, но отсутствие null места вместо пропавшей части было бы еще лучше.

Вход 1:

 Entity(id = 2), Entity(id = 1), Entity(id = 4) 

Вход 2:

 Dto(id = 3), Dto(id = 2), Dto(id = 1) 

Ожидаемый результат (в любом порядке):

 Pair(Entity(id = 1), Dto(id = 1)), Pair(Entity(id = 2), Dto(id = 2)), Pair(null, Dto(id = 3)), Pair(Entity(id = 4), null) 

Во-первых, Observable.merge потоки вместе: это дает вам поток всех предметов. (В приведенном ниже коде я использовал отдельный класс для тегов каждого потока.)

Затем для каждого элемента в потоке попытайтесь сопоставить его с ранее наблюдаемым элементом другого типа и выведите пару. Если нет, сохраните его для последующего согласования.

Наконец, как только поток будет выполнен, оставшиеся непревзойденные элементы не будут сопоставлены ни с чем, поэтому они могут быть выпущены непарными.

 import io.reactivex.Observable data class Entity(val id: Int) data class Dto(val id: Int) sealed class Either<out A, out B> data class Left<A>(val value: A) : Either<A, Nothing>() data class Right<B>(val value: B) : Either<Nothing, B>() fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> { val unmatchedA = mutableMapOf<C, A>() val unmatchedB = mutableMapOf<C, B>() val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest -> when (latest) { is Left -> { val id = idA(latest.value) unmatchedB.remove(id)?.let { return@flatMap Observable.just(latest.value to it) } unmatchedA.put(id, latest.value) } is Right -> { val id = idB(latest.value) unmatchedA.remove(id)?.let { return@flatMap Observable.just(it to latest.value) } unmatchedB.put(id, latest.value) } } Observable.empty<Nothing>() } return Observable.concat(merged, Observable.create { emitter -> unmatchedA.values.forEach { emitter.onNext(it to null) } unmatchedB.values.forEach { emitter.onNext(null to it) } emitter.onComplete() }) } fun main(args: Array<String>) { val entities = Observable.just(Entity(2), Entity(1), Entity(4)) val dtos = Observable.just(Dto(3), Dto(2), Dto(1)) joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println) } 
 (Entity(id=2), Dto(id=2)) (Entity(id=1), Dto(id=1)) (Entity(id=4), null) (null, Dto(id=3)) 

Обратите внимание, что это может иметь некоторое нечетное поведение, если идентификаторы повторяются в потоке, и в зависимости от структуры потоков возможно, что это приведет к буферизации большого количества элементов в памяти.

Intereting Posts
Модуль библиотеки Android, разработанный в Kotlin, экспортируется в приложение Java, вызывающее неудачное разрешение: Lkotlin / jvm / internal / Intrinsics Включить свойство, объявленное в теле класса данных в toString () в Котлине Как получить идентификатор ресурса в фрагменте, используя kotlin в android? Единичное тестирование Rxjava наблюдаемых, которые имеют задержку Как я могу инициализировать переменную перед каждым тестом, используя kotlin-test framework Общие ограничения Котлина Как получить данные из пользовательской таблицы в Corda можно использовать println как функцию ссылки в Kotlin Как вызвать метод Java Kotlin, который называется с escape-символами? Игнорировать setter и установить свойство напрямую Можно ли включить переменную в тело условия while для Kotlin? Кастинг JSONArray в Iterable <JSONObject> – Котлин Почему я не могу использовать интерфейс как общий тип в этом Rx-трансформаторе? Как установить свойство объекта-компаньона в Котлин через отражение? Как полезна делегация Котлина?