Java技术栈中间件优雅停机方案设计与实现全景图下
前言
紧接上文,那么接下来笔者就从一些知名框架源码实现角度,为大家详细阐述一下优雅关闭是如何实现的?4。Spring的优雅关闭机制
前面两个小节中我们从支持优雅关闭最底层的内核信号机制开始聊起然后到JVM进程实现优雅关闭的ShutdwonHook原理,经过这一系列的介绍,我们现在对优雅关闭在内核层和JVM层的相关机制原理有了一定的了解。
那么在真实Java应用中,我们到底该如何基于上述机制实现一套优雅关闭方案呢?本小节我们来从Spring源码中获取下答案!!
在介绍Spring优雅关闭机制源码实现之前,笔者先来带大家回顾下,在Spring的应用上下文关闭的时候,Spring究竟给我们提供了哪些关闭时的回调机制,从而可以让我们在这些回调中编写Java应用的优雅关闭逻辑。4。1发布ContextClosedEvent事件
在Spring上下文开始关闭的时候,首先会发布ContextClosedEvent事件,注意此时Spring容器的Bean还没有开始销毁,所以我们可以在该事件回调中执行优雅关闭的操作。ComponentpublicclassShutdownListenerimplementsApplicationListenerContextClosedEvent{OverridepublicvoidonApplicationEvent(ContextClosedEventevent){。。。。。。。。优雅关闭逻辑。。。。。}}4。2Spring容器中的Bean销毁前回调
当Spring开始销毁容器中管理的Bean之前,会回调所有实现DestructionAwareBeanPostProcessor接口的Bean中的postProcessBeforeDestruction方法。ComponentpublicclassDestroyBeanPostProcessorimplementsDestructionAwareBeanPostProcessor{OverridepublicvoidpostProcessBeforeDestruction(Objectbean,StringbeanName)throwsBeansException{。。。。。。。。Spring容器中的Bean开始销毁前回调。。。。。。。}}4。3回调标注PreDestroy注解的方法ComponentpublicclassShutdown{PreDestroypublicvoidpreDestroy(){。。。。。。释放资源。。。。。。。}}4。4回调DisposableBean接口中的destroy方法ComponentpublicclassShutdownimplementsDisposableBean{Overridepublicvoiddestroy()throwsException{。。。。。。释放资源。。。。。。}}4。5回调自定义的销毁方法beanidShutdownclasscom。test。netty。ShutdowndestroymethoddoDestroypublicclassShutdown{publicvoiddoDestroy(){。。。。。自定义销毁方法。。。。}}4。6Spring优雅关闭机制的实现
Spring相关应用程序本质上也是一个JVM进程,所以Spring框架想要实现优雅关闭机制也必须依托于我们在本文第三小节中介绍的JVM的ShutdownHook机制。
在Spring启动的时候,需要向JVM注册ShutdownHook,当我们执行kill15pid命令时,随后Spring会在ShutdownHook中触发上述介绍的五种回调。
下面我们来看一下Spring中ShutdownHook的注册逻辑:4。6。1Spring中ShutdownHook的注册publicabstractclassAbstractApplicationContextextendsDefaultResourceLoaderimplementsConfigurableApplicationContext,DisposableBean{OverridepublicvoidregisterShutdownHook(){if(this。shutdownHooknull){Noshutdownhookregisteredyet。this。shutdownHooknewThread(){Overridepublicvoidrun(){synchronized(startupShutdownMonitor){doClose();}}};Runtime。getRuntime()。addShutdownHook(this。shutdownHook);}}}
在Spring启动的时候,我们需要调用AbstractApplicationContextregisterShutdownHook方法向JVM注册Spring的ShutdownHook,从这段源码中我们看出,Spring将doClose()方法封装在ShutdownHook线程中,而doClose()方法里边就是Spring优雅关闭的逻辑。
这里需要强调的是,当我们在一个纯Spring环境下,Spring框架是不会为我们主动调用registerShutdownHook方法去向JVM注册ShutdownHook的,我们需要手动调用registerShutdownHook方法去注册。publicclassSpringShutdownHook{publicstaticvoidmain(String〔〕args)throwsIOException{GenericApplicationContextcontextnewGenericApplicationContext();。。。。。。。。注册ShutdownHookcontext。registerShutdownHook();。。。。。。。。}}
而在SpringBoot环境下,SpringBoot在启动的时候会为我们调用这个方法去主动注册ShutdownHook。我们不需要手动注册。publicclassSpringApplication{publicConfigurableApplicationContextrun(String。。。args){。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。。。ConfigurableApplicationCcontextcreateApplicationContext();refreshContext(context);。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。。。}privatevoidrefreshContext(ConfigurableApplicationContextcontext){refresh(context);if(this。registerShutdownHook){try{context。registerShutdownHook();}catch(AccessControlExceptionex){Notallowedinsomeenvironments。}}}}4。6。2Spring中的优雅关闭逻辑protectedvoiddoClose(){更新上下文状态if(this。active。get()this。closed。compareAndSet(false,true)){if(logger。isInfoEnabled()){logger。info(Closingthis);}取消JMX托管LiveBeansView。unregisterApplicationContext(this);try{发布ContextClosedEvent事件publishEvent(newContextClosedEvent(this));}catch(Throwableex){logger。warn(ExceptionthrownfromApplicationListenerhandlingContextClosedEvent,ex);}回调Lifecyclebeans,相关stop方法if(this。lifecycleProcessor!null){try{this。lifecycleProcessor。onClose();}catch(Throwableex){logger。warn(ExceptionthrownfromLifecycleProcessoroncontextclose,ex);}}销毁bean,触发前面介绍的几种回调destroyBeans();Closethestateofthiscontextitself。closeBeanFactory();Letsubclassesdosomefinalcleanupiftheywish。。。onClose();Switchtoinactive。this。active。set(false);}}
在这里我们可以看出最终是在AbstractApplicationContextdoClose方法中触发本小节开始介绍的五种回调:发布ContextClosedEvent事件。注意这里是一个同步事件,也就是说Spring的ShutdownHook线程在这里发布完事件之后会继续同步执行事件的处理,等到事件处理完毕后,才会去执行后面的destroyBeans()方法对IOC容器中的Bean进行销毁。
所以在ContextClosedEvent事件监听类中,可以放心地去做优雅关闭相关的操作,因为此时Spring容器中的Bean还没有被销毁。destroyBeans()方法中依次触发剩下的四种回调。
最后结合前边小节中介绍的内容,总结Spring的整个优雅关闭流程如下图所示:
Spring优雅关闭机制。png5。Dubbo的优雅关闭
本小节优雅关闭部分源码基于apachedubbo2。7。7版本,该版本中的优雅关闭是有Bug的,下面让我们一起来ShootingBug!
在前边几个小节的内容中,我们从内核提供的底层技术支持开始聊到了JVM的ShutdonwHook,然后又从JVM聊到了Spring框架的优雅关闭机制。
在了解了这些内容之后,本小节我们就来看下dubbo中的优雅关闭实现,由于现在几乎所有Java应用都会采用Spring作为开发框架,所以dubbo一般是集成在Spring框架中供我们使用的,它的优雅关闭和Spring有着紧密的联系。5。1Dubbo在Spring环境下的优雅关闭
在本文第四小节《4。Spring的优雅关闭机制》的介绍中,我们知道在Spring的优雅关闭流程中,Spring的ShutdownHook线程会首先发布ContextClosedEvent事件,该事件是一个同步事件,ShutdownHook线程发布完该事件紧接着就会同步执行该事件的监听器,当在事件监听器中处理完ContextClosedEvent事件之后,在回过头来执行destroyBeans()方法并依次触发剩下的四种回调来销毁IOC容器中的Bean。
Spring优雅关闭流程。png
由于在处理ContextClosedEvent事件的时候,Dubbo所依赖的一些关键bean这时还没有被销毁,所以dubbo定义了一个DubboBootstrapApplicationListener用来监听ContextClosedEvent事件,并在onContextClosedEvent事件处理方法中调用dubboBootstrap。stop()方法开启dubbo的优雅关闭流程。publicclassDubboBootstrapApplicationListenerextendsOneTimeExecutionApplicationContextEventListenerimplementsOrdered{OverridepublicvoidonApplicationContextEvent(ApplicationContextEventevent){这里是Spring的同步事件,publishEvent和处理Event是在同一个线程中if(eventinstanceofContextRefreshedEvent){onContextRefreshedEvent((ContextRefreshedEvent)event);}elseif(eventinstanceofContextClosedEvent){onContextClosedEvent((ContextClosedEvent)event);}}privatevoidonContextClosedEvent(ContextClosedEventevent){spring在shutdownhook中会先触发ContextClosedEvent,然后在销毁springbeans所以这里dubbo开始优雅关闭时,依赖的springbeans并未销毁dubboBootstrap。stop();}}
当服务提供者ServiceBean和服务消费者ReferenceBean被初始化时,会将DubboBootstrapApplicationListener注册到Spring容器中。并开始监听ContextClosedEvent事件和ContextRefreshedEvent事件。publicclassServiceClassPostProcessorimplementsBeanDefinitionRegistryPostProcessor,EnvironmentAware,ResourceLoaderAware,BeanClassLoaderAware{OverridepublicvoidpostProcessBeanDefinitionRegistry(BeanDefinitionRegistryregistry)throwsBeansException{since2。7。5注册spring启动关闭事件的listener在事件回调中中调用启动类DubboBootStrap的startstop来启动关闭dubbo应用registerBeans(registry,DubboBootstrapApplicationListener。class);。。。。。。。。省略。。。。。。。}}5。2Dubbo优雅关闭流程简介
由于本文的主题是介绍优雅关闭的一整条流程主线,所以这里笔者只是简要介绍Dubbo优雅关闭的主流程,相关细节部分笔者会在后续的dubbo源码解析系列里为大家详细介绍Dubbo优雅关闭的细节。为了避免本文发散太多,我们这里还是聚焦于流程主线。publicclassDubboBootstrapextendsGenericEventListener{publicDubboBootstrapstop()throwsIllegalStateException{destroy();}}
这里的核心逻辑其实就是我们在《1。2优雅关闭》小节中介绍的两大优雅关闭主题:从当前正在关闭的应用实例上切走现有生产流量。保证业务无损。
这里大家只需要了解Dubbo优雅关闭的主流程即可,相关细节笔者后续会有一篇专门的文章详细为大家介绍。publicvoiddestroy(){if(destroyLock。tryLock()){try{DubboShutdownHook。destroyAll();if(started。compareAndSet(true,false)destroyed。compareAndSet(false,true)){取消注册unregisterServiceInstance();取消元数据服务unexportMetadataService();停止暴露服务unexportServices();取消订阅服务unreferServices();注销注册中心destroyRegistries();关闭服务DubboShutdownHook。destroyProtocols();销毁注册中心客户端实例destroyServiceDiscoveries();清除应用配置类以及相关应用模型clear();关闭线程池shutdown();释放资源release();}}finally{destroyLock。unlock();}}}
从以上内容可以看出,Dubbo的优雅关闭依托于SpringContextClosedEvent事件的发布,而ContextClosedEvent事件的发布又依托于SpringShutdownHook的注册。
dubbospring环境优雅关闭。png
从《4。6。1Spring中ShutdownHook的注册》小节的介绍中我们知道,在SpringBoot环境下,SpringBoot在启动的时候会为我们调用ApplicationContextregisterShutdownHook方法去主动注册ShutdownHook。我们不需要手动注册。
而在一个纯Spring环境下,Spring框架并不会为我们主动调用registerShutdownHook方法去向JVM注册ShutdownHook的,我们需要手动调用registerShutdownHook方法去注册。
所以Dubbo这里为了兼容SpringBoot环境和纯Spring环境下的优雅关闭,引入了SpringExtensionFactory类,只要在Spring环境下都会调用registerShutdownHook去向JVM注册Spring的ShutdownHook。publicclassSpringExtensionFactoryimplementsExtensionFactory{privatestaticfinalLoggerloggerLoggerFactory。getLogger(SpringExtensionFactory。class);privatestaticfinalSetCONTEXTSnewConcurrentHashSet();publicstaticvoidaddApplicationContext(ApplicationContextcontext){CONTEXTS。add(context);if(contextinstanceofConfigurableApplicationContext){在spring启动成功之后设置shutdownHook(兼容非SpringBoot环境)((ConfigurableApplicationContext)context)。registerShutdownHook();}}}
当服务提供者ServiceBean和服务消费者ReferenceBean在初始化完成之后,会回调SpringExtensionFactoryaddApplicationContext方法注册ShutdownHook。publicclassServiceBeanTextendsServiceConfigTimplementsInitializingBean,DisposableBean,ApplicationContextAware,BeanNameAware,ApplicationEventPublisherAware{OverridepublicvoidsetApplicationContext(ApplicationContextapplicationContext){this。applicationContextapplicationCSpringExtensionFactory。addApplicationContext(applicationContext);}}publicclassReferenceBeanTextendsReferenceConfigTimplementsFactoryBean,ApplicationContextAware,InitializingBean,DisposableBean{OverridepublicvoidsetApplicationContext(ApplicationContextapplicationContext){this。applicationContextapplicationCSpringExtensionFactory。addApplicationContext(applicationContext);}}
以上就是Dubbo在Spring集成环境下的优雅关闭全流程,下面我们来看下Dubbo在非Spring环境下的优雅关闭流程。5。3Dubbo在非Spring环境下的优雅关闭
在上小节的介绍中我们知道Dubbo在Spring环境下依托Spring的ShutdownHook,通过监听ContextClosedEvent事件,从而触发Dubbo的优雅关闭流程。
而到了非Spring环境下,Dubbo就需要定义自己的ShutdownHook,从而引入了DubboShutdownHook,直接将优雅关闭流程封装在自己的ShutdownHook中执行。publicclassDubboBootstrapextendsGenericEventListener{privateDubboBootstrap(){configManagerApplicationModel。getConfigManager();environmentApplicationModel。getEnvironment();DubboShutdownHook。getDubboShutdownHook()。register();ShutdownHookCallbacks。INSTANCE。addCallback(newShutdownHookCallback(){Overridepublicvoidcallback()throwsThrowable{DubboBootstrap。this。destroy();}});}}publicclassDubboShutdownHookextendsThread{publicvoidregister(){if(registered。compareAndSet(false,true)){DubboShutdownHookdubboShutdownHookgetDubboShutdownHook();Runtime。getRuntime()。addShutdownHook(dubboShutdownHook);dispatch(newDubboShutdownHookRegisteredEvent(dubboShutdownHook));}}Overridepublicvoidrun(){if(logger。isInfoEnabled()){logger。info(Runshutdownhooknow。);}callback();doDestroy();}privatevoidcallback(){callbacks。callback();}}
从源码中我们看到,当我们的Dubbo应用程序接收到kill15pid信号时,JVM捕获到SIGTERM(15)信号之后,就会触发DubboShutdownHook线程运行,从而通过callback()又回调了上小节中介绍的DubboBootstrapdestroy方法(dubbo的整个优雅关闭逻辑全部封装在这里)。
dubbo非Spring环境下优雅关闭流程。pngpublicclassDubboBootstrapextendsGenericEventListener{publicvoiddestroy(){if(destroyLock。tryLock()){try{DubboShutdownHook。destroyAll();if(started。compareAndSet(true,false)destroyed。compareAndSet(false,true)){。。。。。。。。取消注册。。。。。。。。。。。。。。取消元数据服务。。。。。。。。。。。。。。。。停止暴露服务。。。。。。。。。。。。。。。。取消订阅服务。。。。。。。。。。。。。。。。注销注册中心。。。。。。。。。。。。。。。。关闭服务。。。。。。。。。。。。。。。。销毁注册中心客户端实例。。。。。。。。。。。。。。。。清除应用配置类以及相关应用模型。。。。。。。。。。。。。。。。关闭线程池。。。。。。。。。。。。。。。。释放资源。。。。。。。。}}finally{destroyLock。unlock();}}}}5。4啊哈!Bug!
前边我们在《5。1Dubbo在Spring环境下的优雅关闭》小节和《5。3Dubbo在非Spring环境下的优雅关闭》小节中介绍的这两个环境的下的优雅关闭方案,当它们在各自的场景下运行的时候是没有任何问题的。
但是当这两种方案结合在一起运行,就出大问题了
还记得笔者在《3。2使用ShutdownHook的注意事项》小节中特别强调的一点:ShutdownHook其实本质上是一个已经被初始化但是未启动的Thread,这些通过Runtime。getRuntime()。addShutdownHook方法注册的ShutdownHooks,在JVM进程关闭的时候会被启动并发执行,但是并不会保证执行顺序。
所以在编写ShutdownHook中的逻辑时,我们应该确保程序的线程安全性,并尽可能避免死锁。最好是一个JVM进程只注册一个ShutdownHook。
Dubbo在Spring环境下的优雅关闭Bug。png
那么现在JVM中我们注册了两个ShutdownHook线程,一个Spring的ShutdownHook,另一个是Dubbo的ShutdonwHook。那么这会引出什么问题呢?
经过前边的内容介绍我们知道,无论是在Spring的ShutdownHook中触发的ContextClosedEvent事件还是在Dubbo的ShutdownHook中执行的CallBack。最终都会调用到DubboBootstrapdestroy方法执行真正的优雅关闭逻辑。publicclassDubboBootstrapextendsGenericEventListener{privatefinalLockdestroyLocknewReentrantLock();publicvoiddestroy(){if(destroyLock。tryLock()){try{DubboShutdownHook。destroyAll();if(started。compareAndSet(true,false)destroyed。compareAndSet(false,true)){。。。。。。。dubbo应用的优雅关闭。。。。。。。}}finally{destroyLock。unlock();}}}}
让我们来设想一个这种的场景:当Spring的ShutdownHook线程和Dubbo的ShutdownHook线程同时执行并且在同一个时间点来到DubboBootstrapdestroy方法中争夺destroyLock。Dubbo的ShutdownHook线程获得destroyLock进入destroy()方法体开始执行优雅关闭逻辑。Spring的ShutdownHook线程没有获得destroyLock,退出destroy()方法。
Dubbo优雅关闭Bug。png
在Spring的ShutdownHook线程退出destroy()方法之后紧接着就会执行destroyBeans()方法销毁IOC容器中的Bean,这里边肯定涉及到一些关键业务Bean的销毁,比如:数据库连接池,以及Dubbo相关的核心Bean。
于此同时Dubbo的ShutdownHook线程开始执行优雅关闭逻辑,《1。2优雅关闭》小节中我们提到,优雅关闭要保证业务无损。所以需要将剩下正在进行中的业务流程继续处理完毕并将业务处理结果响应给客户端。但是这时依赖的一些业务关键Bean已经被销毁,比如数据库连接池,这时执行数据库操作就会抛出CannotGetJdbcConnectionException。导致优雅关闭失败,对业务造成了影响。5。5Bug的修复
该Bug最终在apachedubbo2。7。15版本中被修复
详情可查看Issue:https:github。comapachedubboissues7093
经过上小节的分析,我们知道既然这个Bug产生的原因是由于Spring的ShutdownHook线程和Dubbo的ShutdownHook线程并发执行所导致的。
那么当我们处于Spring环境下的时候,就将Dubbo的ShutdownHook注销掉即可。publicclassSpringExtensionFactoryimplementsExtensionFactory{privatestaticfinalLoggerloggerLoggerFactory。getLogger(SpringExtensionFactory。class);privatestaticfinalSetCONTEXTSnewConcurrentHashSet();publicstaticvoidaddApplicationContext(ApplicationContextcontext){CONTEXTS。add(context);if(contextinstanceofConfigurableApplicationContext){注册Spring的ShutdownHook((ConfigurableApplicationContext)context)。registerShutdownHook();在Spring环境下将Dubbo的ShutdownHook取消掉DubboShutdownHook。getDubboShutdownHook()。unregister();}}}
而在非Spring环境下,我们依然保留Dubbo的ShutdownHook。publicclassDubboBootstrap{privateDubboBootstrap(){configManagerApplicationModel。getConfigManager();environmentApplicationModel。getEnvironment();DubboShutdownHook。getDubboShutdownHook()。register();ShutdownHookCallbacks。INSTANCE。addCallback(DubboBootstrap。this::destroy);}}
以上内容就是Dubbo的整个优雅关闭主线流程,以及优雅关闭Bug产生的原因和修复方案。
在Dubbo的优雅关闭流程中最终会通过DubboShutdownHook。destroyProtocols()关闭底层服务。publicclassDubboBootstrapextendsGenericEventListener{privatefinalLockdestroyLocknewReentrantLock();publicvoiddestroy(){if(destroyLock。tryLock()){try{DubboShutdownHook。destroyAll();if(started。compareAndSet(true,false)destroyed。compareAndSet(false,true)){。。。。。。。dubbo应用的优雅关闭。。。。。。。关闭服务DubboShutdownHook。destroyProtocols();。。。。。。。dubbo应用的优雅关闭。。。。。。。}}finally{destroyLock。unlock();}}}}
在Dubbo服务的销毁过程中,会通过调用server。close关闭底层的Netty服务。publicclassDubboProtocolextendsAbstractProtocol{Overridepublicvoiddestroy(){for(Stringkey:newArrayList(serverMap。keySet())){ProtocolServerprotocolServerserverMap。remove(key);RemotingServerserverprotocolServer。getRemotingServer();server。close(ConfigurationUtils。getServerShutdownTimeout());。。。。。。。。。。。省略。。。。。。。。}。。。。。。。。。。。省略。。。。。。。。}
最终触发Netty的优雅关闭。publicclassNettyServerextendsAbstractServerimplementsRemotingServer{OverrideprotectedvoiddoClose()throwsThrowable{。。。。。。。。。。关闭底层Channel。。。。。。try{if(bootstrap!null){关闭Netty的主从Reactor线程组bossGroup。shutdownGracefully();workerGroup。shutdownGracefully();}}catch(Throwablee){logger。warn(e。getMessage(),e);}。。。。。。。。。清理缓存Channel数据。。。。。。。}}6。Netty的优雅关闭
通过上小节介绍dubbo优雅关闭的相关内容,我们很自然的引出了Netty的优雅关闭触发时机,那么在本小节中笔者将为大家详细介绍下Netty是如何优雅地装。。。。。。。。。。优雅地谢幕的
image。png
在之前的系列文章中,我们围绕下图所展示的Netty整个核心框架的运转流程介绍了主从ReactorGroup的创建,启动,运行,接收网络连接,接收网络数据,发送网络数据,以及如何在pipeline中处理相关IO事件的整个源码实现。
netty中的reactor。png
本小节就到了Netty优雅谢幕的时刻了,在这谢幕的过程中,Netty会对它的主从ReactorGroup,以及对应ReactorGroup中的Reactor进行优雅的关闭。下面让我们一起来看下这个优雅关闭的过程6。1ReactorGroup的优雅谢幕publicabstractclassAbstractEventExecutorGroupimplementsEventExecutorGroup{staticfinallongDEFAULTSHUTDOWNQUIETPERIOD2;staticfinallongDEFAULTSHUTDOWNTIMEOUT15;OverridepublicF?shutdownGracefully(){returnshutdownGracefully(DEFAULTSHUTDOWNQUIETPERIOD,DEFAULTSHUTDOWNTIMEOUT,TimeUnit。SECONDS);}}
在Netty进行优雅关闭的整个过程中,这里涉及到了两个非常重要的控制参数:gracefulShutdownQuietPeriod:优雅关闭静默期,默认为2s。这个参数主要来保证Netty整个关闭过程中的优雅。在关闭流程开始后,如果Reactor中还有遗留的异步任务需要执行,那么Netty就不能关闭,需要把所有异步任务执行完毕才可以。当所有异步任务执行完毕后,Netty为了实现更加优雅的关闭操作,一定要保障业务无损,这时候就引入了静默期这个概念,如果在这个静默期内,用户没有新的任务向Reactor提交那么就开始关闭。如果在这个静默期内,还有用户继续提交异步任务,那么就不能关闭,需要把静默期内用户提交的异步任务执行完毕才可以放心关闭。gracefulShutdownTimeout:优雅关闭超时时间,默认为15s。这个参数主要来保证Netty整个关闭过程的可控。我们知道一个生产级的优雅关闭方案既要保证优雅做到业务无损,更重要的是要保证关闭流程的可控,不能无限制的优雅下去。导致长时间无法完成关闭动作。于是Netty就引入了这个参数,如果优雅关闭超时,那么无论此时有无异步任务需要执行都要开始关闭了。
这两个控制参数是非常重要核心的两个参数,我们在后面介绍Netty关闭细节的时候还会为大家详细剖析,这里大家先从概念上大概理解一下。
在介绍完这两个重要核心参数之后,我们接下来看下ReactorGroup的关闭流程:
我们都知道Netty为了保证整个系统的吞吐量以及保证Reactor可以线程安全地,有序地处理各个Channel上的IO事件。基于这个目的Netty将其承载的海量连接分摊打散到不同的Reactor上处理。
ReactorGroup中包含多个Reactor,每个Channel只能注册到一个固定的Reactor上,由这个固定的Reactor负责处理该Channel上整个生命周期的事件。
一个Reactor上注册了多个Channel,负责处理注册在其上的所有Channel的IO事件以及异步任务。
ReactorGroup的结构如下图所示:
image。png
ReactorGroup的关闭流程本质上其实是ReactorGroup中包含的所有Reactor的关闭,当ReactorGroup中的所有Reactor完成关闭后,ReactorGroup才算是真正的关闭。publicabstractclassMultithreadEventExecutorGroupextendsAbstractEventExecutorGroup{Reactor线程组中的Reactor集合privatefinalEventExecutor〔〕关闭futureprivatefinalP?terminationFuturenewDefaultPromise(GlobalEventExecutor。INSTANCE);OverridepublicF?shutdownGracefully(longquietPeriod,longtimeout,TimeUnitunit){for(EventExecutorl:children){l。shutdownGracefully(quietPeriod,timeout,unit);}returnterminationFuture();}OverridepublicF?terminationFuture(){returnterminationF}}EventExecutor〔〕children:数组中存放的是当前ReactorGroup中包含的所有Reactor,类型为EventExecutor。P?terminationFuture:ReactorGroup中的关闭Future,用户线程通过这个terminationFuture可以知道ReactorGroup完成关闭的时机,也可以向terminationFuture注册一些listener。当ReactorGroup完成关闭动作后,会回调用户注册的这些listener。大家可以根据各自的业务场景灵活运用。
在ReactorGroup的关闭过程中,会挨个触发它所包含的所有Reactor的关闭流程。并返回terminationFuture给用户线程。
当ReactorGroup中的所有Reactor完成关闭之后,这个terminationFuture会被设置为success,这样一来用户线程可以感知到ReactorGroup已经完成关闭了。
这一点笔者也在《Reactor在Netty中的实现(创建篇)》一文中的第四小节《4。向Reactor线程组中所有的Reactor注册terminated回调函数》强调过。
在ReactorGroup创建的最后一步,会定义Reactor关闭的terminationListener。在Reactor的terminationListener中会判断当前ReactorGroup中的Reactor是否全部关闭,如果已经全部关闭,则会设置ReactorGroup的terminationFuture为success。记录关闭的Reactor个数,当Reactor全部关闭后,ReactorGroup才可以认为关闭成功privatefinalAtomicIntegerterminatedChildrennewAtomicInteger();ReactorGroup的关闭futureprivatefinalP?terminationFuturenewDefaultPromise(GlobalEventExecutor。INSTANCE);protectedMultithreadEventExecutorGroup(intnThreads,Executorexecutor,EventExecutorChooserFactorychooserFactory,Object。。。args){。。。。。。。。挨个创建Reactor。。。。。。。。。。。。finalFutureListenerObjectterminationListenernewFutureListenerObject(){OverridepublicvoidoperationComplete(FutureObjectfuture)throwsException{if(terminatedChildren。incrementAndGet()children。length){当所有Reactor关闭后ReactorGroup才认为是关闭成功terminationFuture。setSuccess(null);}}};for(EventExecutore:children){向每个Reactor注册terminationListenere。terminationFuture()。addListener(terminationListener);}}
从以上ReactorGroup的关闭流程我们可以看出,ReactorGroup的关闭逻辑只是挨个去触发它所包含的所有Reactor的关闭,Netty的整个优雅关闭核心其实是在单个Reactor的关闭逻辑上。毕竟Reactor才是真正驱动Netty运转的核心引擎。6。2Reactor的优雅谢幕
Reactor的优雅谢幕流程。png
Reactor的状态特别重要,从《一文聊透Netty核心引擎Reactor的运转架构》一文中我们知道Reactor是在一个for(;;){。。。。}死循环中996不停地工作。比如轮询Channel上的IO就绪事件,处理IO就绪事件,执行异步任务就是在这个死循环中完成的。
而Reactor在每一次循环任务结束之后,都会先去判断一下当前Reactor的状态,如果状态变为准备关闭状态STSHUTTINGDOWN后,Reactor就会开启优雅关闭流程。
所以在介绍Reactor的关闭流程之前,笔者先来为大家捋一捋Reactor中的各种状态。STNOTSTARTED1:Reactor的初始状态。在Reactor刚被创建出来的时候,状态为STNOTSTARTED。STSTARTED2:Reactor的启动状态。当向Reactor提交第一个异步任务的时候会触发Reactor的启动。启动之后状态变为STSTARTED。
相关细节可在回顾下《详细图解NettyReactor启动全流程》一文。STSHUTTINGDOWN3:Reactor准备开始关闭状态。当Reactor的shutdownGracefully方法被调用的时候,Reactor的状态就会变为STSHUTTINGDOWN。在这个状态下,用户仍然可以向Reactor提交任务。STSHUTDOWN4:Reactor停止状态。表示Reactor的优雅关闭流程已经结束,此时用户不能在向Reactor提交任务,Reactor会在这个状态下最后一次执行剩余的异步任务。STTERMINATED5:Reactor真正的终结状态,该状态表示Reactor已经完全关闭了。在这个状态下Reactor会设置自己的terminationFuture为Success。进而开始回调上小节末尾提到的terminationListener。
在我们了解了Reactor的各种状态之后,下面就该来正式开始介绍Reactor的关闭流程了:publicabstractclassSingleThreadEventExecutorextendsAbstractScheduledEventExecutorimplementsOrderedEventExecutor{Reactor的状态初始为未启动状态privatevolatileintstateSTNOTSTARTED;Reactor的初始状态,未启动privatestaticfinalintSTNOTSTARTED1;Reactor启动后的状态privatestaticfinalintSTSTARTED2;准备正在进行优雅关闭,此时用户仍然可以提交任务,Reactor仍可以执行任务privatestaticfinalintSTSHUTTINGDOWN3;Reactor停止状态,表示优雅关闭结束,此时用户不能在提交任务,Reactor最后一次执行剩余的任务privatestaticfinalintSTSHUTDOWN4;Reactor中的任务已被全部执行完毕,且不在接受新的任务,真正的终止状态privatestaticfinalintSTTERMINATED5;优雅关闭的静默期privatevolatilelonggracefulShutdownQuietP优雅关闭超时时间privatevolatilelonggracefulShutdownTReactor的关闭FutureprivatefinalP?terminationFuturenewDefaultPromiseVoid(GlobalEventExecutor。INSTANCE);OverridepublicF?shutdownGracefully(longquietPeriod,longtimeout,TimeUnitunit){。。。。。。省略参数校验。。。。。。。此时Reactor的状态为STSTARTEDif(isShuttingDown()){returnterminationFuture();}booleaninEventLoopinEventLoop();intoldSfor(;;){if(isShuttingDown()){returnterminationFuture();}intnewS需要唤醒Reactor去执行关闭流程oldSif(inEventLoop){newStateSTSHUTTINGDOWN;}else{switch(oldState){caseSTNOTSTARTED:caseSTSTARTED:newStateSTSHUTTINGDOWN;default:Reactor正在关闭或者已经关闭newStateoldS}}if(STATEUPDATER。compareAndSet(this,oldState,newState)){}}优雅关闭静默期,在该时间内,用户还是可以向Reactor提交任务并且执行,只要有任务在Reactor中,就不能进行关闭每隔100ms检测是否有任务提交进来,如果在静默期内没有新的任务提交,那么才会进行关闭保证关闭行为的优雅gracefulShutdownQuietPeriodunit。toNanos(quietPeriod);优雅关闭的最大超时时间,优雅关闭行为不能超过该时间,如果超过的话不管当前是否还有任务都要进行关闭保证关闭行为的可控gracefulShutdownTimeoutunit。toNanos(timeout);这里需要保证Reactor线程是在运行状态,如果已经停止,那么就不在进行后续关闭行为,直接返回terminationFutureif(ensureThreadStarted(oldState)){returnterminationF}将正在监听IO事件的Reactor从Selector上唤醒,表示要关闭了,开始执行关闭流程if(wakeup){确保Reactor线程在执行完任务之后不会在selector上停留taskQueue。offer(WAKEUPTASK);if(!addTaskWakesUp){如果此时Reactor正在Selector上阻塞,则可以确保Reactor被及时唤醒wakeup(inEventLoop);}}returnterminationFuture();}OverridepublicF?terminationFuture(){returnterminationF}}
首先在开启关闭流程之前,需要调用isShuttingDown()判断一下当前Reactor是否已经开始关闭流程或者已经完成关闭。如果已经开始关闭了,这里会直接返回Reactor的terminationFuture。OverridepublicbooleanisShuttingDown(){returnstateSTSHUTTINGDOWN;}
剩下的逻辑就是不停的在一个for循环中通过CAS不停的尝试将Reactor的当前STSTARTED状态改为STSHUTTINGDOWN正在关闭状态。
如果通过inEventLoop()判断出当前执行线程是Reactor线程,那么表示当前Reactor的状态只会是STSTARTED运行状态,那么就可以直接将newState设置为STSHUTTINGDOWN。因为只有Reactor处于STSTARTED状态的时候才会运行到这里。否则在前边就直接返回terminationFuture了。
如果当前执行线程为用户线程并不是Reactor线程的话,那么此时Reactor的状态可能是正在关闭状态或者已经关闭状态,用户线程在重复发起Reactor的关闭流程。所以这些异常场景的处理会在switch(oldState){。。。。}语句中完成。switch(oldState){caseSTNOTSTARTED:caseSTSTARTED:newStateSTSHUTTINGDOWN;default:Reactor正在关闭或者已经关闭newStateoldS当前Reactor已经处于关闭流程中,则无需在唤醒Reactor了}
如果当前Reactor还未发起关闭流程,比如状态为STNOTSTARTED或者STSTARTED,那么直接可以放心的将newState设置为STSHUTTINGDOWN。
如果当前Reactor已经处于关闭流程中或者已经完成关闭,比如状态为STSHUTTINGDOWN,STSHUTDOWN或者STTERMINATED。则没有必要在唤醒Reactor重复执行关闭流程了wakeupfalse。Reactor的状态维持当前状态不变。
当Reactor的状态确定完毕后,则在for循环中不断的通过CAS修改Reactor的当前状态。此时oldStateSTSTARTED,newStateSTSHUTTINGDOWN。if(STATEUPDATER。compareAndSet(this,oldState,newState)){}
随后在Reactor中设置我们在《6。1ReactorGroup的优雅谢幕》小节开始处介绍的控制Netty优雅关闭的两个非常重要的核心参数:gracefulShutdownQuietPeriod:优雅关闭静默期,默认为2s。当Reactor中已经没有异步任务需要在执行时,该静默期开始触发,Netty在这里会每隔100ms检测一下是否有任务提交进来,如果在静默期内没有新的任务提交,那么才会进行关闭,保证关闭行为的优雅。gracefulShutdownTimeout:优雅关闭超时时间,默认为15s。优雅关闭行为不能超过该时间,如果超过的话不管当前是否还有任务都要进行关闭,保证关闭行为的可控。
流程走到这里,Reactor就开始准备执行关闭流程了,那么在进行关闭操作之前,我们需要确保Reactor线程此时应该是运行状态,如果此时Reactor线程还未开始运行那么就需要让它运行起来执行关闭操作。这里需要保证Reactor线程是在运行状态,如果已经停止,那么就不在进行后续关闭行为,直接返回terminationFutureif(ensureThreadStarted(oldState)){returnterminationF}privatebooleanensureThreadStarted(intoldState){if(oldStateSTNOTSTARTED){try{doStartThread();}catch(Throwablecause){STATEUPDATER。set(this,STTERMINATED);terminationFuture。tryFailure(cause);if(!(causeinstanceofException)){AlsorethrowasitmaybeanOOMEforexamplePlatformDependent。throwException(cause);}}}}
如果此时Reactor线程刚刚执行完异步任务或者正在Selector上阻塞,那么我们需要确保Reactor线程被及时的唤醒,从而可以直接进入关闭流程。wakeuptrue。
这里的addTaskWakesUp默认为false。表示并不是只有addTask方法才能唤醒Reactor线程还有其他方法可以唤醒Reactor线程,比如SingleThreadEventExecutorexecute方法还有本小节介绍的SingleThreadEventExecutorshutdownGracefully方法都会唤醒Reactor线程。
关于addTaskWakesUp字段的详细含义和作用,大家可以回顾下《一文聊透Netty核心引擎Reactor的运转架构》一文中的《1。2。2Reactor开始轮询IO就绪事件》小节。将正在监听IO事件的Reactor从Selector上唤醒,表示要关闭了,开始执行关闭流程if(wakeup){确保Reactor线程在执行完任务之后不会在selector上停留taskQueue。offer(WAKEUPTASK);if(!addTaskWakesUp){如果此时Reactor正在Selector上阻塞,则可以确保Reactor被及时唤醒wakeup(inEventLoop);}}通过taskQueue。offer(WAKEUPTASK)向Reactor中添加WAKEUPTASK,可以确保Reactor在执行完异步任务之后不会在Selector上做停留,直接执行关闭操作。如果此时Reactor线程正在Selector上阻塞,那么直接调用wakeup(inEventLoop)唤醒Reactor线程,直接来到关闭流程。publicfinalclassNioEventLoopextendsSingleThreadEventLoop{Overrideprotectedvoidwakeup(booleaninEventLoop){if(!inEventLoopnextWakeupNanos。getAndSet(AWAKE)!AWAKE){selector。wakeup();}}}6。3Reactor线程的优雅关闭
我们先来通过一张Reactor优雅关闭整体流程图来从总体上俯撼一下关闭流程:
Reactor线程优雅关闭流程。png
通过《一文聊透Netty核心引擎Reactor的运转架构》一文的介绍,我们知道Reactor是在一个for循环中996不停地处理IO事件以及执行异步任务。如下面笔者提取的Reactor运行框架所示:publicfinalclassNioEventLoopextendsSingleThreadEventLoop{Overrideprotectedvoidrun(){for(;;){try{。。。。。。。1。监听Channel上的IO事件。。。。。。。。。。。。。。2。处理Channel上的IO事件。。。。。。。。。。。。。。3。执行异步任务。。。。。。。。。。}finally{try{if(isShuttingDown()){关闭Reactor上注册的所有Channel,停止处理IO事件,触发unActive以及unRegister事件closeAll();注销掉所有Channel停止处理IO事件之后,剩下的就需要执行Reactor中剩余的异步任务了if(confirmShutdown()){}}}catch(Errore){throw(Error)e;}catch(Throwablet){handleLoopException(t);}}}}}
在Reactor在每次for循环的末尾finally{。。。。}语句块中都会通过isShuttingDown()方法去检查当前Reactor的状态是否是关闭状态,如果是关闭状态则开始正式进入Reactor的优雅关闭流程。
我们在本文前边《1。2优雅关闭》小节中在讨论优雅关闭方案的时候提到,我们要着重从以下两个方面来实施优雅关闭:首先需要切走程序承担的现有流量。保证现有剩余的任务可以执行完毕,保证业务无损。
Netty这里实现的优雅关闭同样也遵从这两个要点。在优雅关闭流程开始之前首先会调用closeAll()方法,将Reactor上注册的所有Channel全部关闭掉,切掉现有流量。随后会调用confirmShutdown()方法,将剩余的异步任务执行完毕。在该方法中只要有异步任务需要执行,就不能关闭,保证业务无损。该方法返回值为true时表示可以进行关闭。返回false时表示不能马上关闭。6。3。1切走流量privatevoidcloseAll(){这里的目的是清理selector中的一些无效keyselectAgain();获取Selector上注册的所有ChannelSetSelectionKeykeysselector。keys();CollectionchannelsnewArrayList(keys。size());for(SelectionKeyk:keys){获取NioSocketChannelObjectak。attachment();if(ainstanceofAbstractNioChannel){channels。add((AbstractNioChannel)a);}else{。。。。。。。。。省略。。。。。。}}for(AbstractNioChannelch:channels){关闭Reactor上注册的所有Channel,并在pipeline中触发unActive事件和unRegister事件ch。unsafe()。close(ch。unsafe()。voidPromise());}}
首先会通过selectAgain()最后一次在Selector上执行一次非阻塞轮询操作,目的是清除Selector上的一些无效Key。
关于无效Key的清除,详细细节大家可以回看下《一文聊透Netty核心引擎Reactor的运转架构》一文中的《3。1。3从Selector中移除失效的SelectionKey》小节。
随后通过selector。keys()获取在Selector上注册的所有SelectionKey。进而获取到Netty中的NioSocketChannel。SelectionKey与NioSocketChannel的对应关系如下图所示:
channel与SelectionKey对应关系。png
最后将注册在Reactor上的这些NioSocketChannel挨个进行关闭。
Channel的关闭流程可以回看下笔者的这篇文章《且看Netty如何应对TCP连接的正常关闭,异常关闭,半关闭场景》6。3。2保证业务无损
该方法中的逻辑是保证Reactor进行优雅关闭的核心,Netty这里为了保证业务无损,采取的是只要有异步任务Task或者ShutdwonHooks需要执行,就不能关闭,需要等待所有tasks或者ShutdownHooks执行完毕,才会考虑关闭的事情。protectedbooleanconfirmShutdown(){if(!isShuttingDown()){}if(!inEventLoop()){thrownewIllegalStateException(mustbeinvokedfromaneventloop);}取消掉所有的定时任务cancelScheduledTasks();if(gracefulShutdownStartTime0){获取优雅关闭开始时间,相对时间gracefulShutdownStartTimeScheduledFutureTask。nanoTime();}这里判断只要有task任务需要执行就不能关闭if(runAllTasks()runShutdownHooks()){if(isShutdown()){Executorshutdownnonewtasksanymore。}gracefulShutdownQuietPeriod表示在这段时间内,用户还是可以继续提交异步任务的,Reactor在这段时间内是会保证这些任务被执行到的。gracefulShutdownQuietPeriod0表示没有这段静默时期,当前Reactor中的任务执行完毕后,无需等待静默期,执行关闭if(gracefulShutdownQuietPeriod0){}避免Reactor在Selector上阻塞,因为此时已经不会再去处理IO事件了,专心处理关闭流程taskQueue。offer(WAKEUPTASK);}此时Reactor中已经没有任务可执行了,是时候考虑关闭的事情了finallongnanoTimeScheduledFutureTask。nanoTime();当Reactor中所有的任务执行完毕后,判断是否超过gracefulShutdownTimeout如果超过了则直接关闭if(isShutdown()nanoTimegracefulShutdownStartTimegracefulShutdownTimeout){}即使现在没有任务也还是不能进行关闭,需要等待一个静默期,在静默期内如果没有新的任务提交,才会进行关闭如果在静默期内还有任务继续提交,那么静默期将会重新开始计算,进入一轮新的静默期检测if(nanoTimelastExecutionTimegracefulShutdownQuietPeriod){taskQueue。offer(WAKEUPTASK);try{gracefulShutdownQuietPeriod内每隔100ms检测一下是否有任务需要执行Thread。sleep(100);}catch(InterruptedExceptione){Ignore}}在整个gracefulShutdownQuietPeriod期间内没有任务需要执行或者静默期结束则无需等待gracefulShutdownTimeout超时,直接关闭}
在关闭流程开始之前,Netty首先会调用cancelScheduledTasks()方法将Reactor中剩余需要执行的定时任务全部取消掉。
记录优雅关闭开始时间gracefulShutdownStartTime,这是为了后续判断优雅关闭流程是否超时。
调用runAllTasks()方法将Reactor中TaskQueue里剩余的异步任务全部取出执行。
运行剩余tasks和hooks。png
调用runShutdownHooks()方法将用户注册在Reactor上的ShutdownHook取出执行。
我们可以在用户线程中通过如下方式向Reactor中注册ShutdownHooks:NioEventLoopreactor(NioEventLoop)ctx。channel()。eventLoop();reactor。addShutdownHook(newRunnable(){Overridepublicvoidrun(){。。。。。关闭逻辑。。。。}});
在Reactor进行关闭的时候,会取出用户注册的这些ShutdownHooks进行运行。publicabstractclassSingleThreadEventExecutorextendsAbstractScheduledEventExecutorimplementsOrderedEventExecutor{可以向Reactor添加shutdownHook,当Reactor关闭的时候会被调用privatefinalSetRunnableshutdownHooksnewLinkedHashSetRunnable();privatebooleanrunShutdownHooks(){while(!shutdownHooks。isEmpty()){ListRunnablecopynewArrayListRunnable(shutdownHooks);shutdownHooks。clear();for(Runnabletask:copy){try{Reactor线程挨个顺序同步执行task。run();}catch(Throwablet){logger。warn(Shutdownhookraisedanexception。,t);}finally{}}}if(ran){lastExecutionTimeScheduledFutureTask。nanoTime();}}}
需要注意的是这里的ShutdownHooks是Netty提供的一种机制并不是我们在《3。JVM中的ShutdownHook》小节中介绍的JVM中的ShutdownHooks。
JVM中的ShutdownHooks是一个Thread,JVM在关闭之前会并发无序地运行。而Netty中的ShutdownHooks是一个Runnable,Reactor在关闭之前,会由Reactor线程同步有序地执行。这里需要注意的是只要有tasks和hooks需要执行Netty就会一直执行下去直到这些任务全部执行完为止。
当Reactor没有任何任务需要执行时,这时就会判断当前关闭流程所用时间是否超过了我们前边设定的优雅关闭最大超时时间gracefulShutdownTimeout。nanoTimegracefulShutdownStartTimegracefulShutdownTimeout
如果关闭流程因为前边这些任务的执行导致已经超时,那么就直接关闭Reactor,退出Reactor的工作循环。
如果没有超时,那么这时就会触发前边介绍的优雅关闭的静默期gracefulShutdownQuietPeriod。
在静默期中Reactor线程会每隔100ms检查一下是否有用户提交任务请求,如果有的话,就需要保证将用户提交的这些任务执行完毕。然后静默期将会重新开始计算,进入一轮新的静默期检测。
如果在整个静默期内,没有任何任务提交,则无需等待gracefulShutdownTimeout超时,直接关闭Reactor,退出Reactor的工作循环。
从以上过程我们可以看出Netty的优雅关闭至少需要等待一个静默期的时间。还有一点是Netty优雅关闭的时间可能会超出gracefulShutdownTimeout,因为Netty需要保证遗留剩余的任务被执行完毕。当所有任务执行完毕之后,才会去检测是否超时。6。4Reactor的最终关闭流程
当在静默期内没有任何任务提交或者关闭流程超时时,上小节中介绍的confirmShutdown()就会返回true。随即Reactor线程就会退出工作循环。publicfinalclassNioEventLoopextendsSingleThreadEventLoop{Overrideprotectedvoidrun(){for(;;){try{。。。。。。。1。监听Channel上的IO事件。。。。。。。。。。。。。。2。处理Channel上的IO事件。。。。。。。。。。。。。。3。执行异步任务。。。。。。。。。。}finally{try{if(isShuttingDown()){关闭Reactor上注册的所有Channel,停止处理IO事件,触发unActive以及unRegister事件closeAll();注销掉所有Channel停止处理IO事件之后,剩下的就需要执行Reactor中剩余的异步任务了if(confirmShutdown()){}}}catch(Errore){throw(Error)e;}catch(Throwablet){handleLoopException(t);}}}}}
我们在《详细图解NettyReactor启动全流程》一文中的《1。3。3Reactor线程的启动》小节中的介绍中提到,Reactor线程的启动是通过第一个异步任务被提交到Reactor中的时候被触发的。在向Reactor提交任务的方法SingleThreadEventExecutorexecute(java。lang。Runnable,boolean)中会触发下面doStartThread()方法的调用,在这里会调用前边提到的Reactor工作循环run()方法。
在doStartThread()方法的finally{。。。}语句块中会完成Reactor的最终关闭流程,也就是Reactor在退出run方法中的for循环之后的后续收尾流程。
最终Reactor的优雅关闭完整流程如下图所示:
Reactor优雅关闭全流程。pngpublicabstractclassSingleThreadEventExecutorextendsAbstractScheduledEventExecutorimplementsOrderedEventExecutor{privatevoiddoStartThread(){executor。execute(newRunnable(){Overridepublicvoidrun(){。。。。。。。。。。省略。。。。。。。。。try{Reactor线程开始轮询处理IO事件,执行异步任务SingleThreadEventExecutor。this。run();后面的逻辑为用户调用shutdownGracefully关闭Reactor退出循环走到这里}catch(Throwablet){logger。warn(Unexpectedexceptionfromaneventexecutor:,t);}finally{走到这里表示在静默期内已经没有用户在向Reactor提交任务了,或者达到优雅关闭超时时间,开始对Reactor进行关闭如果当前Reactor不是关闭状态则将Reactor的状态设置为STSHUTTINGDOWNfor(;;){intoldSif(oldStateSTSHUTTINGDOWNSTATEUPDATER。compareAndSet(SingleThreadEventExecutor。this,oldState,STSHUTTINGDOWN)){}}try{for(;;){此时Reactor线程虽然已经退出,而此时Reactor的状态为shuttingdown,但任务队列还在用户在此时依然可以提交任务,这里是确保用户在最后的这一刻提交的任务可以得到执行。if(confirmShutdown()){}}for(;;){当Reactor的状态被更新为SHUTDOWN后,用户提交的任务将会被拒绝intoldSif(oldStateSTSHUTDOWNSTATEUPDATER。compareAndSet(SingleThreadEventExecutor。this,oldState,STSHUTDOWN)){}}这里Reactor的状态已经变为SHUTDOWN了,不会在接受用户提交的新任务了但为了防止用户在状态变为SHUTDOWN之前,也就是Reactor在SHUTTINGDOWN的时候提交了任务所以此时Reactor中可能还会有任务,需要将剩余的任务执行完毕confirmShutdown();}finally{try{SHUTDOWN状态下,在将全部的剩余任务执行完毕后,则将Selector关闭cleanup();}finally{清理Reactor线程中的threadLocal缓存,并通知相应future。FastThreadLocal。removeAll();STTERMINATED状态为Reactor真正的终止状态STATEUPDATER。set(SingleThreadEventExecutor。this,STTERMINATED);使得awaitTermination方法返回threadLock。countDown();统计一下当前reactor任务队列中还有多少未执行的任务,打出ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志aintnumUserTasksdrainTasks();if(numUserTasks0logger。isWarnEnabled()){logger。warn(Aneventexecutorterminatedwithnonemptytaskqueue(numUserTasks));}通知Reactor的terminationFuture成功,在创建Reactor的时候会向其terminationFuture添加Listener在listener中增加terminatedChildren个数,当所有Reactor关闭后ReactorGroup关闭成功terminationFuture。setSuccess(null);}}}}});}}
流程走到doStartThread方法中的finally{。。。}语句块中的时候,这个时候表示在优雅关闭的静默期内,已经没有任务继续向Reactor提交了。或者关闭耗时已经超过了设定的优雅关闭最大超时时间。
现在正式来到了Reactor的关闭流程。在流程开始之前需要确保当前Reactor的状态为STSHUTTINGDOWN正在关闭状态。
注意此刻用户线程依然可以向Reactor提交任务。当Reactor的状态变为STSHUTDOWN或者STTERMINATED时,用户向Reactor提交的任务就会被拒绝,但是此时Reactor的状态为STSHUTTINGDOWN,依然可以接受用户提交过来的任务。publicabstractclassSingleThreadEventExecutorextendsAbstractScheduledEventExecutorimplementsOrderedEventExecutor{OverridepublicbooleanisShutdown(){returnstateSTSHUTDOWN;}privatevoidexecute(Runnabletask,booleanimmediate){booleaninEventLoopinEventLoop();addTask(task);if(!inEventLoop){startThread();当Reactor的状态为STSHUTDOWN时,拒绝用户提交的异步任务,但是在优雅关闭STSHUTTINGDOWN状态时还是可以接受用户提交的任务的if(isShutdown()){try{if(removeTask(task)){}}catch(UnsupportedOperationExceptione){}if(reject){reject();}}}。。。。。。。。。省略。。。。。。。。}}
所以Reactor从工作循环run方法中退出随后流程一路走到这里来的这段时间,用户仍然有可能向Reactor提交任务,为了确保关闭流程的优雅,这里会在for循环中不停的执行confirmShutdown()方法直到所有的任务全部执行完毕。
随后会将Reactor的状态改为STSHUTDOWN状态,此时用户就不能在向Reactor提交任务了。如果此时在提交任务就会收到RejectedExecutionException异常。大家这里可能会有疑问,Netty在Reactor的状态变为STSHUTDOWN之后,又一次调用了confirmShutdown()方法,这是为什么呢?
其实这样做的目的是为了防止Reactor状态在变为SHUTDOWN之前,在这个极限的时间里,用户又向Reactor提交了任务,所以还需要最后一次调用confirmShutdown()将在这个极限时间内提交的任务执行完毕。
以上逻辑步骤就是真正优雅关闭的精髓所在,确保任务全部执行完毕,保证业务无损。
在我们优雅处理流程介绍完了之后,下面就是关闭Reactor的流程了:
Reactor会在SHUTDOWN状态下,将Selector进行关闭。Overrideprotectedvoidcleanup(){try{selector。close();}catch(IOExceptione){logger。warn(Failedtocloseaselector。,e);}}
清理Reactor线程中遗留的所有ThreadLocal缓存。FastThreadLocal。removeAll();
将Reactor的状态由SHUTDOWN改为STTERMINATED状态。此时Reactor就算真正的关闭了。STATEUPDATER。set(SingleThreadEventExecutor。this,STTERMINATED);
用户线程可能会调用Reactor的awaitTermination方法阻塞等待Reactor的关闭,当Reactor关闭之后会调用threadLock。countDown()使得用户线程从awaitTermination方法返回。publicabstractclassSingleThreadEventExecutorextendsAbstractScheduledEventExecutorimplementsOrderedEventExecutor{privatefinalCountDownLatchthreadLocknewCountDownLatch(1);OverridepublicbooleanawaitTermination(longtimeout,TimeUnitunit)throwsInterruptedException{。。。。。。。。省略。。。。。。。等待Reactor关闭threadLock。await(timeout,unit);returnisTerminated();}OverridepublicbooleanisTerminated(){returnstateSTTERMINATED;}}
当这一切处理完毕之后,最后就会设置Reactor的terminationFuture为success。此时注册在Reactor的terminationFuture上的listener就会被回调。
这里还记得我们在《Reactor在Netty中的实现(创建篇)》一文中介绍的,在ReactorGroup中的所有Reactor被挨个全部创建成功之后,会向所有Reactor的terminationFuture注册一个terminationListener。
在terminationListener中检测当前ReactorGroup中的所有Reactor是否全部完成关闭,如果已经全部关闭,则设置ReactorGroup的terminationFuture为Success。此刻ReactorGroup关闭流程结束,Netty正式优雅谢幕完毕publicabstractclassMultithreadEventExecutorGroupextendsAbstractEventExecutorGroup{Reactor线程组中的Reactor集合privatefinalEventExecutor〔〕记录关闭的Reactor个数,当Reactor全部关闭后,才可以认为关闭成功privatefinalAtomicIntegerterminatedChildrennewAtomicInteger();ReactorGroup关闭futureprivatefinalP?terminationFuturenewDefaultPromise(GlobalEventExecutor。INSTANCE);protectedMultithreadEventExecutorGroup(intnThreads,Executorexecutor,EventExecutorChooserFactorychooserFactory,Object。。。args){。。。。。。。。挨个创建Reactor。。。。。。。。finalFutureListenerObjectterminationListenernewFutureListenerObject(){OverridepublicvoidoperationComplete(FutureObjectfuture)throwsException{if(terminatedChildren。incrementAndGet()children。length){当所有Reactor关闭后才认为是关闭成功terminationFuture。setSuccess(null);}}};for(EventExecutore:children){e。terminationFuture()。addListener(terminationListener);}。。。。。。。。省略。。。。。。。。}}
到现在为止,Netty的整个优雅关闭流程,笔者就为大家详细介绍完了,下图为整个优雅关闭的完整流程图,大家可以对照下面这副总体流程图在回顾下我们前面介绍的源码逻辑。
Reactor优雅关闭总流程。png6。5Reactor的状态变更流转
在本文的最后,笔者再来带着大家回顾下Reactor的状态变更流程。
Reactor的状态变更。png在Reactor被创建出来之后状态为STNOTSTARTED。随着第一个异步任务的提交Reactor开始启动随后状态为STSTARTED。当调用shutdownGracefully方法之后,Reactor的状态变为STSHUTTINGDOWN。表示正在进行优雅关闭。此时用户仍可向Reactor提交异步任务。当Reactor中遗留的任务全部执行完毕之后,Reactor的状态变为STSHUTDOWN。此时如果用户继续向Reactor提交异步任务,会被拒绝,并收到RejectedExecutionException异常。当Selector完成关闭,并清理掉Reactor线程中所有的TheadLocal缓存之后,Reactor的状态变为STTERMINATED。总结
到这里关于优雅关闭的前世今生笔者就位大家全部交代完毕了,信息量比较大,需要好好消化一下,很佩服大家能够一口气看到这里。
本文我们从进程优雅启停方案开始聊起,以优雅关闭的实现方案为起点,先是介绍了优雅关闭的底层基石内核的信号量机制,从内核又聊到了JVM的ShutdownHook原理以及执行过程,最后通过三个知名的开源框架为案例,分别从Spring的优雅关闭机制聊到了Dubbo的优雅关闭,最后通过Dubbo的优雅关闭引出了Netty优雅关闭的详细实现方案,前后呼应。
好了,本文的内容就到这里了,大家辛苦了,相信大家认真看完之后一定会收获很大,我们下篇文章见