Schedulers.io()
方法返回一个IO任务调度器,它可以用于异步阻塞IO任务的执行。在Android中学用来处理网络请求之类的操作,它实际上是一个 IoScheduler。
该类有三个内部类:
ThreadWorker : 任务封装类,它是NewThreadWorker的子类,它有一个ScheduledExecutorService用于执行任务,每一个任务都在该类中执行。
EventLoopWorker : Worker子类,Scheduler.createWorker()方法中会创建该类,该类引用了一个ThreadWorker,在schedule方法中调用ThreadWorker.scheduleActual方法执行任务。任务执行完成后将ThreadWorker缓存到CachedWorkerPool缓存池中。
CachedWorkerPool : 任务缓存池,提供了get()方法和release()方法,分别用于获取和缓存ThreadWorker。
1.1. IoScheduler
/** 任务调度Scheduler,创建并缓存一个用于执行任务的线程池。
*/
public final class IoScheduler extends Scheduler {
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
static final RxThreadFactory WORKER_THREAD_FACTORY; // RxThreadFactory
private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
static final RxThreadFactory EVICTOR_THREAD_FACTORY;
// 线程缓存存活时间。
public static final long KEEP_ALIVE_TIME_DEFAULT = 60;
private static final long KEEP_ALIVE_TIME;
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
static final ThreadWorker SHUTDOWN_THREAD_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool; // Worker线程池
static final CachedWorkerPool NONE;
static {
KEEP_ALIVE_TIME = Long.getLong(KEY_KEEP_ALIVE_TIME, KEEP_ALIVE_TIME_DEFAULT);
SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown"));
SHUTDOWN_THREAD_WORKER.dispose();
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
/** 根据ThreadFactory构建IoScheduler,并且启动WorkerPools缓存池。
*/
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
// 创建CachedWorkerPool
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
@Override
public void shutdown() {
for (;;) {
CachedWorkerPool curr = pool.get();
if (curr == NONE) {
return;
}
if (pool.compareAndSet(curr, NONE)) {
curr.shutdown(); // 停止pool
return;
}
}
}
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
public int size() {
return pool.get().allWorkers.size();
}
1.1.1. CachedWorkerPool
任务缓存池,提供了get()方法和release()方法,分别用于获取和缓存ThreadWorker。
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
// WorkerQueue,Worker工作队列。
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
final CompositeDisposable allWorkers;
// 线程池。
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
@Override
public void run() {
evictExpiredWorkers(); // 移除过期的Worker
}
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
// 从缓存队列中获取ThreadWorker。
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// 未找到缓存的Worker,创建一个新的Worker。
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
void release(ThreadWorker threadWorker) {
// 刷新threadWorker过期时间,并把它放到缓存池中。
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
void evictExpiredWorkers() { // 移除过期的Worker
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
// 如果threadWorker的过期时间小于当前时间,则从queue中移除。
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
long now() {
return System.nanoTime();
}
void shutdown() { // 停止
allWorkers.dispose();
if (evictorTask != null) {
evictorTask.cancel(true);
}
if (evictorService != null) {
evictorService.shutdownNow();
}
}
}
1.1.2. EventLoopWorker
Worker子类,Scheduler.createWorker()方法中会创建该类,该类引用了ThreadWorker,在schedule方法中调用ThreadWorker.scheduleActual方法执行任务。任务执行完成后将ThreadWorker缓存到CachedWorkerPool缓存池中。
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool; // 缓存池
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get(); // 从缓存池中获取一个ThreadWorker对象
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// 将threadWorker释放并放到缓存池pool中
pool.release(threadWorker);
}
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
1.1.3. ThreadWorker
任务封装类,它是NewThreadWorker的子类,每一个任务都被封装成该类。
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime; // 过期时间
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime() {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
1.2. NewThreadWorker
该类内部有一个ScheduledExecutorService,用于执行Runnable任务。它是Worker的子类,实现了schedule(Runnable action, long delayTime, TimeUnit unit)
方法,并调用scheduleActual()
方法执行action任务,该方法中创建了ScheduledRunnable对象并将创建出的对象交由executor执行。
ScheduledRunnable实现了三个接口: Runnable
, Callable<Object>
, Disposable
。目的是用于包装Runnable任务,以及实现任务的管理(通过Feature管理任务,使得可以通过cancel方法取消任务)。
/**
* Base class that manages a single-threaded ScheduledExecutorService as a
* worker but doesn't perform task-tracking operations.
*/
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
// 任务是否取消
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) { // 如果任务已取消,则返回 EmptyDisposable
return EmptyDisposable.INSTANCE;
}
// 调用scheduleActual执行任务。
return scheduleActual(action, delayTime, unit, null);
}
/** 调度给定的Runnable到executor中去执行,并且将执行的feature包装到一个Disposable对象中返回。
* @return the ScheduledRunnable instance
*/
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
if (delayTime <= 0L) {
f = executor.submit(task);
} else {
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
/** 将Runnable任务包装成一个ScheduledRunnable对象,并将它交给ScheduledExecutorService执行。
* @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled
* @return 返回ScheduledRunnable对象,该类实现了 Runnable, Callable<Object>, Disposable。
*/
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 创建 ScheduledRunnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try { // 执行任务,并将feature设置给ScheduledRunnable。
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
@Override
public void dispose() {
if (!disposed) {
disposed = true;
executor.shutdownNow();
}
}
/**
* Shuts down the underlying executor in a non-interrupting fashion.
*/
public void shutdown() {
if (!disposed) {
disposed = true;
executor.shutdown();
}
}
@Override
public boolean isDisposed() {
return disposed;
}
}
1.3. ScheduledRunnable
调度任务,它实现了Runnable,Callable,Disposable。
public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable {
final Runnable actual;
/** Indicates that the parent tracking this task has been notified about its completion. */
static final Object PARENT_DISPOSED = new Object();
/** Indicates the dispose() was called from within the run/call method. */
static final Object SYNC_DISPOSED = new Object();
/** Indicates the dispose() was called from another thread. */
static final Object ASYNC_DISPOSED = new Object();
static final Object DONE = new Object();
static final int PARENT_INDEX = 0;
static final int FUTURE_INDEX = 1;
static final int THREAD_INDEX = 2;
/** 根据Runnable任务创建实例,并且设置可选的Parent。*/
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
super(3);
this.actual = actual;
this.lazySet(0, parent);
}
@Override
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();
return null;
}
@Override
public void run() {
lazySet(THREAD_INDEX, Thread.currentThread());
try {
try { // 执行任务
actual.run();
} catch (Throwable e) {
RxJavaPlugins.onError(e);
}
} finally {
lazySet(THREAD_INDEX, null); // 设置 THREAD_INDEX 为 null
Object o = get(PARENT_INDEX);
// 设置PARENT_INDEX为DONE。
if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
((DisposableContainer)o).delete(this);
}
for (;;) {
o = get(FUTURE_INDEX);
// 设置FUTURE_INDEX为DONE。
if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
break;
}
}
}
}
public void setFuture(Future<?> f) { // 保存Feature对象,通过该Feature对象可以取消执行任务。
for (;;) {
Object o = get(FUTURE_INDEX);
if (o == DONE) {
return;
} if (o == SYNC_DISPOSED) {
f.cancel(false);
return;
} if (o == ASYNC_DISPOSED) {
f.cancel(true);
return;
}
// 设置FUTURE_INDEX,保存Feature对象。
if (compareAndSet(FUTURE_INDEX, o, f)) {
return;
}
}
}
@Override
public void dispose() { // 取消执行任务。
for (;;) {
Object o = get(FUTURE_INDEX);
// 如果已经完成,或者已经取消,则跳出循环。
if (o == DONE || o == SYNC_DISPOSED || o == ASYNC_DISPOSED) {
break;
}
boolean async = get(THREAD_INDEX) != Thread.currentThread();
// 取消任务
if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
if (o != null) {
((Future<?>)o).cancel(async);
}
break;
}
}
// 取消任务后,如果parent不为空,则从parent中移除自身。
for (;;) {
Object o = get(PARENT_INDEX);
if (o == DONE || o == PARENT_DISPOSED || o == null) {
return;
}
if (compareAndSet(PARENT_INDEX, o, PARENT_DISPOSED)) {
((DisposableContainer)o).delete(this);
return;
}
}
}
@Override
public boolean isDisposed() {
Object o = get(PARENT_INDEX);
return o == PARENT_DISPOSED || o == DONE;
}
}
1.4. Structure
整体架构及流程图