Изменить текучесть <Список <Obj1 >> в текущую <Список <Obj2 >> в комнате

Как я могу прочитать текучий список значений из комнаты и преобразовать его в другой объект, который представляет собой комбинацию большего количества значений из комнаты

database.leadsDao().getLeads(leadState.name) .flatMap { val len = it.size.toLong() Flowable.fromIterable(it) .flatMap { Flowable.zip( database.orderDao().getById(it.orderId), database.orderMedicineDao().getByOrderId(it.orderId), database.patientDao().getById(it.patientId), Function3<Order, List<OrderMedicine>, Patient, LeadDetail> { order, orderMedicines, patient -> LeadDetail.from(it, patient, order, orderMedicines) }) } .take(len) .toList() .toFlowable() } 

Код выше работает, но мне не нравится часть take(len) . И без него поток никогда не вызываетNext абонента. Поток продолжает ждать больше предметов, чего не должно быть, поскольку Flowable.fromIterable дает конечное число или элементы, а затем заканчивается. Т.е. код ниже не работает

 database.leadsDao().getLeads(leadState.name) .flatMap { Flowable.fromIterable(it) .flatMap { Flowable.zip( database.orderDao().getById(it.orderId), database.orderMedicineDao().getByOrderId(it.orderId), database.patientDao().getById(it.patientId), Function3<Order, List<OrderMedicine>, Patient, LeadDetail> { order, orderMedicines, patient -> LeadDetail.from(it, patient, order, orderMedicines) }) } .toList() .toFlowable() } 

Flowable.fromIterable дает конечное число или элементы, а затем заканчивается.

Но Flowable.zip внутри flatmap не закончится, поскольку объекты DAO Room испускают текущее значение И все будущие обновления, поэтому вызовы database.*() , Которые связаны друг с другом, не являются конечными. Если вы добавите .first() во внутренний Flowable.zip то и вторая версия должна работать.