Schedulers.io()方法返回一个IO任务调度器,它可以用于异步阻塞IO任务的执行。在Android中学用来处理网络请求之类的操作,它实际上是一个 IoScheduler

该类有三个内部类:

ThreadWorker : 任务封装类,它是NewThreadWorker的子类,它有一个ScheduledExecutorService用于执行任务,每一个任务都在该类中执行。

EventLoopWorker : Worker子类,Scheduler.createWorker()方法中会创建该类,该类引用了一个ThreadWorker,在schedule方法中调用ThreadWorker.scheduleActual方法执行任务。任务执行完成后将ThreadWorker缓存到CachedWorkerPool缓存池中。

CachedWorkerPool : 任务缓存池,提供了get()方法和release()方法,分别用于获取和缓存ThreadWorker。

1.1. IoScheduler

/** 任务调度Scheduler,创建并缓存一个用于执行任务的线程池。
 */
public final class IoScheduler extends Scheduler {
    private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
    static final RxThreadFactory WORKER_THREAD_FACTORY; // RxThreadFactory

    private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
    static final RxThreadFactory EVICTOR_THREAD_FACTORY;
      // 线程缓存存活时间。
    public static final long KEEP_ALIVE_TIME_DEFAULT = 60;

    private static final long KEEP_ALIVE_TIME;
    private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;

    static final ThreadWorker SHUTDOWN_THREAD_WORKER;
    final ThreadFactory threadFactory;
    final AtomicReference<CachedWorkerPool> pool; // Worker线程池

      static final CachedWorkerPool NONE;

    static {
        KEEP_ALIVE_TIME = Long.getLong(KEY_KEEP_ALIVE_TIME, KEEP_ALIVE_TIME_DEFAULT);

        SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown"));
        SHUTDOWN_THREAD_WORKER.dispose();

        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));

        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
        EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);

        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
        NONE.shutdown();
    }

    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }

    /** 根据ThreadFactory构建IoScheduler,并且启动WorkerPools缓存池。
     */
    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

    @Override
    public void start() {
          // 创建CachedWorkerPool
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }

    @Override
    public void shutdown() {
        for (;;) {
            CachedWorkerPool curr = pool.get();
            if (curr == NONE) {
                return;
            }
            if (pool.compareAndSet(curr, NONE)) {
                curr.shutdown(); // 停止pool
                return;
            }
        }
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

    public int size() {
        return pool.get().allWorkers.size();
    }

1.1.1. CachedWorkerPool

任务缓存池,提供了get()方法和release()方法,分别用于获取和缓存ThreadWorker。

static final class CachedWorkerPool implements Runnable {
    private final long keepAliveTime;
      // WorkerQueue,Worker工作队列。
    private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
    final CompositeDisposable allWorkers;
      // 线程池。
    private final ScheduledExecutorService evictorService;
    private final Future<?> evictorTask;
    private final ThreadFactory threadFactory;

    CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
        this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
        this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
        this.allWorkers = new CompositeDisposable();
        this.threadFactory = threadFactory;

        ScheduledExecutorService evictor = null;
        Future<?> task = null;
        if (unit != null) {
            evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
            task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
        }
        evictorService = evictor;
        evictorTask = task;
    }

    @Override
    public void run() {
        evictExpiredWorkers(); // 移除过期的Worker
    }

    ThreadWorker get() {
        if (allWorkers.isDisposed()) {
            return SHUTDOWN_THREAD_WORKER;
        }
        while (!expiringWorkerQueue.isEmpty()) {
              // 从缓存队列中获取ThreadWorker。
            ThreadWorker threadWorker = expiringWorkerQueue.poll();
            if (threadWorker != null) {
                return threadWorker;
            }
        }
        // 未找到缓存的Worker,创建一个新的Worker。
        ThreadWorker w = new ThreadWorker(threadFactory);
        allWorkers.add(w);
        return w;
    }

    void release(ThreadWorker threadWorker) {
        // 刷新threadWorker过期时间,并把它放到缓存池中。
        threadWorker.setExpirationTime(now() + keepAliveTime);
        expiringWorkerQueue.offer(threadWorker);
    }

    void evictExpiredWorkers() { // 移除过期的Worker
        if (!expiringWorkerQueue.isEmpty()) {
            long currentTimestamp = now();
            for (ThreadWorker threadWorker : expiringWorkerQueue) {
                  // 如果threadWorker的过期时间小于当前时间,则从queue中移除。
                if (threadWorker.getExpirationTime() <= currentTimestamp) {
                    if (expiringWorkerQueue.remove(threadWorker)) {
                        allWorkers.remove(threadWorker);
                    }
                } else {
                    // Queue is ordered with the worker that will expire first in the beginning, so when we
                    // find a non-expired worker we can stop evicting.
                    break;
                }
            }
        }
    }

    long now() {
        return System.nanoTime();
    }

    void shutdown() { // 停止
        allWorkers.dispose();
        if (evictorTask != null) {
            evictorTask.cancel(true);
        }
        if (evictorService != null) {
            evictorService.shutdownNow();
        }
    }
}

1.1.2. EventLoopWorker

Worker子类,Scheduler.createWorker()方法中会创建该类,该类引用了ThreadWorker,在schedule方法中调用ThreadWorker.scheduleActual方法执行任务。任务执行完成后将ThreadWorker缓存到CachedWorkerPool缓存池中。

static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool; // 缓存池
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        this.threadWorker = pool.get(); // 从缓存池中获取一个ThreadWorker对象
    }

    @Override
    public void dispose() {
        if (once.compareAndSet(false, true)) {
            tasks.dispose();
            // 将threadWorker释放并放到缓存池pool中
            pool.release(threadWorker);
        }
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
}

1.1.3. ThreadWorker

