1. RxAndroid使用UI线程

在RxJava中,如果要在主线程中处理一个异步任务的结果,通常使用AndroidSchedulers.mainThread()来获取一个Scheduler。

1.1. mainThread()

通过AndroidSchedulers.mainThread(),可以获取到一个Scheduler,在该Scheduler上进行的任务,都是在UI线程。

public final class AndroidSchedulers {

    private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
}

AndroidSchedulers.mainThread()方法,获取到的是一个单例的HandlerScheduler对象。

1.2. HandlerScheduler

HandlerScheduler持有一个Handler对象,线程切换就是基于这个Handler对象来实现,从上面一段代码中可以看到这个Handler对象传入的是new Handler(Looper.getMainLooper()),所以通过Handler来处理的任务,就是在Android的主线程当中执行。

final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;

    HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }

    @Override
    @SuppressLint("NewApi") // Async will only be true when the API is available to call.
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        if (async) {
            message.setAsynchronous(true);
        }
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }
}

该类还有scheduleDirect方法和createWorker方法,前者创建一个Runnable并使用该handler来处理该Runnable。后者创建一个内部类HandlerWorker。

1.3. HandlerWorker

HandlerWorker继承自Worker,并覆写了schedule方法,该方法实现的功能和HandlerScheduler.scheuleDirect基本相同。

class HandlerWorker extends Worker {
    private final Handler handler;
    private final boolean async;

    private volatile boolean disposed;

    HandlerWorker(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }

    @Override
    @SuppressLint("NewApi") // Async will only be true when the API is available to call.
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (disposed) {
            return Disposables.disposed();
        }
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.
        if (async) {
            message.setAsynchronous(true);
        }

        handler.sendMessageDelayed(message, unit.toMillis(delay));
        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }
        return scheduled;
    }
}

1.4. ScheduledRunnable

ScheduledRunnable继承自Runnable,真正的任务是由外部传入进来的Runnable,这样封装一层用于对任务进行管理,dispose()方法表示移除任务,isDisposed()方法表示任务是否移除。

private static final class ScheduledRunnable implements Runnable, Disposable {
    private final Handler handler;
    private final Runnable delegate;
    private volatile boolean disposed; // Tracked solely for isDisposed().

    ScheduledRunnable(Handler handler, Runnable delegate) {
        this.handler = handler;
        this.delegate = delegate;
    }

    @Override
    public void run() {
        try {
            delegate.run();
        } catch (Throwable t) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public void dispose() {
        handler.removeCallbacks(this);
        disposed = true;
    }

      @Override
    public boolean isDisposed() {
        return disposed;
    }  
}

1.5. Example

假设在一个线程中执行以下一段代码,通过跟踪分析它的流程来看HandlerScheduler是怎么切换到observeOn所指定的UI线程的。

Observable.fromArray("1", "2") // step1
        .observeOn(AndroidSchedulers.mainThread()) // step2
        .subscribe(new Observer<String>() { // step3
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.e(TAG, "onNext: " + s);
            }

            @Override
            public void onComplete() {
            }
        });
  1. 返回一个ObservableFromArray对象,该对象里面包含了数据源。
  2. 返回一个ObservableObserveOn对象,该对象指定了订阅者订阅事件所在的线程。
  3. subscribe开始订阅。

RxJava-Single

ObservableObserve.subscribeActual

该方法中使用scheduler.createWorker()创建一个Worker实例,然后将该Worker实例作为参数,又创建了一个ObserveOnObserver对象。

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

ObserveOnObserver.schedule

订阅事件在事件下行过程中通过该ObserveOnObserver向下分发订阅事件,schedule方法中的worker,即上面创建的HandlerWorker。

// ObserveOnObserver.schedule执行订阅事件
void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

HandlerWorker.schedule

HandlerWorker.schedule方法通过Handler发消息的方式,将Runnable任务切换到UI线程中执行。

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        // ...
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.
    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

results matching ""

    No results matching ""