写出正确程序第一步是掌握这些并发编程缺陷

网友投稿 700 2022-05-29

多年来,研究人员花了大量的时间和精力研究并发编程的缺陷。很多早期的工作是关于死锁的,之前的章节也有提及,本章会深入学习[C+71]。最近的研究集中在一些其他类型的常见并发缺陷(即非死锁缺陷)。在本章中,我们会简要了解一些并发问题的例子,以便更好地理解要注意什么问题。因此,本章的关键问题就是:

关键问题:如何处理常见的并发缺陷

并发缺陷会有很多常见的模式。了解这些模式是写出健壮、正确程序的第一步。

32.1 有哪些类型的缺陷

第一个最明显的问题就是:在复杂并发程序中,有哪些类型的缺陷呢?一般来说,这个问题很难回答,好在其他人已经做过相关的工作。具体来说,Lu等人[L+08]详细分析了一些流行的并发应用,以理解实践中有哪些类型的缺陷。

研究集中在4个重要的开源应用:MySQL(流行的数据库管理系统)、Apache(著名的Web服务器)、Mozilla(著名的Web浏览器)和OpenOffice(微软办公套件的开源版本)。研究人员通过检查这几个代码库已修复的并发缺陷,将开发者的工作变成量化的缺陷分析。理解这些结果,有助于我们了解在成熟的代码库中,实际出现过哪些类型的并发问题。

表32.1是Lu及其同事的研究结论。可以看出,共有105个缺陷,其中大多数是非死锁相关的(74个),剩余31个是死锁缺陷。另外,可以看出每个应用的缺陷数目,OpenOffice只有8个,而Mozilla有接近60个。

表32.1 现代应用程序的缺陷统计

我们现在来深入分析这两种类型的缺陷。对于第一类非死锁的缺陷,我们通过该研究的例子来讨论。对于第二类死锁缺陷,我们讨论人们在阻止、避免和处理死锁上完成的大量工作。

32.2 非死锁缺陷

Lu的研究表明,非死锁问题占了并发问题的大多数。它们是怎么发生的?我们如何修复?我们现在主要讨论其中两种:违反原子性(atomicity violation)缺陷和错误顺序(order violation)缺陷。

违反原子性缺陷

第一种类型的问题叫作违反原子性。这是一个MySQL中出现的例子。读者可以先自行找出其中问题所在。

1    Thread 1::2    if (thd->proc_info) {3      ...4      fputs(thd->proc_info, ...);5      ...6    }78    Thread 2::9    thd->proc_info = NULL;

这个例子中,两个线程都要访问thd结构中的成员proc_info。第一个线程检查proc_info非空,然后打印出值;第二个线程设置其为空。显然,当第一个线程检查之后,在fputs()调用之前被中断,第二个线程把指针置为空;当第一个线程恢复执行时,由于引用空指针,导致程序奔溃。

根据Lu等人,更正式的违反原子性的定义是:“违反了多次内存访问中预期的可串行性(即代码段本意是原子的,但在执行中并没有强制实现原子性)”。在我们的例子中,proc_info的非空检查和fputs()调用打印proc_info是假设原子的,当假设不成立时,代码就出问题了。

这种问题的修复通常(但不总是)很简单。你能想到如何修复吗?

在这个方案中,我们只要给共享变量的访问加锁,确保每个线程访问proc_info字段时,都持有锁(proc_info_lock)。当然,访问这个结构的所有其他代码,也应该先获取锁。

1    pthread_mutex_t proc_info_lock = PTHREAD_MUTEX_INITIALIZER;23    Thread 1::4    pthread_mutex_lock(&proc_info_lock);5    if (thd->proc_info) {6      ...7      fputs(thd->proc_info, ...);8      ...9    }10    pthread_mutex_unlock(&proc_info_lock);1112   Thread 2::13   pthread_mutex_lock(&proc_info_lock);14   thd->proc_info = NULL;15   pthread_mutex_unlock(&proc_info_lock);

违反顺序缺陷

Lu等人提出的另一种常见的非死锁问题叫作违反顺序(order violation)。下面是一个简单的例子。同样,看看你是否能找出为什么下面的代码有缺陷。

