幼儿饰品瑜伽美体用品微软
投稿投诉
微软创意
爱情通信
用品婚姻
爱好看病
美体软件
影音星座
瑜伽周边
星座办公
饰品塑形
搞笑减肥
幼儿两性
智家潮品

scalaAkka并发编程定时任务简易通信框架

  1。Akka并发编程框架简介1。1Akka概述
  Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用工具包。Akka是使用scala开发的库,同时可以使用scala和Java语言来开发基于Akka的应用程序。1。2Akka特性提供基于异步非阻塞、高性能的事件驱动编程模型内置容错机制,允许Actor在出错时进行恢复或者重置操作超级轻量级的事件处理(每GB堆内存几百万Actor)使用Akka可以在单机上构建高并发程序,也可以在网络中构建分布式程序。1。3Akka通信过程
  以下图片说明了AkkaActor的并发编程模型的基本流程:学生创建一个ActorSystem通过ActorSystem来创建一个ActorRef(老师的引用),并将消息发送给ActorRefActorRef将消息发送给MessageDispatcher(消息分发器)MessageDispatcher将消息按照顺序保存到目标Actor的MailBox中MessageDispatcher将MailBox放到一个线程中MailBox按照顺序取出消息,最终将它递给TeacherActor接受的方法中2。创建Actor
  Akka中,也是基于Actor来进行编程的。类似于之前学习过的Actor。但是Akka的Actor的编写、创建方法和之前有一些不一样。2。1API介绍ActorSystem:它负责创建和监督Actor1。在Akka中,ActorSystem是一个重量级的结构,它需要分配多个线程。2。在实际应用中,ActorSystem通常是一个单例对象,可以使用它创建很多Actor。3。直接使用context。system就可以获取到管理该Actor的ActorSystem的引用实现Actor类1。定义类或者单例对象继承Actor(注意:要导入akka。actor包下的Actor)2。实现receive方法,receive方法中直接处理消息即可,不需要添加loop和react方法调用。Akka会自动调用receive来接收消息。3。【可选】还可以实现preStart()方法,该方法在Actor对象构建后执行,在Actor生命周期中仅执行一次。加载Actor1。要创建Akka的Actor,必须要先获取创建一个ActorSystem。需要给ActorSystem指定一个名称,并可以去加载一些配置项(后面会使用到)2。调用ActorSystem。actorOf(Props(Actor对象),Actor名字)来加载Actor。2。2ActorPath
  每一个Actor都有一个Path,这个路径可以被外部引用。路径的格式如下:
  Actor类型
  路径
  示例
  本地Actor
  akka:actorSystem名称userActor名称
  akka:SimpleAkkaDemousersenderActor
  远程Actor
  akka。tcp:mysysip地址:portuserActor名称
  akka。tcp:192。168。10。17:5678userserviceb2。3入门案例2。3。1需求
  基于Akka创建两个Actor,Actor之间可以互相发送消息。2。3。2实现步骤创建Maven模块创建并加载Actor发送接收消息2。3。3创建Maven模块
  使用Akka需要导入Akka库,这里我们使用Maven来管理项目,具体步骤如下:创建Maven模块。打开pom。xml文件,导入akkaMaven依赖和插件。dependenciesdependencygroupIdorg。scalalanggroupIdscalalibraryartifactIdversion{scala。version}versiondependencydependencygroupIdcom。typesafe。akkagroupIdakkaactor2。11artifactIdversion2。3。14versiondependencydependencygroupIdcom。typesafe。akkagroupIdakkaremote2。11artifactIdversion2。3。14versiondependencydependencygroupIdcom。itheimagroupIdsparkdemocommonartifactIdversion1。0SNAPSHOTversiondependencydependenciesbuildsourceDirectorysrcmainscalasourceDirectorytestSourceDirectorysrctestscalatestSourceDirectorypluginsplugingroupIdnet。alchim31。mavengroupIdscalamavenpluginartifactIdversion3。2。2versionexecutionsexecutiongoalsgoalcompilegoalgoaltestCompilegoalgoalsconfigurationdependencyfilearg{project。build。directory}。scaladependenciesargargsconfigurationexecutionexecutionspluginplugingroupIdorg。apache。maven。pluginsgroupIdmavenshadepluginartifactIdversion2。4。3versionexecutionsexecutionphasepackagephasegoalsgoalshadegoalgoalsconfigurationfiltersfilter:artifactexcludesexcludeMETAINF。SFexcludeexcludeMETAINF。DSAexcludeexcludeMETAINF。RSAexcludeexcludesfilterfilterstransformerstransformerimplementationorg。apache。maven。plugins。shade。resource。AppendingTransformerresourcereference。confresourcetransformertransformerimplementationorg。apache。maven。plugins。shade。resource。ManifestResourceTransformermainClassmainClasstransformertransformersconfigurationexecutionexecutionspluginpluginsbuild2。3。4创建并加载Actor
  到这,我们已经把Maven项目创建起来了,后续我们都会采用Maven来管理我们的项目。接下来,我们来实现:
  创建并加载Actor,这里,我们要创建两个Actor:SenderActor:用来发送消息ReceiverActor:用来接收,回复消息
  具体步骤在srcmainscala文件夹下创建包:com。itheima。akka。demo在该包下创建两个Actor(注意:用object修饰的单例对象)。SenderActor:表示发送消息的Actor对象。ReceiverActor:表示接收消息的Actor对象。在该包下创建单例对象Entrance,并封装main方法,表示整个程序的入口。把程序启动起来,如果不报错,说明代码是没有问题的。
  参考代码objectSenderActorextendsActor{细节:在Actor并发编程模型中,需要实现act方法,想要持续接收消息,可通过loopreact实现。在Akka编程模型中,需要实现receive方法,直接在receive方法中编写偏函数处理消息即可。重写receive()方法overridedefreceive:Receive{casexprintln(x)}}objectReceiverActorextendsActor{重写receive()方法overridedefreceive:Receive{casexprintln(x)}}objectEntrance{defmain(args:Array〔String〕){1。实现一个ActorTrait,其实就是创建两个Actor对象(上述步骤已经实现)。2。创建ActorSystem两个参数的意思分别是:ActorSystem的名字,加载配置文件(此处先不设置)valactorSystemActorSystem(actorSystem,ConfigFactory。load())3。加载ActoractorOf方法的两个参数意思是:1。具体的Actor对象。2。该Actor对象的名字valsenderActoractorSystem。actorOf(Props(SenderActor),senderActor)valreceiverActoractorSystem。actorOf(Props(ReceiverActor),receiverActor)}}2。3。5发送接收消息
  思路分析使用样例类封装消息SubmitTaskMessage提交任务消息SuccessSubmitTaskMessage任务提交成功消息使用!发送异步无返回消息。
  参考代码MessagePackage。scala文件中的代码记录发送消息的样例类。parammsg具体的要发送的信息。caseclassSubmitTaskMessage(msg:String)记录回执信息的样例类。parammsg具体的回执信息。caseclassSuccessSubmitTaskMessage(msg:String)Entrance。scala文件中的代码程序主入口。objectEntrance{defmain(args:Array〔String〕):Unit{1。创建ActorSystem,用来管理所有用户自定义的Actor。valactorSystemActorSystem(actorSystem,ConfigFactory。load())2。通过ActorSystem,来管理我们自定义的Actor(SenderActor,ReceiverActor)valsenderActoractorSystem。actorOf(Props(SenderActor),senderActor)valreceiverActoractorSystem。actorOf(Props(ReceiverActor),receiverActor)3。由ActorSystem给SenderActor发送一句话start。senderActor!start}}SenderActor。scala文件中的代码objectSenderActorextendsActor{overridedefreceive:Receive{1。接收Entrance发送过来的:startcasestart{2。打印接收到的数据。println(SenderActor接收到:Entrance发送过来的start信息。)3。获取ReceiverActor的具体路径。参数:要获取的Actor的具体路径。格式:akka:actorSystem的名字user要获取的Actor的名字。valreceiverActorcontext。actorSelection(akka:actorSystemuserreceiverActor)4。给ReceiverActor发送消息:采用样例类SubmitTaskMessagereceiverActor!SubmitTaskMessage(我是SenderActor,我在给你发消息!。。。)}5。接收ReceiverActor发送过来的回执信息。caseSuccessSubmitTaskMessage(msg)println(sSenderActor接收到回执信息:{msg})}}ReceiverActor。scala文件中的代码objectReceiverActorextendsActor{overridedefreceive:Receive{1。接收SenderActor发送过来的消息。caseSubmitTaskMessage(msg){2。打印接收到的消息。println(sReceiverActor接收到:{msg})3。给出回执信息。sender!SuccessSubmitTaskMessage(接收任务成功!。我是ReceiverActor)}}}
  输出结果SenderActor接收到:Entrance发送过来的start信息。ReceiverActor接收到:我是SenderActor,我在给你发消息!。。。SenderActor接收到回执信息:接收任务成功!。我是ReceiverActor3。Akka定时任务
  需求:如果我们想要使用Akka框架定时的执行一些任务,该如何处理呢?
  答:在Akka中,提供了一个scheduler对象来实现定时调度功能。使用ActorSystem。scheduler。schedule()方法,就可以启动一个定时任务。3。1schedule()方法的格式方式一:采用发送消息的形式实现。defschedule(initialDelay:FiniteDuration,延迟多久后启动定时任务interval:FiniteDuration,每隔多久执行一次receiver:ActorRef,给哪个Actor发送消息message:Any)要发送的消息(implicitexecutor:ExecutionContext)隐式参数:需要手动导入方式二:采用自定义方式实现。defschedule(initialDelay:FiniteDuration,延迟多久后启动定时任务interval:FiniteDuration每隔多久执行一次)(f:Unit)定期要执行的函数,可以将逻辑写在这里(implicitexecutor:ExecutionContext)隐式参数:需要手动导入注意:不管使用上述的哪种方式实现定时器,都需要导入隐式转换和隐式参数,具体如下:
  导入隐式转换,用来支持定时器。
  importactorSystem。dispatcher
  导入隐式参数,用来给定时器设置默认参数。
  importscala。concurrent。duration。3。2案例
  需求定义一个ReceiverActor,用来循环接收消息,并打印接收到的内容。创建一个ActorSystem,用来管理所有用户自定义的Actor。关联ActorSystem和ReceiverActor。导入隐式转换和隐式参数。通过定时器,定时(间隔1秒)给ReceiverActor发送一句话。方式一:采用发送消息的形式实现。方式二:采用自定义方式实现。
  参考代码案例:演示Akka中的定时器。objectMainActor{1。定义一个Actor,用来循环接收消息,并打印。objectReceiverActorextendsActor{overridedefreceive:Receive{casexprintln(x)不管接收到的是什么,都打印。}}defmain(args:Array〔String〕):Unit{2。创建一个ActorSystem,用来管理所有用户自定义的Actor。valactorSystemActorSystem(actorSystem,ConfigFactory。load())3。关联ActorSystem和ReceiverActor。valreceiverActoractorSystem。actorOf(Props(ReceiverActor),receiverActor)4。导入隐式转换和隐式参数。导入隐式转换,用来支持定时器。importactorSystem。dispatcher导入隐式参数,用来给定时器设置默认参数。importscala。concurrent。duration。5。通过定时器,定时(间隔1秒)给ReceiverActor发送一句话。方式一:通过定时器的第一种方式实现,传入四个参数。actorSystem。scheduler。schedule(3。seconds,2。seconds,receiverActor,你好,我是种哥,我有种子你买吗?。。。)方式二:通过定时器的第二种方式实现,传入两个时间,和一个函数。actorSystem。scheduler。schedule(0seconds,2seconds)(receiverActor!新上的种子哟,你没见过!嘿嘿嘿。。。)实际开发写法actorSystem。scheduler。schedule(0seconds,2seconds){receiverActor!新上的种子哟,你没见过!嘿嘿嘿。。。}}}4。实现两个进程之间的通信4。1案例介绍
  基于Akka实现在两个进程间发送、接收消息。WorkerActor启动后去连接MasterActor,并发送消息给MasterActor。MasterActor接收到消息后,再回复消息给WorkerActor。
  4。2Worker实现
  步骤创建一个Maven模块,导入依赖和配置文件。创建Maven模块创建启动WorkerActor。在srcmainscala文件夹下创建包,在该包下创建WorkerActor(单例对象的形式创建)。在该包下创建Entrance单例对象,里边定义main方法发送setup消息给WorkerActor,WorkerActor接收打印消息。启动测试。
  参考代码WorkerActor。scala文件中的代码1。创建WorkActor,用来接收和发送消息。objectWorkerActorextendsActor{overridedefreceive:Receive{2。接收消息。casexprintln(x)}}Entrance。scala文件中的代码程序入口。当前ActorSystem对象的路径akka。tcp:actorSystem127。0。0。1:9999objectEntrance{defmain(args:Array〔String〕):Unit{1。创建ActorSystem。valactorSystemActorSystem(actorSystem,ConfigFactory。load())2。通过ActorSystem,加载自定义的WorkActor。valworkerActoractorSystem。actorOf(Props(WorkerActor),workerActor)3。给WorkActor发送一句话。workerActor!setup}}启动测试:右键,执行,如果打印结果出现setup,说明程序执行没有问题。4。3Master实现
  步骤创建一个Maven模块,导入依赖和配置文件。创建Maven模块。创建启动MasterActor。在srcmainscala文件夹下创建包,在该包下创建MasterActor(单例对象的形式创建)。在该包下创建Entrance单例对象,里边定义main方法WorkerActor发送connect消息给MasterActorMasterActor回复success消息给WorkerActorWorkerActor接收并打印接收到的消息启动Master、Worker测试
  参考代码MasterActor。scala文件中的代码MasterActor:用来接收WorkerActor发送的数据,并给其返回回执信息。负责管理MasterActor的ActorSystem的地址:akka。tcp:actorSystem127。0。0。1:8888objectMasterActorextendsActor{overridedefreceive:Receive{1。接收WorkerActor发送的数据caseconnect{println(MasterActor接收到:connect!。。。)2。给WorkerActor回执一句话。sender!success}}}Entrance。scala文件中的代码Master模块的主入口objectEntrance{defmain(args:Array〔String〕):Unit{1。创建ActorSystem,用来管理用户所有的自定义Actor。valactorSystemActorSystem(actorSystem,ConfigFactory。load())2。关联ActorSystem和MasterActor。valmasterActoractorSystem。actorOf(Props(MasterActor),masterActor)3。给masterActor发送一句话:测试数据,用来测试。masterActor!测试数据}}WorkerActor。scala文件中的代码(就修改了第3步)WorkerActor:用来接收ActorSystem发送的消息,并发送消息给MasterActor,然后接收MasterActor的回执信息。负责管理WorkerActor的ActorSystem的地址:akka。tcp:actorSystem127。0。0。1:9999objectWorkerActorextendsActor{overridedefreceive:Receive{1。接收Entrance发送过来的:setup。casesetup{println(WorkerActor接收到:Entrance发送过来的指令setup!。)2。获取MasterActor的引用。valmasterActorcontext。system。actorSelection(akka。tcp:actorSystem127。0。0。1:8888usermasterActor)3。给MasterActor发送一句话。masterActor!connect}4。接收MasterActor的回执信息。casesuccessprintln(WorkerActor接收到:success!)}}5。案例:简易版spark通信框架5。1案例介绍
  模拟Spark的Master与Worker通信。一个Master管理多个Worker若干个Worker(Worker可以按需添加)向Master发送注册信息向Master定时发送心跳信息5。2实现思路构建Master、Worker阶段构建MasterActorSystem、Actor构建WorkerActorSystem、ActorWorker注册阶段Worker进程向Master注册(将自己的ID、CPU核数、内存大小(M)发送给Master)Worker定时发送心跳阶段Worker定期向Master发送心跳消息Master定时心跳检测阶段Master定期检查Worker心跳,将一些超时的Worker移除,并对Worker按照内存进行倒序排序多个Worker测试阶段启动多个Worker,查看是否能够注册成功,并停止某个Worker查看是否能够正确移除5。3工程搭建
  需求
  本项目使用Maven搭建工程。
  步骤分别搭建以下几个项目,GroupID统一都为:com。yueda,具体工程名如下:
  工程名
  说明
  sparkdemocommon
  存放公共的消息、实体类
  sparkdemomaster
  AkkaMaster节点
  sparkdemoworker
  AkkaWorker节点导入依赖分别在三个项目下的srcmain,srctest下,创建scala目录。导入配置文件(资料包中的application。conf)修改Master的端口为7000修改Worker的端口为80005。4构建Master和Worker
  需求
  分别构建Master和Worker,并启动测试
  步骤创建并加载MasterActor创建并加载WorkerActor测试是否能够启动成功
  参考代码完成master模块中的代码,即:在srcmainscala下创建包:com。itheima。spark。master,包中代码如下:MasterActor。scala文件中的代码Master:用来管理多个Worker的。MasterActor的路径:akka。tcp:actorSystem127。0。0。1:7000objectMasterActorextendsActor{overridedefreceive:Receive{casexprintln(x)}}Master。scala文件中的代码程序入口:相当于我们以前写的MainActorobjectMaster{defmain(args:Array〔String〕):Unit{1。创建ActorSystem。valactorSystemActorSystem(actorSystem,ConfigFactory。load())2。通过ActorSystem,关联MasterActor。valmasterActoractorSystem。actorOf(Props(MasterActor),masterActor)3。启动程序,如果不报错,说明代码没有问题。}}
  完成worker模块中的代码,即:在srcmainscala下创建包:com。itheima。spark。worker,包中代码如下:WorkerActor。scala文件中的代码WorkerActor的地址:akka。tcp:actorSystem127。0。0。1:7100objectWorkerActorextendsActor{overridedefreceive:Receive{casexprintln(x)}}
  Worker。scala文件中的代码程序入口objectWorker{defmain(args:Array〔String〕):Unit{1。创建ActorSystem。valactorSystemActorSystem(actorSystem,ConfigFactory。load())2。通过ActorSystem,关联MasterActor。valworkerActoractorSystem。actorOf(Props(WorkerActor),workerActor)3。启动程序,如果不报错,说明代码没有问题。workerActor!hello}}5。5Worker注册阶段实现
  需求
  在Worker启动时,发送注册消息给Master。
  思路分析Worker向Master发送注册消息(workerid、cpu核数、内存大小)随机生成CPU核(1、2、3、4、6、8)随机生成内存大小(512、1024、2048、4096)(单位M)Master保存Worker信息,并给Worker回复注册成功消息启动测试
  具体步骤在sparkdemocommon项目的srcmainscala文件夹下创建包在WorkerActor单例对象中定义一些成员变量,分别表示:masterActorRef:表示MasterActor的引用。workerid:表示当前WorkerActor对象的id。cpu:表示当前WorkerActor对象的CPU核数。mem:表示当前WorkerActor对象的内存大小。cuplist:表示当前WorkerActor对象的CPU核心数的取值范围。memlist:表示当前WorkerActor对象的内存大小的取值范围。在WorkerActor的preStart()方法中,封装注册信息,并发送给MasterActor。在MasterActor中接收WorkerActor提交的注册信息,并保存到双列集合中。。MasterActor给WorkerActor发送回执信息(注册成功信息。)。在WorkerActor中接收MasterActor回复的注册成功信息。
  参考代码WorkerActor。scala文件中的代码WorkerActor的地址:akka。tcp:actorSystem127。0。0。1:7100objectWorkerActorextendsActor{1定义成员变量,记录MasterActor的引用,以及WorkerActor提交的注册参数信息。privatevarmasterActorRef:ActorSelection表示MasterActor的引用。privatevarworkerid:String表示WorkerActor的idprivatevarcpu:Int表示WorkerActor的CPU核数privatevarmem:Int表示WorkerActor的内存大小。privatevalcpulistList(1,2,3,4,6,8)CPU核心数的取值范围privatevalmemlistList(512,1024,2048,4096)内存大小取值范围2。重写preStart()方法,里边的内容:在Actor启动之前就会执行。overridedefpreStart():Unit{3。获取Master的引用。masterActorRefcontext。actorSelection(akka。tcp:actorSystem127。0。0。1:7000usremasterActor)4。构建注册消息。workeridUUID。randomUUID()。toString设置workerActor的idvalrnewRandom()cpucpulist(r。nextInt(cpulist。length))memmemlist(r。nextInt(memlist。length))5。将WorkerActor的提交信息封装成WorkerRegisterMessage对象。varregisterMessageWorkerRegisterMessage(workerid,cpu,mem)6。发送消息给MasterActor。masterActorRef!registerMessage}overridedefreceive:Receive{casexprintln(x)}}MasterActor。scala文件中的代码Master:用来管理多个Worker的。MasterActor的路径:akka。tcp:actorSystem127。0。0。1:7000objectMasterActorextendsActor{1。定义一个可变的Map集合,用来保存注册成功好的Worker信息。privatevalregWorkerMapcollection。mutable。Map〔String,WorkerInfo〕()overridedefreceive:Receive{caseWorkerRegisterMessage(workId,cpu,mem){2。打印接收到的注册信息println(sMasterActor:接收到worker注册信息,{workId},{cpu},{mem})3。把注册成功后的保存信息保存到:workInfo中。regWorkerMapworkIdWorkerInfo(workId,cpu,mem)4。回复一个注册成功的消息。sender!RegisterSuccessMessage}}}修改WorkerActor。scala文件中receive()方法的代码overridedefreceive:Receive{caseRegisterSuccessMessageprintln(WorkerActor:注册成功!)}5。6Worker定时发送心跳阶段
  需求
  Worker接收到Master返回的注册成功信息后,定时给Master发送心跳消息。而Master收到Worker发送的心跳消息后,需要更新对应Worker的最后心跳时间。
  思路分析编写工具类读取心跳发送时间间隔创建心跳消息Worker接收到注册成功后,定时发送心跳消息Master收到心跳消息,更新Worker最后心跳时间启动测试
  具体步骤在worker的srcmainresources文件夹下的application。conf文件中添加一个配置。worker。heartbeat。interval5配置worker发送心跳的周期(单位是s)在worker项目的com。itheima。spark。work包下创建一个新的单例对象:ConfigUtils,用来读取配置文件信息。在WorkerActor的receive()方法中,定时给MasterActor发送心跳信息。Master接收到心跳消息,更新Worker最后心跳时间。。
  参考代码worker项目的ConfigUtils。scala文件中的代码objectConfigUtils{1。获取配置信息对象。privatevalconfigConfigFactory。load()2。获取worker心跳的具体周期valworker。heartbeat。intervalconfig。getInt(worker。heartbeat。interval)}修改WorkerActor。scala文件的receive()方法中的代码overridedefreceive:Receive{caseRegisterSuccessMessage{1。打印接收到的注册成功消息println(WorkerActor:接收到注册成功消息!)2。导入时间单位隐式转换和隐式参数importscala。concurrent。duration。importcontext。dispatcher3。定时给Master发送心跳消息。context。system。scheduler。schedule(0seconds,ConfigUtil。worker。heartbeat。intervalseconds){3。1采用自定义的消息的形式发送心跳信息。masterActorRef!WorkerHeartBeatMessage(workerId,cpu,mem)}}}MasterActor。scala文件中的代码objectMasterActorextendsActor{1。定义一个可变的Map集合,用来保存注册成功好的Worker信息。privatevalregWorkerMapcollection。mutable。Map〔String,WorkerInfo〕()overridedefreceive:Receive{接收注册信息。caseWorkerRegisterMessage(workId,cpu,mem){2。打印接收到的注册信息println(sMasterActor:接收到worker注册信息,{workId},{cpu},{mem})3。把注册成功后的保存信息保存到:workInfo中。regWorkerMapworkIdWorkerInfo(workId,cpu,mem,newDate()。getTime)4。回复一个注册成功的消息。sender!RegisterSuccessMessage}接收心跳消息caseWorkerHeartBeatMessage(workId,cpu,mem){1。打印接收到的心跳消息。println(sMasterActor:接收到{workId}的心跳信息)2。更新指定Worker的最后一次心跳时间。regWorkerMapworkIdWorkerInfo(workId,cpu,mem,newDate()。getTime)3。为了测试代码逻辑是否OK,我们可以打印下regWorkerMap的信息println(regWorkerMap)}}}5。7Master定时心跳检测阶段
  需求
  如果某个worker超过一段时间没有发送心跳,Master需要将该worker从当前的Worker集合中移除。可以通过Akka的定时任务,来实现心跳超时检查。
  思路分析编写工具类,读取检查心跳间隔时间间隔、超时时间定时检查心跳,过滤出来大于超时时间的Worker移除超时的Worker对现有Worker按照内存进行降序排序,打印可用Worker
  具体步骤修改Master的application。conf配置文件,添加两个配置配置检查Worker心跳的时间周期(单位:秒)master。check。heartbeat。interval6master。check。heartbeat。timeout15在Master项目的com。itheima。spark。master包下创建:ConfigUtils工具类(单例对象),用来读取配置文件信息。在MasterActor中开始检查心跳(即:修改MasterActorpreStart中的代码。)。开启Master,然后开启Worker,进行测试。
  参考代码Master项目的ConfigUtils。scala文件中的代码针对Master的工具类。objectConfigUtil{1。获取到配置文件对象。privatevalconfig:ConfigConfigFactory。load()2。获取检查Worker心跳的时间周期(单位:秒)valmaster。check。heartbeat。intervalconfig。getInt(master。check。heartbeat。interval)3。获取worker心跳超时的时间(秒)valmaster。check。heartbeat。timeoutconfig。getInt(master。check。heartbeat。timeout)}MasterActor。scala文件的preStart()方法中的代码5。定时检查worker的心跳信息overridedefpreStart():Unit{5。1导入时间转换隐式类型和定时任务隐式变量importscala。concurrent。duration。importcontext。dispatcher5。2启动定时任务。context。system。scheduler。schedule(0seconds,ConfigUtil。master。check。heartbeat。intervalseconds){5。3过滤大于超时时间的Worker。valtimeOutWorkerMapregWorkerMap。filter{keyval5。3。1获取最后一次心跳更新时间。vallastHeatBeatTimekeyval。2。lastHeartBeatTime5。3。2超时公式:当前系统时间最后一次心跳时间超时时间(配置文件信息1000)if(newDate()。getTimelastHeatBeatTimeConfigUtil。master。check。heartbeat。timeout1000)trueelsefalse}5。4移除超时的Workerif(!timeOutWorkerMap。isEmpty){如果要被移除的Worker集合不为空,则移除此timeOutWorkerMap注意:双列集合是根据键移除元素的,所以最后的。1是在获取键。regWorkerMaptimeOutWorkerMap。map(。1)}5。5对worker按照内存大小进行降序排序,打印Worker。2获取所有的WorkInfo对象。valworkerListregWorkerMap。map(。2)。toList5。6按照内存进行降序排序。valsortedWorkerListworkerList。sortBy(。mem)。reverse5。7打印结果println(按照内存的大小降序排列的Worker列表:)println(sortedWorkerList)}}5。8多个Worker测试阶段
  需求
  修改配置文件,启动多个worker进行测试。
  大白话:启动一个Worker,就修改一次Worker项目下的application。conf文件中记录的端口号,然后重新开启Worker即可。
  步骤测试启动新的Worker是否能够注册成功停止Worker,测试是否能够从现有列表删除

