Akka DispatchersRouters

网友投稿 600 2022-05-29

Akka Dispatcher是维持Akka Actor动作的核心组件,是整个Akka框架的引擎。它是基于Java的Executor框架来实现的。Dispatcher控制和协调消息并将其分发给运行在底层线程上的Actor,由它来负责调度资源的优化,并保证任务以最快的速度执行。

Akka的高稳定性是建立在“Let It Crash”模型之上的,该模型是基于Supervision和Monitoring实现的。通过定义Supervision和监管策略,实现系统异常处理。

Akka为了保证事务的一致,引入了STM的概念。STM使用的是“乐观锁”,执行临界区代码后,会检测是否产生冲突,如果产生冲突,将回滚修改,重新执行临界区代码。

Akka中,Dispatcher基于Java Executor框架来实现,提供了异步执行任务的能力。Executor是基于生产者——消费者模型来构建的。这意味着任务的提交和任务的执行是在不同的线程中隔离执行的,即提交任务的线程与执行任务的线程是不同的。

Executor框架有两个重要实现:

ThreadPoolExecutor:该实现从预定义的线程池中选取线程来执行任务。

ForkJoinPool:使用相同的线程池模型,提供了工作窃取的支持。

Dispatcher运行在线程之上,负责分发其邮箱里面的Actors和Messages到executor中的线程上运行。在Akka中,提供了4种类型的Dispatcher:

Dispatcher

Pinned Dispatcher

Balancing Dispatcher

Calling Thread Dispatcher

对应的也有4种默认的邮箱:

Unbounded mailbox

Bounded mailbox

Unbounded priority mailbox

Bounded priority mailbox

为Actor指定派发器

一般Actor都会有缺省的派发器,如果要指定派发器,要做两件事:

1)在实例化Actor时,指定派发器:

val myActor = context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"),"myActor")

1

2)创建Actor时,使用withDispatcher指定派发器,如my-dispatcher,然后在applicaction.conf配置文件中配置派发器

使用Dispatcher派发器

my-dispatcher{ # Dispatcher是基于事件的派发器名称 type = Dispatcher # 使用何种ExecutionService executor = "fork-join-executor" # 配置fork join池 fork-join-executor{ # 容纳基于倍数的并行数的线程数下限 parallelism-min = 2 # 并行数(线程)(CPU核数*2) parallelism-factor = 2.0 # 容纳基于倍数的并行数量的线程数上限 parallelism-max = 10 } # throughput定义了线程切换到另一个Actor之前处理的消息数上限 # 设置为1表示尽可能公平 throughput = 100 }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

使用PinnedDispatcher派发器

my-dispatcher{ # Dispatcher是基于事件的派发器名称 type = PinnedDispatcher # 使用何种ExecutionService executor = "thread-pool-executor" # 配置fork join池 thread-pool-executor{ # 容纳基于倍数的并行数的线程数下限 parallelism-min = 2 # 并行数(线程)(CPU核数*2) parallelism-factor = 2.0 # 容纳基于倍数的并行数量的线程数上限 parallelism-max = 10 } # throughput定义了线程切换到另一个Actor之前处理的消息数上限 # 设置为1表示尽可能公平 throughput = 100 }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

不同派发器的介绍

Dispatcher

Dispatcher是Akka中默认的派发器,它是基于事件的分发器,该派发器绑定一组Actor到线程池中。该派发器有如下特点:

1)每一个Actor都有自己的邮箱

2)该派发器都可以被任意数量的Actor共享

3)该派发器可以由ThreadPoolExecutor或ForkJoinPool提供支持

4)该派发器是非阻塞的。

Balancing Dispatcher

该派发器是基于事件的分发器,它会将任务比较多的Actor的任务重新分发到比较闲的Actor上运行。该派发器有如下特点:

1)所有Actor共用一个邮箱

2)该派发器只能被同一种类型的Actor共享

3)该派发器可以由ThreadPoolExecutor或ForkJoinPool提供支持

Pinned Dispatcher

该派发器为每一个Actor提供一个单一的、专用的线程。这种做法在I/O操作或者长时间运行的计算中很有用。该派发器有如下特点:

1)每一个Actor都有自己的邮箱

2)每一个Actor都有专用的线程,该线程不能和其他Actor共享

3)该派发器有一个Executor线程池

4)该派发器在阻塞上进行了优化,如:如果程序正在进行I/O操作,那么这个Actor将会等到任务执行完成。这种阻塞型的操作在性能上要比默认的Dispatcher要好。

Calling Thread Dispatcher

