java 并发编程学习笔记(六)之 AQS (AbstractQueuedSynchronizer)

网友投稿 563 2022-05-30

AQS

(1)aqs

使用node实现fifo队列,可以用于构建锁或者其他的同步装置的基础框架

利用了一个int类型表示状态

使用方法是继承

子类通过继承并通过实现它的方法管理其状态{acquire 和 release}

可以同时实现排他锁和共享锁模式(独占,共享)

(2)CountDownLatch

/**

* 一般用于 当主线程需要等待 子线程执行完成之后 再执行的场景

* * 线程名称 子线程结束

* * thread1 ---------------------- end |

* * thread2 ---------------------- end | 主线程 主线程结束

* * thread3 ---------------------- end | --------------------- end

* * thread4 ---------------------- end |

* * thread5 ---------------------- end |

* *

*

*/

@Slf4j

public class CountDownLatchExample1 {

private static int threadCount = 200;

public static void main(String[] args) throws InterruptedException {

ExecutorService service = Executors.newCachedThreadPool();

final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {

final int threadNum = i;

service.execute(() -> {

try {

test(threadNum);

} catch (InterruptedException e) {

log.error("exception",e);

} finally {

countDownLatch.countDown();

}

});

}

// countDownLatch.await();

countDownLatch.await(10, TimeUnit.MILLISECONDS); //超时等待

log.info("finish");

service.shutdown();

}

private static void test(int threadNum) throws InterruptedException {

log.info("{}", threadNum);

Thread.sleep(100);

}

}

(3)Semaphore

/**

* 一般用于控制同一时刻运行的线程数量

* | |

* ---------|----- |---------

* | |

* ---------|----- |---------

* | |

*/

@Slf4j

public class SemaphoreExample1 {

private static int threadCount = 20;

public static void main(String[] args) throws InterruptedException {

ExecutorService service = Executors.newCachedThreadPool();

final Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < threadCount; i++) {

final int threadNum = i;

service.execute(() -> {

try {

//semaphore.acquire(); 每次拿一个许可证

//semaphore.acquire(3); 每次拿三个许可证

semaphore.tryAcquire(3);

semaphore.tryAcquire(1,TimeUnit.SECONDS); //尝试一秒之内获取许可

semaphore.tryAcquire(3,1,TimeUnit.SECONDS);

if(semaphore.tryAcquire()) { //尝试获取许可证 ,没有获取到直接将当前线程丢弃

test(threadNum);

semaphore.release();

}else {

log.info(Thread.currentThread().getName()+"我没有拿到许可证,┭┮﹏┭┮");

}

} catch (InterruptedException e) {

log.error("exception", e);

}

});

}

service.shutdown();

}

private static void test(int threadNum) throws InterruptedException {

log.info("{}", threadNum);

Thread.sleep(1000);

}

}

(4)CyclicBarrier

/**

* 一般用于多个线程之间相互等待,当全部都到达某个屏障点的时候在,继续执行每个线程,并且可以重复循环使用

* 线程名称 某个屏障点 终点

* thread1 ------------| ---------- end

* thread2 ------------| ---------- end

* thread3 ------------| ---------- end

* thread4 ------------| ---------- end

* thread5 ------------| ---------- end

*

*/

@Slf4j

public class CyclicBarrierExample1 {

// private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() -> {

log.info("五个线程都已经准备就绪");

});

public static void main(String[] args) throws InterruptedException {

ExecutorService service = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {

final int threadNum = i;

Thread.sleep(1000);

service.execute(() -> {

try {

race(threadNum);

} catch (InterruptedException | BrokenBarrierException e) {

e.printStackTrace();

}

});

}

service.shutdown();

}

private static void race(int threadNum) throws InterruptedException, BrokenBarrierException {

Thread.sleep(1000);

log.info("{} is ready", threadNum);

cyclicBarrier.await();

try {

// cyclicBarrier.await(1, TimeUnit.SECONDS);

} catch (Exception e) {

// e.printStackTrace();

}

log.info("{} continue", threadNum);

}

}

(5)锁

锁的简单使用:

