Абонент RxJava2 PublishSubject не может получать элементы при вызове из нескольких потоков, используя SingleScheduler

У меня есть следующий модульный тест, в котором я пытаюсь отправить 10 String из разных потоков и проверить, что я получаю эти String из одного потока. Моя проблема в том, что этот тест закрылков. Иногда это удается, но иногда я получаю только 8 или 9 элементов, и после этого тест зависает до тех пор, пока время защелки не исчезнет. Я использую SingleScheduler неправильно? Я пропустил что-то еще?

 val consumerCallerThreadNames = mutableSetOf<String>() val messageCount = AtomicInteger(0) val latch = CountDownLatch(MESSAGE_COUNT) @Test fun someTest() { val msg = "foo" val subject = PublishSubject.create<String>() subject .observeOn(SingleScheduler()) .subscribe({ message -> consumerCallerThreadNames.add(Thread.currentThread().name) messageCount.incrementAndGet() latch.countDown() }, Throwable::printStackTrace) 1.rangeTo(MESSAGE_COUNT).forEach { Thread({ try { subject.onNext(msg) } catch (t: Throwable) { t.printStackTrace() } }).start() } latch.await(10, SECONDS) assertThat(consumerCallerThreadNames).hasSize(1) assertThat(messageCount.get()).isEqualTo(MESSAGE_COUNT) } companion object { val MESSAGE_COUNT = 10 } 

Если я переписал это для использования одного потока ExecutorService тест больше не закрывается, поэтому проблема заключается либо в Rx, либо в моем отсутствии знаний о Rx.

В RxJava есть требование, которое вызывает on* , не происходит одновременно. Это означает, что ваш код не является потокобезопасным.

Так как только сам объект используется одновременно, он должен быть закреплен путем сериализации (по сути, «синхронизированной» Java) самого субъекта с использованием метода Subject<T>.toSerialized() .

val subject = PublishSubject.create<String>() становится val subject = PublishSubject.create<String>().toSerialized() .