该派发器主要用于测试,并且在当前线程运行任务,不会创建新线程,该派发器有如下特点:

1)每一个Actor都有自己的邮箱

2)该派发器都可以被任意数量的Actor共享

3)该派发器由调用线程支持

邮箱

邮箱用于保存接收的消息,在Akka中除使用BalancingDispather分发器的Actor以外,每个Actor都拥有自己的邮箱。使用同一个BalancingDispather的所有Actor共享同一个邮箱实例。

邮箱是基于Java concurrent中的队列来实现的,它有如下特点:

1)阻塞队列,直到队列空间可用,或者队列中有可用元素

2)有界队列,它的大小是被限制的

缺省的邮箱实现

UnboundedMailbox

底层是一个java.util.concurrent.ConcurrentLinkedQueue

是否阻塞:No

是否有界:No

BoundedMailbox

底层是一个java.util.concurrent.LinkedBlockingQueue

是否阻塞:Yes

是否有界:Yes

UnboundedPriorityMailbox

底层是一个java.util.concurrent.PriorityBlockingQueue

是否阻塞:Yes

是否有界:No

BoundedPriorityMailbox

底层是一个java.util.PriorityBlockingQueue

是否阻塞:Yes

是否有界:Yes

还有一些缺省的持久邮箱。

Router

当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router。它所管理的一些目标actor叫做routees

Akka定义好的一些Router:

akka.routing.RoundRobinRouter:轮转路由器将消息按照轮转顺序发送给routers

akka.routing.RandomRouter:随机路由器随机选择一个router,并将消息发送给这个router

akka.routing.SmallestMailboxRouter:最小邮箱路由器会在routers中选择邮箱里信息最少的router,然后把消息发送给它。

akka.routing.BroadcastRouter:广播路由器将相同的消息发送给所有的routers

akka.routing.ScatterGatherFirstCompletedRouter:敏捷路由器先将消息广播到所有routers,返回最先完成任务的router的结果给调用者。

路由器的使用

RoundRobinPool 和 RoundRobinGroupRouter对routees使用轮询机制

RandomPool 和 RandomGroupRouter随机选择routees发送消息

BalancingPool尝试从繁忙的routee重新分配任务到空闲routee,所有的routee共享一个mailbox

SmallestMailboxPoolRouter创建的所有routees中谁邮箱中的消息最少发给谁

BroadcastPool 和 BroadcastGroup广播的路由器将接收到的消息转发到它所有的routee。

ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup将消息发送给所有的routees,然后等待到收到第一个回复,将结果发送回原始发送者。其他的回复将被丢弃

TailChoppingPool 和 TailChoppingGroup将首先发送消息到一个随机挑取的routee,短暂的延迟后发给第二个routee(从剩余的routee中随机挑选),以此类推。它等待第一个答复,并将它转回给原始发送者。其他答复将被丢弃。此Router的目标是通过查询到多个routee来减少延迟,假设其他的actor可能比第一个actor更快响应。

ConsistentHashingPool 和 ConsistentHashingGroup对消息使用一致性哈希(consistent hashing)选择routee

有三种方式定义哪些数据作为一致性哈希键

定义路由的hashMapping,将传入的消息映射到它们一致哈希键。这使决策对发送者透明。·

这些消息可能会实现ConsistentHashable。键是消息的一部分,并很方便地与消息定义一起定义。·

消息可以被包装在一个ConsistentHashableEnvelope中,来定义哪些数据可以用来做一致性哈希。发送者知道要使用的键。

路由器的使用要先创建路由器后使用。 AKKA的路由由router和众多的routees组成,router和routees都是actor.Router即路由,是负责负载均衡和路由的抽象,有两种方法来创建router:

1.Actor Group

2.Actor Pool

当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router。它所管理的一些目标actor叫做routees

根据不同的情况需要,Akka提供了几种路由策略。当然也可以创建自己的路由及策略。Akka提供的路由策略如下:

akka.routing.RoundRobinRoutingLogic 轮询

akka.routing.RandomRoutingLogic 随机

akka.routing.SmallestMailboxRoutingLogic 空闲

akka.routing.BroadcastRoutingLogic 广播

akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集

akka.routing.TailChoppingRoutingLogic 尾部断续

akka.routing.ConsistentHashingRoutingLogic 一致性哈希

创建Router Actor

创建router actor 有两种方式:

Pool(池)——routees都是router 的子actor,如果routees终止,router将把它们移除

