java并发编程艺术

网友投稿 531 2022-05-30

参考博客 :https://blog.csdn.net/LQL_King/article/details/77146647

Volatile

理解volatile特性的一个好方法是把对volatile变量的单个读/写,看成是使用同一个锁对这些单个读/写操作做了同步。下面通过具体的示例来说明,示例代码如下。

class VolatileFeaturesExample { volatile long vl = 0L; // 使用volatile声明64位的long型变量 public void set(long l) { vl = l; // 单个volatile变量的写 } public void getAndIncrement () { vl++; // 复合(多个)volatile变量的读/写 } public long get() { return vl; // 单个volatile变量的读 } }

假设有多个线程分别调用上面程序的3个方法,这个程序在语义上和下面程序等价。

class VolatileFeaturesExample { long vl = 0L; // 64位的long型普通变量 public synchronized void set(long l) { // 对单个的普通变量的写用同一个锁同步 vl = l; } public void getAndIncrement () { // 普通方法调用 long temp = get(); // 调用已同步的读方法 temp += 1L; // 普通写操作 set(temp); // 调用已同步的写方法 } public synchronized long get() { // 对单个的普通变量的读用同一个锁同步 return vl; } }

volatile变量自身具有下列特性。

·可见性。对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写

入。

·原子性:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不

具有原子性。

对于普通同步方法,锁是当前实例对象。

·对于静态同步方法,锁是当前类的Class对象。

·对于同步方法块,锁是Synchonized括号里配置的对象。

当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁

Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。

举个例子,定义一个表示程序是否运行的成员变量boolean on=true,那么另一个线程可能对它执行关闭动作(on=false),这里涉及多个线程对变量的访问,因此需要将其定义成为volatile boolean on=true,这样其他线程对它进行改变时,可以让所有线程感知到变化,因为所有对on变量的访问和修改都需要以共享内存为准。但是,过多地使用volatile是不必要的,因为它会降低程序执行的效率。关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。

wait()  /  notify()

调用wait()、notify()以及notifyAll()时需要注意的细节,如下。

1)使用wait()、notify()和notifyAll()时需要先对调用对象加锁。

2)调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。

3)notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。

4)notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。

5)从wait()方法返回的前提是获得了调用对象的锁。

管道输入/输出流

管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。

管道输入/输出流主要包括了如下4种具体实现:PipedOutputStream、PipedInputStream、PipedReader和PipedWriter,前两种面向字节,而后两种面向字符。

在代码清单4-12所示的例子中,创建了printThread,它用来接受main线程的输入,任何main线程的输入均通过PipedWriter写入,而printThread在另一端通过PipedReader将内容读出并打印。

public class Piped {  public static void main(String[] args) throws Exception {   PipedWriter out = new PipedWriter();   PipedReader in = new PipedReader();   // 将输出流和输入流进行连接,否则在使用时会抛出IOException   out.connect(in);      Thread printThread = new Thread(new Print(in), "PrintThread");   printThread.start();   int receive = 0;   try {    while ((receive = System.in.read()) != -1) {     out.write(receive);    }   } finally {    out.close();   }  }  static class Print implements Runnable {   private PipedReader in;   public Print(PipedReader in) {    this.in = in;   }   public void run() {    int receive = 0;    try {     while ((receive = in.read()) != -1) {      System.out.print((char) receive);     }    } catch (IOException ex) {    }   }  } }

对于Piped类型的流,必须先要进行绑定,也就是调用connect()方法,如果没有将输入/输出流绑定起来,对于该流的访问将会抛出异常。

LOCK(重入锁与读写锁)

Lock lock = new ReentrantLock(); //重入锁(可多次上锁) lock.lock(); try { } finally { lock.unlock(); }

在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。

不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放。

表5-1 Lock接口提供的synchronized关键字不具备的主要特性

之前提到锁(如Mutex和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。

除了保证写操作对读操作的可见性以及并发性的提升之外,读写锁能够简化读写交互场景的编程方式。假设在程序中定义一个共享的用作缓存数据结构,它大部分时间提供读服务(例如查询和搜索),而写操作占有的时间很少,但是写操作完成之后的更新需要对后续的读服务可见。

在没有读写锁支持的(Java 5之前)时候,如果需要完成上述工作就要使用Java的等待通知机制,就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行(写操作之间依靠synchronized关键进行同步),这样做的目的是使读操作能读取到正确的数据,不会出现脏读。改用读写锁实现上述功

能,只需要在读操作时获取读锁,写操作时获取写锁即可。当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行,编程方式相对于使用等待通知机制的实现方式而言,变得简单明了。

一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是ReentrantReadWriteLock,它提供的特性如表5-8所示。

当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作。LockSupport定义了一组的公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而LockSupport也成为构建同步组件的基础工具。

LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程。Park有停车的意思,假设线程为车辆,那么park方法代表着停车,而unpark方法则是指车辆启动离开,这些方法以及描述如表5-10所示。

表5-10 LockSupport提供的阻塞和唤醒方法

在Java 6中,LockSupport增加了park(Object blocker)、parkNanos(Object blocker,long nanos)和parkUntil(Object blocker,long deadline)3个方法,用于实现阻塞当前线程的功能,其中参数blocker是用来标识当前线程在等待的对象(以下称为阻塞对象),该对象主要用于问题排查和系统监控。

下面的示例中,将对比parkNanos(long nanos)方法和parkNanos(Object blocker,long nanos)方法来展示阻塞对象blocker的用处,代码片段和线程dump(部分)如表5-11所示。

从表5-11的线程dump结果可以看出,代码片段的内容都是阻塞当前线程10秒,但从线程dump结果可以看出,有阻塞对象的parkNanos方法能够传递给开发人员更多的现场信息。这是由于在Java 5之前,当线程阻塞(使用synchronized关键字)在一个对象上时,通过线程dump能够查看到该线程的阻塞对象,方便问题定位,而Java 5推出的Lock等并发工具时却遗漏了这一

点,致使在线程dump时无法提供阻塞对象的信息。因此,在Java 6中,LockSupport新增了上述3个含有阻塞对象的park方法,用以替代原有的park方法。

Condition

public class BoundedQueue { private Object[] items; // 添加的下标,删除的下标和数组当前数量 private int addIndex, removeIndex, count; private Lock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public BoundedQueue(int size) { items = new Object[size]; } // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位" public void add(T t) throws InterruptedException { lock.lock();  try { while (count == items.length) notFull.await(); items[addIndex] = t; if (++addIndex == items.length) addIndex = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素 @SuppressWarnings("unchecked") public T remove() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[removeIndex]; if (++removeIndex == items.length) removeIndex = 0; --count; notFull.signal(); return (T) x; } finally { lock.unlock(); } } }

上述示例中,BoundedQueue通过add(T t)方法添加一个元素,通过remove()方法移出一个元素。以添加方法为例。

首先需要获得锁,目的是确保数组修改的可见性和排他性。当数组数量等于数组长度时,表示数组已满,则调用notFull.await(),当前线程随之释放锁并进入等待状态。如果数组数量不等于数组长度,表示数组未满,则添加元素到数组中,同时通知等待在notEmpty上的线程,数组中已经有新元素可以获取。在添加和删除方法中使用while循环而非if判断,目的是防止过早或意外的通知,只有条件符合才能够退出循环。回想之前提到的等待/通知的经典范式,二者是非常类似的。

Java里的阻塞队列

JDK 7提供了7个阻塞队列,如下。

·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。

·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。

·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

·DelayQueue:一个使用优先级队列实现的无界阻塞队列。

·SynchronousQueue:一个不存储元素的阻塞队列。

·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

1.ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。

默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列。

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true); 访问者的公平性是使用可重入锁实现的,代码如下。 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }

2.LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

3.PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

4.DelayQueue

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。

·缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询

DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

·定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从

DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

(1)如何实现Delayed接口

DelayQueue队列的元素必须实现Delayed接口。我们可以参考ScheduledThreadPoolExecutor

里ScheduledFutureTask类的实现,一共有三步。

第一步:在对象创建的时候,初始化基本数据。使用time记录当前对象延迟到什么时候可

以使用,使用sequenceNumber来标识元素在队列中的先后顺序。代码如下。

private static final AtomicLong sequencer = new AtomicLong(0); ScheduledFutureTask(Runnable r, V result, long ns, long period) { ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); }

第二步:实现getDelay方法,该方法返回当前元素还需要延时多长时间,单位是纳秒,代码

如下。

public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); }

