写于 2017年 05 月 24 日

public interface Api{
    void queryPhoto(String query,QueryCallback QueryCallback);
    void store(Photo photo,StoreCallback storeCallback);
    interface QueryCallback{
        void onQuerySuccess(List<Photo> photoList);
        void onQueryFailed(Exception e);
    }
    interface StoreCallback{
        void onCatStored(Uri uri);
        void onStoredFailed(Exception e);
    }
}
RxJob<List<Photo>> queryPhotoJob = new RxJob<List<Photo>>(){
  				@Override
                public void doJob(final RxCallback<List<Photo>> rxCallback) {
                    mApi.queryPhoto("flyer", new Api.QueryCallback() {
                        @Override
                        public void onQuerySuccess(List<Photo> photoList) {
                            rxCallback.onNext(photoList);
                        }
                      

                        @Override
                        public void onQueryFailed(Exception e) {
                            rxCallback.onError(e);
                        }
                    });
                }
            };
};

RxJob<List<Photo>> storePhotoJob = new RxJob<Uri>() {
            @Override
            public void doJob(RxCallback<Uri> rxCallback) {
                mApi.store(photo, new Api.StoreCallback() {
                    @Override
                    public void onCatStored(Uri uri) {
                        rxCallback.onNext(uri);
                    }

                    @Override
                    public void onStoredFailed(Exception e) {
			rxCallback.onError(e);
                    }
                });
            }
        };


queryPhotoJob.map(photoList -> { return getBestPhoto(photoList);})
  	.flatMap(photo -> { return storePhotoJob;} )
	.map(photo -> {return photo.getUrl();})
	.doJob(new RxCallback<String>() {
            @Override
            public void onNext(String s) {
		ToastUtils.show(s);
            }

            @Override
            public void onError(Exception e) {
		ToastUtils.show(e);
            }
   });
public abstract class RxJob<T>{
	public abstract void doJob(RxCallback<T> rxCallback){}
  	public RxJob map(){
        final RxJob<T> curJob = this;
        return new RxJob<R>() {
            @Override
            public void doJob(final RxCallback<R> rxCallback) {
                curJob.doJob(new RxCallback<T>() {
                    @Override
                    public void onNext(T t) {
                        R mapped = func.call(t);
                        rxCallback.onNext(mapped);
                    }

                    @Override
                    public void onError(Exception e) {
                        rxCallback.onError(e);
                    }
                });
            }
        };
    }
  	public RxJob flatMap(){...};
}

flatMap 的实现

前一篇文章介绍了链式调用的形成以及 map 方法的实现,map 方法实现的是 T -> R 的类型转换

如何实现 RxJava 的链式调用 -- map 方法的实现

但如果 map 方法中需要实现 一个异步方法呢?

queryPhotoJob.map(photoList -> { return getBestPhoto(photoList);})
		.map(photo -> {mApi.store(photo)} )
  		.map(uri -> {loadImage(uri)})

注意 mApi.store(photo) 是异步任务,无法直接返还一个uri 类型。

于是需要用到 flatMap 方法。

flatMap 同样需要用到 R call(T t) 这个转变函数

首先回头考虑,我们最开始的第一个异步方法是如何实现的,也就是 queryPhotoJob 的创建

执行是在 doJob() 方法中执行了 mApi.queryPhoto() 然后调用 rxCallback.onNext() 继续往下走

代码如下:

RxJob<List<Photo>> queryPhotoJob = new RxJob<List<Photo>>(){
  				@Override
                public void doJob(final RxCallback<List<Photo>> rxCallback) {
                    mApi.queryPhoto("flyer", new Api.QueryCallback() {
                        @Override
                        public void onQuerySuccess(List<Photo> photoList) {
                            rxCallback.onNext(photoList);
                        }
                      

                        @Override
                        public void onQueryFailed(Exception e) {
                            rxCallback.onError(e);
                        }
                    });
                }
            };
};

