首页>>后端>>java->Java多线程之ThreadPoolExecutor原理(图文代码实例详解)

Java多线程之ThreadPoolExecutor原理(图文代码实例详解)

时间:2023-11-29 本站 点击:32

ThreadPoolExecutor是Java的线程池并发代名词,多线程开发基本都是基于这个去做具体的业务开发。虽然觉得自己回了,网上帖子已经有很多的文章写这个,但是是自己一一点写的,终归是要比看别人的理解更加深刻,所以最近自己在对java知识的系统梳理。那么接下来主要分析下这个多线程框架的原理。

ThreadPoolExecutor的构造函数以成员变量介绍

publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueue<Runnable>workQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){

面试靠的最多是这个构造函数中7个参数的作用,

corePoolSize 是核心线程数, 即使线程是空闲的,线程池一直保持的的线程数,除非allowCoreThreadTimeOut参数设置为true

maximumPoolSize 线程池最大线程数

keepAliveTime unit 线程存活时间 和 时间单位

workQueue 是任务队列,是用来保持task,该队列保持住了Runnable的任务,通过调用线程池的execute的方法.

threadFactory 创建线程的工厂

RejectedExecutionHandler 是当线程数超过限制以及队列也满了,需要执行的拒绝策 略.

成员变零

privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING,0));privatestaticfinalintCOUNT_BITS=Integer.SIZE-3;privatestaticfinalintCAPACITY=(1<<COUNT_BITS)-1;//线程容量//runStateisstoredinthehigh-orderbitsprivatestaticfinalintRUNNING=-1<<COUNT_BITS;privatestaticfinalintSHUTDOWN=0<<COUNT_BITS;privatestaticfinalintSTOP=1<<COUNT_BITS;privatestaticfinalintTIDYING=2<<COUNT_BITS;privatestaticfinalintTERMINATED=3<<COUNT_BITS;

面试最喜欢问的是 ctl变量的代表什么意义?ctl变量的的的用高3位表示线程池的状态,用低29位表示线程个数,两者通过 | 操作,拼接出ctl变量,也就是线程池的最大线程数capacity是 (2^29)-1。

线程池状态

RUNNING 运行状态 -1 << 29 表示线程池可以接新的任务并且处理队列任务

SHUTDOWN 关闭状态态 -1 << 29 表示线程池不接受新的线程池任务但是可以处理队列中的任务

STOP 停止状态 1 << 29 表示线程池不接受新的线程池任务也不处理队列中的任务并且中断线程池里中正在执行的任务

TIDYING 2 << 29 表示所有的线程池都已经中断了,线程数为0,线程状态转为为TIDYING, 将执行terminated钩子函数

TERMINATED 3 << 29 表示所有terminated方法都已经执行完成。

线程状态之间装换图

线程池的提交任务执行流程

首先我们来看平时业务代码是提交任务到线程池执行的函数是通过execute或者submit方法,区别就是submit返回具有Future,execute返回void,的、那么接下来我们主要分析execute的执行流程,submit涉及到线程异步返回,之后会另外单独分析,那么下面这个execute函数就能看出线程池的整个执行流程,

