1. RxJava中的Scheduler

RxJava中的Scheduler用来执行Runnable任务的调度,Schedulers类提供了几种标准的Scheduler任务调度器。

Scheduler中Runnable任务的执行,被包装成了一个Worker对象,通过createWorker()方法创建一个Worker,它是一个抽象方法,需要由子类实现。Worker是一个抽象类,子类继承它并实现它的方法。

1.1. Scheduler

public abstract class Scheduler {

    /**
     * 检索或创建新的{@link Scheduler.Worker},代表顺序执行的任务,当任务完成后,Worker应该被release。
     * 通过{@link Scheduler.Worker#dispose()}来避免内存泄漏。
     * Worker中的任务应该是有序并不重复的。
     */
    @NonNull
    public abstract Worker createWorker();

    /**
     * 调度给定的任务,没有延迟。该方法在多线程并发操作时是安全的,但是Scheduler并不保证任务被执行的顺序。
     * @return 返回一个Disposable对象,调用者可以根据需求用此对象取消执行任务。
     */
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    /**
     * 根据给定的延迟时间调度执行任务,该方法调用createWorker()获取一个Worker,根据Worker和
     * Runnable创建一个DisposeTask对象,之后调用Worker.schedule开始执行任务。
     * @return 返回一个Disposable对象,调用者可以根据需求用此对象取消执行任务。
     */

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }

    /**
     * 根据给定的时间间隔,周期性的执行任务。
     * 周期性的执行是固定的频率,第一次执行的时间延迟是{@code initialDelay}, 第二次是{@code initialDelay + period}, 第三次{@code initialDelay + 2 * period}, and so on.
     * @return the Disposable that let's one cancel this particular delayed task.
     */
    @NonNull
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
        Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }
        return periodicTask;
    }

    /**
     * 开启Scheduler,启动线程开始执行任务。
     */
    public void start() {
    }

    /**
     * 停止Scheduler中的线程,停止接收任务并且清除引用的资源。
     */
    public void shutdown() {
    }
}

1.2. Worker

Scheduler的内部类Worker表示用于执行可运行任务的独立程序。Dispose Worker时应取消所有未完成的工作,并允许资源清理。

public abstract static class Worker implements Disposable {
    /**
     * 立即执行任务,没有延迟,该方法默认的实现会调用到{@link #schedule(Runnable, long, TimeUnit)}.
     * @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
     */
    @NonNull
    public Disposable schedule(@NonNull Runnable run) {
        return schedule(run, 0L, TimeUnit.NANOSECONDS);
    }

    /**
     * 根据指定的时间延迟执行任务,该方法需要由子类实现。
     * @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
     */
    @NonNull
    public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

    /**
     * 根据指定的延迟时间和时间间隔执行任务。
     * @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
     */
    @NonNull
    public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
        final SequentialDisposable first = new SequentialDisposable();
        final SequentialDisposable sd = new SequentialDisposable(first);
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        final long periodInNanoseconds = unit.toNanos(period);
        final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
        final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
          // 创建PeriodicTask并且通过schedule方法执行该任务。
          // 在PeriodicTask中,不断的重复执行任务本身,以达到重复执行任务。
        Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd, periodInNanoseconds), initialDelay, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }
        first.replace(d);
        return sd;
    }
}

1.3. PeriodicTask

周期性执行的任务。

/**
 * 持有周期性执行的任务的状态和逻辑,用于决定什么时候执行下一次任务。
 */
final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
    @NonNull // 执行的具体任务
    final Runnable decoratedRun;
    @NonNull // 任务句柄,如果任务被取消,在该类中,需要停止执行下一次任务。
    final SequentialDisposable sd;
    final long periodInNanoseconds;
    long count;
    long lastNowNanoseconds;
    long startInNanoseconds;

    PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
            long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
        this.decoratedRun = decoratedRun;
        this.sd = sd;
        this.periodInNanoseconds = periodInNanoseconds;
        lastNowNanoseconds = firstNowNanoseconds;
        startInNanoseconds = firstStartInNanoseconds;
    }  
    @Override
    public void run() {
        decoratedRun.run();
          // 任务未被取消
        if (!sd.isDisposed()) {
            long nextTick;
            long nowNanoseconds = now(TimeUnit.NANOSECONDS);
            // If the clock moved in a direction quite a bit, rebase the repetition period
            if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
                    || nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
                nextTick = nowNanoseconds + periodInNanoseconds;
                /*
                 * Shift the start point back by the drift as if the whole thing
                 * started count periods ago.
                 */
                startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
            } else {
                nextTick = startInNanoseconds + (++count * periodInNanoseconds);
            }
            lastNowNanoseconds = nowNanoseconds;
            long delay = nextTick - nowNanoseconds;
              // 继续执行decorateRun任务。
            sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
        }
    }  
}

1.4. DisposeTask

可取消的任务,它是一个Runnable,可用于执行任务。

/**
 * 可取消的Task任务,
 */
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {

    @NonNull
    final Runnable decoratedRun;
    @NonNull
    final Worker w;
    @Nullable
    Thread runner;

    DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
        this.decoratedRun = decoratedRun;
        this.w = w;
    }

    @Override
    public void run() {
        runner = Thread.currentThread();
        try { // 执行具体的Runnable
            decoratedRun.run();
        } finally {
            dispose(); // 任务执行完成,dispose()
            runner = null;
        }
    }

    @Override
    public void dispose() {
          // 如果Worker是NewThreadWorker,则调用它的shutdown停止它,否则通过dispose停止。
        if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
            ((NewThreadWorker)w).shutdown();
        } else {
            w.dispose();
        }
    }

    @Override
    public boolean isDisposed() {
        return w.isDisposed();
    }
}

从Scheduler和Worker常用的方法及其内部类可以看出,当要执行一个任务时,首先通过Scheduler.createWorker()把任务封装成一个Worker,然后通过Scheduler.scheduleDirect() -> Worker.schedule()方法执行任务。

Scheduler.createWorker()Worker.schedule() 方法为抽象方法,需要由其子类实现。

1.5. Structure

整体架构流程图

Structure

results matching ""

    No results matching ""