Прослушивание сообщений и написание команд по наблюдаемому потоку

У меня есть метод, который открывает соединение ( Single<Connection> ) и другое, которое возвращает байты ( Observable<Byte> ), которые должны быть записаны в соединение при открытии:

 interface Connection { fun read(): Observable<Byte> fun write(command: Byte) } fun openConnection(): Single<Connection> // opens a single connection fun toBeWritten(): Observable<Byte> // output messages from elsewhere 

Проблема : я хочу сделать наблюдаемый поток, который считывает входящие сообщения и в то же время подписывается toBeWritten() и записывает испущенные значения в соединение.

Пока у меня есть работа со следующим трюком: объединение входного потока с выходом с использованием withLatestFrom и запись с doOnNext .

 openConnection() .toObservable() .flatMap { connection -> connection.read() .withLatestFrom( toBeWritten() .doOnNext { connection.write(it) } .startWith(0), BiFunction { readMe: Byte, writeMe: Byte -> readMe } ) } .retry() .subscribe { handleReadMessage(it) } 

startWith(0) после doOnNext необходим, или ничего не будет прочитано, пока toBeWritten() не toBeWritten() что-то.

Вопрос : есть ли лучшее решение этой проблемы? этот человек чувствует себя взломанным .

Бонус : минусы этого решения? (утечки памяти? и т. д.?)

Intereting Posts
Как войти в классы презентатора в Котлине? Покрытие кода Котлина в конвейере CI Создайте пользовательский кинжал 2 с помощью Kotlin Лучший способ применить преобразование к каждому элементу в списке одновременно в Котлине Как разрушить конструктор в Котлине Количество цифр в Котлине Равномерное авторазведение нескольких текстовых элементов Как инициализировать List <T> в Котлине? Как разрешить нарушение ограничения конечного ограничения в Котлине? Выбранный в настоящее время отладчик Java не поддерживает точки останова типа «Точки останова линии Котлин», где поместите этот код в kotlin / firebasedatabase.getinstance (). setpersistenceenabled (true); Зачем использовать разницу между объявлением и сайтом Тест интеграции с весной – аутентификацияПриглашение не вводится Интервал Рабочего Планировщика называется слишком нерегулярным Фрагмент: изменение видимости в функции