Invoke RxJava2 отменяемый / одноразовый из правильной нити

Я реализую наблюдаемый, который пропускает строки из Resource .

Проблема в том, что этот ресурс действительно не нравится закрываться из другого потока, на котором он был создан (он убивает щенка и выдает исключение, когда это происходит).

Когда я размещаю подписку, ресурс Cancellable / Disposable вызывается из main потока, а наблюдаемый был подписан на Schedulers.io() .

Вот код Котлина:

 fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() emitter.setCancellable { resource.close() // <-- main thread :( } try { while (!emitter.isDisposed) emitter.onNext(resource.readLine()) // <-- blocked here! } catch (ioe: IOException) { emitter.tryOnError(ioe) // <-- this also triggers the cancellable } } val disposable = lines() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.i(TAG, "Line: $it" } disposable.dispose() // <-- main thread :) 

Вопрос : Можно ли вызывать Cancellable из правильного * потока, принимая во внимание, что подписанный поток заблокирован на resource.readLine() ?

* Правильная резьба, означающая один из subscribeOn(Schedures.io()) .

EDIT : Я боюсь, что этот вопрос не имеет правильного ответа, если resource.close() является потокобезопасным или какой-то опрос на resource.dataReady реализован так, что поток не заблокирован.

Schedulers.io() управляет пулом потоков, поэтому он может или не может использовать тот же поток для размещения вашего ресурса. Вам необходимо будет использовать настраиваемый планировщик и оператор unsubscribeOn() чтобы гарантировать, что Observable подписан и отменит подписку в том же потоке. Что-то вроде:

 Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); val disposable = lines() .unsubscribeOn(customScheduler) .subscribeOn(customScheduler) .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.i(TAG, "Line: $it" } 

Если вы не возражаете, чтобы отложить вызов NetworkResource#close немного, почему бы не просто

  fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } resource.close() } catch (ioe: IOException) { emitter.tryOnError(ioe) } } 

Но по-прежнему существует проблема с этим: в случае IOException никто не вызовет NetworkResource#close (также, я думаю, также в вашем примере из вопроса).

Попытайтесь исправить это:

  fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } } catch (ioe: IOException) { emitter.tryOnError(ioe) } finally { resource.close() // try-catch here, too? } } 

или используя функцию «Kotlin-Try-With-Resources»

  fun lines(): Observable<String> = Observable.create { emitter -> NetworkResource().use { resource -> try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } } catch (ioe: IOException) { emitter.tryOnError(ioe) } } } 

Надеюсь, это поможет. Желаю хороших выходных.

А как насчет альтернативного пути?

a) путем обеспечения потоковой безопасности NetworkResource (если вы контролируете исходный код)

или

б) путем обертывания NetworkResource с помощью «агента» ? С «агентом» я имею в виду прокси-сервер, который использует внутренне выделенный поток, который выполняет все взаимодействия с NetworkResource (конструирование, readLine, close, …)