Akka的事务STM
649
2022-05-29
Akka Dispatcher是维持Akka Actor动作的核心组件,是整个Akka框架的引擎。它是基于Java的Executor框架来实现的。Dispatcher控制和协调消息并将其分发给运行在底层线程上的Actor,由它来负责调度资源的优化,并保证任务以最快的速度执行。
Akka的高稳定性是建立在“Let It Crash”模型之上的,该模型是基于Supervision和Monitoring实现的。通过定义Supervision和监管策略,实现系统异常处理。
Akka为了保证事务的一致,引入了STM的概念。STM使用的是“乐观锁”,执行临界区代码后,会检测是否产生冲突,如果产生冲突,将回滚修改,重新执行临界区代码。
Akka中,Dispatcher基于Java Executor框架来实现,提供了异步执行任务的能力。Executor是基于生产者——消费者模型来构建的。这意味着任务的提交和任务的执行是在不同的线程中隔离执行的,即提交任务的线程与执行任务的线程是不同的。
Executor框架有两个重要实现:
ThreadPoolExecutor:该实现从预定义的线程池中选取线程来执行任务。
ForkJoinPool:使用相同的线程池模型,提供了工作窃取的支持。
Dispatcher运行在线程之上,负责分发其邮箱里面的Actors和Messages到executor中的线程上运行。在Akka中,提供了4种类型的Dispatcher:
Dispatcher
Pinned Dispatcher
Balancing Dispatcher
Calling Thread Dispatcher
对应的也有4种默认的邮箱:
Unbounded mailbox
Bounded mailbox
Unbounded priority mailbox
Bounded priority mailbox
为Actor指定派发器
一般Actor都会有缺省的派发器,如果要指定派发器,要做两件事:
1)在实例化Actor时,指定派发器:
val myActor = context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"),"myActor")
1
2)创建Actor时,使用withDispatcher指定派发器,如my-dispatcher,然后在applicaction.conf配置文件中配置派发器
使用Dispatcher派发器
my-dispatcher{ # Dispatcher是基于事件的派发器名称 type = Dispatcher # 使用何种ExecutionService executor = "fork-join-executor" # 配置fork join池 fork-join-executor{ # 容纳基于倍数的并行数的线程数下限 parallelism-min = 2 # 并行数(线程)(CPU核数*2) parallelism-factor = 2.0 # 容纳基于倍数的并行数量的线程数上限 parallelism-max = 10 } # throughput定义了线程切换到另一个Actor之前处理的消息数上限 # 设置为1表示尽可能公平 throughput = 100 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
使用PinnedDispatcher派发器
my-dispatcher{ # Dispatcher是基于事件的派发器名称 type = PinnedDispatcher # 使用何种ExecutionService executor = "thread-pool-executor" # 配置fork join池 thread-pool-executor{ # 容纳基于倍数的并行数的线程数下限 parallelism-min = 2 # 并行数(线程)(CPU核数*2) parallelism-factor = 2.0 # 容纳基于倍数的并行数量的线程数上限 parallelism-max = 10 } # throughput定义了线程切换到另一个Actor之前处理的消息数上限 # 设置为1表示尽可能公平 throughput = 100 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
不同派发器的介绍
Dispatcher
Dispatcher是Akka中默认的派发器,它是基于事件的分发器,该派发器绑定一组Actor到线程池中。该派发器有如下特点:
1)每一个Actor都有自己的邮箱
2)该派发器都可以被任意数量的Actor共享
3)该派发器可以由ThreadPoolExecutor或ForkJoinPool提供支持
4)该派发器是非阻塞的。
Balancing Dispatcher
该派发器是基于事件的分发器,它会将任务比较多的Actor的任务重新分发到比较闲的Actor上运行。该派发器有如下特点:
1)所有Actor共用一个邮箱
2)该派发器只能被同一种类型的Actor共享
3)该派发器可以由ThreadPoolExecutor或ForkJoinPool提供支持
Pinned Dispatcher
该派发器为每一个Actor提供一个单一的、专用的线程。这种做法在I/O操作或者长时间运行的计算中很有用。该派发器有如下特点:
1)每一个Actor都有自己的邮箱
2)每一个Actor都有专用的线程,该线程不能和其他Actor共享
3)该派发器有一个Executor线程池
4)该派发器在阻塞上进行了优化,如:如果程序正在进行I/O操作,那么这个Actor将会等到任务执行完成。这种阻塞型的操作在性能上要比默认的Dispatcher要好。
Calling Thread Dispatcher
该派发器主要用于测试,并且在当前线程运行任务,不会创建新线程,该派发器有如下特点:
1)每一个Actor都有自己的邮箱
2)该派发器都可以被任意数量的Actor共享
3)该派发器由调用线程支持
邮箱
邮箱用于保存接收的消息,在Akka中除使用BalancingDispather分发器的Actor以外,每个Actor都拥有自己的邮箱。使用同一个BalancingDispather的所有Actor共享同一个邮箱实例。
邮箱是基于Java concurrent中的队列来实现的,它有如下特点:
1)阻塞队列,直到队列空间可用,或者队列中有可用元素
2)有界队列,它的大小是被限制的
缺省的邮箱实现
UnboundedMailbox
底层是一个java.util.concurrent.ConcurrentLinkedQueue
是否阻塞:No
是否有界:No
BoundedMailbox
底层是一个java.util.concurrent.LinkedBlockingQueue
是否阻塞:Yes
是否有界:Yes
UnboundedPriorityMailbox
底层是一个java.util.concurrent.PriorityBlockingQueue
是否阻塞:Yes
是否有界:No
BoundedPriorityMailbox
底层是一个java.util.PriorityBlockingQueue
是否阻塞:Yes
是否有界:Yes
还有一些缺省的持久邮箱。
Router
当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router。它所管理的一些目标actor叫做routees
Akka定义好的一些Router:
akka.routing.RoundRobinRouter:轮转路由器将消息按照轮转顺序发送给routers
akka.routing.RandomRouter:随机路由器随机选择一个router,并将消息发送给这个router
akka.routing.SmallestMailboxRouter:最小邮箱路由器会在routers中选择邮箱里信息最少的router,然后把消息发送给它。
akka.routing.BroadcastRouter:广播路由器将相同的消息发送给所有的routers
akka.routing.ScatterGatherFirstCompletedRouter:敏捷路由器先将消息广播到所有routers,返回最先完成任务的router的结果给调用者。
路由器的使用
RoundRobinPool 和 RoundRobinGroupRouter对routees使用轮询机制
RandomPool 和 RandomGroupRouter随机选择routees发送消息
BalancingPool尝试从繁忙的routee重新分配任务到空闲routee,所有的routee共享一个mailbox
SmallestMailboxPoolRouter创建的所有routees中谁邮箱中的消息最少发给谁
BroadcastPool 和 BroadcastGroup广播的路由器将接收到的消息转发到它所有的routee。
ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup将消息发送给所有的routees,然后等待到收到第一个回复,将结果发送回原始发送者。其他的回复将被丢弃
TailChoppingPool 和 TailChoppingGroup将首先发送消息到一个随机挑取的routee,短暂的延迟后发给第二个routee(从剩余的routee中随机挑选),以此类推。它等待第一个答复,并将它转回给原始发送者。其他答复将被丢弃。此Router的目标是通过查询到多个routee来减少延迟,假设其他的actor可能比第一个actor更快响应。
ConsistentHashingPool 和 ConsistentHashingGroup对消息使用一致性哈希(consistent hashing)选择routee
有三种方式定义哪些数据作为一致性哈希键
定义路由的hashMapping,将传入的消息映射到它们一致哈希键。这使决策对发送者透明。·
这些消息可能会实现ConsistentHashable。键是消息的一部分,并很方便地与消息定义一起定义。·
消息可以被包装在一个ConsistentHashableEnvelope中,来定义哪些数据可以用来做一致性哈希。发送者知道要使用的键。
路由器的使用要先创建路由器后使用。 AKKA的路由由router和众多的routees组成,router和routees都是actor.Router即路由,是负责负载均衡和路由的抽象,有两种方法来创建router:
1.Actor Group
2.Actor Pool
当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router。它所管理的一些目标actor叫做routees
根据不同的情况需要,Akka提供了几种路由策略。当然也可以创建自己的路由及策略。Akka提供的路由策略如下:
akka.routing.RoundRobinRoutingLogic 轮询
akka.routing.RandomRoutingLogic 随机
akka.routing.SmallestMailboxRoutingLogic 空闲
akka.routing.BroadcastRoutingLogic 广播
akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集
akka.routing.TailChoppingRoutingLogic 尾部断续
akka.routing.ConsistentHashingRoutingLogic 一致性哈希
创建Router Actor
创建router actor 有两种方式:
Pool(池)——routees都是router 的子actor,如果routees终止,router将把它们移除
Group(群组)——routees都创建在router的外部,router通过使用actor来选择将消息发送到指定路径,但不监管routees是否终止。Router actor 向 routees 发送消息,与向普通actor发送消息一样通过其ActorRef。Router actor 不会改变消息的发送人,routees 回复消息时发送回原始发件人,而不是Router actor。
Pool(池)可以通过配置并使用代码在配置中获取的方法来实现 (例如创建一个轮询Router向5个routees发送消息)
Group(群组)有时我们需要单独地创建routees,然后提供一个Router来供其使用。可以通过将routees的路径传递给Router的配置,消息将通过ActorSelection来发送到这些路径。
有两种方式创建路由器:
Pool(池)
import akka.actor._ import akka.routing.{ActorRefRoutee, FromConfig, RoundRobinGroup, RoundRobinPool, RoundRobinRoutingLogic, Router} object HelloScala { def main(args: Array[String]): Unit = { // 创建router val _system = ActorSystem("testRouter") // 通知代码来实现路由器 val hahaRouter = _system.actorOf(RoundRobinPool(5).props(Props[WorkerRoutee]),"router111") hahaRouter ! RouteeMsg(333) val myRouter = _system.actorOf(Props[WorkerRoutee].withRouter(RoundRobinPool(nrOfInstances = 5))) myRouter ! RouteeMsg(22) val masterRouter = _system.actorOf(Props[MasterRouter],"masterRouter") masterRouter ! RouteeMsg(100) } } class MasterRouter extends Actor{ var masterRouter = { val routees = Vector.fill(3){ val r = context.actorOf(Props[WorkerRoutee]) context watch r ActorRefRoutee(r) } Router(RoundRobinRoutingLogic(),routees) } override def receive: Receive = { case w: RouteeMsg => masterRouter.route(w,sender()) case Terminated(a) => masterRouter = masterRouter.removeRoutee(a) val r = context.actorOf(Props[WorkerRoutee]) context watch r masterRouter = masterRouter.addRoutee(r) } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class WorkerRoutee2 extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#@@@@@$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}") } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
Group(群组)
import akka.actor._ import akka.routing.{ RoundRobinGroup} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! RouteeMsg(13333) } } class TestActor extends Actor{ val routee1 = context.actorOf(Props[WorkerRoutee],"w1") val routee2 = context.actorOf(Props[WorkerRoutee],"w2") val routee3 = context.actorOf(Props[WorkerRoutee],"w3") val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter") override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case _ => } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}") } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
特殊消息
Broadcast消息用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。
PoisonPill消息无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。但仍然能影响到routees,因为Router停止时它的子actor也会停止,就可能会造成消息未处理。因此我们可以将PoisonPill包装到Broadcast消息中。这样Router所管理的所有routees将会处理完消息后再处理PoisonPill并停止。
Kill消息当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。Router将抛出ActorKilledException并失败,然后Router根据监管的策略,被恢复、重启或终止。Router的子routee也将被暂停,也受Router监管的影响,但是独立在Router外部创建的routee将不会被影响。
import akka.actor._ import akka.routing.{Broadcast, RoundRobinGroup} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! PoisonPill } } class TestActor extends Actor{ val routee1 = context.actorOf(Props[WorkerRoutee],"w1") val routee2 = context.actorOf(Props[WorkerRoutee],"w2") val routee3 = context.actorOf(Props[WorkerRoutee],"w3") val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter") override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case RouteeBroadcast => testRouter ! Broadcast // 用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。 case Broadcast => println("TestActor receive a broadcast message") case Kill => testRouter ! Kill// 当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。 case PoisonPill => testRouter ! PoisonPill // 无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。 case _ => } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) // 定义广播信息 case object RouteeBroadcast class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}") } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
远程部署Router
既可以创建本地actor来作为Router,也可以命令Router在任一远程主机上部署子actor。需要将路由配置放在RemoteRouterConfig下,在远程部署的路径类中要添加akka-remote模块:
import akka.actor._ import akka.remote.routing.{RemoteRouterConfig} import akka.routing.{Broadcast,RoundRobinPool} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val addresses = Seq( Address("akka.tcp","remotesys","otherhost",6666), AddressFromURIString("akka.tcp://othersys@anotherhost:6666") ) // WorkerRoutee 路由部署到远程的主机上 val routerRemote = _system.actorOf(RemoteRouterConfig(RoundRobinPool(5),addresses).props(Props[WorkerRoutee])) } } // 定义routee对应的actor类型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}") } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
NAT 任务调度
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。