вызывать часть потока один раз с несколькими подписчиками?

У меня есть пользовательский адаптер Rx для связи сокетов. Вне этого я наблюдаю Flowable с сообщениями. Затем у меня есть менеджер, который обрабатывает каждое сообщение, а затем выделяет его дальше.

fun observeSocket() = socketManager .observe() .doOnNext{ insideMessageHandler.handle(it) } 

Затем у меня есть два подписчика, которые наблюдают за Socket (). Subscribe ()

Проблема в том, что с каждым сообщением внутриMessageHandler.handle (it) вызывается дважды. Я хочу найти способ, в котором часть потока будет распространена для каждого абонента. К сожалению оператор .share () в конце watchSocket () не работает.

У меня есть что-то вроде этого дважды:

  /onNextInside Flowable/-onNextOutsideSubscriber1 Flowable\-onNextOutsideSubscriber2 \-onNextInside 

И я хочу иметь что-то вроде этого:

  /-onNextInside Flowable/-onNextOutsideSubscriber1 \-onNextOutsideSubscriber2 

В коде это выглядит

 insideManager.observeSocket().subscribe({do something first}) insideManager.observeSocket().subscribe({do something second}) 

Проблема в том, что в этом случае у меня onNextInside называется дважды

Возможно ли это?

Проблема заключается в воссоздании наблюдаемого:

 fun observeSocket() = socketManager .observe() .doOnNext{ insideMessageHandler.handle(it) } 

При каждом вызове функции observeSocket() вы создаете новую цепочку, поэтому размещение share() не изменит ситуацию.

Вместо этого определите эту цепочку как общий синглтон:

 private val _observeSocket = socketManager .observe() .doOnNext{ insideMessageHandler.handle(it) } .share() fun observeSocket() = _observeSocket 

Для этого случая у вас есть предметы.

 var yourSubject = PublishSubject<Type>.create() fun observeSocket() = socketManager.doOnNext(yourSubject::onNext) 

Затем вы можете наблюдать за своим наблюдателем () и другим / несколькими абонентами по этому вопросу.

Если вы хотите поделиться выпуском одного Observables, вы можете использовать оператор share (), который гарантирует, что даже когда подписчики подписываются подписчиками, он создает только один экземпляр и делится испускаемыми данными, используя

 fun observeSocket() = socketManager .doOnNext(insideMessageHandler::handle) .share() 

Во всяком случае, это нехорошая утилита для реактивных вещей, чтобы плоские / излучать другие наблюдаемые в doOnNext для этой цели.

Лучше поделитесь одним Flowable и подписывайте несколько раз и планируйте значения в соответствии с вашими потребностями, вместо того, чтобы помещать все в одно текущее, поскольку, если в потоке есть ошибка, весь поток может не выпустить больше данных.