在项目中,有遇到一个闪退异常,用户使用场景是下拉刷新50次,抓取日志发现导致闪退的原因是OutOfMemoryError,而产生OutOfMemoryError原因是"pthred_create"
。
查看下拉刷新的代码,发现下拉刷新的业务代码有问题,在刷新的时候,发起了13个网络请求,连续快速下拉刷新50次,必定会产生650次网络请求,所以初步判断是下拉刷新导致的。查看每个网络请求,也都使用了如下的方式,通过subscribeOn(Schedulers.io())
将任务分配到IoScheduler中。
apiService.getXxxData()
.subscribeOn(Schedulers.io())
.subscribe({ /*...*/ })
1.1. Schedulers.io()
Schedulers.io()
返回的IoScheduler不是有维护缓存池吗,为什么还会出现线程创建过多导致OOM的问题?
/**
* Returns a default, shared {@link Scheduler} instance intended for IO-bound work.
* This can be used for asynchronously performing blocking IO.
* -----------------------------------------------------------
* The implementation is backed by a pool of single-threaded {@link ScheduledExecutorService} instances
* that will try to reuse previously started instances used by the worker
* returned by createWorker() but otherwise will start a new backing
* {@link ScheduledExecutorService} instance.
*
* Note that this scheduler may create an unbounded number of worker threads
* that can result in system slowdowns or {@code OutOfMemoryError}.
*
* Therefore, for casual uses or when implementing an operator,
* the Worker instances must be disposed via {@link Scheduler.Worker#dispose()}.
* @return a {@link Scheduler} meant for IO-bound work
*/
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
Schedulers.io()
方法有这样一段注释:
它返回一个用于处理IO工作的Scheduler,可以用来执行异步任务。它的实现依赖于一个单线程的ScheduledExecutorService
,当处理一个任务的时候先尝试使用之前已经创建过的空闲的实例,否则的话才会创建一个新的实例。注意该Scheduler可能创建无界的线程数量,可能会导致系统降速卡顿或者OutOfMemoryError。
通过上面的注释更加确信文章开始遇到的问题是创建线程数量过多导致的了,那IoScheduler是根据什么策略创建线程的?又是怎么缓存线程的?
在SchedulerPoolFactory.create()
方法中添加debug日志断点,打印输出factory和该方法创建出的ScheduledExecutorService实例。
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
// "SchedulerPoolFactory.create()" + factory
tryPutIntoPool(PURGE_ENABLED, exec);
// "SchedulerPoolFactory.createExecutorService -> " + exec.hashCode()
return exec;
}
在IoScheduler.createWorker()
方法和EventLoopWorker.schedule()
方法中分别添加debug日志断点,打印输出threadWorker实例。
public final class IoScheduler extends Scheduler {
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
// "IoScheduler.createWorker(): " + this + ", " + this.hashCode()
}
static final class EventLoopWorker extends Scheduler.Worker {
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
// "threadWorker.scheduleActual = " + threadWorker + ", " + threadWorker.hashCode() + ", " + this.hashCode()
}
}
}
定义以下submitWorkers()
方法,用于模拟执行指定数量的任务。在执行任务的过程当中,查看上述日志断点的输入内容。
private void submitWorkers(int workerCount) {
for (int i = 0; i < workerCount; i++) {
Single.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "String-call(): " + Thread.currentThread().hashCode();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// TODO
}
});
}
}
1.1.1. submitWorkers(3)
通过传入参数3创建3个任务,得到如下输出:
// task1
IoScheduler.createWorker(): io.reactivex.internal.schedulers.IoScheduler@9aa9f9f, 162176927
SchedulerPoolFactory.create()RxThreadFactory[RxCachedThreadScheduler]
SchedulerPoolFactory.createExecutorService -> 239245548
threadWorker.scheduleActual = io.reactivex.internal.schedulers.IoScheduler$ThreadWorker@f7b34b5, 259732661, 176342858
// task2
IoScheduler.createWorker(): io.reactivex.internal.schedulers.IoScheduler@9aa9f9f, 162176927
SchedulerPoolFactory.create()RxThreadFactory[RxCachedThreadScheduler]
SchedulerPoolFactory.createExecutorService -> 251094833
threadWorker.scheduleActual = io.reactivex.internal.schedulers.IoScheduler$ThreadWorker@1ae0e16, 28184086, 174777239
// task3
IoScheduler.createWorker(): io.reactivex.internal.schedulers.IoScheduler@9aa9f9f, 162176927
SchedulerPoolFactory.create()RxThreadFactory[RxCachedThreadScheduler]
SchedulerPoolFactory.createExecutorService -> 74773922
threadWorker.scheduleActual = io.reactivex.internal.schedulers.IoScheduler$ThreadWorker@5f02233, 99623475, 136071664
IoScheduler.createWorker()
方法调用了3次,SchedulerPoolFactory.create()
方法也调用了3次,也就创建了3个ScheduledExecutorService。
3个ThreadWorker的hashCode分别是:259732661
, 28184086
, 99623475
1.1.2. submitWorkers(4)
在60s内,再次调用该方法将传入参数4创建4个任务,得到如下输出:
// task1
IoScheduler.createWorker(): io.reactivex.internal.schedulers.IoScheduler@9aa9f9f, 162176927
threadWorker.scheduleActual = io.reactivex.internal.schedulers.IoScheduler$ThreadWorker@f7b34b5, 259732661, 58764943
// task2
IoScheduler.createWorker(): io.reactivex.internal.schedulers.IoScheduler@9aa9f9f, 162176927
threadWorker.scheduleActual = io.reactivex.internal.schedulers.IoScheduler$ThreadWorker@1ae0e16, 28184086, 19117349
// task3
IoScheduler.createWorker(): io.reactivex.internal.schedulers.IoScheduler@9aa9f9f, 162176927
threadWorker.scheduleActual = io.reactivex.internal.schedulers.IoScheduler$ThreadWorker@5f02233, 99623475, 31696043
// task4
IoScheduler.createWorker(): io.reactivex.internal.schedulers.IoScheduler@9aa9f9f, 162176927
SchedulerPoolFactory.create()RxThreadFactory[RxCachedThreadScheduler]
SchedulerPoolFactory.createExecutorService -> 92241569
threadWorker.scheduleActual = io.reactivex.internal.schedulers.IoScheduler$ThreadWorker@16428c6, 23341254, 188080263
前3个任务使用了上一次调用的时候创建的ThreadWorker,只有最后一个任务,是新建了一个ThreadWorker来执行它。
此时缓存中存在4个ThreadWorker:259732661
, 28184086
, 99623475
, 23341254
1.1.3. submitWorkers(4)
在60s内,再次调用该方法将传入参数4创建4个任务,得到如下输出:
// task1
IoScheduler.createWorker(): io.reactivex.internal.schedulers.IoScheduler@9aa9f9f, 162176927
threadWorker.scheduleActual = io.reactivex.internal.schedulers.IoScheduler$ThreadWorker@f7b34b5, 259732661, 116001618
// task2
IoScheduler.createWorker(): io.reactivex.internal.schedulers.IoScheduler@9aa9f9f, 162176927
threadWorker.scheduleActual = io.reactivex.internal.schedulers.IoScheduler$ThreadWorker@1ae0e16, 28184086, 42864416
// task3
IoScheduler.createWorker(): io.reactivex.internal.schedulers.IoScheduler@9aa9f9f, 162176927
threadWorker.scheduleActual = io.reactivex.internal.schedulers.IoScheduler$ThreadWorker@5f02233, 99623475, 125405854
// task4
IoScheduler.createWorker(): io.reactivex.internal.schedulers.IoScheduler@9aa9f9f, 162176927
threadWorker.scheduleActual = io.reactivex.internal.schedulers.IoScheduler$ThreadWorker@16428c6, 23341254, 102038860
4个任务都使用的是缓存的ThreadWorker,未创建新的ThreadWorker。
流程总结
- NewThreadWorker内部持有一个ScheduledExecutorService,每创建一个NewThreadWorker就创建一个ScheduledExecutorService。
- 第一次批量执行任务,有多少个任务就会创建多少个NewThreadWorker。
- 再次执行批量任务,先从缓存池中获取ThreadWorker,如果有获取到则使用缓存中的ThreadWorker,如果没有则新创建NewThreadWorker来执行任务。
- 缓存中的ThreadWorker超过60s后仍然没有接收到新的任务,则被销毁。
基于以上的分析,如果传递给submitWorkers()
方法中任务数量足够大(500?),则必然会导致OutOfMemoryError
。
1.2. Better Practice
Schedulers.from(@NonNull Executor executor)
提供了通过Executor创建Scheduler的方法。所以为了避免上述的问题,可以创建一个有最大线程数量限制的Executor,然后采用该方法创建一个Scheduler。
private Scheduler scheduler;
private void createScheduler() {
if (null == scheduler) {
// 创建corePoolSize为20,maximumPoolSize为50的线程池。
ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 50,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
scheduler = Schedulers.from(executor);
}
}
private void submitWorkers(int workerCount) {
createScheduler(); // 创建Scheduler
for (int i = 0; i < workerCount; i++) {
Single.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "String-call(): " + Thread.currentThread().hashCode();
}
})
.subscribeOn(scheduler) // 指定Scheduler
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// TODO
}
});
}
}
通过createScheduler()
创建一个最大线程数量为50的Scheduler来解决此问题,即使一次性创建1000个任务,也不会导致OutOfMemoryError。