RxJava2 Опубликовать

В чем разница между

ObservableTransformer { Observable.merge( it.ofType(x).compose(transformerherex), it.ofType(y).compose(transformerherey) ) } 

а также

 ObservableTransformer { it.publish{ shared -> Observable.merge( shared.ofType(x).compose(transformerherex), shared.ofType(y).compose(transformerherey) ) } } 

когда я запускаю свой код с использованием этих двух, я получил те же результаты. Что делает публикация здесь.

Разница заключается в том, что верхний трансформатор будет подписываться на восходящий поток дважды для одной подписки из нисходящего потока, дублируя любые побочные эффекты восходящего потока, который обычно не нужен:

 Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3) .doOnSubscribe(s -> System.out.println("Subscribed!")); mixedSource.compose(f -> Observable.merge( f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) ) ) .subscribe(System.out::println); 

распечатает

 Subscribed! 2 3 4 Subscribed! A B C 

Представленный здесь побочный эффект – распечатка Subscribed! В зависимости от реальной работы в реальном источнике, это может означать отправку электронной почты дважды, дважды получая строки таблицы. В этом конкретном примере вы можете видеть, что, даже если исходные значения чередуются в своем типе, вывод содержит их отдельно.

Напротив, publish(Function) устанавливает одну подписку на источник на одного конечного абонента, поэтому любые побочные эффекты в источнике происходят только один раз.

 mixedSource.publish(f -> Observable.merge( f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) ) ) .subscribe(System.out::println); 

который печатает

 Subscribed! A 2 B 3 C 4 

потому что источник подписывается один раз, и каждый элемент является многоадресным для двух «плеч» .ofType().compose() .

publish оператор, преобразует ваш наблюдаемый в Connectable Observable .

Давайте посмотрим, что означает Connectable Observable : предположим, что вы хотите подписаться на наблюдаемое несколько раз и хотите обслуживать одни и те же элементы для каждого абонента. Вам нужно использовать Connectable Observable .

Пример:

 var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period).Publish(); observable.Connect(); observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); Thread.Sleep(period); observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); 

вывод:

 first subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 

В этом случае мы достаточно быстро подходим к подписке до публикации первого элемента, но только по первой подписке. Вторая подписка подписывается поздно и пропускает первую публикацию.

Мы могли бы переместить вызов метода Connect () до тех пор, пока не будут сделаны все подписки. Таким образом, даже при вызове Thread.Sleep мы не будем подписываться на базовую информацию до тех пор, пока не будут сделаны обе подписки. Это будет сделано следующим образом:

 var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period).Publish(); observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); Thread.Sleep(period); observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); observable.Connect(); 

вывод:

 first subscription : 0 second subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 

Таким образом, используя Completable Observable, у нас есть способ контролировать, когда можно использовать элементы Observable emit.

Пример взяты из: http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect

EDIT Согласно 180-м слайду в этой ссылке :

Другой характер публикации заключается в том, что если какой-либо наблюдатель начнет наблюдать за 10 секундами наблюдаемых начатых испускающих предметов, наблюдатель получает только те предметы, которые были выпущены через 10 секунд (во время подписки) не во всех элементах. Так что в сторонах, поскольку я понял, что публикация используется для событий пользовательского интерфейса. И совершенно разумно, что любой наблюдатель должен получать только те события, которые были выполнены после того, как он подписал НЕ все произошедшие ранее события.

Надеюсь, поможет.