Как преобразовать многие AsyncTasks в Rx Observables в Android?

Я использую API-интерфейс Facebook для поиска пользователей, которым нравятся мои сообщения. Вся логика реализуется в несколько шагов:

  1. Найти все сообщения от asus
  2. Преобразуйте этот AT в Rx.Observable
  3. Карта наблюдений GraphResponse to List (Post is POJO)
  4. Вызывайте оператор FlatMap и в нем метод вызова тела, который выполняет итерацию каждого сообщения и делает asyncTask, а затем преобразовывает его в Observable и помещает в Array.
  5. Объединить массив наблюдаемых в одном наблюдаемом.
  6. Сопоставьте свой GraphResponse с Liked Profile
  7. Подписаться и сделать список любителей
  8. PROFIT!

И у меня есть небольшая проблема на шаге 4-5. Пожалуйста, посмотрите метод «любит» в репозитории. В комментариях я писал проблемы

Подсказка : я использую MVP + Clean Architecture с репозиторием (уровень данных) и Interactor (бизнес-уровень)

class FacebookRepository { private val facebook = Facebook.instance() private val gson = GsonBuilder().create() fun posts(): Observable<GraphResponse>? { return RxDecorator<GraphResponse>().decorate(Observable.defer({ val request = GraphRequest( facebook.token, "/me/posts", null, HttpMethod.GET, GraphRequest.Callback { /* handle the result */ } ) Observable.just(request.executeAndWait()) })) } fun setFaceBookAccessToken(currentAccessToken: AccessToken?) { facebook.token = currentAccessToken } fun logout() { facebook.logout() } fun token(): String? { return facebook.token?.token } fun likes(posts: List<Post>?): Observable<List<Profile>> { Log.d("observables:posts", posts.toString()) val p = iterateObservables(posts) // STOP HERE and WAIT to complete this method. // Then p is composite - merge and return return Observable.merge(p).map { Log.d("merge:posts", it.toString()) val profiles = gson.fromJson<List<Profile>>( it.jsonObject["data"].toString(), object : TypeToken<List<Profile>>() {}.type ) return@map profiles } } private fun iterateObservables(posts: List<Post>?): MutableList<Observable<GraphResponse>>? { val observables: MutableList<Observable<GraphResponse>>? = null Log.d("iterateObs:posts", posts.toString()) Log.d("posts_not_null", (posts != null).toString()) Log.d("posts.size", posts?.size.toString()) if (posts != null) { for (post in posts) { Log.d("iterateObs:post", post.toString()) val request = GraphRequest( AccessToken.getCurrentAccessToken(), "/${post.id}/likes", null, HttpMethod.GET, GraphRequest.Callback { Log.d("fb:post:id", "${post.id}: ${it.jsonObject}") }).executeAsync() Log.d("obs:after:post", observables.toString()) } } return observables } } 

И это Interactor

 class FacebookInteractor { private val callbackManager = com.facebook.CallbackManager.Factory.create() private val repository = FacebookRepository() fun facebookAuth(view: IMainView) { LoginManager .getInstance() .logInWithReadPermissions( view.getContext() as MainActivity, Arrays.asList("user_friends", "user_likes", "user_posts", "public_profile", "email") ) } fun onFacebookLoginResult(requestCode: Int, resultCode: Int, data: Intent) { callbackManager.onActivityResult(requestCode, resultCode, data) } fun facebookAccessTokenChanged(oldAccessToken: AccessToken?, currentAccessToken: AccessToken?) { if(oldAccessToken?.token != currentAccessToken?.token) { repository.setFaceBookAccessToken(currentAccessToken) } } fun likes(): Observable<List<Profile>>? { return repository.posts()?.map { val gson = GsonBuilder().create() val posts = gson.fromJson<List<Post>>( it.jsonObject["data"].toString(), object : TypeToken<List<Post>>() {}.getType() ) return@map posts }?.flatMap { return@flatMap repository.likes(it) } } fun logout() { repository.logout() } fun isLogined(): Boolean { return repository.token() != null } } 

Я использую Kotlin как язык разработки.

Спасибо EveryOne!) Но ответ был в другом случае) Проблема была в логике, а не в реализации). У Facebook SDK есть RequestBatch И нет необходимости в будущем или остановке Threads)

 package com.github.scrobot.likes_listener.data.facebook import android.util.Log import com.facebook.* import com.github.scrobot.likes_listener.data.facebook.models.Facebook import com.github.scrobot.likes_listener.data.facebook.models.Post import com.github.scrobot.likes_listener.data.facebook.models.Profile import com.google.gson.GsonBuilder import com.google.gson.reflect.TypeToken import rus.pifpaf.client.util.rx.RxDecorator import rx.Observable import java.util.* /** * Created by aleksejskrobot on 07.12.16. */ class FacebookRepository { private val facebook = Facebook.instance() private val gson = GsonBuilder().create() fun posts(): Observable<GraphResponse>? { return RxDecorator<GraphResponse>().decorate(Observable.defer({ val request = GraphRequest( facebook.token, "/me/posts", null, HttpMethod.GET, GraphRequest.Callback { /* handle the result */ } ) Observable.just(request.executeAndWait()) })) } fun setFaceBookAccessToken(currentAccessToken: AccessToken?) { facebook.token = currentAccessToken } fun logout() { facebook.logout() } fun token(): String? { return facebook.token?.token } fun likes(posts: List<Post>?): Observable<List<Profile>> { val batch = GraphRequestBatch() posts?.mapTo(batch) { post -> GraphRequest( facebook.token, "/${post.id}/likes", null, HttpMethod.GET, GraphRequest.Callback { Log.d("fb:post:id", "${post.id}: ${it.jsonObject}") } ) } return RxDecorator<List<GraphResponse>>().decorate(rx.Observable.defer({ rx.Observable.just(batch.executeAndWait()) })).map { Log.d("batchResponse:it", it.toString()) val list = ArrayList<Profile>() for (i in it) { list.addAll(gson.fromJson<List<Profile>>( i.jsonObject["data"].toString(), object : TypeToken<List<Profile>>() {}.type )) } list.filter { list.contains(it) } return@map list } } }