Kotlin: как преобразовать тест, который использует Thread.sleep для RxJava TestScheduler

Я пишу инструментальный тест, он проверяет, когда я кешу что-то в буфер Rx и через некоторый интервал (10 секунд) этот предмет вставляет буферизированные значения в мою базу данных Room.

Тест правилен, когда я использую Thread.sleep (syncTimeInterval). Я хочу написать этот же тест с помощью TestScheduler.

Здесь это версия Thread.sleep (которая проходит тест):

@Test fun testMultipleLogs() { val loadAllCloudCallBefore = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionBefore = appDatabase.logNewSessionDao().loadAll() assertEquals(0, loadAllCloudCallBefore.size) assertEquals(0, loadAllLogNewSessionBefore.size) Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Thread.sleep(30000) val loadAllCloudCallAfter = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionAfter = appDatabase.logNewSessionDao().loadAll() assertEquals(60, loadAllCloudCallAfter.size) assertEquals(60, loadAllLogNewSessionAfter.size) } 

И здесь этот тест не проходит, размер после ожидания времени, установленного TestScheduler, равен 0 (не 60)

 @Test fun testMultipleLogs() { var testScheduler: TestScheduler = TestScheduler() val loadAllCloudCallBefore = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionBefore = appDatabase.logNewSessionDao().loadAll() assertEquals(0, loadAllCloudCallBefore.size) assertEquals(0, loadAllLogNewSessionBefore.size) Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } testScheduler.advanceTimeBy(21, TimeUnit.SECONDS) val loadAllCloudCallAfter = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionAfter = appDatabase.logNewSessionDao().loadAll() assertEquals(60, loadAllCloudCallAfter.size) assertEquals(60, loadAllLogNewSessionAfter.size) } 

Как я могу проверить этот случай правильно? Есть ли способ?

ОБНОВИТЬ

Функции в LogManager выглядят следующим образом:

  fun logCloudCall(url: String, callGroup: String) { val logCloudCall = LogCloudCall(url = url, callGroup = callGroup, date = Converter.GENERAL_DATE_FORMAT.format(Date())) Log.v("LogManager", logCloudCall.toString()) addLog(logCloudCall) } fun logNewSession() { val logNewSession = LogNewSession( date = Converter.GENERAL_DATE_FORMAT.format(Date())) Log.v("LogManager", logNewSession.toString()) addLog(logNewSession) } fun addLog(logEvent: LogEvent) { source.onNext(logEvent) } 

И это механизм, который я использую в моем LogManager init:

  val source = PublishSubject.create<LogEvent>().toSerialized() var logRepository: LogRepository init { logRepository = LogRepositoryImpl(context) configureSubject() } fun configureSubject() { source .buffer(10, TimeUnit.SECONDS) .subscribe { bufferedData -> proceedValues(bufferedData) } } 

Проходит следующий тест:

 @Test fun foo() { val testScheduler = TestScheduler() var count = 0 Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { count++ } testScheduler.advanceTimeBy(21, SECONDS) assert(count == 20) } 

То есть, ваш тестовый код выглядит корректно, но результат неверен. Единственным неизвестным здесь является код logManager . Есть ли в этом классе потоки? Это может объяснить, почему счет еще равен 0 : у вас может быть состояние гонки.


Вероятно, это связано с вызовом buffer . buffer внутренне использует вычисление Scheduler :

 public final Observable<List<T>> buffer(long timespan, TimeUnit unit) { return buffer(timespan, unit, Schedulers.computation(), Integer.MAX_VALUE); } 

Вероятно, это приведет к проблеме с потоками, которую вы видите.