kotlin получает подписчика для наблюдения наблюдаемого с помощью RxJava2

Android Studio 3.0 Beta2 

Я создал 2 метода один, который создает наблюдаемый, а другой, который создает подписчика.

Тем не менее, у меня возникает проблема, чтобы заставить подписчика подписаться на наблюдаемый. В Java это сработает, и я пытаюсь заставить его работать в Котлине.

В моем методе onCreate (..) я пытаюсь установить это. Это правильный способ сделать это?

 class MainActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) /* CANNOT SET SUBSCRIBER TO SUBCRIBE TO THE OBSERVABLE */ createStringObservable().subscribe(createStringSubscriber()) } fun createStringObservable(): Observable<String> { val myObservable: Observable<String> = Observable.create { subscriber -> subscriber.onNext("Hello, World!") subscriber.onComplete() } return myObservable } fun createStringSubscriber(): Subscriber<String> { val mySubscriber = object: Subscriber<String> { override fun onNext(s: String) { println(s) } override fun onComplete() { println("onComplete") } override fun onError(e: Throwable) { println("onError") } override fun onSubscribe(s: Subscription?) { println("onSubscribe") } } return mySubscriber } } 

Большое спасибо за любые предложения,

обратите особое внимание на типы.

Observable.subscribe() имеет три основных варианта:

  • тот, который не принимает аргументов
  • несколько, которые принимают io.reactivex.functions.Consumer
  • тот, который принимает io.reactivex.Observer

тип, который вы пытаетесь подписаться в вашем примере, – org.reactivestreams.Subscriber (определяется как часть спецификации реактивных потоков). вы можете обратиться к документам, чтобы получить более полный учет этого типа, но достаточно сказать, что он несовместим ни с одним из перегруженных методов Observable.subscribe() .

вот модифицированный пример вашего createStringSubscriber() , который позволит компилировать ваш код:

 fun createStringSubscriber(): Observer<String> { val mySubscriber = object: Observer<String> { override fun onNext(s: String) { println(s) } override fun onComplete() { println("onComplete") } override fun onError(e: Throwable) { println("onError") } override fun onSubscribe(s: Disposable) { println("onSubscribe") } } return mySubscriber } 

все изменилось:

  1. это возвращает тип Observer (вместо Subscriber )
  2. onSubscribe() передается Disposable (вместо Subscription )

.. и, как упоминалось в «Vincent Mimoun-Prat», синтаксис лямбда действительно может сократить ваш код.

  override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) // Here's an example using pure RxJava 2 (ie not using RxKotlin) Observable.create<String> { emitter -> emitter.onNext("Hello, World!") emitter.onComplete() } .subscribe( { s -> println(s) }, { e -> println(e) }, { println("onComplete") } ) // ...and here's an example using RxKotlin. The named arguments help // to give your code a little more clarity Observable.create<String> { emitter -> emitter.onNext("Hello, World!") emitter.onComplete() } .subscribeBy( onNext = { s -> println(s) }, onError = { e -> println(e) }, onComplete = { println("onComplete") } ) } 

Надеюсь, это поможет!

Взгляните на RxKotlin , что упростит много вещей и сделает код более кратким.

 val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon") list.toObservable() // extension function for Iterables .filter { it.length >= 5 } .subscribeBy( // named arguments for lambda Subscribers onNext = { println(it) }, onError = { it.printStackTrace() }, onComplete = { println("Done!") } )