У меня есть метод, который открывает соединение ( Single<Connection>
) и другое, которое возвращает байты ( Observable<Byte>
), которые должны быть записаны в соединение при открытии:
interface Connection { fun read(): Observable<Byte> fun write(command: Byte) } fun openConnection(): Single<Connection> // opens a single connection fun toBeWritten(): Observable<Byte> // output messages from elsewhere
Проблема : я хочу сделать наблюдаемый поток, который считывает входящие сообщения и в то же время подписывается toBeWritten()
и записывает испущенные значения в соединение.
Пока у меня есть работа со следующим трюком: объединение входного потока с выходом с использованием withLatestFrom
и запись с doOnNext
.
openConnection() .toObservable() .flatMap { connection -> connection.read() .withLatestFrom( toBeWritten() .doOnNext { connection.write(it) } .startWith(0), BiFunction { readMe: Byte, writeMe: Byte -> readMe } ) } .retry() .subscribe { handleReadMessage(it) }
startWith(0)
после doOnNext
необходим, или ничего не будет прочитано, пока toBeWritten()
не toBeWritten()
что-то.
Вопрос : есть ли лучшее решение этой проблемы? этот человек чувствует себя взломанным .
Бонус : минусы этого решения? (утечки памяти? и т. д.?)