Akka事务STM

网友投稿 552 2022-05-29

一个事务必须具有以下四个特点,即所谓的ACID特性:

原子性:所有的操作要么全部成功,要么全部失败。

一致性:在事务完成后,系统保持一致性状态。

隔离性:在一个事务成功或失败前,产生的数据对于系统中的其他事务是不见的。

持久性:事务操作的结果要持久化保存。

Akka使用(Software Transactional Memory)软件事务内存来实现事务。这是一种多线程之间数据共享的同步机制。对于并行计算编程而言,只要将线程中需要 访问共享内存的关键逻辑 部分划分出来封装到一个事务中即可。

传统的保护共享数据的方法就是加同步锁。在java或Scala中以synchronized同步代码块的形式来实现:

var seats = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) val reservedSeat = seats.synchronized {// seats的synchronized构建临界区 val head = seats.head// 取出第一个元素 seats = seats.tail// 把除取出的第一个元素外的元素重新赋值给synchronized head }

1

2

3

4

5

6

7

所有的线程在synchronized块上面是依次执行的,保证了在同一时刻只有一个线程访问共享变量,确保了共享变量的一致性。但是如果有线程只是想去读取共享变量,而不是要去修改时,遇到synchronized块也是要等待的,这样会降低了系统的整体性能,这种锁,叫做“悲观锁”,这种锁它会假设在任何时候都可能会有线程去修改共享变量。

相应的就会有**“乐观锁”**,乐观锁认为在访问和修改共享变量时,都不会产生任何问题。因此在执行代码时不会有任何锁。在乐观锁的实现中,当线程离开了临界区时,系统会检测可能的更新冲突,如果检测不到更新冲突,那么就直接提交事务,如果检测到有冲突发生,那么所有的改变都会回滚并尝试重新执行临界区代码。

STM使用的是乐观锁。Akka通过将共享变量包装到STM的引用中,检测共享数据在事务中是否已经发生改变,从而能防止因多线程访问共享变量造成的数据不一致的问题。

要使用STM必须在build.sbt中加入:

libraryDependencies += "com.typesafe.akka" %% "akka-agent" % "2.5.23"

1

未例代码:

import concurrent.stm._ object HelloScala { def main(args: Array[String]): Unit = { // 使用Ref包裹变量 val mySeq = Seq[Int](1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) val seats = Ref(mySeq) // 在atomic块中使用Ref变量示例 val getSeat = atomic{implicit txt => val head = seats().head// 取出第一个值 // 共享变量 seats() = seats().tail// 重新赋值 head } println(getSeat) } }

1

2

3

4

5

6

7

Akka的事务STM

8

9

10

11

12

13

14

15

16

17

18

上面的seats变量,只能够中atomic块中使用,在atomic块的代码将被视为一个原子命令被执行。在编译时,atomic块需要一个隐式变量如上述的txt来为Ref中手冲突做检测。上述代码跟synchronized做的是同样的事情,但是工作机制完全不同。STM使用的是乐观锁,当atomic块执行完成后,有一个检查将会执行,这个操作就是去检查是否有冲突发生。ACID乐观锁实现了三个,没有实现持久化,因STM都是发生在内存中,内存中的事务永远都不会持久化。面使用synchronized的临界区只有执行一次,使用的是悲观锁。

有时候,我们只想读取共享变量,而不做任何改变。我们就可以使用Ref.View来读取共享变量来提高性能:

println(seats.single.get)// 得到seats视图,调用视图上的get方法来获取值 println(seats.single.get.head) println(seats.single.get)

1

2

3

读取Agent事务中的数据

Akka中的Agent和Actor都是基于STM来处理事务的。akka的Agent提供了一个独立于位置的异步操作,所有对Agent的操作都是异步的。

import akka.agent.Agent import scala.concurrent.{Future} import scala.concurrent.ExecutionContext.Implicits.global case class Seat(var a: Int) object HelloScala { def main(args: Array[String]): Unit = { val agent = Agent(50)// 创建Agent println(agent())// 使用agent()方式读取Agent数据 println(agent.get)// 使用get()方式读取Agent数据 agent send(888)// 使用send修改Agent的值 val future1 :Future[Int] = agent alter(_+100)// 修改Agent数据 println(future1 foreach println) } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

任务调度

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:虚拟机安装VMware Tools仍旧不能复制粘贴的解决方法--共享文件夹
下一篇:高并发编程-通过volatile重新认识CPU缓存 和 Java内存模型(JMM)
相关文章