/**

* class Point {

* * private double x, y;

* * private final StampedLock sl = new StampedLock();

* *

* * void move(double deltaX, double deltaY) { // an exclusively locked method

* * long stamp = sl.writeLock();

* * try {

* * x += deltaX;

* * y += deltaY;

* * } finally {

* * sl.unlockWrite(stamp);

* * }

* * }

* *

* * double distanceFromOrigin() { // A read-only method

* * long stamp = sl.tryOptimisticRead();

* * double currentX = x, currentY = y;

* * if (!sl.validate(stamp)) {

* * stamp = sl.readLock();

* * try {

* * currentX = x;

* * currentY = y;

* * } finally {

* * sl.unlockRead(stamp);

* * }

* * }

* * return Math.sqrt(currentX * currentX + currentY * currentY);

* * }

* *

* * void moveIfAtOrigin(double newX, double newY) { // upgrade

* * // Could instead start with optimistic, not read mode

* * long stamp = sl.readLock();

* * try {

* * while (x == 0.0 && y == 0.0) {

* * long ws = sl.tryConvertToWriteLock(stamp);

* * if (ws != 0L) {

* * stamp = ws;

* * x = newX;

* * y = newY;

* * break;

* * }

* * else {

* * sl.unlockRead(stamp);

* * stamp = sl.writeLock();

* * }

* * }

* * } finally {

* * sl.unlock(stamp);

* * }

* * }

* * }}

* *

*/

@Slf4j

public class LockExample1 {

private static int clientTotal = 5000;

private static int threadTotal = 200;

private static int count = 0;

//重入锁

private final static Lock lock = new ReentrantLock();

//重入读写锁

private final static Map map = new TreeMap();

private final static ReentrantReadWriteLock reenLock = new ReentrantReadWriteLock();

private final static Lock readLock = reenLock.readLock();

private final static Lock writeLock = reenLock.writeLock();

//stamped锁

private final static StampedLock stampedLock = new StampedLock();

//condition

private final static ReentrantLock REENTRANT_LOCK = new ReentrantLock();

private final static Condition condition = REENTRANT_LOCK.newCondition();

//重入读写锁的使用

public Integer getValue(Integer key) {

readLock.lock();

Integer value = null;

try {

value = map.get(key);

} catch (Exception e) {

e.printStackTrace();

} finally {

readLock.unlock();

}

return value;

}

//重入读写锁的使用

public Set getSet() {

Set set = null;

readLock.lock();

try {

set = map.keySet();

} catch (Exception e) {

e.printStackTrace();

} finally {

readLock.unlock();

}

return set;

}

/**

* 重入读写锁的使用

* @param key

* @param value

* @return

*/

public Integer put(Integer key, Integer value) {

writeLock.lock();

try {

map.put(key, value);

} catch (Exception e) {

e.printStackTrace();

} finally {

writeLock.unlock();

}

return value;

}

/**

* condition的使用

*/

public static void testCond(){

new Thread(() -> {

try {

REENTRANT_LOCK.lock();

log.info("运动员获取锁");

condition.await();

log.info("运动员接收到信号,比赛开始~~~~");

}catch (Exception e){

e.printStackTrace();

}finally {

REENTRANT_LOCK.unlock();

}

}).start();

new Thread(() -> {

try {

REENTRANT_LOCK.lock();

log.info("裁判获取锁");

Thread.sleep(3000);

log.info("裁判发送信号");

condition.signalAll();

}catch (Exception e){

e.printStackTrace();

}finally {

REENTRANT_LOCK.unlock();

}

}).start();

}

public static void main(String[] args) throws InterruptedException {

ExecutorService service = Executors.newCachedThreadPool();

final Semaphore semaphore = new Semaphore(threadTotal);

final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

for (int i = 0; i < clientTotal; i++) {

service.execute(() -> {

try {

semaphore.acquire();

add();

semaphore.release();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

countDownLatch.countDown();

}

});

}

countDownLatch.await();

service.shutdown();

log.info("count {}", count);

testCond();

}

/**

* stampedLock的使用方式

*/

private static void add() {

long stamp = stampedLock.writeLock();

try {

count++;

} catch (Exception e) {

} finally {

stampedLock.unlock(stamp);

}

}

}

