我们使用 Akka 开发并行程序时,可以使用层级结构组织 Actors。层次结构不仅比较符合人类直觉,还为容错提供了机制保障。
(资料图片)
Akka 是用层次结构组织 Actors 的。
1. Akka 的层次结构
我们需要实现一个翻译模块,其功能是输入中文输出多国语言。我们可以让一个 Master Actor 负责接收外界输入,多个 Worker Actor 负责将输入翻译成特定语言,Master Actor 和 Worker Actor 之间是上下级层次关系。下图展示了这种层级结构。
具体代码实现如下所示。
classMasterextendsActorwithActorLogging{valenglish2chinese=context.actorOf(Props[English2Chinese],"English2Chinese")valenglish2cat=context.actorOf(Props[English2Cat],"English2Cat")defreceive={caseeng1:String=>{english2chinese!eng1english2cat!eng1}}}classEnglish2ChineseextendsActorwithActorLogging{defreceive={caseeng:String=>{println("我翻译不出来!")}}}classEnglish2CatextendsActorwithActorLogging{defreceive={caseeng:String=>{println("喵喵喵!")}}}objectMain{defmain(args:Array[String])={valsys=ActorSystem("system")valmaster=sys.actorOf(Props[Master],"Master")master!"Hello,world!"}}
我们在 Master Actor 中使用 context.actorOf 实例化 English2Chinese 和 English2Cat,便可以在它们之间形成层次关系。这点通过它们的 actor 地址得到证实。
上面的 Actors 层次结构是我们程序里 Actor 的层次结构。这个层次结构是 Actor System 层次结构的一部分。Actor System 层次结构从根节点出来有两个子节点:UserGuardian 和 SystemGuardian。用户程序产生的所有 Actor 都在 UserGuardian 节点下,SystemGuardian 节点则包含系统中的一些 Actor,比如 deadLetterListener。如果一个 Actor 已经 stop 了,发送给这个 Actor 的消息就会被转送到 deadLetterListener。因此完整的 Actor 层次结构如下所示。
2. Akka 的容错机制
对于分布式系统来说,容错机制是很重要的指标。那么 Akka 是怎么实现容错的呢?Akka 的容错机制是基于层次结构: Akka 在 Actor 加一个监控策略,对其子 Actor 进行监控。下面的代码是给 Actor 加了一个监控策略,其监控策略内容:如果子 Actor 在运行过程中抛出 Exception,对该子 Actor 执行停止动作 (即停止该子 Actor)。
overridevalsupervisorStrategy=OneForOneStrategy(){case_:Exception=>Stop}
Akka 的监控策略一共支持四种动作:Stop, Resume, Restart 和 Escalate。
Stop:子 Actor 停止。Resume:子 Actor 忽略引发异常的消息,继续处理后续消息。Restart:子 Actor 停止,重新初始化一个子 Actor 处理后续消息Escalate:错误太严重,自己已经无法处理,将错误信息上报给父 Actor。Akka 的监控策略分为两种。一种是 OneForOne。这种策略只对抛出 Exception 的子 Actor 执行相应动作。还是拿上面的翻译模块做例子,我们加入一个 OneForOne 的 Stop 的监控策略。
classMaster1extendsActorwithActorLogging{valenglish2Chinese=context.actorOf(Props[English2Chinese1],"English2Chinese")valenglish2Cat=context.actorOf(Props[English2Cat1],"English2Cat")overridevalsupervisorStrategy=OneForOneStrategy(){case_:Exception=>Stop}overridedefreceive={caseeng:String=>{english2Cat!eng;english2Chinese!eng;}}}classEnglish2Chinese1extendsActorwithActorLogging{overridedefreceive={caseeng:String=>{println("翻译不出来")}}}classEnglish2Cat1extendsActorwithActorLogging{overridedefreceive={caseeng:String=>{thrownewException("ExceptioninEnglish2Cat1")}}}objecthierarchy1{defmain(args:Array[String])={valsystem=ActorSystem("system")valmaster=system.actorOf(Props[Master1],"Master")master!"Hello,world"Thread.sleep(1000)master!"Hello,world"}}
运行这段代码,我们得到下面结果。从下面的结果,我们可以看出:***轮 English2Cat1 抛出了 Exception, English2Chinese1 正常工作;第二轮,English2Cat1 已经死了,English2Chinese1 也已经死亡了。这个结果说明监控策略已经将 MasterActor 的所有子 Actor 停止了。
另一种是 AllForOne。如果有子 Actor 抛出 Exception,这种监控策略对所有子 Actor 执行动作。
classMaster2extendsActorwithActorLogging{valenglish2Chinese=context.actorOf(Props[English2Chinese2],"English2Chinese")valenglish2Cat=context.actorOf(Props[English2Cat2],"English2Cat")overridevalsupervisorStrategy=AllForOneStrategy(){case_:Exception=>Stop}overridedefreceive={caseeng:String=>{english2Cat!eng;english2Chinese!eng;}}}
运行这段代码,我们得到下面结果。从下面的结果,我们可以看出:***轮 English2Cat1 抛出了 Exception, English2Chinese1 正常工作;第二轮,English2Cat1 已经死了,English2Chinese1 也已经死亡了。这个结果说明监控策略已经将 MasterActor 的所有子 Actor 停止了。
3. 总结
我们使用 Akka 开发并行程序时,可以使用层级结构组织 Actors。层次结构不仅比较符合人类直觉,还为容错提供了机制保障。
【本文为清一色专栏作者“李立”的原创稿件,转载请通过清一色获取联系和授权】
戳这里,看该作者更多好文
Copyright © 2015-2022 世界公司网版权所有 备案号:琼ICP备2022009675号-1 联系邮箱:435 227 67@qq.com