盘一盘晕头转向的RunnableCallableFuture
今天我们来一起盘一盘应用场景很多的实现类,虽然我们不经常直接使用这些类,但是在各种地方都有它们的身影(比如线程池),且它们很容易搞混,我称之为可知结果的未来任务FutureTask(叫法有点拗口哈,因为我实在想不出更贴切的名字了)
由于涉及到的接口定义和实现比较多(看标题就知道了),我先让它们各自做个自我介绍,然后我们最后来看看所谓的可知结果的未来任务的源码实现。逐个介绍Runnable
在之前介绍线程的时候,我们就知道了,创建线程的很多方法,其中实现Runnable接口就是其中的方式,其实就是定义了这个线程run方法中做了哪些事情:
源码中的注释还是比较好理解的,但是我们会发现,这个run方法没有任何返回值,也就是说我们根本就不知道,run方法的执行结果到底怎么样,这时DougLea大神就定义了另外一套接口Callable系列接口。Callable
要想run方法有结果,最直接的方法就是将返回值类型void改成特定的类,好在Java有泛型编程,如下源码实现:Ataskthatreturnsaresultandmaythrowanexception。Implementorsdefineasinglemethodwithnoargumentscalled{codecall}。pThe{codeCallable}interfaceissimilarto{linkjava。lang。Runnable},inthatbotharedesignedforclasseswhoseinstancesarepotentiallyexecutedbyanotherthread。A{codeRunnable},however,doesnotreturnaresultandcannotthrowacheckedexception。pThe{linkExecutors}classcontainsutilitymethodstoconvertfromothercommonformsto{codeCallable}classes。seeExecutorsince1。5authorDougLeaparamVtheresulttypeofmethod{codecall}FunctionalInterfacepublicinterfaceCallableV{Computesaresult,orthrowsanexceptionifunabletodoso。returncomputedresultthrowsExceptionifunabletocomputearesultVcall()throwsException;}复制代码
我们和Runnable简单比较下:Callable有返回值Callable可以抛出异常。除了我们想要的有返回值之外,大神还帮我们想到了异常处理的问题,这样子线程任务里发生了异常,我们就可以做对应的处理了,而Runnable是不行的,丧失了一定的灵活和自定义的特性。如下demo演示下:publicclassCallableDemo{publicstaticvoidmain(String〔〕args)throwsException{CallableStringcallablenewCallableString(){OverridepublicStringcall()throwsException{for(inti0;i5;i){Thread。sleep(1000);System。out。println(Thread。currentThread()。getName()处理业务中i);}return122333333;}};System。out。println(Thread。currentThread()。getName()结果:callable。call());}}运行效果main处理业务中0main处理业务中1main处理业务中2main处理业务中3main处理业务中4main结果:122333333复制代码
用法是不是很简单呢。特点如下:想要有结果,就必须等到业务处理完成,就像是线程卡住了自己,无法异步调用从打印效果来看,不像Runnable重新开启了另外一个线程,是主线程自己调用,然后就卡住了
这时,大神又定义了另一个接口,它可以把call方法交给另一个线程异步执行,叫做未来接口Future,如果有更牛逼的名字,请告知我哈Future
官方的解释如下:Future表示异步计算的结果。方法用于检查计算是否完成、等待计算完成以及检索计算结果。只有在计算完成后,才能使用get方法检索结果,在必要时阻塞直到它准备好。取消是由cancel方法执行的。还提供了其他方法来确定任务是正常完成还是被取消。一旦计算完成,就不能取消计算。如果为了可取消性而使用Future,但不提供可用的结果,可以声明Future并返回null作为底层任务的结果。
简单来讲如下:此接口用来代表一个异步计算的结果。可以用来获取一个异步结果,可以取消正在执行的任务,可以判断任务是否取消,可以判断任务的是否完成。只有等任务完成,才会有结果返回。总的来看,Future接口只是提供了获取异步结果的方法以及一系列操控异步执行任务的方法,但我感觉还没真正的是把任务异步交给了另一个线程。于是,大神在这基础之上又组合了一个类,叫未来任务接口RunnableFutureRunnableFutureA{linkFuture}thatis{linkRunnable}。Successfulexecutionofthe{coderun}methodcausescompletionofthe{codeFuture}andallowsaccesstoitsresults。一个可运行的未来。run方法的成功执行将导致Future的完成,并允许访问其结果。seeFutureTaskseeExecutorsince1。6authorDougLeaparamVTheresulttypereturnedbythisFutures{codeget}methodpublicinterfaceRunnableFutureVextendsRunnable,FutureV{SetsthisFuturetotheresultofitscomputationunlessithasbeencancelled。除非已取消,否则将此Future设置为其计算的结果。voidrun();}复制代码
从源码的继承关系来看,它即继承了Runnable,又继承了Future,这样子的一个既可以创建异步线程来执行异步任务,又可以对异步任务进行操控的接口就这样诞生了。至于里面的run方法,一开始我也觉得很懵,为啥Runnable接口里面有了,这边又定义了一次?结合下图官方API,你没看错,它其实就是Runnable的run方法,我猜它这边又定义了一次,只是用来强调下,这是这个类的特有异步执行可操控未来的一个异步过程(就是强调这是属于该未来任务的执行方法,只不过是借用了Runnable,感觉有点像是应用了Adapter适配器设计模式)
好了,上面的介绍其实都是铺垫,下面的才是本篇的主角,它实现了以上接口的所有特性,它才是真正的未来任务实现类FutureTask。
在此,我们画一个UML类图,来对上面的各个接口定义做如下清晰展示:
正主:FutureTask
下面我们就来看看我们的正主:FutureTask。重要属性Revisionnotes:ThisdiffersfrompreviousversionsofthisclassthatreliedonAbstractQueuedSynchronizer,mainlytoavoidsurprisingusersaboutretaininginterruptstatusduringcancellationraces。SynccontrolinthecurrentdesignreliesonastatefieldupdatedviaCAStotrackcompletion,alongwithasimpleTreiberstacktoholdwaitingthreads。Stylenote:Asusual,webypassoverheadofusingAtomicXFieldUpdatersandinsteaddirectlyuseUnsafeintrinsics。大概意思如下:修订注意:这与依赖于AbstractQueuedSynchronizer的该类的以前版本不同,主要是为了避免在取消竞赛期间保留中断状态让用户感到意外。当前设计中的同步控制依赖于通过CAS更新的state字段来跟踪完成情况,以及一个简单的Treiber堆栈来保存等待的线程。样式注意:与往常一样,我们绕过了使用AtomicXFieldUpdaters的开销,而是直接使用Unsafeintrinsic。请注意:本篇基于JDK8源码讲解,这段话的意思其实就是在这版本之前,应该是依赖AQS实现的现在改成了通过stateCASTreiber堆栈来实现的。有兴趣的小伙伴可以看下JDK8之前的实现。Therunstateofthistask,initiallyNEW。Therunstatetransitionstoaterminalstateonlyinmethodsset,setException,andcancel。Duringcompletion,statemaytakeontransientvaluesofCOMPLETING(whileoutcomeisbeingset)orINTERRUPTING(onlywhileinterruptingtherunnertosatisfyacancel(true))。Transitionsfromtheseintermediatetofinalstatesusecheaperorderedlazywritesbecausevaluesareuniqueandcannotbefurthermodified。Possiblestatetransitions:NEWCOMPLETINGNORMALNEWCOMPLETINGEXCEPTIONALNEWCANCELLEDNEWINTERRUPTINGINTERRUPTEDprivatevolatileintstate;privatestaticfinalintNEW0;privatestaticfinalintCOMPLETING1;privatestaticfinalintNORMAL2;privatestaticfinalintEXCEPTIONAL3;privatestaticfinalintCANCELLED4;privatestaticfinalintINTERRUPTING5;privatestaticfinalintINTERRUPTED6;Theunderlyingcallable;nulledoutafterrunningprivateCallableVcallable;Theresulttoreturnorexceptiontothrowfromget()privateObjectoutcome;nonvolatile,protectedbystatereadswritesThethreadrunningthecallable;CASedduringrun()privatevolatileThreadrunner;TreiberstackofwaitingthreadsprivatevolatileWaitNodewaiters;复制代码state:任务状态,有代码中表示的七种状态,从上面的注释可以看出,可能的状态转换有四种:NEWCOMPLETINGNORMALNEWCOMPLETINGEXCEPTIONALNEWINTERRUPTINGINTERRUPTEDNEWCANCELLED其中,NEW(新建)是初始状态;COMPLETING(完成中)和INTERRUPTING(中断中)属于中间状态;NORMAL(正常结束)、EXCEPTIONAL(发生异常)、INTERRUPTED(中断了)和CANCELLED(被取消了)属于结束状态。Callablecallable:所谓的要执行的真正的任务Objectoutcome:任务执行后的结果Threadrunner:真正执行任务的线程WaitNodewaiters:看注释有:Treiberstack,一种无锁并发栈,其特性是基于CAS算法进行栈的操作。所以是一个线程安全的栈(可以保证入栈出栈的线程安全,即同一时刻只有一个线程操作成功)。观其定义:等待节点栈,难道这里又是一个类似等待队列的实现?如下是内部类WaitNode的定义:SimplelinkedlistnodestorecordwaitingthreadsinaTreiberstack。SeeotherclassessuchasPhaserandSynchronousQueueformoredetailedexplanation。staticfinalclassWaitNode{volatileThreadthread;volatileWaitNodenext;WaitNode(){threadThread。currentThread();}}复制代码
果不其然,和我们之前一起学习过的AQS差不多,这里是一个单向的等待列表(因为有个next属性),内部也是当前线程。
根据以上源码注释和属性,总结:在当前版本(JDK8,之前的版本实现方式不太一样)FutureTask的同步控制实现,是通过对状态的CAS的更新,以及用一个线程安全的Treiber堆栈来保存等待的线程的方式来进行的。构造器
FutureTask提供了两个构造器,如下:Createsa{codeFutureTask}thatwill,uponrunning,executethegiven{codeCallable}。paramcallablethecallabletaskthrowsNullPointerExceptionifthecallableisnullpublicFutureTask(CallableVcallable){if(callablenull)thrownewNullPointerException();this。callablecallable;this。stateNEW;ensurevisibilityofcallable}Createsa{codeFutureTask}thatwill,uponrunning,executethegiven{codeRunnable},andarrangethat{codeget}willreturnthegivenresultonsuccessfulcompletion。paramrunnabletherunnabletaskparamresulttheresulttoreturnonsuccessfulcompletion。Ifyoudontneedaparticularresult,considerusingconstructionsoftheform:{codeFuturelt;?fnewFutureTaskVoid(runnable,null)}throwsNullPointerExceptioniftherunnableisnullpublicFutureTask(Runnablerunnable,Vresult){this。callableExecutors。callable(runnable,result);this。stateNEW;ensurevisibilityofcallable}复制代码一个是直接传入Callable对象一个传入一个任务对象runnable,以及一个指定的任务结果,观其内部,是通过【Executors。callable(runnable,result)】转成的callable,我们来看看怎么转的,代码如下:publicstaticTCallableTcallable(Runnabletask,Tresult){if(tasknull)thrownewNullPointerException();returnnewRunnableAdapterT(task,result);}适配器模式来啦,和前面的猜测呼应上了主要目的就是将Runnable适配成Callable并指定传过来的结果作为异步任务的结果staticfinalclassRunnableAdapterTimplementsCallableT{finalRunnabletask;finalTresult;RunnableAdapter(Runnabletask,Tresult){this。tasktask;this。resultresult;}publicTcall(){task。run();returnresult;}}复制代码
通过构造器我们知道,最终就算传入的是Runnable对象,其实最后还是callable对象。具体接口实现
具体接口的实现,就是FutureTask的实现原理了,我们来看看它是如何实现异步任务的控制,并且可以获取到异步任务的结果的吧。主要是Runnable接口和Future接口对应的方法,首先来看Runnable接口的实现:run方法实现publicvoidrun(){如果state不为NEW或者将runner修改为当前线程不成功,注意此时的线程是真正执行任务的线程if(state!NEW!UNSAFE。compareAndSwapObject(this,runnerOffset,null,Thread。currentThread()))return;try{CallableVccallable;if(c!nullstateNEW){Vresult;booleanran;以下过程较简单,就不注释了try{resultc。call();rantrue;}catch(Throwableex){resultnull;ranfalse;setException(ex);}if(ran)set(result);}}finally{runnermustbenonnulluntilstateissettledtopreventconcurrentcallstorun()runnernull;statemustberereadafternullingrunnertopreventleakedinterruptsprivatestaticfinalintINTERRUPTING5;privatestaticfinalintINTERRUPTED6;intsstate;如果状态大于等于INTERRUPTING,说明在中断线程if(sINTERRUPTING)见下面方法的分析handlePossibleCancellationInterrupt(s);}}set(result)protectedvoidset(Vv){if(UNSAFE。compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){outcomev;UNSAFE。putOrderedInt(this,stateOffset,NORMAL);finalstatefinishCompletion();}}setException(ex)protectedvoidsetException(Throwablet){if(UNSAFE。compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){注意此处将异常结果给了结果outcomet;UNSAFE。putOrderedInt(this,stateOffset,EXCEPTIONAL);finalstatefinishCompletion();}}finishCompletion()Removesandsignalsallwaitingthreads,invokesdone(),andnullsoutcallable。其实就是个循环清空等待队列,并且唤醒各个节点线程的方法,也相当于一个结束后的优化清理工作我叫他动作完成的既定动作。privatevoidfinishCompletion(){assertstateCOMPLETING;for(WaitNodeq;(qwaiters)!null;){if(UNSAFE。compareAndSwapObject(this,waitersOffset,q,null)){for(;;){Threadtq。thread;if(t!null){q。threadnull;LockSupport。unpark(t);}WaitNodenextq。next;if(nextnull)break;q。nextnull;unlinktohelpgcqnext;}break;}}done();callablenull;toreducefootprint}handlePossibleCancellationInterruptprivatevoidhandlePossibleCancellationInterrupt(ints){Itispossibleforourinterruptertostallbeforegettingachancetointerruptus。Letsspinwaitpatiently。其实是个自旋,只要没有被中断,就不会退出,直到被中断退出(中断的过程在cancel方法里可以看到我们下面会分析到)if(sINTERRUPTING)while(stateINTERRUPTING)Thread。yield();waitoutpendinginterrupt}复制代码
过程如下:首先,先判断state状态是否是NEW状态,如果不是NEW,则直接返回,合情合理;如果是NEW状态,则通过CAS的方式将当前的runner修改为当前线程,修改成功的话,将继续执行下面的代码。由此我们可以知道的一点是,runner属性,其实是在具体运行的时候,才赋值的。即谁调用了run方法谁就是执行者。继续,取出任务callable,如果任务为空或者state状态不为NEW;则就直接退出了;如果都为true的话,则执行callable方法体对应的业务,并将结果拿到,如果中间出现的异常,则设置异常【setException(ex)】;如果执行成功,则设置结果【set(result)】set(result):此方法首先,将state通过CAS的方式设置为COMPLETING,然后将结果给属性outcome,然后再将state设置为NORMAL,最后调用【finishCompletion()】setException(ex):和上面一样,首先先设置状态COMPLETING,将异常结果赋给outcome,最后再将state置为EXCEPTIONAL,最后调用【finishCompletion()】从set(result)和setException(ex)两个方法可以看出:中间状态COMPLETING其实是一个很短暂的时间,其实在设置为这个状态之前,任务实际已经执行结束或者抛出异常了。finishCompletion():这段代码的作用其实就是将所有等待栈中的节点置为空,并且唤醒所有等待的线程。同时里面有个可以扩展的方法done(可以用来做些额外的事情)。我把这段代码称之为完成既定动作最后,执行finally体里代码:首先将runner置为空,并且判断state的状态,有没有被置为sINTERRUPTING,如果成立,说明有其他线程中断了操作,如果是这样的话,则调用方法【handlePossibleCancellationInterrupt(s)】handlePossibleCancellationInterrupt(s):该方法其实也很简单,就是不停看状态state有没有变为INTERRUPTED,直至变为INTERRUPTED才会退出。
简单总结:run方法其实就是将当前线程变成真正的runner,然后不停地判断状态是否正常,正常的话,就去执行call方法具体的业务,执行完成的话,就把结果返回,并且唤醒所有等待的线程;如果有异常,则设置异常;最后再次判断是否有中断,有的话,等待state变为已中断才退出。
我们再看Future的实现:其实主要就是看get和cancel两个方法就可以了。get方法publicVget()throwsInterruptedException,ExecutionException{intsstate;if(sCOMPLETING)sawaitDone(false,0L);returnreport(s);}awaitDone(false,0L)Awaitscompletionorabortsoninterruptortimeout。在中断或超时时等待完成或中止paramtimedtrueifusetimedwaitsparamnanostimetowait,iftimedreturnstateuponcompletionprivateintawaitDone(booleantimed,longnanos)throwsInterruptedException{finallongdeadlinetimed?System。nanoTime()nanos:0L;待封装的节点(主要用来将当前执行get方法的线程在未来可能封装为等待节点,注意是可能,仔细看下面的分析)WaitNodeqnull;是否排队booleanqueuedfalse;此处是一个自旋操作,什么时候退出循环呢,其实在方法的注释上已经告诉我们了1、被中断2、等待超出设定的时间3、任务完成被唤醒for(;;){如果在反复循环过程中线程被中断了,此时有可能线程被放到队列(Treiber栈),则先移除,然后抛异常;如果没有放入等待队列,则直接抛出异常if(Thread。interrupted()){removeWaiter(q);thrownewInterruptedException();}我把状态值移过来,方便大家分析privatestaticfinalintNEW0;privatestaticfinalintCOMPLETING1;privatestaticfinalintNORMAL2;privatestaticfinalintEXCEPTIONAL3;privatestaticfinalintCANCELLED4;privatestaticfinalintINTERRUPTING5;privatestaticfinalintINTERRUPTED6;intsstate;继续走到这里,判断状态是否大于COMPLETING(NORMALEXCEPTIONALCANCELLEDINTERRUPTINGINTERRUPTED),就是目前处于上面那些状态,要么完成,要么异常了,要么被取消了等结束状态,则直接返回当前任务的状态即可。if(sCOMPLETING){if(q!null)此处的置空,其实是方便GC垃圾回收q。threadnull;returns;}如果状态为正在执行任务中,则yield等等,进入下一次的循环elseif(sCOMPLETING)cannottimeoutyetThread。yield();如果再进来一次循环,代码走到这里(sCOMPLETING,说明任务还没正式开整),如果q还是null,说明当前线程还没封装为等待节点,则先包装为等待节点,再次进入下一次的循环(PS:到底什么时候sCOMPLETING,请回头到上面看看run方法的过程就知道了)elseif(qnull)qnewWaitNode();再进来,queuedfalse,如果还没入队,则将包装好的q通过CAS头插入(查到头节点之前)的方式入队,然后再次进入下一次的循环elseif(!queued)queuedUNSAFE。compareAndSwapObject(this,waitersOffset,q。nextwaiters,q);如果是超时进来的,则里面的就是等待超时的操作,和之前讲过的AQS超时操作都是一样的elseif(timed){nanosdeadlineSystem。nanoTime();if(nanos0L){removeWaiter(q);returnstate;}LockSupport。parkNanos(this,nanos);}再进来循环,走到这里,说明任务还未结束,且当前线程已经包装成节点进入等待队列(Treiber栈)了,则阻塞线程elseLockSupport。park(this);}}report(s)Returnsresultorthrowsexceptionforcompletedtask。paramscompletedstatevalue我把状态值移过来,方便大家分析privatestaticfinalintNEW0;privatestaticfinalintCOMPLETING1;privatestaticfinalintNORMAL2;privatestaticfinalintEXCEPTIONAL3;privatestaticfinalintCANCELLED4;privatestaticfinalintINTERRUPTING5;privatestaticfinalintINTERRUPTED6;SuppressWarnings(unchecked)privateVreport(ints)throwsExecutionException{Objectxoutcome;if(sNORMAL)return(V)x;if(sCANCELLED)thrownewCancellationException();除了上面两种情况外,就是执行抛异常了thrownewExecutionException((Throwable)x);}复制代码
get流程总结如下:取出state,判断是否小于等于COMPLETING,即看任务是不是还在运行或者还没开始;如果是的话,则调用【awaitDone(false,0L)】方法awaitDone(false,0L):此方法是个for自旋,大概意思就是逐步地(逐步的意思就是通过自旋的形式,反复往下执行,注意里面的每个if分支其实互斥发生的,且每次循环,都会从头到尾再来一次,请仔细看代码里的for循环过程分析)将当前线程包装成等待节点从头部放入等待栈并且最后park阻塞等待的过程。这里面一直在自旋检测是否被中断和判断当前state的状态值,大于COMPLETING说明要么任务完成,要么任务失败(被取消,发生异常等)。其实这个方法的过程,如果看过我之前的AQS的分析,其实看这些代码很容易理解的,无非就是自旋CAS操作而已。上面执行awaitDone的最终线程被阻塞,直到线程被唤醒(什么时候唤醒?就在上面的run方法当中执行的finishCompletion方法中唤醒的),继续往下执行【report(s)】report(s):代码就很简单了,状态正常结束的话,就返回结果;状态被取消的话,就抛出取消异常;如果在任务执行过程中发生异常,也会包装成ExecutionException抛出异常。cancel方法Future接口,cancel方法的注释Attemptstocancelexecutionofthistask。Thisattemptwillfailifthetaskhasalreadycompleted,hasalreadybeencancelled,orcouldnotbecancelledforsomeotherreason。Ifsuccessful,andthistaskhasnotstartedwhen{codecancel}iscalled,thistaskshouldneverrun。Ifthetaskhasalreadystarted,thenthe{codemayInterruptIfRunning}parameterdetermineswhetherthethreadexecutingthistaskshouldbeinterruptedinanattempttostopthetask。试图取消此任务的执行。如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败。如果成功,并且在调用cancel时这个任务还没有启动,那么这个任务就不应该运行。如果任务已经开始,那么参数mayInterruptIfRunning决定执行该任务的线程是否应该在试图停止该任务时被中断。pAfterthismethodreturns,subsequentcallsto{linkisDone}willalwaysreturn{codetrue}。Subsequentcallsto{linkisCancelled}willalwaysreturn{codetrue}ifthismethodreturned{codetrue}。该方法返回后,对isDone的后续调用将始终返回true。如果该方法返回true,则对iscancelled后续调用将始终返回true。parammayInterruptIfRunning{codetrue}ifthethreadexecutingthistaskshouldbeinterrupted;otherwise,inprogresstasksareallowedtocompletemayInterruptIfRunningtrue:当前的任务会被中断false:允许当前的任务继续执行return{codefalse}ifthetaskcouldnotbecancelled,typicallybecauseithasalreadycompletednormally;{codetrue}otherwisepublicbooleancancel(booleanmayInterruptIfRunning){if(!(stateNEWUNSAFE。compareAndSwapInt(this,stateOffset,NEW,mayInterruptIfRunning?INTERRUPTING:CANCELLED)))returnfalse;try{incasecalltointerruptthrowsexceptionif(mayInterruptIfRunning){如果mayInterruptIfRunning为true,则中断线程,并且将状态改为INTERRUPTEDtry{Threadtrunner;if(t!null)t。interrupt();}finally{finalstateUNSAFE。putOrderedInt(this,stateOffset,INTERRUPTED);}}}finally{最后做完成既定动作,上面分析过finishCompletion();}最后返回true,取消成功returntrue;}复制代码
根据代码和注释分析如下:如果state状态不为NEW,即当前任务已经进行了,即【stateNEW】为false,则直接返回false,取消失败如果state为NEW,则通过CAS将状态设置为INTERRUPTING(如果mayInterruptIfRunning为true)或者CANCELLED,如果修改成功,将继续,否则也失败返回哪些情况会取消失败:(1)任务已经开始进行或者完成了(2)任务已经被取消了(3)其他原因无法取消的此方法说是取消了,但其实并不代表任务真正地被取消了,通过源码分析可以知道,它只是改变了状态state,这时就看你当前任务执行到什么程度了,如果还没开始,那么任务就不会执行(run方法中在判断【c!nullstateNEW】后就不会继续往下执行任务了);如果任务已经在执行了,就看你有没有指定中断(即mayInterruptIfRunning为true),如果指定中断,那会中断操作;如果没有,则继续运行。回头再看一次run方法的过程就知道了:run方法最终会到finally代码块,这时【sINTERRUPTING】成立,如果sINTERRUPTING,说明cancel方法还没有执行【t。interrupt()】,所以在【handlePossibleCancellationInterrupt】方法里会一直等待直到变成INTERRUPTED。
以上呢,便是FutureTask主要操作的过程分析了。FutureTask总结1、FutureTask分别实现了Runnable和Future的接口,其中的各种操作都离不开对状态state的操作和判断。state状态的中间状态(COMPLETING,INTERRUPTING)存在时间很短(通过set(Vv)setException和cancel方法可知),state的值要么就是初始状态NEW,要么就很快变成了结束状态(NORMAL,EXCEPTIONAL,CANCELLED,INTERRUPTED)此外,在我们分析完rungetcancel方法之后,我们发现这三个方法之间都需要前后联系起来看,他们之间其实是通过state这个状态值,来作为中间的一个协调者这样的一个角色来实现最终的效果的。2、FutureTask三大方法的调用是不同种类的线程在调用,大体归为三类:第一类是任务执行者,就是用来当作runner的那个线程;第二类是调用get获取结果的;第三类则是调用cancel方法的。当然可能还有其他调用isDone和isCancel这些工具方法的线程。由于是多线程并发编程,以上三类线程有时几乎是同时进行的,所以针对状态以及一些特殊值的设定,都采取了自旋CAS的方式进行的(包括等待队列【Treiber栈】的实现),这样就保证了各个操作的线程安全性。3、FutureTask等待队列的实现和我们之前一起学习过的AQS差不多,但比AQS简单很多,同样是线程如果拿不到自己想要的任务结果就会逐步的通过自旋CAS的方式把线程包装成节点,然后头压栈的形式放到等待队列里;当任务执行完成,再进行唤醒通知它们取结果。在这过程中:如果任务执行发生了异常(其中也包括中断异常):则直接将state改为EXCEPTIONAL,然后退出并唤醒等待结果的线程(并将异常信息抛出,回头看report方法的分析就会知道)如果被其他线程取消,两种情况:中断式取消和非中断式中断式:先将状态改为中间状态INTERRUPTING,然后中断线程,如果这时任务正在执行,就会抛出中断异常,然后如果有线程get的话,在被唤醒后,执行report的时候直接报错结束(这时虽然报CancellationException,其根本就是中断异常);非中断式:先将状态改为结束状态CANCELLED,然后就看任务自己的执行情况了,如果任务还未开始,则run方法中在判断【c!nullstateNEW】后就不会继续往下执行任务了;如果任务已经进行,则任务会继续执行完成,只不过会在【set(result)】方法里修改状态为COMPLETING会失败,结果也会是空的。如果有调用get的线程,则被唤醒后,执行report的时候也会直接抛出异常。DEMO分享
最后呢,我再提供一个demo的用法示例,注意看代码中的注释,按照最后的方法试试运行效果:publicclassDEmo{publicstaticvoidmain(String〔〕args)throwsInterruptedException,ExecutionException{System。out。println(mainstart);FutureTaskIntegertasknewFutureTaskInteger((){System。out。println(thread:Thread。currentThread()。getName()异步任务开始。。。);Thread。sleep(5000);System。out。println(thread:Thread。currentThread()。getName()异步任务结束。。);return1024;});Threadt1newThread(task);开启FutureTask任务的执行,由t1线程执行t1。start();t2为取消线程Threadt2newThread(newRunnable(){Overridepublicvoidrun(){t2线程2s后取消,两种形式,中断式取消和非中断式try{Thread。sleep(2000);}catch(InterruptedExceptione){}1task。cancel(false);}});t2。start();此处主线程等待获取结果2System。out。println(task。get());上面的demo,我标出了1和2两处位置小伙伴们可以试着切换1处的truefalse看看效果也可以将2处的代码进行注释和不注释切换看看效果}}复制代码
PS:其实提供这段代码的主要目的是供小伙伴们打断点调试,来印证我文末最后的总结,如下方我在源码中打断点的代码位置(看代码行数即可)
FutureTask扩展
以为到这里本篇就结束了吗?还没有,我们再来仔细看下FutureTask的方法,如下:
我们仔细看这三个方法的修饰符,都是protected,这说明什么,说明这三个方法可以在子类中被重写
在此,我们也引出一个问题:FutureTask虽然为我们提供了获取任务结果的方法get,但是呢,最终还是没有达到我想要的异步效果,即我调用了get方法之后,会阻塞在那,而且发生异常的话,异常只能被动抛出。我现在想要实现一种我不需要get方法阻塞,也能获取到结果,且以通知的形式告诉主线程,如果任务异常的话,可以主动地获取到异常信息。
来看看怎么实现的吧定义结果监听器publicinterfaceTaskListenser{exception:true,结果异常,value即异常详情信息false,无异常,value即正常结果值publicvoidonResultOk(booleanexception,Objectvalue);}复制代码自定义Future,重写set和setExceptionpublicclassMyFutureTaskVextendsFutureTaskV{设置监听privateTaskListensertaskListenser;publicMyFutureTask(CallableVcallable){super(callable);}Overrideprotectedvoidset(Vv){super。set(v);try{任务执行完成,通知taskListenser。onResultOk(false,get());}catch(InterruptedExceptionExecutionExceptione){e。printStackTrace();}}OverrideprotectedvoidsetException(Throwablet){super。setException(t);taskListenser。onResultOk(true,getExceptionResult());}把异常信息以返回值的形式给到前台returnprivateThrowablegetExceptionResult(){try{get();}catch(InterruptedExceptionExecutionExceptione){returne;}returnnull;}publicvoidsetTaskListenser(TaskListensertaskListenser){this。taskListensertaskListenser;}}复制代码运行类DEMOpublicclassMyDemoimplementsTaskListenser,CallableString{OverridepublicvoidonResultOk(booleanexception,Objectvalue){if(!exception){System。out。println(我收到結果了:(String)value);}else{System。out。println(执行任务发生异常了:((Throwable)value)。getMessage());}}OverridepublicStringcall()throwsException{System。out。println(Thread。currentThread()。getName()异步任务开始。。。);模拟执行任务for(inti0;i5;i){System。out。println(Thread。currentThread()。getName()执行任务中。。。i);此段代码可以用来测试任务异常的情况,注释打开即可if(i3){thrownewException(发生异常啦!!!);}Thread。sleep(1000);}System。out。println(Thread。currentThread()。getName()异步任务结束。。);return1024;}publicstaticvoidmain(String〔〕args){MyDemomyDemonewMyDemo();MyFutureTaskStringtasknewMyFutureTask(myDemo);task。setTaskListenser(myDemo);Threadt1newThread(task);开启FutureTask任务的执行,由t1线程执行t1。start();}}复制代码运行效果正常Thread0异步任务开始。。。Thread0执行任务中。。。0Thread0执行任务中。。。1Thread0执行任务中。。。2Thread0执行任务中。。。3Thread0执行任务中。。。4Thread0异步任务结束。。我收到結果了:1024发生异常的效果Thread0异步任务开始。。。Thread0执行任务中。。。0Thread0执行任务中。。。1Thread0执行任务中。。。2Thread0执行任务中。。。3执行任务异常了:java。lang。Exception:发生异常啦!!!复制代码
例子比较简单哈,大家可以试试当然,大家如果能够举一反三的话就更好了!!比如我这边的通知只是利用了观察者模式,能不能用我们之前学的阻塞队列呢?切忌眼高手低啊!
本篇到此就结束了,你学会了吗?文字偏多,感谢阅读,认可的话,请点关注吧哈哈
原文链接:https:juejin。cnpost7120469977634701319
43岁殷桃也太会穿了吧!老爹西装也穿出潮流范,大婶年纪少女状春季来临,休闲百搭的老爹西装也重出了江湖,作为西装的一种,老爹西装时尚简洁,不会有商务西装的正式与沉闷,反而注入了几分帅气潇洒的气质,是大女人的最爱与首选,它流畅的线条与宽松的……
总决赛MVP榜单出炉郭艾伦彻底无缘争夺,辽宁付豪弗格希望最大在辽宁男篮以9990击败浙江广厦的比赛里,坐在场边的郭艾伦一脸若有所思,这是他职业生涯,第二次有机会捧起总冠军奖杯,也是他作为辽宁男篮的绝对核心以来,球队整体表现最出色的一个赛……
贵圈4大狗仔,他们曝出来的瓜,毁掉了娱乐圈的半边天一、卓伟内地第一狗仔众所周知的卓伟,本名叫韩炳江,卓伟是他的笔名,除此之外,他还有过很多笔名,卓伟最出名。卓伟非常努力,他原本读的是中专,硬是凭借努力考上了汉语言文……
2022又流行工装裤?但杨幂赵丽颖同款过时了,今年要这么选有没有人和我一样,觉得今年的冬天好冷好漫长但也相信春天已经在派送的路上啦,所以也是蠢蠢欲动想买新衣服在各大网站上搜寻剁手灵感的时候,我发现今年有一条裤子的出镜率好高,各位时尚博……
为了奖牌不择手段,为什么韩国运动员对冠军这样执着?这几天,大家是不是都被冰墩墩、谷爱凌给刷屏了。很多的奥运选手可以说是频繁地登上热搜。今年的北京冬奥会可以说是引来了全国人民极高的关注。但这里面,还充斥着一些非常不和谐的声音。……
广东第二阶段兵强马壮,外援回归,新人加盟联赛第一阶段结束,广东取得11胜2负,排名第二的战绩。在大量练兵的情况下取得这样的成绩算很不错了,再说输北京那场主要还是伤病原因。第一阶段的一些数据和去年相比,因为外援不……
自驾游新疆深一度石榴云新疆日报记者姚刚一场别开生面的自驾旅行,除了体验在城市里开车感受不到的动力和性能,欣赏沿线美景、品尝当地美食外,还能关注自驾游的文化味、参与感。地域辽阔的新疆……
500元价位的路由器怎么选这个价位首选小米,TPLINK这些品牌,可以买到中高端配置的路由器了。三大品牌的路由器虽然不错,但500元只能买到入门级的产品,性价比不高。TPLINKXDR5480……
杨振宁宇宙结构不是偶然的,有造物者宇宙到底是如何诞生的?这是人类一直都在探讨的问题,现代科学认为,我们的宇宙诞生于138亿年前,在138亿年前,有一颗奇点发生了爆炸,奇点是一个质量无限大、能量无限大、密度无限大……
从冬奥会奖牌性别比谈中国男女相处之道今天冬奥会结束,中国以9枚金牌的成绩位居奖牌榜第三名,创造了历史。我仔细看了获奖队员的男女分布比例,联想到之前男足出局、女足夺冠的情况,不由得长舒一口气。毕竟,我们冬奥会男女获……
踉踉跄跄的亚马逊新手之路最近的打击有些多,多到让卖家喘不过气来。先是受疫情影响,供应链无法按时开工,很多产品面临断货风险。然后好不容易供应链开始复工,结果又赶上空运价格的扶摇直上,产品的一……
双外援正式确定!李楠连签两名大牌外援,要率领球队重返季后赛江苏男篮上赛季全华班的阵容输得很惨,在没有外援的情况下,李楠指导也是有心无力,只能以培养年轻球员位置。第三阶段,吴冠希和史鸿飞等主力大将全部受伤,只有赵率舟等人带队支撑,整支球……