同步锁synchronized不是JUC中的锁但也顺便提下,它是由synchronized 关键字进行同步,实现对竞争资源互斥访问的锁。

同步锁的原理:对于每一个对象,有且仅有一个同步锁;不同的线程能共同访问该同步锁。在同一个时间点该同步锁能且只能被一个线程获取到,其他线程都得等待。

另外:synchronized是Java中的关键字且是内置的语言实现;它是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁定;synchronized等待的线程会一直等待下去,不能响应中断。

重入锁ReentrantLock,顾名思义:就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。另外该锁孩纸获取锁时的公平和非公平性选择,所以它包含公平锁与非公平锁(它们两也可以叫可重入锁)。首先提出两个疑问:它怎么实现重进入呢?释放逻辑还跟AQS中一样吗?

ReentrantLock 独有的功能

可指定是公平锁还是非公共锁,公平锁就是 先等待的线程先获得锁

提供了一个condition类,可以分组唤醒需要的线程,

提供了能够中断等待锁的线程机制,lock.lockInterruptibly()

非公平锁

final boolean nonfairTryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) {

if (compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

// 同步状态已经被其他线程占用,则判断当前线程是否与被占用的线程是同一个线程,如果是同一个线程则允许获取,并state+1

else if (current == getExclusiveOwnerThread()) {

int nextc = c + acquires;

if (nextc < 0) // overflow

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功。如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。

protected final boolean tryRelease(int releases) {

int c = getState() - releases;

if (Thread.currentThread() != getExclusiveOwnerThread())

throw new IllegalMonitorStateException();

boolean free = false;

if (c == 0) {

free = true;

setExclusiveOwnerThread(null);

}

setState(c);

return free;

}

上面代码是释放锁的代码。如果该锁被获取了n次,那么前(n-1)次都是返回false,直至state=0,将占有线程设置为null,并返回true,表示释放成功。

公平锁

公平锁与非公平锁有啥区别呢? 还是从源码中分析吧。

protected final boolean tryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) {

// 区别:增加判断同步队列中当前节点是否有前驱节点的判断

if (!hasQueuedPredecessors() &&

compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

// 一样支持重入

else if (current == getExclusiveOwnerThread()) {

int nextc = c + acquires;

if (nextc < 0)

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

与非公平锁的唯一不同就是增加了一个判断条件:判断同步队列中当前节点是否有前驱节点的判断,如果方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

公平锁与非公平锁的区别

从上面源码中得知,公平性锁保证了锁的获取按照FIFO原则,但是代价就是进行大量的线程切换。而非公平性锁,可能会造成线程“饥饿”(不会保证先进来的就会先获取),但是极少线程的切换,保证了更大的吞吐量。下面我们看下案例:

import org.junit.Test;

import java.util.*;

import java.util.concurrent.*;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

public class FairAndUnfairTest {

private static Lock fairLock = new ReentrantLock2(true);

private static Lock unFairLock = new ReentrantLock2(false);

@Test

public void fair() throws Exception{

testLock(fairLock);

}

@Test

public void unFairLock() throws Exception{

testLock(unFairLock);

}

private static void testLock(Lock lock) throws InterruptedException, ExecutionException {

java 并发编程学习笔记(六)之 AQS (AbstractQueuedSynchronizer)

ExecutorService threadPool = Executors.newFixedThreadPool(5);

List> list = new ArrayList<>();

for (int i = 0 ; i < 5; i++) {

Future future = threadPool.submit(new Job(lock));

list.add(future);

}

long cost = 0;

for (Future future : list) {

cost += future.get();

}

// 查看五个线程所需耗时的时间

System.out.println("cost:" + cost + " ms");

}

private static class Job implements Callable {

private Lock lock;

public Job(Lock lock) {

this.lock = lock;

}

@Override

public Long call() throws Exception {

long st = System.currentTimeMillis();

// 同一线程获取100锁

for (int i =0; i < 100; i ++) {

lock.lock();

try {

System.out.println("Lock by[" + Thread.currentThread().getId() + "]," +

"Waiting by[" + printThread(((ReentrantLock2)lock).getQueuedThreads()) + "]");

} catch (Exception e) {

e.printStackTrace();

} finally {

lock.unlock();

}

}

// 返回100次所需的时间

return System.currentTimeMillis() - st;

}

private String printThread(Collection list) {

StringBuilder ids = new StringBuilder();

for (Thread t : list) {

ids.append(t.getId()).append(",");

}

return ids.toString();

}

}

private static class ReentrantLock2 extends ReentrantLock {

public ReentrantLock2(boolean fair) {

super(fair);

}

public Collection getQueuedThreads() {

List arrayList = new ArrayList<>(super.getQueuedThreads());

Collections.reverse(arrayList);

return arrayList;

}

}

}

非公平性锁的测试结果,cost:117 ms

公平性锁的测试结果,cost:193 ms

读写锁

读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁(同一时刻只允许一个线程进行访问)有了很大的提升。

下面我们看下它有啥特性:

特性

说明

公平性选择

可重入

该锁支持可重进入。

读线程在获取了读锁之后能够再次获取读锁。

写线程在获取了写锁之后能够再次获取写锁。

锁降级

遵循获取写锁、获取读锁在释放写锁的次序,写锁能够降级成读锁。

排他性

当写线程访问时,其他读写线程均被阻塞

另外读写锁是采取一个整型变量来维护多种状态。高16位表示读,低16位表示写。

// 偏移位

static final int SHARED_SHIFT = 16;

static final int SHARED_UNIT = (1 << SHARED_SHIFT);

// 读写线程允许占用的最大数

static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

// 独占标志

static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

下面从源码中找出这些特性,先看下写锁的实现:

1 protected final boolean tryAcquire(int acquires) {

2

3 Thread current = Thread.currentThread();

4 int c = getState();

5 // 表示独占个数,也就是与低16位进行与运算。

6 int w = exclusiveCount(c);

7 if (c != 0) {

8 // c!=0 且 w==0表示不存在写线程,但存在读线程

9 if (w == 0 || current != getExclusiveOwnerThread())

10 return false;

11 if (w + exclusiveCount(acquires) > MAX_COUNT)

12 throw new Error("Maximum lock count exceeded");

13 /**

14 * 获取写锁的条件:

15 * 不能存在读线程且当前线程是当前占用锁的线程(这里体现可重入性和排他性);

16 * 当前占用锁的次数不能超过最大数

17 */

18 setState(c + acquires);

19 return true;

20 }

21 if (writerShouldBlock() ||

22 !compareAndSetState(c, c + acquires))

23 return false;

24 setExclusiveOwnerThread(current);

25 return true;

26 }

27 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

获取读锁源码如下:

protected final int tryAcquireShared(int unused) {

Thread current = Thread.currentThread();

int c = getState();

/**

* exclusiveCount(c) != 0: 表示有写线程在占用

* getExclusiveOwnerThread() != current : 当前占用锁的线程不是当前线程。

* 如果上面两个条件同时满足,则获取失败。

* 上面表明如果当前线程是拥有写锁的线程可以获取读锁(体现可重入和锁降级)。

*/

if (exclusiveCount(c) != 0 &&

getExclusiveOwnerThread() != current)

return -1;

int r = sharedCount(c);

if (!readerShouldBlock() &&

r < MAX_COUNT &&

compareAndSetState(c, c + SHARED_UNIT)) {

if (r == 0) {

firstReader = current;

firstReaderHoldCount = 1;

} else if (firstReader == current) {

firstReaderHoldCount++;

} else {

HoldCounter rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current))

cachedHoldCounter = rh = readHolds.get();

else if (rh.count == 0)

readHolds.set(rh);

rh.count++;

}

return 1;

}

return fullTryAcquireShared(current);

}

Java 任务调度

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:《MySQL必懂系列》全局锁、表级锁、行锁
下一篇:【云驻共创】华为云数据库之大数据入门与应用(下)
相关文章