博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程池调度ThreadPoolExecutor详解
阅读量:4180 次
发布时间:2019-05-26

本文共 13535 字,大约阅读时间需要 45 分钟。

        先来看个Demo:

ThreadPoolExecutor executor;private void testThreadPool() {    executor = new ThreadPoolExecutor(3,10,1000,            TimeUnit.MILLISECONDS,new LinkedBlockingDeque<>(20),new ThreadPoolExecutor.CallerRunsPolicy());    for (int i = 0; i < 50; i++) {        executor.submit(new Runnable() {            @Override            public void run() {                try {                    Thread.sleep(1000);                    System.out.println("线程名 = "+Thread.currentThread().getName()+"    "+executor.getPoolSize());                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });    }}

打印log:

线程名 = pool-1-thread-3    10线程名 = pool-1-thread-2    10线程名 = pool-1-thread-1    10线程名 = main    10线程名 = pool-1-thread-6    10线程名 = pool-1-thread-10    10线程名 = pool-1-thread-9    10线程名 = pool-1-thread-4    10线程名 = pool-1-thread-5    10线程名 = pool-1-thread-8    10线程名 = pool-1-thread-7    10

看到这,肯能会有人有疑问,为什么会有在main线程中执行的呢?接下来就来看看ThreadPoolExecutor的实现。

构造方法参数介绍:

        ThreadPoolExecutor构造方法的参数有以下几个,分别是corePoolSize,maxinumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler,这几个参数的作用是:

        1、corePoolSize:核心线程数量,即使是空闲的时候也会保持的线程数量,除非设置了allowOCreThreadTimeOut为true才会在空闲的时候回收掉;

        2、maxinumPoolSize:允许在线程池中的最大线程数量,只有在线程数量大于corePoolSize+workQueue.size()时才会在新建corePoolSize之外的线程;

        3、keepAliveTime:当线程数量超过核心线程数量时,多出来的线程会在闲置这个时间后进行回收;

        4、unit:keepTimeAlive的时间单位;

        5:workQueue:存放还没执行的任务队列,Java提供了几种可供我们自行选择的集合,比如:LinkedBlockQueue、ArrayBlockQueue,SynchronousQueue;

        6、threadFactory:创建线程的工厂类;

        7、handler:任务执行失败或添加的任务超过了队列的容量。

线程池状态介绍:

*   RUNNING:  Accept new tasks and process queued tasks*   SHUTDOWN: Don't accept new tasks, but process queued tasks*   STOP:     Don't accept new tasks, don't process queued tasks,*             and interrupt in-progress tasks*   TIDYING:  All tasks have terminated, workerCount is zero,*             the thread transitioning to state TIDYING*             will run the terminated() hook method*   TERMINATED: terminated() has completed

* RUNNING -> SHUTDOWN*    On invocation of shutdown(), perhaps implicitly in finalize()* (RUNNING or SHUTDOWN) -> STOP*    On invocation of shutdownNow()* SHUTDOWN -> TIDYING*    When both queue and pool are empty* STOP -> TIDYING*    When pool is empty* TIDYING -> TERMINATED*    When the terminated() hook method has completed

        上面是文档中的一些介绍,英语水平有限,翻译的不好还请见谅:

1、RUNNING:接收新的任务和处理已经进入队列里的任务;

2、SHUTDOWN:不接收新的任务但会处理已经在队列里的任务;

3、STOP:不接收新的任务和处理已经在队列里的任务,而且还会打断正在执行的任务;

4、TIDYING:所有的任务已经终止,workCount为0,线程状态转化为TIDYING然后执行terminated()方法;

5、TERMINATED:terminated()方法执行完成。

        状态转换:

1、RUNNING--->SHUTDOWN:调用了shutDown()方法或是在销毁;

2、RUNNING--->STOP:调用了shutDownNow()方法;

3、SHUTDOWN--->TIDYING:当队列和线程次为空;

4、STOP--->TIDYING:当线程池为空;

5、TIDYING--->TERMINATED:当terminated()方法执行完成。

线程池状态和数量存放:    

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池状态和数量就是存放在ctl中的,ctl是线程安全的,ctl是32位bit的一个数,其中高三位是用来存放线程池状态的,其余的是用来存放数量的。

线程开始执行的方法:

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    /*     * Proceed in 3 steps:     *     * 1. If fewer than corePoolSize threads are running, try to     * start a new thread with the given command as its first     * task.  The call to addWorker atomically checks runState and     * workerCount, and so prevents false alarms that would add     * threads when it shouldn't, by returning false.     *     * 2. If a task can be successfully queued, then we still need     * to double-check whether we should have added a thread     * (because existing ones died since last checking) or that     * the pool shut down since entry into this method. So we     * recheck state and if necessary roll back the enqueuing if     * stopped, or start a new thread if there are none.     *     * 3. If we cannot queue task, then we try to add a new     * thread.  If it fails, we know we are shut down or saturated     * and so reject the task.     */    //上面的三点主要是讲解了下面的执行的流程和作用    int c = ctl.get();    //如果线程池中的线程小于核心线程数,    // 尝试开启一个新的线程以传进来的任务作为他的第一个任务    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        c = ctl.get();    }    if (isRunning(c) && workQueue.offer(command)) {        //线程正在执行和已经将任务添加到了队列中        int recheck = ctl.get();        //检查线程是否正在执行,如果不在执行,从队列中移除        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    else if (!addWorker(command, false))        //如果不能添加进队列且不能创建线程,那就执行reject()方法,        reject(command);}

对于这个方法讲解,源码中的英文已经给我们讲解的很清楚了,主要分为三点:

        1、先判断线程池中的线程是否低于核心线程,如果低于,那就创建一个新的线程来放置这个任务并返回;

        2、如果大于核心线程,正常的情况都是这个任务加入到队列中去,然后需要在此检查是否需要新建一个线程(因为可能存在一个线程在上次检查完之后被回收了)或者因为线程池停止了。所以需要再次检查状态,如果不在RUNNING状态并且能够成功移除任务的话,那么调用reject方法,否则就调用addWorker(null,false)方法。

        3、如何任务不能加入队列,就会尝试创建一个新的线程来执行这个任务,如果创建线程失败了,那就会reject()。

        通过对上面的源码的分析,我们可以看到其中主要涉及到两个方法:addWorker()和reject():

        我们先来看一下比较简单的方法reject():

final void reject(Runnable command) {    handler.rejectedExecution(command, this);}

这里就是调用了构造方法中传进来的handler对象,如果不传那就会使用默认的,Java中为我们准备了四个这样的对象,下面就来看看:

//在那个线程中调用的就在那个线程执行public static class CallerRunsPolicy implements RejectedExecutionHandler {    public CallerRunsPolicy() { }    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        if (!e.isShutdown()) {            r.run();        }    }}//直接跑异常public static class AbortPolicy implements RejectedExecutionHandler {    public AbortPolicy() { }    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());    }}//什么处理都不做public static class DiscardPolicy implements RejectedExecutionHandler {    public DiscardPolicy() { }    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {    }}//去除队列头后在将任务加入到队列尾public static class DiscardOldestPolicy implements RejectedExecutionHandler {    public DiscardOldestPolicy() { }    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        if (!e.isShutdown()) {            e.getQueue().poll();            e.execute(r);        }    }}

默认使用的就是直接抛异常了,如果上面几种方法都不符合你得处理需求,那你可以自定义自己的处理方式。在这里我们看到了callerRunsPolicy这个对象,这也是最开始demo使用的处理方式,直接是在添加任务的线程执行run()方法。

        接下来就还有addWork()这个方法了:

private boolean addWorker(Runnable firstTask, boolean core) {    retry:    for (;;) {        int c = ctl.get();        //得到运行状态        int rs = runStateOf(c);        // 检查运行状态        if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                        firstTask == null &&                        ! workQueue.isEmpty()))            return false;        for (;;) {            //得到线程池中的数量            int wc = workerCountOf(c);            //检查线程池中数量是否大于容量,或者核心线程数和最大线程数            if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                return false;            //线程池数量+1成功,跳出循环            if (compareAndIncrementWorkerCount(c))                break retry;            //否者重新读取ctl            c = ctl.get();  // Re-read ctl            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop        }    }    boolean workerStarted = false;    boolean workerAdded = false;    Worker w = null;    try {        //以firstTask作为第一个任务创建worker(构造方法中会创建线程)并执行        w = new Worker(firstTask);        final Thread t = w.thread;        if (t != null) {            final ReentrantLock mainLock = this.mainLock;            mainLock.lock();            try {                // Recheck while holding lock.                // Back out on ThreadFactory failure or if                // shut down before lock acquired.                int rs = runStateOf(ctl.get());                if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                    if (t.isAlive()) // precheck that t is startable                        throw new IllegalThreadStateException();                    //workers是一个HashSet对象,里面存放的是工作线程                    workers.add(w);                    int s = workers.size();                    if (s > largestPoolSize)                        largestPoolSize = s;                    workerAdded = true;                }            } finally {                mainLock.unlock();            }            if (workerAdded) {                //开始执行任务                t.start();                workerStarted = true;            }        }    } finally {        if (! workerStarted)            //开启线程失败调用这个方法            addWorkerFailed(w);    }    return workerStarted;}

这里主要可以分为两步,一是检查线程池的状态,二是线程池状态ok就创建线程并开启。

        接下来我们来看看是如何创建线程的,这里创建线程主要用到的是Worker这个类,我们来看看Worker这个类的结构:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {    final Thread thread;    Runnable firstTask;    Worker(Runnable firstTask) {        setState(-1); // inhibit interrupts until runWorker        this.firstTask = firstTask;        this.thread = getThreadFactory().newThread(this);    }        public void run() {        runWorker(this);    }}

这里只列出了一部分Worker的代码,Worker本身是实现了Runnable接口的,当我们创建线程的时候把这个对象传进去,那么当我们调用Thread.start()方法是,调用到的就是Worker的这个run()方法了。再来看runWorker():

final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        //这里会一直循环去队列里去任务,getTask()就是执行去任务的动作,        // 当任务为null时才会才会退出,这也就意味着这个线程执行完成        while (task != null || (task = getTask()) != null) {            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &&                            runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())                wt.interrupt();            try {                //这个方法是一个空方法,当你想在任务执行前做点什么,重写下这个方法就ok了                beforeExecute(wt, task);                Throwable thrown = null;                try {                    //这里就是在执行我们传进来的任务了                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    //当你想在任务完成后坐点什么,那实现这个方法就能帮助你                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        processWorkerExit(w, completedAbruptly);    }}

我们知道,一个线程的生命周期其实就是run() 方法了,当run()执行完成意味着线程也就结束了,从上面的while()循环中我们知道,这里线程的生命周期其实就是由getTask()在决定了:

private Runnable getTask() {    boolean timedOut = false; // Did the last poll() time out?    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        // Check if queue empty only if necessary.        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {            decrementWorkerCount();            return null;        }        int wc = workerCountOf(c);        // Are workers subject to culling?        //是否允许线程超时        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;        if ((wc > maximumPoolSize || (timed && timedOut))                && (wc > 1 || workQueue.isEmpty())) {            if (compareAndDecrementWorkerCount(c))                return null;            continue;        }        try {            //根据timed这个标示决定是一直阻塞还是到达时间后退出这个线程(线程结束),            Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();            if (r != null)                return r;            timedOut = true;        } catch (InterruptedException retry) {            timedOut = false;        }    }}

getTask()其实一直去队列里取任务,当任务队列里没有任务时,有两种方法去队列拿任务,这两个方法都是阻塞方法,poll(keepAliveTime,unit):这个方法是在到时间后会返回null,而take() 方法则会一直阻塞。

        到这,线程池的整个流程就分析分差不多了,这里在总结一下:

执行流程:

1. 如果当前线程数小于corePoolSize,那么创建线程并保存到线程集合中;

2. 如果当前线程数大于等于corePoolSize,那么将任务加入队列;如果成功加入,那么就等待线程的getTask获取到任务再去执行,如果线程池中的线程数大于核心线程数,那么getTask()在空闲的时间大于keepAliveTime时会返回null,线程执行结束。

3. 如果第2步中加入队列失败,并且当前线程数小于maximumPoolSize,会尝试开启线程,如果大于等于maximumPoolSize,那么创建线程失败,会交由RejectExecutionHandler的rejectExecution()处理。

使用:

对于线程池的使用,Android中建议使用AsyncTask中的线程池

public static final Executor THREAD_POOL_EXECUTOR        = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,                TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
看到没有,public还是static的,那我们直接拿来用就可以了,这里的核心线程数是根据CPU来确定的,免去了我们自己去创建线程池,不过这里有需要注意的地方,那就是这个线程池中的队列容量是有限制的,为128。

转载地址:http://nzhai.baihongyu.com/

你可能感兴趣的文章
为什么阿里巴巴禁止在 foreach 循环里进行元素的 remove/add 操作
查看>>
AWS EC2如何从普通用户切换为root用户
查看>>
click方法不生效的
查看>>
mysql排行榜并列与不并列
查看>>
SpringBoot | Mybatis申明为Mapper文件
查看>>
JPA主键生成策略
查看>>
byte数组和InputStream的相互转换
查看>>
InputStream,InputStreamReader和Reader之间的区别与关系
查看>>
Java中System.arraycopy方法的使用
查看>>
tk.mybatis的使用记录
查看>>
遍历获取目录下的所有文件
查看>>
从指定服务器路径下载文件
查看>>
EasyExcel读取和写入java model数据
查看>>
《C编译原理》共享库的动态加载和静态加载
查看>>
《Android系统学习》第二章:如何制作OTA U盘升级包
查看>>
《Android系统学习》第五章:编译Android的JDK环境
查看>>
《C++特性》之引用类型
查看>>
fflush(stdin)在gcc编译器中不起作用?
查看>>
《Android系统学习》第八章:Android gtest
查看>>
《Android系统学习》第九章:Android模拟器编译
查看>>