任务封装类,它是NewThreadWorker的子类,每一个任务都被封装成该类。

static final class ThreadWorker extends NewThreadWorker {
    private long expirationTime; // 过期时间

    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }

    public long getExpirationTime() {
        return expirationTime;
    }

    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}

1.2. NewThreadWorker

该类内部有一个ScheduledExecutorService,用于执行Runnable任务。它是Worker的子类,实现了schedule(Runnable action, long delayTime, TimeUnit unit)方法,并调用scheduleActual()方法执行action任务,该方法中创建了ScheduledRunnable对象并将创建出的对象交由executor执行。

ScheduledRunnable实现了三个接口: Runnable, Callable<Object>, Disposable。目的是用于包装Runnable任务,以及实现任务的管理(通过Feature管理任务,使得可以通过cancel方法取消任务)。

/**
 * Base class that manages a single-threaded ScheduledExecutorService as a
 * worker but doesn't perform task-tracking operations.
 */
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;
      // 任务是否取消
    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) { // 如果任务已取消,则返回 EmptyDisposable
            return EmptyDisposable.INSTANCE;
        }
          // 调用scheduleActual执行任务。
        return scheduleActual(action, delayTime, unit, null);
    }

    /** 调度给定的Runnable到executor中去执行,并且将执行的feature包装到一个Disposable对象中返回。
     * @return the ScheduledRunnable instance
     */
    public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
        ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
        try {
            Future<?> f;
            if (delayTime <= 0L) {
                f = executor.submit(task);
            } else {
                f = executor.schedule(task, delayTime, unit);
            }
            task.setFuture(f);
            return task;
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }
    }

    /** 将Runnable任务包装成一个ScheduledRunnable对象,并将它交给ScheduledExecutorService执行。
     * @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled
     * @return 返回ScheduledRunnable对象,该类实现了 Runnable, Callable<Object>, Disposable。
     */
    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
          // 创建 ScheduledRunnable
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
        Future<?> f;
        try { // 执行任务,并将feature设置给ScheduledRunnable。
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }

    @Override
    public void dispose() {
        if (!disposed) {
            disposed = true;
            executor.shutdownNow();
        }
    }

    /**
     * Shuts down the underlying executor in a non-interrupting fashion.
     */
    public void shutdown() {
        if (!disposed) {
            disposed = true;
            executor.shutdown();
        }
    }

    @Override
    public boolean isDisposed() {
        return disposed;
    }
}

1.3. ScheduledRunnable

调度任务,它实现了Runnable,Callable,Disposable。

public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable {
    final Runnable actual;

    /** Indicates that the parent tracking this task has been notified about its completion. */
    static final Object PARENT_DISPOSED = new Object();
    /** Indicates the dispose() was called from within the run/call method. */
    static final Object SYNC_DISPOSED = new Object();
    /** Indicates the dispose() was called from another thread. */
    static final Object ASYNC_DISPOSED = new Object();

    static final Object DONE = new Object();

    static final int PARENT_INDEX = 0;
    static final int FUTURE_INDEX = 1;
    static final int THREAD_INDEX = 2;

    /** 根据Runnable任务创建实例,并且设置可选的Parent。*/
    public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
        super(3);
        this.actual = actual;
        this.lazySet(0, parent);
    }

    @Override
    public Object call() {
        // Being Callable saves an allocation in ThreadPoolExecutor
        run();
        return null;
    }

    @Override
    public void run() {
        lazySet(THREAD_INDEX, Thread.currentThread());
        try {
            try { // 执行任务
                actual.run();
            } catch (Throwable e) {
                RxJavaPlugins.onError(e);
            }
        } finally {
            lazySet(THREAD_INDEX, null); // 设置 THREAD_INDEX 为 null
            Object o = get(PARENT_INDEX);
            // 设置PARENT_INDEX为DONE。
            if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
                ((DisposableContainer)o).delete(this);
            }
            for (;;) {
                o = get(FUTURE_INDEX);
                // 设置FUTURE_INDEX为DONE。
                if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
                    break;
                }
            }
        }
    }

    public void setFuture(Future<?> f) { // 保存Feature对象,通过该Feature对象可以取消执行任务。
        for (;;) {
            Object o = get(FUTURE_INDEX);
            if (o == DONE) {
                return;
            } if (o == SYNC_DISPOSED) {
                f.cancel(false);
                return;
            } if (o == ASYNC_DISPOSED) {
                f.cancel(true);
                return;
            }
            // 设置FUTURE_INDEX,保存Feature对象。
            if (compareAndSet(FUTURE_INDEX, o, f)) {
                return;
            }
        }
    }

    @Override
    public void dispose() { // 取消执行任务。
        for (;;) {
            Object o = get(FUTURE_INDEX);
            // 如果已经完成,或者已经取消,则跳出循环。
            if (o == DONE || o == SYNC_DISPOSED || o == ASYNC_DISPOSED) {
                break;
            }
            boolean async = get(THREAD_INDEX) != Thread.currentThread();
            // 取消任务
            if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
                if (o != null) {
                    ((Future<?>)o).cancel(async);
                }
                break;
            }
        }
        // 取消任务后,如果parent不为空,则从parent中移除自身。
        for (;;) {
            Object o = get(PARENT_INDEX);
            if (o == DONE || o == PARENT_DISPOSED || o == null) {
                return;
            }
            if (compareAndSet(PARENT_INDEX, o, PARENT_DISPOSED)) {
                ((DisposableContainer)o).delete(this);
                return;
            }
        }
    }

    @Override
    public boolean isDisposed() {
        Object o = get(PARENT_INDEX);
        return o == PARENT_DISPOSED || o == DONE;
    }
}

1.4. Structure

整体架构及流程图

structure

results matching ""

    No results matching ""