Akka DispatchersRouters

网友投稿 649 2022-05-29

Akka Dispatcher是维持Akka Actor动作的核心组件,是整个Akka框架的引擎。它是基于Java的Executor框架来实现的。Dispatcher控制和协调消息并将其分发给运行在底层线程上的Actor,由它来负责调度资源的优化,并保证任务以最快的速度执行。

Akka的高稳定性是建立在“Let It Crash”模型之上的,该模型是基于Supervision和Monitoring实现的。通过定义Supervision和监管策略,实现系统异常处理。

Akka为了保证事务的一致,引入了STM的概念。STM使用的是“乐观锁”,执行临界区代码后,会检测是否产生冲突,如果产生冲突,将回滚修改,重新执行临界区代码。

Akka中,Dispatcher基于Java Executor框架来实现,提供了异步执行任务的能力。Executor是基于生产者——消费者模型来构建的。这意味着任务的提交和任务的执行是在不同的线程中隔离执行的,即提交任务的线程与执行任务的线程是不同的。

Executor框架有两个重要实现:

ThreadPoolExecutor:该实现从预定义的线程池中选取线程来执行任务。

ForkJoinPool:使用相同的线程池模型,提供了工作窃取的支持。

Dispatcher运行在线程之上,负责分发其邮箱里面的Actors和Messages到executor中的线程上运行。在Akka中,提供了4种类型的Dispatcher:

Dispatcher

Pinned Dispatcher

Balancing Dispatcher

Calling Thread Dispatcher

对应的也有4种默认的邮箱:

Unbounded mailbox

Bounded mailbox

Unbounded priority mailbox

Bounded priority mailbox

为Actor指定派发器

一般Actor都会有缺省的派发器,如果要指定派发器,要做两件事:

1)在实例化Actor时,指定派发器:

val myActor = context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"),"myActor")

1

2)创建Actor时,使用withDispatcher指定派发器,如my-dispatcher,然后在applicaction.conf配置文件中配置派发器

使用Dispatcher派发器

my-dispatcher{ # Dispatcher是基于事件的派发器名称 type = Dispatcher # 使用何种ExecutionService executor = "fork-join-executor" # 配置fork join池 fork-join-executor{ # 容纳基于倍数的并行数的线程数下限 parallelism-min = 2 # 并行数(线程)(CPU核数*2) parallelism-factor = 2.0 # 容纳基于倍数的并行数量的线程数上限 parallelism-max = 10 } # throughput定义了线程切换到另一个Actor之前处理的消息数上限 # 设置为1表示尽可能公平 throughput = 100 }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

使用PinnedDispatcher派发器

my-dispatcher{ # Dispatcher是基于事件的派发器名称 type = PinnedDispatcher # 使用何种ExecutionService executor = "thread-pool-executor" # 配置fork join池 thread-pool-executor{ # 容纳基于倍数的并行数的线程数下限 parallelism-min = 2 # 并行数(线程)(CPU核数*2) parallelism-factor = 2.0 # 容纳基于倍数的并行数量的线程数上限 parallelism-max = 10 } # throughput定义了线程切换到另一个Actor之前处理的消息数上限 # 设置为1表示尽可能公平 throughput = 100 }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

不同派发器的介绍

Dispatcher

Dispatcher是Akka中默认的派发器,它是基于事件的分发器,该派发器绑定一组Actor到线程池中。该派发器有如下特点:

1)每一个Actor都有自己的邮箱

2)该派发器都可以被任意数量的Actor共享

3)该派发器可以由ThreadPoolExecutor或ForkJoinPool提供支持

4)该派发器是非阻塞的。

Balancing Dispatcher

该派发器是基于事件的分发器,它会将任务比较多的Actor的任务重新分发到比较闲的Actor上运行。该派发器有如下特点:

1)所有Actor共用一个邮箱

2)该派发器只能被同一种类型的Actor共享

3)该派发器可以由ThreadPoolExecutor或ForkJoinPool提供支持

Pinned Dispatcher

该派发器为每一个Actor提供一个单一的、专用的线程。这种做法在I/O操作或者长时间运行的计算中很有用。该派发器有如下特点:

1)每一个Actor都有自己的邮箱

2)每一个Actor都有专用的线程,该线程不能和其他Actor共享

3)该派发器有一个Executor线程池

4)该派发器在阻塞上进行了优化,如:如果程序正在进行I/O操作,那么这个Actor将会等到任务执行完成。这种阻塞型的操作在性能上要比默认的Dispatcher要好。

Calling Thread Dispatcher