贾茂兴随笔闲说足球闲说足球作者:贾茂兴说起男子足球,真的让人一言难尽,也很让人难为情,可谓是爱之深,恨之切。多少年来,一直在说足球要从娃娃抓起,可是几代娃娃已经变成了娃娃他爸、他爷了……美露西号开启12年太空旅程首探木星特洛伊小行星群新华社华盛顿10月16日电美国航天局露西号太空探测器16日从美国佛罗里达州卡纳维拉尔角空军基地发射升空,开启为期12年的太空旅程,将首次探索在木星轨道内运行的特洛伊小行星群。……儿时记忆中的柿子,难以忘记1hr秋天一到,柿子就红,柿子一红,山村就红,我们的童年也就跟着红了起来。火红的柿子挂在枝头,红得像一个个小灯笼。我的目光,就随着树上的柿子转动着、摇曳着,一直等到……免费!北京一大公园,四个颐和园大,麋鹿成群,孔雀与人一同散步给大家介绍一个免费大公园,北京南海子郊野公园。南海子郊野公园是北京四大郊野公园之一,也是北京市最大的湿地公园,总面积超过11万平方公里,是辽、金、元、明、清五朝皇家猎场和……勇熊赛后!科尔批发挥不佳回应4后卫阵容,格林与狄龙互相嘲讽勇士客场110131不敌灰熊,赛后科尔提到为什么输球:他们只是打得更好,而我们打得更差。也就是说,科尔并不是很满意球队的发挥。当然说得轻巧,全队的防守确实让人绝望。不过就整个赛……2022年评选表彰4家枣庄市突出贡献企业和20名市优秀企业家大众网海报新闻记者田甜孙牧远枣庄报道10日上午,加力提速争先进位坚定不移推动工业经济高质量发展新闻发布会在枣庄会展中心举行。发布会上,市高端化工产业促进中心副主任张强介绍……浪漫不在于浪漫的得到而在于寻找浪漫的过程!马克思说:幸福不在于幸福的得到,而在于追求幸福的过程;我的妻子却说:浪漫不在于浪漫的得到。而在于寻找浪漫的过程。。。。弹指一挥间,我们已经结婚十余年了。情人节那天,……非凡营救上映,夺命狂飙火速营救,丁海峰血战人贩组织!剧情动作电影《非凡营救》已经在网络视频平台上映播出了,该片由戴维执导,丁海峰、伊米娜、卢婕、米瑞、帕维尔等人联合主演,主要讲述了安保公司培训教官意外卷入一场谋杀案,逃亡期间发现……湾转黑科技3D打印你的牙原标题:湾转黑科技3D打印你的牙编者按:2023年政府工作报告指出,过去五年我国科技创新成果丰硕,一些关键核心技术攻关取得新突破,创新支撑发展能力不断增强。为展现粤港澳大……产品组合效应抵御销量下滑,2022年大众集团销售收入增长11近日,大众汽车集团(下简称大众集团)正式对外公布2022年业绩情况:去年,大众集团销售收入达2792。32亿欧元,同比增长11。6;未计入特殊项目支出的营业利润达225。23亿……F1巴林站数据分析法拉利仅比2022年快0。003秒在一停前的长距离节奏方面,同样都是搭载红色软胎的勒克莱尔比维斯塔潘慢了0。7秒。但在一停后,仍然使用软胎的维斯塔潘比勒克莱尔快了0。6秒。法拉利在周六的排位赛中表现出良好……日打赏额不超100!代表建议严控平台打赏分成比例,限制高额打近年来,网络直播平台成为了青少年喜爱的娱乐方式,但同时也存在一些问题,如一些直播平台企业为牟取暴利,通过精心设计的玩法来引诱、误导、‘算计’青少年。面对这种情况,我们应该采取何……
少女写真6变白皮肤可以通过光子嫩肤、黑脸娃娃等方式,另外还可以通过食物调理改善。1、光子嫩肤:变白皮肤可以通过光子嫩肤的方式,利用特定仪器发射彩光光束,能够粉碎皮下色素细胞,并促使……利川梯田秋色有多美?去了才会知道湖北日报客户端(利川频道通讯员李传书杨翠)利川的梯田秋色很美丽,弯弯的梯田象无数个月牙儿排在山坡上,田里的稻子金黄金黄的,秋风吹起的时候,掀起层层稻浪,前面的浪头没跑远,后面的……废废丨持续大涨!今日铝价!铝锭价格(2022。09。30)长江有色铝18340hr2000930中原现货铝18340hr2000930广东现货铝18360hr2000930上海现货……iPhone14Plus首发即破发,回归机型为何不受欢迎?iPhone14Plus在今日正式发售,起售价为6999元,作为五年来苹果第一次回归Plus的机型,iPhone14Plus承担着苹果丰富iPhone产品线的重任,但市场表现并……教科书一直教错了,太阳系第九大行星可能是它2006年,国际天文学会的一条公告,彻底将冥王星踢出了太阳系,教科书教了几十年的事说变就变了,究竟是什么原因呢?这个事还要从100多年前说起。19世纪,美国有一位超级富翁……有多大的手,就端多大碗,有多大的承受力就干多大的事俗话说:有多大的手,就端多大的碗。小手端大腕,端不稳还容易把碗打碎。有多大的能耐,就做多大的事,超出你的能力,能力不足容易穷途末路,也容易跌落人生的低谷。身边有个朋……继张艺兴之后又迎来张韶涵?12。31梦幻西游嘉年华巨星云集距离12月31日梦幻西游嘉年华开启仅剩4日!随着日子越来越近,官方放出的嘉年华信息也越来越丰富。比如梦幻西游手游代言人张艺兴,近日发布了新的角色印象曲《悟》。想必张艺兴也会在嘉……王者英雄战力构成教你如何快速拿标王者有一套完善的游戏社交系统,让玩家与玩家之剑交流非常完善,并且为了激起玩家的胜负欲,展开了排位赛、巅峰赛和英雄战力系统。英雄战力系统在王者刚出之时非常混杂,而现在的王者……十美清东陵,您见过吗?春赏百花秋听雨,夏有繁星冬看雪。清东陵的美在四季,在朝暮,在宏大与细微之间。你是否看过云霞中恢弘的清东陵?你是否欣赏过晨雾中缥缈的清东陵?你……去申花?鲁媒恒大2主力即将转会!1人曾是鲁能的绯闻球员北京时间1月3日,在2021年中超联赛还剩下最后1轮就要落幕的日子里,山东媒体报道称出了一个令中超豪门广州队球迷关注的消息,那就是广州队的2位主力刘殿座和蒋光太即将转会离开的消……iPhone14系列曝光汇总可变挖孔屏卫星通话,苹果再放大招iPhone14系列离发布还有不到一周的时间,这也是苹果手机近年来做出改变最大的一次。像取消了难看的刘海屏,升级了祖传的1200万像素摄像头等,但是配置提高了,iPhone14……养生,如何养生?1、拒绝不良习气养生最要杜绝的就是不良的习惯,例如抽烟,喝酒。抽烟对人体的肺部功能造成损伤,呼吸道易发生感染。嗓子变得沙哑,咳嗽。喝酒对大脑有麻痹作用,长时间喝酒,……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网