1. RxJava链式编程流程分析
RxJava中两个常用的可观察的数据源类型:Single和Observable。
1.1. Single
根据以下代码片段,分析跟踪调用堆栈。
Single<String> singleString = Single.just("test"); // step1
singleString.subscribeOn(Schedulers.io()) // step2
.observeOn(AndroidSchedulers.mainThread()) // step3
.subscribe(new SingleObserver<String>() { // step4
@Override
public void onSubscribe(Disposable d) {
print("onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onSuccess(String s) {
print("onSuccess: " + s);
print("onSuccess: " + Thread.currentThread().getName());
}
// ...
});
Result:
onSubscribe: Thread-1
onSuccess: test
onSuccess: main
- Single.just返回一个SingleJust对象,其中包含了数据源。
- subscribeOn返回一个SingleSubscribeOn对象,它指定了数据源分发事件所在的线程。
- observeOn返回一个SingleObserveOn对象,它指定了订阅者接收事件所在的线程。
- subscribe方法开启订阅事件。
分析调用堆栈流程如下:
1.2. Observable
根据以下代码片段,分析跟踪调用堆栈。
Observable<Integer> observableIntegerArray = Observable.fromArray(1, 2); // step1
observableIntegerArray.subscribeOn(Schedulers.io()) // step2
.map(new Function<Integer, Integer>() { // step3
@Override
public Integer apply(Integer integer) throws Exception {
print("map.apply: " + Thread.currentThread().getName());
return integer.intValue() * 10;
}
})
.observeOn(AndroidSchedulers.mainThread()) // step4
.subscribe(new Observer<Integer>() { // step5
@Override
public void onSubscribe(Disposable d) {
print("onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(Integer integer) {
print("Observer.onNext: " + Thread.currentThread().getName());
print("Observer.onNext: " + integer.intValue());
}
@Override
public void onComplete() {
print("Observer.onComplete: " + Thread.currentThread().getName());
}
// ...
});
Result:
onSubscribe: Thread-1
map.apply: RxCachedThreadScheduler-3
map.apply: RxCachedThreadScheduler-3
Observer.onNext: main
Observer.onNext: 10
Observer.onNext: main
Observer.onNext: 20
Observer.onComplete: main
- Observable.fromArray返回一个ObservableFromArray对象,其中包含了数据源。
- subscribeOn返回一个ObservableSubscribeOn对象,它指定了数据源分发事件所在的线程。
- map方法返回一个ObservableMap对象,它指定了map数据转换操作。
- observeOn返回一个ObservableObserveOn对象,它指定了订阅者接收事件所在的线程。
- subscribe方法开启订阅事件。
分析调用堆栈流程如下:
1.3. Flow
根据以上两个流程的总结分析,RxJava中的各种数据转换操作,以及线程切换流程如下:
1.4. 总结
RxJava的每一个数据转换操作,或者线程切换,都会返回一个对应的Observable或者Single类型的可观察对象,该对象的source则为上一个对象,即调用者,此为upstream链。
调用者调用subscribe方法的时候,则会创建一个Observer观察者对象,subscribe方法沿着upstream链上的Source,一层一层向上调用到最顶层的Source,这个过程中生成一个Observer的downstream链。
最顶层的Source开始执行任务操作,执行完成后,调用downstream链的onNext或者onSuceess,一层一层回调最终到调用者。