Group(群组)——routees都创建在router的外部,router通过使用actor来选择将消息发送到指定路径,但不监管routees是否终止。Router actor 向 routees 发送消息,与向普通actor发送消息一样通过其ActorRef。Router actor 不会改变消息的发送人,routees 回复消息时发送回原始发件人,而不是Router actor。

Pool(池)可以通过配置并使用代码在配置中获取的方法来实现 (例如创建一个轮询Router向5个routees发送消息)

Group(群组)有时我们需要单独地创建routees,然后提供一个Router来供其使用。可以通过将routees的路径传递给Router的配置,消息将通过ActorSelection来发送到这些路径。

有两种方式创建路由器:

Pool(池)

import akka.actor._ import akka.routing.{ActorRefRoutee, FromConfig, RoundRobinGroup, RoundRobinPool, RoundRobinRoutingLogic, Router} object HelloScala { def main(args: Array[String]): Unit = { // 创建router val _system = ActorSystem("testRouter") // 通知代码来实现路由器 val hahaRouter = _system.actorOf(RoundRobinPool(5).props(Props[WorkerRoutee]),"router111") hahaRouter ! RouteeMsg(333) val myRouter = _system.actorOf(Props[WorkerRoutee].withRouter(RoundRobinPool(nrOfInstances = 5))) myRouter ! RouteeMsg(22) val masterRouter = _system.actorOf(Props[MasterRouter],"masterRouter") masterRouter ! RouteeMsg(100) } } class MasterRouter extends Actor{ var masterRouter = { val routees = Vector.fill(3){ val r = context.actorOf(Props[WorkerRoutee]) context watch r ActorRefRoutee(r) } Router(RoundRobinRoutingLogic(),routees) } override def receive: Receive = { case w: RouteeMsg => masterRouter.route(w,sender()) case Terminated(a) => masterRouter = masterRouter.removeRoutee(a) val r = context.actorOf(Props[WorkerRoutee]) context watch r masterRouter = masterRouter.addRoutee(r) } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class WorkerRoutee2 extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#@@@@@$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}") } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

Group(群组)

import akka.actor._ import akka.routing.{ RoundRobinGroup} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! RouteeMsg(13333) } } class TestActor extends Actor{ val routee1 = context.actorOf(Props[WorkerRoutee],"w1") val routee2 = context.actorOf(Props[WorkerRoutee],"w2") val routee3 = context.actorOf(Props[WorkerRoutee],"w3") val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter") override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case _ => } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}") } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

特殊消息

Broadcast消息用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。

PoisonPill消息无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。但仍然能影响到routees,因为Router停止时它的子actor也会停止,就可能会造成消息未处理。因此我们可以将PoisonPill包装到Broadcast消息中。这样Router所管理的所有routees将会处理完消息后再处理PoisonPill并停止。

Kill消息当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。Router将抛出ActorKilledException并失败,然后Router根据监管的策略,被恢复、重启或终止。Router的子routee也将被暂停,也受Router监管的影响,但是独立在Router外部创建的routee将不会被影响。

import akka.actor._ import akka.routing.{Broadcast, RoundRobinGroup} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! PoisonPill } } class TestActor extends Actor{ val routee1 = context.actorOf(Props[WorkerRoutee],"w1") val routee2 = context.actorOf(Props[WorkerRoutee],"w2") val routee3 = context.actorOf(Props[WorkerRoutee],"w3") val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter") override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case RouteeBroadcast => testRouter ! Broadcast // 用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。 case Broadcast => println("TestActor receive a broadcast message") case Kill => testRouter ! Kill// 当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。 case PoisonPill => testRouter ! PoisonPill // 无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。 case _ => } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) // 定义广播信息 case object RouteeBroadcast class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}") } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

远程部署Router

既可以创建本地actor来作为Router,也可以命令Router在任一远程主机上部署子actor。需要将路由配置放在RemoteRouterConfig下,在远程部署的路径类中要添加akka-remote模块:

import akka.actor._ import akka.remote.routing.{RemoteRouterConfig} import akka.routing.{Broadcast,RoundRobinPool} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val addresses = Seq( Address("akka.tcp","remotesys","otherhost",6666), AddressFromURIString("akka.tcp://othersys@anotherhost:6666") ) // WorkerRoutee 路由部署到远程的主机上 val routerRemote = _system.actorOf(RemoteRouterConfig(RoundRobinPool(5),addresses).props(Props[WorkerRoutee])) } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}") } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

Akka Dispatchers和Routers

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

NAT 任务调度

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:H5微应用JSAPI We码小程序跳转
下一篇:微服务API设计的实践与思考总结
相关文章