由此考虑,flatMap 方法中也可以把那个异步方法处理成一个 RxJob 对象

然后让他把回调往返还的 new RxJob() 中的 doJob 方法中 的 Callback 往下一个 RxJob 传递

public <R> RxJob<R> flatMap(final Func<T,RxJob<R>> func){
    final RxJob<T> curJob = this;
    return new RxJob<R>() {
        @Override
        public void doJob(final RxCallback<R> rxCallback) {
            curJob.doJob(new RxCallback<T>() {
                @Override
                public void onNext(T t) {
                    // 此处是最大的区别,外部函数的转换
                    RxJob<R> flatMapped = func.call(t);
                  	// 得到 flatMapped ,然后调用
                    flatMapped.doJob(new RxCallback<R>() {
                        @Override
                        public void onNext(R r) {
                            // 把需要变换出的类型往下一个任务传递
                          	// 此处和 map 方法处理一致
                            rxCallback.onNext(r);
                        }

                        @Override
                        public void onError(Exception e) {
                            rxCallback.onError(e);
                        }
                    });
                }

                @Override
                public void onError(Exception e) {
                    rxCallback.onError(e);
                }
            });
        }
    };
}

mapflatMap 的对比

flatMapmap 最大的区别就是返还的 RxJob 对象中 curJob.doJob 中回调的区别

flatMap 方法核心部分

 // 此处是最大的区别
 RxJob<R> flatMapped = func.call(t);
 flatMapped.doJob(new RxCallback<R>() {
   @Override
   public void onNext(R r) {
   	rxCallback.onNext(r);
   }

  @Override
  public void onError(Exception e) {
       rxCallback.onError(e);
  }
});

map 方法核心部分

R mapped = func.call(t);
rxCallback.onNext(mapped);

flatMap 在回调完以后,运行的是 flatMapped.doJob()

并且把当前 RxJob 对象 rxCallback 的回调放到了flatMapped.doJob 中的回调用

从而使得下一个数据 r 是在运行完 flatMapped 任务后生成的。

代入此处就是 r 就是 mApi.store(photo,callback) 中的 photo

解释为什么 map 函数无法实现 flatMap 的功能?

flatMap 中的转变函数的具体实现是 T -> RxJob<R> 的转换

带入当前例子就是 Photo —> RxJob<String>

注意,之前的 map 方法是 List —> Photo 而不是 List<Photo> -> Photo

因为 map 方法的入参是 Func<T,R> func ,而不是 Func<List<T>,R> func

flaMap 函数的入参则是 Func<T,RxJob<R>> func,写死了第二个参数类型必须是 RxJob<R> ,然后 RxJob<R> 方法会被执行

做到了异步中的异步,同样也说明了入参 Func<?,?> func 会决定转换函数的功能,从而影响整个变换的不同

mapflatMap 的区别就如下图

					map
	  A ------------>  B/[B]		
    
												
					flatMap							
	 [A]------------> [B]									 

其核心在于类型的变换,即 Func<?,?> func也就是函数式编程中的 Functor

不同的 func 就会实现不同的变换, flatMap 就是一个 Monad

当其运用到 ReactiveX 上时,其转换核心就是数据流,也就做到了 Observe/Subscribe 的模式,因为在每一次变换后数据始终存在,对应的操作也可触发。不过 RxJava 还有更多其他的东西,不仅仅是这两个变换。但基本上理解这两个变换就能举一反三了,理解其他变换

RxJava 中的 map/flatMap 也是创建对于的 Observable 对象,然后调用对应的函数,然后返回需要的数据

其中 map 方法中的入参 Function,只会在各种类型中转换

flatMap 中的入参 Function 必然会转换出一个 Observable<R> 类型

RxJava 2 转换出的是一个 ObservableSource<R>

官方解释如下,non-backpressured Observable,可以理解为类似 Observable 类型,此处和 backpressured 的一些概念有关联,不作解释。

Represents a basic, non-backpressured {@link Observable} source base interface