1    Thread 1::2    void init() {3        ...4        mThread = PR_CreateThread(mMain, ...);5        ...6    }78    Thread 2::9    void mMain(...) {10       ...11       mState = mThread->State;12       ...13   }

你可能已经发现,线程2的代码中似乎假定变量mThread已经被初始化了(不为空)。然而,如果线程1并没有首先执行,线程2就可能因为引用空指针奔溃(假设mThread初始值为空;否则,可能会产生更加奇怪的问题,因为线程2中会读到任意的内存位置并引用)。

违反顺序更正式的定义是:“两个内存访问的预期顺序被打破了(即A应该在B之前执行,但是实际运行中却不是这个顺序)”[L+08]。

我们通过强制顺序来修复这种缺陷。正如之前详细讨论的,条件变量(condition variables)就是一种简单可靠的方式,在现代代码集中加入这种同步。在上面的例子中,我们可以把代码修改成这样:

1    pthread_mutex_t mtLock = PTHREAD_MUTEX_INITIALIZER;2    pthread_cond_t mtCond = PTHREAD_COND_INITIALIZER;3    int mtInit            = 0;45    Thread 1::6    void init() {7       ...8       mThread = PR_CreateThread(mMain, ...);910      // signal that the thread has been created...11      pthread_mutex_lock(&mtLock);12      mtInit = 1;13      pthread_cond_signal(&mtCond);14      pthread_mutex_unlock(&mtLock);15      ...16   }1718   Thread 2::19   void mMain(...) {20      ...21      // wait for the thread to be initialized...22      pthread_mutex_lock(&mtLock);23      while (mtInit == 0)24          pthread_cond_wait(&mtCond,  &mtLock);25      pthread_mutex_unlock(&mtLock);2627      mState = mThread->State;28      ...29   }

在这段修复的代码中,我们增加了一个锁(mtLock)、一个条件变量(mtCond)以及状态的变量(mtInit)。初始化代码运行时,会将mtInit设置为1,并发出信号表明它已做了这件事。如果线程2先运行,就会一直等待信号和对应的状态变化;如果后运行,线程2会检查是否初始化(即mtInit被设置为1),然后正常运行。请注意,我们可以用mThread本身作为状态变量,但为了简洁,我们没有这样做。当线程之间的顺序很重要时,条件变量(或信号量)能够解决问题。

非死锁缺陷:小结

Lu等人的研究中,大部分(97%)的非死锁问题是违反原子性和违反顺序这两种。因此,程序员仔细研究这些错误模式,应该能够更好地避免它们。此外,随着更自动化的代码检查工具的发展,它们也应该关注这两种错误,因为开发中发现的非死锁问题大部分都是这两种。

然而,并不是所有的缺陷都像我们举的例子一样,这么容易修复。有些问题需要对应用程序的更深的了解,以及大量代码及数据结构的调整。阅读Lu等人的优秀(可读性强)的论文,了解更多细节。

32.3 死锁缺陷

除了上面提到的并发缺陷,死锁(deadlock)是一种在许多复杂并发系统中出现的经典问题。例如,当线程1持有锁L1,正在等待另外一个锁L2,而线程2持有锁L2,却在等待锁L1释放时,死锁就产生了。以下的代码片段就可能出现这种死锁:

Thread 1:    Thread 2:lock(L1);    lock(L2);lock(L2);    lock(L1);

这段代码运行时,不是一定会出现死锁的。当线程1占有锁L1,上下文切换到线程2。线程2锁住L2,试图锁住L1。这时才产生了死锁,两个线程互相等待。如图32.1所示,其中的圈(cycle)表明了死锁。

图32.1 死锁依赖图

该图应该有助于描述清楚问题。程序员在编写代码中应该如何处理死锁呢?

关键问题:如何对付死锁

我们在实现系统时,如何避免或者能够检测、恢复死锁呢?这是目前系统中的真实问题吗?

为什么发生死锁

你可能在想,上文提到的这个死锁的例子,很容易就可以避免。例如,只要线程1和线程2都用相同的抢锁顺序,死锁就不会发生。那么,死锁为什么还会发生?

其中一个原因是在大型的代码库里,组件之间会有复杂的依赖。以操作系统为例。虚拟内存系统在需要访问文件系统才能从磁盘读到内存页;文件系统随后又要和虚拟内存交互,去申请一页内存,以便存放读到的块。因此,在设计大型系统的锁机制时,你必须要仔细地去避免循环依赖导致的死锁。

另一个原因是封装(encapsulation)。软件开发者一直倾向于隐藏实现细节,以模块化的方式让软件开发更容易。然而,模块化和锁不是很契合。Jula等人指出[J+08],某些看起来没有关系的接口可能会导致死锁。以Java的Vector类和AddAll()方法为例,我们这样调用这个方法:

Vector v1, v2; v1.AddAll(v2);

在内部,这个方法需要多线程安全,因此针对被添加向量(v1)和参数(v2)的锁都需要获取。假设这个方法,先给v1加锁,然后再给v2加锁。如果另外某个线程几乎同时在调用v2.AddAll(v1),就可能遇到死锁。

产生死锁的条件

死锁的产生需要如下4个条件[C+71]。

写出正确程序的第一步是掌握这些并发编程缺陷

互斥:线程对于需要的资源进行互斥的访问(例如一个线程抢到锁)。

持有并等待:线程持有了资源(例如已将持有的锁),同时又在等待其他资源(例如,需要获得的锁)。

非抢占:线程获得的资源(例如锁),不能被抢占。

循环等待:线程之间存在一个环路,环路上每个线程都额外持有一个资源,而这个资源又是下一个线程要申请的。

如果这4个条件的任何一个没有满足,死锁就不会产生。因此,我们首先研究一下预防死锁的方法;每个策略都设法阻止某一个条件,从而解决死锁的问题。

预防

也许最实用的预防技术(当然也是经常采用的),就是让代码不会产生循环等待。最直接的方法就是获取锁时提供一个全序(total ordering)。假如系统共有两个锁(L1和L2),那么我们每次都先申请L1然后申请L2,就可以避免死锁。这样严格的顺序避免了循环等待,也就不会产生死锁。

当然,更复杂的系统中不会只有两个锁,锁的全序可能很难做到。因此,偏序(partial ordering)可能是一种有用的方法,安排锁的获取并避免死锁。Linux中的内存映射代码就是一个偏序锁的好例子[T+94]。代码开头的注释表明了10组不同的加锁顺序,包括简单的关系,比如i_mutex早于i_mmap_mutex,也包括复杂的关系,比如i_mmap_mutex早于private_lock,早于swap_lock,早于mapping->tree_lock。

你可以想到,全序和偏序都需要细致的锁策略的设计和实现。另外,顺序只是一种约定,粗心的程序员很容易忽略,导致死锁。最后,有序加锁需要深入理解代码库,了解各种函数的调用关系,即使一个错误,也会导致“D”字[1]

提示:通过锁的地址来强制锁的顺序

当一个函数要抢多个锁时,我们需要注意死锁。比如有一个函数:do_something(mutex t *m1, mutex t *m2),如果函数总是先抢m1,然后m2,那么当一个线程调用do_something(L1, L2),而另一个线程调用do_something(L2, L1)时,就可能会产生死锁。

为了避免这种特殊问题,聪明的程序员根据锁的地址作为获取锁的顺序。按照地址从高到低,或者从低到高的顺序加锁,do_something()函数就可以保证不论传入参数是什么顺序,函数都会用固定的顺序加锁。具体的代码如下:

if (m1 > m2) { // grab locks in high-to-low address order    pthread_mutex_lock(m1);   pthread_mutex_lock(m2); } else {    pthread_mutex_lock(m2);    pthread_mutex_lock(m1);   } // Code assumes that m1 != m2 (it is not the same lock)

>在获取多个锁时,通过简单的技巧,就可以确保简单有效的无死锁实现。 #### 持有并等待 死锁的持有并等待条件,可以通过原子地抢锁来避免。实践中,可以通过如下代码来实现:

1    lock(prevention); 2    lock(L1); 3    lock(L2); 4    ... 5    unlock(prevention);

先抢到prevention这个锁之后,代码保证了在抢锁的过程中,不会有不合时宜的线程切换,从而避免了死锁。当然,这需要任何线程在任何时候抢占锁时,先抢到全局的prevention锁。例如,如果另一个线程用不同的顺序抢锁L1和L2,也不会有问题,因为此时,线程已经抢到了prevention锁。 注意,出于某些原因,这个方案也有问题。和之前一样,它不适用于封装:因为这个方案需要我们准确地知道要抢哪些锁,并且提前抢到这些锁。因为要提前抢到所有锁(同时),而不是在真正需要的时候,所以可能降低了并发。 #### 非抢占 在调用unlock之前,都认为锁是被占有的,多个抢锁操作通常会带来麻烦,因为我们等待一个锁时,同时持有另一个锁。很多线程库提供更为灵活的接口来避免这种情况。具体来说,trylock()函数会尝试获得锁,或者返回−1,表示锁已经被占有。你可以稍后重试一下。 可以用这一接口来实现无死锁的加锁方法:

1    top: 2      lock(L1); 3      if (trylock(L2) == -1) { 4        unlock(L1); 5        goto top; 6    }

注意,另一个线程可以使用相同的加锁方式,但是不同的加锁顺序(L2然后L1),程序仍然不会产生死锁。但是会引来一个新的问题:活锁(livelock)。两个线程有可能一直重复这一序列,又同时都抢锁失败。这种情况下,系统一直在运行这段代码(因此不是死锁),但是又不会有进展,因此名为活锁。也有活锁的解决方法:例如,可以在循环结束的时候,先随机等待一个时间,然后再重复整个动作,这样可以降低线程之间的重复互相干扰。 关于这个方案的最后一点:使用trylock方法可能会有一些困难。第一个问题仍然是封装:如果其中的某一个锁,是封装在函数内部的,那么这个跳回开始处就很难实现。如果代码在中途获取了某些资源,必须要确保也能释放这些资源。例如,在抢到L1后,我们的代码分配了一些内存,当抢L2失败时,并且在返回开头之前,需要释放这些内存。当然,在某些场景下(例如,之前提到的Java的vector方法),这种方法很有效。#### 互斥最后的预防方法是完全避免互斥。通常来说,代码都会存在临界区,因此很难避免互斥。那么我们应该怎么做呢? Herlihy提出了设计各种无等待(wait-free)数据结构的思想[H91]。想法很简单:通过强大的硬件指令,我们可以构造出不需要锁的数据结构。举个简单的例子,假设我们有比较并交换(compare-and-swap)指令,是一种由硬件提供的原子指令,做下面的事:

1    int CompareAndSwap(int *address, int expected, int new) { 2      if (*address == expected) { 3        *address = new; 4        return 1; // success 5      } 6      return 0;   // failure 7    }

假定我们想原子地给某个值增加特定的数量。我们可以这样实现:

1    void AtomicIncrement(int *value, int amount) { 2      do { 3        int old = *value; 4      } while (CompareAndSwap(value, old, old + amount) == 0); 5    }

无须获取锁,更新值,然后释放锁这些操作,我们使用比较并交换指令,反复尝试将值更新到新的值。这种方式没有使用锁,因此不会有死锁(有可能产生活锁)。 我们来考虑一个更复杂的例子:链表插入。这是在链表头部插入元素的代码:

1    void insert(int value) { 2      node_t *n = malloc(sizeof(node_t)); 3      assert(n != NULL); 4      n->value = value; 5      n->next = head; 6      head     = n; 7    }

这段代码在多线程同时调用的时候,会有临界区(看看你是否能弄清楚原因)。当然,我们可以通过给相关代码加锁,来解决这个问题:

1    void insert(int value) { 2      node_t *n = malloc(sizeof(node_t)); 3      assert(n != NULL); 4      n->value = value; 5      lock(listlock);    // begin critical section 6      n->next = head; 7      head    = n; 8      unlock(listlock); // end of critical section 9    }

上面的方案中,我们使用了传统的锁[2]。这里我们尝试用比较并交换指令(compare-and-swap)来实现插入操作。一种可能的实现是:

1    void insert(int value) { 2      node_t *n = malloc(sizeof(node_t)); 3      assert(n != NULL); 4      n->value = value; 5      do { 6        n->next = head; 7      } while (CompareAndSwap(&head, n->next, n) == 0); 8    }

文章转自异步社区

任务调度 开发者

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:你有一份2019运维技能风向标 请查收
下一篇:5G 时代,华为云的互联网生意经
相关文章