У меня есть следующий модульный тест, в котором я пытаюсь отправить 10 String
из разных потоков и проверить, что я получаю эти String
из одного потока. Моя проблема в том, что этот тест закрылков. Иногда это удается, но иногда я получаю только 8
или 9
элементов, и после этого тест зависает до тех пор, пока время защелки не исчезнет. Я использую SingleScheduler
неправильно? Я пропустил что-то еще?
val consumerCallerThreadNames = mutableSetOf<String>() val messageCount = AtomicInteger(0) val latch = CountDownLatch(MESSAGE_COUNT) @Test fun someTest() { val msg = "foo" val subject = PublishSubject.create<String>() subject .observeOn(SingleScheduler()) .subscribe({ message -> consumerCallerThreadNames.add(Thread.currentThread().name) messageCount.incrementAndGet() latch.countDown() }, Throwable::printStackTrace) 1.rangeTo(MESSAGE_COUNT).forEach { Thread({ try { subject.onNext(msg) } catch (t: Throwable) { t.printStackTrace() } }).start() } latch.await(10, SECONDS) assertThat(consumerCallerThreadNames).hasSize(1) assertThat(messageCount.get()).isEqualTo(MESSAGE_COUNT) } companion object { val MESSAGE_COUNT = 10 }
Если я переписал это для использования одного потока ExecutorService
тест больше не закрывается, поэтому проблема заключается либо в Rx, либо в моем отсутствии знаний о Rx.
В RxJava есть требование, которое вызывает on*
, не происходит одновременно. Это означает, что ваш код не является потокобезопасным.
Так как только сам объект используется одновременно, он должен быть закреплен путем сериализации (по сути, «синхронизированной» Java) самого субъекта с использованием метода Subject<T>.toSerialized()
.
val subject = PublishSubject.create<String>()
становится val subject = PublishSubject.create<String>().toSerialized()
.