通过构造函数可以看出延迟时间参数ns的单位是纳秒,自己设计的时候最好使用纳秒,因

为实现getDelay()方法时可以指定任意单位,一旦以秒或分作为单位,而延时时间又精确不到

纳秒就麻烦了。使用时请注意当time小于当前时间时,getDelay会返回负数。

第三步:实现compareTo方法来指定元素的顺序。例如,让延时时间最长的放在队列的末

尾。实现代码如下。

public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<> x = (ScheduledFutureTask<>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) 0 : ((d < 0) -1 : 1); }

(2)如何实现延时阻塞队列

延时阻塞队列的实现很简单,当消费者从队列里获取元素时,如果元素没有达到延时时

间,就阻塞当前线程。

public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<> x = (ScheduledFutureTask<>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) 0 : ((d < 0) -1 : 1); }

代码中的变量leader是一个等待获取队列头部元素的线程。如果leader不等于空,表示已

经有线程在等待获取队列的头元素。所以,使用await()方法让当前线程等待信号。如果leader

等于空,则把当前线程设置成leader,并使用awaitNanos()方法让当前线程等待接收信号或等

待delay时间。

5.SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,

否则不能继续添加元素。

它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法

可以创建公平性访问的SynchronousQueue,如果设置为true,则等待的线程会采用先进先出的

顺序访问队列。

public SynchronousQueue(boolean fair) { transferer = fair new TransferQueue() : new TransferStack(); }

SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费

者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于

LinkedBlockingQueue和ArrayBlockingQueue。

6.LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻

塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

(1)transfer方法

如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法

时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等

待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返

回。transfer方法的关键代码如下。

Node pred = tryAppend(s, haveData);

return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行代码是试图把存放当前元素的s节点作为tail节点。第二行代码是让CPU自旋等待

消费者消费元素。因为自旋会消耗CPU,所以自旋一定的次数后使用Thread.yield()方法来暂停

当前正在执行的线程,并执行其他线程。

(2)tryTransfer方法

tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等

待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法

立即返回,而transfer方法是必须等到消费者消费了才返回。

对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传入

的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超

时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

java并发编程的艺术

7.LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以

从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队

时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、

addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、

获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双

端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于

removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First

和Last后缀的方法更清楚。

在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以

运用在“工作窃取”模式中。

线程池

java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,

还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。

ThreadPoolExecutor执行execute方法分下面4种情况。

1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用

RejectedExecutionHandler.rejectedExecution()方法。

想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

·任务的性质:CPU密集型任务、IO密集型任务和混合型任务。

·任务的优先级:高、中和低。

·任务的执行时间:长、中和短。

·任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如配置N cpu +1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*N cpu 。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务.

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性。

·taskCount:线程池需要执行的任务数量。

·completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。

·largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是

否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。

·getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销

毁,所以这个大小只增不减。

·getActiveCount:获取活动的线程数。

多线程 Java

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:【云驻共创】华为云内容审核服务
下一篇:【云驻共创】华为云数据库之走进GaussDB数据库
相关文章