Прослушивание сообщений и написание команд по наблюдаемому потоку

У меня есть метод, который открывает соединение ( Single<Connection> ) и другое, которое возвращает байты ( Observable<Byte> ), которые должны быть записаны в соединение при открытии:

 interface Connection { fun read(): Observable<Byte> fun write(command: Byte) } fun openConnection(): Single<Connection> // opens a single connection fun toBeWritten(): Observable<Byte> // output messages from elsewhere 

Проблема : я хочу сделать наблюдаемый поток, который считывает входящие сообщения и в то же время подписывается toBeWritten() и записывает испущенные значения в соединение.

Пока у меня есть работа со следующим трюком: объединение входного потока с выходом с использованием withLatestFrom и запись с doOnNext .

 openConnection() .toObservable() .flatMap { connection -> connection.read() .withLatestFrom( toBeWritten() .doOnNext { connection.write(it) } .startWith(0), BiFunction { readMe: Byte, writeMe: Byte -> readMe } ) } .retry() .subscribe { handleReadMessage(it) } 

startWith(0) после doOnNext необходим, или ничего не будет прочитано, пока toBeWritten() не toBeWritten() что-то.

Вопрос : есть ли лучшее решение этой проблемы? этот человек чувствует себя взломанным .

Бонус : минусы этого решения? (утечки памяти? и т. д.?)