该派发器主要用于测试,并且在当前线程运行任务,不会创建新线程,该派发器有如下特点:

1)每一个Actor都有自己的邮箱

2)该派发器都可以被任意数量的Actor共享

3)该派发器由调用线程支持

邮箱

邮箱用于保存接收的消息,在Akka中除使用BalancingDispather分发器的Actor以外,每个Actor都拥有自己的邮箱。使用同一个BalancingDispather的所有Actor共享同一个邮箱实例。

邮箱是基于Java concurrent中的队列来实现的,它有如下特点:

1)阻塞队列,直到队列空间可用,或者队列中有可用元素

2)有界队列,它的大小是被限制的

缺省的邮箱实现

UnboundedMailbox

底层是一个java.util.concurrent.ConcurrentLinkedQueue

是否阻塞:No

是否有界:No

BoundedMailbox

底层是一个java.util.concurrent.LinkedBlockingQueue

是否阻塞:Yes

是否有界:Yes

UnboundedPriorityMailbox

底层是一个java.util.concurrent.PriorityBlockingQueue

是否阻塞:Yes

是否有界:No

BoundedPriorityMailbox

底层是一个java.util.PriorityBlockingQueue

是否阻塞:Yes

是否有界:Yes

还有一些缺省的持久邮箱。

Router

当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router。它所管理的一些目标actor叫做routees

Akka定义好的一些Router:

akka.routing.RoundRobinRouter:轮转路由器将消息按照轮转顺序发送给routers

akka.routing.RandomRouter:随机路由器随机选择一个router,并将消息发送给这个router

akka.routing.SmallestMailboxRouter:最小邮箱路由器会在routers中选择邮箱里信息最少的router,然后把消息发送给它。

akka.routing.BroadcastRouter:广播路由器将相同的消息发送给所有的routers

akka.routing.ScatterGatherFirstCompletedRouter:敏捷路由器先将消息广播到所有routers,返回最先完成任务的router的结果给调用者。

路由器的使用

RoundRobinPool 和 RoundRobinGroupRouter对routees使用轮询机制

RandomPool 和 RandomGroupRouter随机选择routees发送消息

BalancingPool尝试从繁忙的routee重新分配任务到空闲routee,所有的routee共享一个mailbox

SmallestMailboxPoolRouter创建的所有routees中谁邮箱中的消息最少发给谁

BroadcastPool 和 BroadcastGroup广播的路由器将接收到的消息转发到它所有的routee。

ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup将消息发送给所有的routees,然后等待到收到第一个回复,将结果发送回原始发送者。其他的回复将被丢弃

TailChoppingPool 和 TailChoppingGroup将首先发送消息到一个随机挑取的routee,短暂的延迟后发给第二个routee(从剩余的routee中随机挑选),以此类推。它等待第一个答复,并将它转回给原始发送者。其他答复将被丢弃。此Router的目标是通过查询到多个routee来减少延迟,假设其他的actor可能比第一个actor更快响应。

ConsistentHashingPool 和 ConsistentHashingGroup对消息使用一致性哈希(consistent hashing)选择routee

有三种方式定义哪些数据作为一致性哈希键

定义路由的hashMapping,将传入的消息映射到它们一致哈希键。这使决策对发送者透明。·

这些消息可能会实现ConsistentHashable。键是消息的一部分,并很方便地与消息定义一起定义。·

消息可以被包装在一个ConsistentHashableEnvelope中,来定义哪些数据可以用来做一致性哈希。发送者知道要使用的键。

路由器的使用要先创建路由器后使用。 AKKA的路由由router和众多的routees组成,router和routees都是actor.Router即路由,是负责负载均衡和路由的抽象,有两种方法来创建router:

1.Actor Group

2.Actor Pool

当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router。它所管理的一些目标actor叫做routees

根据不同的情况需要,Akka提供了几种路由策略。当然也可以创建自己的路由及策略。Akka提供的路由策略如下:

akka.routing.RoundRobinRoutingLogic 轮询

akka.routing.RandomRoutingLogic 随机

akka.routing.SmallestMailboxRoutingLogic 空闲

akka.routing.BroadcastRoutingLogic 广播

akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集

akka.routing.TailChoppingRoutingLogic 尾部断续

akka.routing.ConsistentHashingRoutingLogic 一致性哈希

创建Router Actor

创建router actor 有两种方式:

Pool(池)——routees都是router 的子actor,如果routees终止,router将把它们移除

