1. RxJava链式编程流程分析

RxJava中两个常用的可观察的数据源类型:Single和Observable。

1.1. Single

根据以下代码片段,分析跟踪调用堆栈。

Single<String> singleString = Single.just("test"); // step1
singleString.subscribeOn(Schedulers.io()) // step2
.observeOn(AndroidSchedulers.mainThread()) // step3
.subscribe(new SingleObserver<String>() { // step4
    @Override
    public void onSubscribe(Disposable d) {
        print("onSubscribe: " + Thread.currentThread().getName());
    }

    @Override
    public void onSuccess(String s) {
        print("onSuccess: " + s);
        print("onSuccess: " + Thread.currentThread().getName());
    }
      // ...
});

Result:

onSubscribe: Thread-1
onSuccess: test
onSuccess: main
  1. Single.just返回一个SingleJust对象,其中包含了数据源。
  2. subscribeOn返回一个SingleSubscribeOn对象,它指定了数据源分发事件所在的线程。
  3. observeOn返回一个SingleObserveOn对象,它指定了订阅者接收事件所在的线程。
  4. subscribe方法开启订阅事件。

分析调用堆栈流程如下:

RxJava-Single

1.2. Observable

根据以下代码片段,分析跟踪调用堆栈。

Observable<Integer> observableIntegerArray = Observable.fromArray(1, 2); // step1
observableIntegerArray.subscribeOn(Schedulers.io()) // step2
        .map(new Function<Integer, Integer>() { // step3
            @Override
            public Integer apply(Integer integer) throws Exception {
                print("map.apply: " + Thread.currentThread().getName());
                return integer.intValue() * 10;
            }
        })
        .observeOn(AndroidSchedulers.mainThread()) // step4
        .subscribe(new Observer<Integer>() { // step5
            @Override
            public void onSubscribe(Disposable d) {
                print("onSubscribe: " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(Integer integer) {
                print("Observer.onNext: " + Thread.currentThread().getName());
                print("Observer.onNext: " + integer.intValue());
            }

            @Override
            public void onComplete() {
                print("Observer.onComplete: " + Thread.currentThread().getName());
            }
              // ...
        });

Result:

onSubscribe: Thread-1
map.apply: RxCachedThreadScheduler-3
map.apply: RxCachedThreadScheduler-3
Observer.onNext: main
Observer.onNext: 10
Observer.onNext: main
Observer.onNext: 20
Observer.onComplete: main
  1. Observable.fromArray返回一个ObservableFromArray对象,其中包含了数据源。
  2. subscribeOn返回一个ObservableSubscribeOn对象,它指定了数据源分发事件所在的线程。
  3. map方法返回一个ObservableMap对象,它指定了map数据转换操作。
  4. observeOn返回一个ObservableObserveOn对象,它指定了订阅者接收事件所在的线程。
  5. subscribe方法开启订阅事件。

分析调用堆栈流程如下:

avatar

1.3. Flow

根据以上两个流程的总结分析,RxJava中的各种数据转换操作,以及线程切换流程如下:

flow

1.4. 总结

RxJava的每一个数据转换操作,或者线程切换,都会返回一个对应的Observable或者Single类型的可观察对象,该对象的source则为上一个对象,即调用者,此为upstream链。

调用者调用subscribe方法的时候,则会创建一个Observer观察者对象,subscribe方法沿着upstream链上的Source,一层一层向上调用到最顶层的Source,这个过程中生成一个Observer的downstream链。

最顶层的Source开始执行任务操作,执行完成后,调用downstream链的onNext或者onSuceess,一层一层回调最终到调用者。

results matching ""

    No results matching ""