alamide的笔记库「 87篇笔记 」「 小破站已建 0 天啦 🐶 」


OkHttp 线程池

2023-07-13, by alamide

OkHttp 中线程池的使用。

本文学习目标:

为什么 OkHttp 要使用的线程池参数 corePoolSize 为 0,为什么使用 SynchronousQueue。

1.为什么要使用自定义 ThreadPoolExecutor

OkHttp 中使用的线程池都为这种形式,执行异步请求及清理连接池

executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));

先来看一下 ThreadPoolExecutor 构造方法中的各个参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
}

corePoolSize 线程池中保持核心线程数,即使空闲也不会被销毁

maximumPoolSize 线程池中可同时存在的最大线程数,线程池中除了核心线程,在必要时还可以创建额外的线程

keepAliveTime 创建的额外线程最大存活时间

workQueue 没有空闲线程时任务存放的队列,供给有空闲线程时获取并执行

threadFactory 线程创建工厂,用来创建线程,可以设置自定义线程名

Executors 提供了一些方法来生成 ThreadPoolExecutor 如

Executors.newFixedThreadPool();
Executors.newSingleThreadExecutor();
Executors.newCachedThreadPool();

那为什么 OkHttp 还要使用自定义的 ThreadPoolExecutor 呢?

先来看看这几个方法:

Executors.newFixedThreadPool() 创建固定线程数的线程池,不能满足网络请求的场景。原因为,无法确定同时网络请求数,也就无法确定核心线程数。核心线程池数不能设置太大,否则会和浪费资源。不能设置太小,否则会有请求任务在阻塞排队。所以不符合。

Executors.newSingleThreadExecutor() 同上

Executors.newCachedThreadPool() 是符合需求的,实际上 OkHttp 的自定义 ThreadPoolExecutor 与 Executors.newCachedThreadPool() 基本一致,除了 threadFactory

好了,到这里可以解释为什么要自定义的问题了,下面就是自定义的参数解析了。

OkHttp 的亮点之一就是高并发,高并发的要求是什么,有多少请求处理多少,不需要等待,所以使用 SynchronousQueue 而不是 LinkedBlockingQueue。

2.SynchronousQueue VS LinkedBlockingQueue

在阅读 ThreadPoolExecutor 源码之前,先来看一下 SynchronousQueue 和 LinkedBlockingQueue 的区别,目前不会去看它们的源码,有点复杂。

SynchronousQueue 和 LinkedBlockingQueue 都是 BlockingQueue,SynchronousQueue 是当有消费者等待获取值时,才能生产者才能入列成功,有点拗口,看代码吧

SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();
final boolean offer = synchronousQueue.offer(10);
System.out.println(offer);//false

这里会入列失败,因为没有消费者在等待获取,

SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();

//因为 take() 为阻塞操作,所以需要放在子线程中。
Thread thread = new Thread(() -> {
    final Integer take;
    try {
        take = synchronousQueue.take();
        System.out.println(take);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
});

thread.start();

Thread.sleep(1000);
final boolean offer = synchronousQueue.offer(10);
System.out.println(offer);

这样就可以入列和获取成功了,SynchronousQueue 还有一个 poll(long timeout, TimeUnit unit) 方法,等待指定时间,超时则返回 null

take = synchronousQueue.poll(2, TimeUnit.SECONDS);

可以这样理解 SynchronousQueue,生产一个消费一个,没有消费者就直接丢弃,不会额外存储。

而 LinkedBlockingQueue 则是生产多少存多少,供给消费者消费,消费不了就存储在队列中。

3.ThreadPoolExecutor

来看下一个 Runnable 怎么被执行的,用线程池执行一个任务,调用 ThreadPoolExecutor.execute()

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        
        //corePooolSize=0, 不执行
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        //使用 SynchronousQueue,第一次 false
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //执行 addWorker
        else if (!addWorker(command, false))
            reject(command);
    }

    private boolean addWorker(Runnable firstTask, boolean core) {
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                ......
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }
    }
}

调用链一致到 runWorker,可以获知 Worker 是 ThreadPool 负责执行任务的类,

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;

    while (task != null || (task = getTask()) != null) {
        task.run();
    }
}

private Runnable getTask() {
    for (;;) {
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
        if (r != null)
            return r;
    }
}

runWork 的工作是执行具体的 Runnable,执行完之后,就从队列中继续获取,获取之后继续执行,如果队列中没有元素,就阻塞,这样就完成了 Thread 的复用。

如果超时,且 workers > maximumPoolSize && workQueue.isEmpty() 则返回 null,Worker 调用结束,销毁。

再来看执行的代码

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    
    //corePooolSize=0, 不执行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    /**
     * 使用 SynchronousQueue
     * 第一次:workQueue.offer()=false,addWorker
     * 第二次:如果第一次请求时创建的 Worker 已经执行完任务,且未超时,则 offer 成功, Worker 继续工作;
     * 如果未执行完,则 offer 失败,走第一次的流程
     * 
     * 使用 LinkedBlockingQueue
     * 第一次:workQueue.offer()=true,workerCountOf(recheck) == 0 -> addWorker
     * 第二次:workQueue.offer()=true,workerCountOf(recheck) != 0 直接跳出,任务存放在队列中,永远只会有一个 Worker
     **/
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //执行 addWorker
    else if (!addWorker(command, false))
        reject(command);
}

所以可以获知,使用 SynchronousQueue 任务永远都会被立刻执行,有空闲的 Worker 使用空闲的 Worker,没有则新创建

而 LinkedBlockingQueue,只会创建一个 Worker,任务存放在 BlockingQueue 中,等待执行

所以这里解释了,为什么使用 SynchronousQueue。

你可能会有疑问,那直接来一个执行一个好了,为什么要使用队列?使用队列是为了复用 Thread ,这也是 ThreadPool 的核心功能之一。

那为什么 corePoolSize 为 0 呢?这个我也不是太理解,按照我的理解起码线程池中应该放几个长存线程,而且网络调用也是很频繁的。我目前的理解如下,网络请求还是很频繁的,所以总会有几个 Worker 未超时,而保留在线程池中。当所有 Worker 都超时,那么用户有很大概率是退出应用了,所以设置为 0。

至于 maximumPoolSize 设置为 Integer.MAX_VALUE,可以最大程度提供并发能力。

threadFactory 没什么可说的,就是创建 Thread,主要用来设置线程名,

public static ThreadFactory threadFactory(String name, boolean daemon) {
    return runnable -> {
        Thread result = new Thread(runnable, name);
        result.setDaemon(daemon);
        return result;
    };
}
Tags: java - okhttp - thread
~ belongs to alamide@163.com