Group(群组)——routees都创建在router的外部,router通过使用actor来选择将消息发送到指定路径,但不监管routees是否终止。Router actor 向 routees 发送消息,与向普通actor发送消息一样通过其ActorRef。Router actor 不会改变消息的发送人,routees 回复消息时发送回原始发件人,而不是Router actor。

Pool(池)可以通过配置并使用代码在配置中获取的方法来实现 (例如创建一个轮询Router向5个routees发送消息)

Group(群组)有时我们需要单独地创建routees,然后提供一个Router来供其使用。可以通过将routees的路径传递给Router的配置,消息将通过ActorSelection来发送到这些路径。

有两种方式创建路由器:

Pool(池)

import akka.actor._ import akka.routing.{ActorRefRoutee, FromConfig, RoundRobinGroup, RoundRobinPool, RoundRobinRoutingLogic, Router} object HelloScala { def main(args: Array[String]): Unit = { // 创建router val _system = ActorSystem("testRouter") // 通知代码来实现路由器 val hahaRouter = _system.actorOf(RoundRobinPool(5).props(Props[WorkerRoutee]),"router111") hahaRouter ! RouteeMsg(333) val myRouter = _system.actorOf(Props[WorkerRoutee].withRouter(RoundRobinPool(nrOfInstances = 5))) myRouter ! RouteeMsg(22) val masterRouter = _system.actorOf(Props[MasterRouter],"masterRouter") masterRouter ! RouteeMsg(100) } } class MasterRouter extends Actor{ var masterRouter = { val routees = Vector.fill(3){ val r = context.actorOf(Props[WorkerRoutee]) context watch r ActorRefRoutee(r) } Router(RoundRobinRoutingLogic(),routees) } override def receive: Receive = { case w: RouteeMsg => masterRouter.route(w,sender()) case Terminated(a) => masterRouter = masterRouter.removeRoutee(a) val r = context.actorOf(Props[WorkerRoutee]) context watch r masterRouter = masterRouter.addRoutee(r) } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class WorkerRoutee2 extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#@@@@@$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}") } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

Group(群组)

import akka.actor._ import akka.routing.{ RoundRobinGroup} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! RouteeMsg(13333) } } class TestActor extends Actor{ val routee1 = context.actorOf(Props[WorkerRoutee],"w1") val routee2 = context.actorOf(Props[WorkerRoutee],"w2") val routee3 = context.actorOf(Props[WorkerRoutee],"w3") val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter") override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case _ => } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}") } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

特殊消息

Broadcast消息用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。

PoisonPill消息无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。但仍然能影响到routees,因为Router停止时它的子actor也会停止,就可能会造成消息未处理。因此我们可以将PoisonPill包装到Broadcast消息中。这样Router所管理的所有routees将会处理完消息后再处理PoisonPill并停止。

Kill消息当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。Router将抛出ActorKilledException并失败,然后Router根据监管的策略,被恢复、重启或终止。Router的子routee也将被暂停,也受Router监管的影响,但是独立在Router外部创建的routee将不会被影响。

import akka.actor._ import akka.routing.{Broadcast, RoundRobinGroup} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! PoisonPill } } class TestActor extends Actor{ val routee1 = context.actorOf(Props[WorkerRoutee],"w1") val routee2 = context.actorOf(Props[WorkerRoutee],"w2") val routee3 = context.actorOf(Props[WorkerRoutee],"w3") val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter") override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case RouteeBroadcast => testRouter ! Broadcast // 用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。 case Broadcast => println("TestActor receive a broadcast message") case Kill => testRouter ! Kill// 当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。 case PoisonPill => testRouter ! PoisonPill // 无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。 case _ => } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) // 定义广播信息 case object RouteeBroadcast class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}") } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

远程部署Router

既可以创建本地actor来作为Router,也可以命令Router在任一远程主机上部署子actor。需要将路由配置放在RemoteRouterConfig下,在远程部署的路径类中要添加akka-remote模块:

import akka.actor._ import akka.remote.routing.{RemoteRouterConfig} import akka.routing.{Broadcast,RoundRobinPool} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val addresses = Seq( Address("akka.tcp","remotesys","otherhost",6666), AddressFromURIString("akka.tcp://othersys@anotherhost:6666") ) // WorkerRoutee 路由部署到远程的主机上 val routerRemote = _system.actorOf(RemoteRouterConfig(RoundRobinPool(5),addresses).props(Props[WorkerRoutee])) } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}") } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

Akka Dispatchers和Routers

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

NAT 任务调度

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:H5微应用JSAPI We码小程序跳转
下一篇:微服务API设计的实践与思考总结
相关文章