У меня есть две несортированные наблюдаемые разные типы. Оба эти типа имеют общий ключ. Я хотел бы объединить их в новые наблюдаемые излучающие пары соответствующих элементов, и я не могу понять, как это сделать.
Обратите внимание, что некоторые из ключей могут отсутствовать. Было бы хорошо, если бы не полные пары были сброшены, но отсутствие null
места вместо пропавшей части было бы еще лучше.
Вход 1:
Entity(id = 2), Entity(id = 1), Entity(id = 4)
Вход 2:
Dto(id = 3), Dto(id = 2), Dto(id = 1)
Ожидаемый результат (в любом порядке):
Pair(Entity(id = 1), Dto(id = 1)), Pair(Entity(id = 2), Dto(id = 2)), Pair(null, Dto(id = 3)), Pair(Entity(id = 4), null)
Во-первых, Observable.merge
потоки вместе: это дает вам поток всех предметов. (В приведенном ниже коде я использовал отдельный класс для тегов каждого потока.)
Затем для каждого элемента в потоке попытайтесь сопоставить его с ранее наблюдаемым элементом другого типа и выведите пару. Если нет, сохраните его для последующего согласования.
Наконец, как только поток будет выполнен, оставшиеся непревзойденные элементы не будут сопоставлены ни с чем, поэтому они могут быть выпущены непарными.
import io.reactivex.Observable data class Entity(val id: Int) data class Dto(val id: Int) sealed class Either<out A, out B> data class Left<A>(val value: A) : Either<A, Nothing>() data class Right<B>(val value: B) : Either<Nothing, B>() fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> { val unmatchedA = mutableMapOf<C, A>() val unmatchedB = mutableMapOf<C, B>() val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest -> when (latest) { is Left -> { val id = idA(latest.value) unmatchedB.remove(id)?.let { return@flatMap Observable.just(latest.value to it) } unmatchedA.put(id, latest.value) } is Right -> { val id = idB(latest.value) unmatchedA.remove(id)?.let { return@flatMap Observable.just(it to latest.value) } unmatchedB.put(id, latest.value) } } Observable.empty<Nothing>() } return Observable.concat(merged, Observable.create { emitter -> unmatchedA.values.forEach { emitter.onNext(it to null) } unmatchedB.values.forEach { emitter.onNext(null to it) } emitter.onComplete() }) } fun main(args: Array<String>) { val entities = Observable.just(Entity(2), Entity(1), Entity(4)) val dtos = Observable.just(Dto(3), Dto(2), Dto(1)) joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println) }
(Entity(id=2), Dto(id=2)) (Entity(id=1), Dto(id=1)) (Entity(id=4), null) (null, Dto(id=3))
Обратите внимание, что это может иметь некоторое нечетное поведение, если идентификаторы повторяются в потоке, и в зависимости от структуры потоков возможно, что это приведет к буферизации большого количества элементов в памяти.