publicvoidexecute(Runnablecommand){if(command==null)thrownewNullPointerException();/**Proceedin3steps:**1.IffewerthancorePoolSizethreadsarerunning,tryto*startanewthreadwiththegivencommandasitsfirst*task.ThecalltoaddWorkeratomicallychecksrunStateand*workerCount,andsopreventsfalsealarmsthatwouldadd*threadswhenitshouldn't,byreturningfalse.**2.Ifataskcanbesuccessfullyqueued,thenwestillneed*todouble-checkwhetherweshouldhaveaddedathread*(becauseexistingonesdiedsincelastchecking)orthat*thepoolshutdownsinceentryintothismethod.Sowe*recheckstateandifnecessaryrollbacktheenqueuingif*stopped,orstartanewthreadiftherearenone.**3.Ifwecannotqueuetask,thenwetrytoaddanew*thread.Ifitfails,weknowweareshutdownorsaturated*andsorejectthetask.*/intc=ctl.get();if(workerCountOf(c)<corePoolSize){if(addWorker(command,true))return;c=ctl.get();}if(isRunning(c)&&workQueue.offer(command)){intrecheck=ctl.get();if(!isRunning(recheck)&&remove(command))reject(command);elseif(workerCountOf(recheck)==0)//当线程池的核心线程数设置为0情况下,那么这时workerCountOf(recheck)为0,这时就开启非线程数处理队列任务addWorker(null,false);}elseif(!addWorker(command,false))reject(command);}

线程池执行任务流程图如下:

我相信大概的流程一般同学是清楚的:

当线程数的Worker线程 < corePoolSize 创建核心线程数执行

当线程数的Worker线程 > corePoolSize,将任务加入任务队列中

则当corePoolSize< maxPoolsize,则新增非核心线程执行任务

当队列满了,线程数也已经达到maxPoolsize,则执行拒绝策略

实际源码中执行流程还有一些小细节容易被忽略的地点

重新检查线程的状态以及检查 线程池的线程数的流程

线程池新增工作线程的流程

线程池新增工作任务主要addWorker方法。由于代码比较长,我就在代码里写好注释

privatebooleanaddWorker(RunnablefirstTask,booleancore){retry:for(;;){intc=ctl.get();intrs=runStateOf(c);//Checkifqueueemptyonlyifnecessary.if(rs>=SHUTDOWN&&//第一个条件:线程至少不是运行状态,那么就是shutdownstoptidying,terminated状态!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))//第二个条件:当前线程池是shutdown状态且任务队列非空并且工作任务第一个任务是空的取反条件,这个含义是当除了SHUTDOWN状态且第一个任务为空且任务队列不为空//情况下,直接返回false,增加Work线程失败returnfalse;for(;;){intwc=workerCountOf(c);if(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))returnfalse;if(compareAndIncrementWorkerCount(c))breakretry;c=ctl.get();//Re-readctlif(runStateOf(c)!=rs)continueretry;//elseCASfailedduetoworkerCountchange;retryinnerloop}}booleanworkerStarted=false;booleanworkerAdded=false;Workerw=null;try{w=newWorker(firstTask);finalThreadt=w.thread;if(t!=null){finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{//Recheckwhileholdinglock.//BackoutonThreadFactoryfailureorif//shutdownbeforelockacquired.intrs=runStateOf(ctl.get());if(rs<SHUTDOWN||//线程池是running状态(rs==SHUTDOWN&&firstTask==null)){//线程池处于shutdown状态并且第一个task为空if(t.isAlive())//precheckthattisstartablethrownewIllegalThreadStateException();//加入工作线程的集合workers.add(w);ints=workers.size();if(s>largestPoolSize)//记录最大线程数largestPoolSize=s;workerAdded=true;}}finally{-mainLock.unlock();-}-if(workerAdded){-t.start();workerStarted=true;}}}finally{if(!workerStarted)addWorkerFailed(w);}returnworkerStarted;}

添加工作线程主要步骤

检查线程池的运行状态以及队列是否是空,增加线程。为什么增加这个判断,主要是因为线程池是多线程的随便可能另外调用shutdown等方法关闭线程池,所以做每一步之前都要再次check线程池的状态,其中比较重要的点是线程池在除了Running状态,其他的只有shutdow状态,且队列任务非空的情况,才能增加work线程处理任务。

判断线程池的线程是核心线程数,然后就判断大于核心线程数, 如果不是增加的核心线程数,然后通过 CAS增加线程数加1,然后re-read的ctl的现在的状态是否刚开始进入循环的状态保持一致。

创建Worker对象,它的第一个参数Runable就是执行的第一个task,然后获取mainLock的重入锁,然后再次判断线程池的状态是否是shutdown状态,然后将Worker对象加入工作线程的Set集合中,判断是大于largePoolSize,则将workSet的size赋值largePoolSize,然后赋值workerAdded为true,接下来在finnally中workerAdded为true,则调用Worker的start方法启动该Worker线程,

如果WorkerAdded失败,则从Worder的Set移除刚才加入Worker线程,并将线程池的线程数减1,

工作线程Worker的执行流程

首先来看下Work的类的成员变量的构造函数,从下面的Work的代码,可以看到它是实现了RUnnable接口,上一节Worker启动是调用了它的start方法,真正由操作系统调度执行的其run方法,那么接下来重点看下run的工作流程。

privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{/***Thisclasswillneverbeserialized,butweprovidea*serialVersionUIDtosuppressajavacwarning.*/privatestaticfinallongserialVersionUID=6138294804551838833L;/**Threadthisworkerisrunningin.Nulliffactoryfails.*/finalThreadthread;/**Initialtasktorun.Possiblynull.*/RunnablefirstTask;/**Per-threadtaskcounter*/volatilelongcompletedTasks;/***CreateswithgivenfirsttaskandthreadfromThreadFactory.*@paramfirstTaskthefirsttask(nullifnone)*/Worker(RunnablefirstTask){//初始化状态为-1,表示不能被中断setState(-1);//inhibitinterruptsuntilrunWorkerthis.firstTask=firstTask;this.thread=getThreadFactory().newThread(this);}

下面代码中Work的run直接调用runWork,并传入自身对象, 开始一个循环判断第一个任务后者从任务队列中取任务不为空,就开始上锁,然后执行任务,如果任务队列为空了,则处理Work的退出。

/**DelegatesmainrunlooptoouterrunWorker*/publicvoidrun(){//直接调用runWorker函数runWorker(this);}finalvoidrunWorker(Workerw){//Wokder当前线程Threadwt=Thread.currentThread();Runnabletask=w.firstTask;w.firstTask=null;//将state值赋值为0,这样就运行中断w.unlock();//allowinterruptsbooleancompletedAbruptly=true;try{//循环判断第一个Task获取从获取任务while(task!=null||(task=getTask())!=null){//获取当前Work的锁,处理任务,也就是当前Work线程处理是同步处理任务的w.lock();//Ifpoolisstopping,ensurethreadisinterrupted;//ifnot,ensurethreadisnotinterrupted.This//requiresarecheckinsecondcasetodealwith//shutdownNowracewhileclearinginterrupt//线程池的状态至少是stop,即使stop,tidying.terminated状态if((runStateAtLeast(ctl.get(),STOP)//检查线程是否中断且清楚中断||(Thread.interrupted()&&//再次检查线程池的状态至少是STOPrunStateAtLeast(ctl.get(),STOP)))&&//再次判断是否中断!wt.isInterrupted())//中断线程wt.interrupt();try{//执行业务任务前处理(钩子函数)beforeExecute(wt,task);Throwablethrown=null;try{//这里就是执行提交线程池的Runnable的任务的run方法task.run();}catch(RuntimeExceptionx){thrown=x;throwx;}catch(Errorx){thrown=x;throwx;}catch(Throwablex){thrown=x;thrownewError(x);}finally{//执行业务任务后处理(钩子函数)afterExecute(task,thrown);}}finally{//执行结束重置为空,回到while循环拿下一个task=null;//处理任务加1w.completedTasks++;//释放锁,处理下一个任务w.unlock();}}//代码执行到这里,代表业务的任务没有异常,不然不会走到这里,//因为上一层try没有catch异常的,而业务执行出现异常,最里层//虽然catch了异常,但是也都通过throw向外抛出completedAbruptly=false;}finally{//如果循环结束,则处理Work退出工作,代表任务拿不到任务,即任务队列没有任务了processWorkerExit(w,completedAbruptly);}}

下面就来看下getTask获取任务队列的处理逻辑 、如果这里返回null,即runWorker循环退出,则会处理finnaly中processWorkExit,处理Work线程的退出,下面是getWork返回null的情况:

如果线程池状态值至少是SHUTDOWN状态,并且 线程池状态值至少是STOP状态,或者是任务队列是空,则将线程池的workcout减1,并返回null,

计算线程池中线程池的数量,如果线程数量大于最大线程数量,或者 allowCoreThreadTimeOut参数为true或者 线程数大于并且任务队列为空,则将线程池减1,并返回null,

privateRunnablegetTask(){//超时标志booleantimedOut=false;//Didthelastpoll()timeout?for(;;){//获取线程状态intc=ctl.get();//线程状态intrs=runStateOf(c);//Checkifqueueemptyonlyifnecessary.//如果线程池状态值至少是SHUTDOWN状态,if(rs>=SHUTDOWN线程池状态值至少是STOP状态,或者是任务队列是空&&(rs>=STOP||workQueue.isEmpty())){//CAS将worker线程数减1decrementWorkerCount();returnnull;}//计算线程池线程数量intwc=workerCountOf(c);//Areworkerssubjecttoculling?//allowCoreThreadTimeOut参数设置为true,或则线程池的线程数大于corePoolSize,表示需要超时的Worker需要退出,booleantimed=allowCoreThreadTimeOut||wc>corePoolSize;//线程数大于最大线程数||已经超时if((wc>maximumPoolSize||(timed&&timedOut))//线程数大于1或者任务队列为空&&(wc>1||workQueue.isEmpty())){//CAS将线程数减1if(compareAndDecrementWorkerCount(c))returnnull;continue;}try{//需要处理超时的Worker,则获取任务队列中任务等待的时间//就是线程池构造函数中keepAliveTime时间,如果不处理超时的Worker//则直接调用take一直阻塞等待任务队列中有任务,拿到就返回Runnale任务Runnabler=timed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();if(r!=null)returnr;timedOut=true;}catch(InterruptedExceptionretry){timedOut=false;}}}

Worker的退出处理:1 从上面分析知道completedAbruptly是任务执行时是否出现异常标志,如果任务执行过程出错,则将线程池的线程数量减12.加线程池的mainLock的全局锁,这里主要区分Worker执行任务中,拿的是Worker内部的锁,完成任务加1,将worker从Worker的集合移除,3. 执行tryTerminate函数,是否线程池线程池是否关闭4. 根据线程池状态是否补充非核心的Worker线程去处理

privatevoidprocessWorkerExit(Workerw,booleancompletedAbruptly){//任务执行时出现异常,则减去工作if(completedAbruptly)//Ifabrupt,thenworkerCountwasn'tadjusteddecrementWorkerCount();//拿到线程池的主锁finalReentrantLockmainLock=this.mainLock;//加锁mainLock.lock();try{//完成任务加1completedTaskCount+=w.completedTasks;//将worker从Worker的集合移除workers.remove(w);}finally{mainLock.unlock();}//尝试线程池关闭tryTerminate();//获取线程池的ctlintc=ctl.get();//如果线程池的状态值小于STOP,即使SHUTDOWNRUNNINGif(runStateLessThan(c,STOP)){//任务执行没有异常if(!completedAbruptly){//allowCoreThreadTimeOut参数true,则min=0,表示不需要线程常驻。//负责是有corePoolSize个线程常驻线程池intmin=allowCoreThreadTimeOut?0:corePoolSize;if(min==0&&!workQueue.isEmpty())min=1;//如果线程池数大于最小,也就是不需要补充线程执行任务队列的任务if(workerCountOf(c)>=min)return;//replacementnotneeded}//走到这里表示线程池的线程数为0,而任务队列又不为空,得补充一个线程处理任务addWorker(null,false);}}

tryTerminate的逻辑是处理线程池关闭的场景

finalvoidtryTerminate(){for(;;){intc=ctl.get();//线程池是RUNNING状态if(isRunning(c)||//线程池状态至少是TIDYINGrunStateAtLeast(c,TIDYING)||//线程池状态是SHUTDOWN但是队列不为空(runStateOf(c)==SHUTDOWN&&!workQueue.isEmpty()))return;if(workerCountOf(c)!=0){//Eligibletoterminate//中断一个空闲线程interruptIdleWorkers(ONLY_ONE);return;}//只有最后一个线程才能走到这里,处理线程池从TIDYIING状态//到TERMINATED状态finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{if(ctl.compareAndSet(c,ctlOf(TIDYING,0))){try{//钩子函数terminated();}finally{//设置线程池TERMINATED状态ctl.set(ctlOf(TERMINATED,0));//唤醒调用awaitTermination的线程termination.signalAll();}return;}}finally{mainLock.unlock();}//elseretryonfailedCAS}}

线程池的拒绝策略 RejectedExecutionHandler

当线程池无法处理任务时的处理策略:1.默认拒绝策略是AbortPolicy 直接抛出RejectedExecutionException异常 2.DiscardPolicy 直接丢弃任务 3.DiscardOldestPolicy 丢弃任务队列中最老的任务,这里之前理解是直接丢弃,其实看了源码之后,其实它还是当线程池还咩有关闭时,尝试去提交该任务到线程池去执行

publicstaticclassDiscardOldestPolicyimplementsRejectedExecutionHandler{/***Createsa{@codeDiscardOldestPolicy}forthegivenexecutor.*/publicDiscardOldestPolicy(){}/***Obtainsandignoresthenexttaskthattheexecutor*wouldotherwiseexecute,ifoneisimmediatelyavailable,*andthenretriesexecutionoftaskr,unlesstheexecutor*isshutdown,inwhichcasetaskrisinsteaddiscarded.**@paramrtherunnabletaskrequestedtobeexecuted*@parametheexecutorattemptingtoexecutethistask*/publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){if(!e.isShutdown()){e.getQueue().poll();e.execute(r);}}}

CallerRunsPolicy 直接调用方去执行这个任务,也就是直接Runnable的run函数。

privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING,0));privatestaticfinalintCOUNT_BITS=Integer.SIZE-3;privatestaticfinalintCAPACITY=(1<<COUNT_BITS)-1;//线程容量//runStateisstoredinthehigh-orderbitsprivatestaticfinalintRUNNING=-1<<COUNT_BITS;privatestaticfinalintSHUTDOWN=0<<COUNT_BITS;privatestaticfinalintSTOP=1<<COUNT_BITS;privatestaticfinalintTIDYING=2<<COUNT_BITS;privatestaticfinalintTERMINATED=3<<COUNT_BITS;0

总结

本文主要就线程池的状态转换、工作线程Worker创建以及执行任务队列中任务的流程、拒绝策略的详细分析。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/33.html