C++:并发编程入门笔记

标准库

下面介绍一些常用的标准库,包括 C 的库,涵盖了多线程及其通信方式。

请勿试图记住他们,先粗略看一下有哪些 API,然后马上开始实战环节,用到再上来查。

pthread(POSIX 线程库)

考虑到 std::thread 是对 POSIX 标准的 pthread 库的封装。故从后者讲起。

头文件:#include <pthread.h>

链接参数:-pthread

创建线程

  • pthread_create
    • 作用:创建一个线程并开始执行

    • 签名:

      1int pthread_create(pthread_t *restrict thread,
      2                   const pthread_attr_t *restrict attr,
      3                   void *(*start_routine)(void *),
      4                   void *restrict arg);
      

      注:restrict 告诉编译器只有这个指针,或者从它派生的指针(指针+ 1)才能访问指针指向的内容。

    • thread 是线程句柄。我们不需要自己生成,调用完会填充进去。

    • attr 是线程属性。一般填 NULL

    • start_routine 是线程所执行的函数地址

    • arg 是线程所执行的函数的参数

等待线程

  • pthread_join
    • 作用:阻塞当前线程,直到指定的线程结束

    • 签名:

      1int pthread_join(pthread_t thread, void **retval);
      
    • thread 是被等待线程句柄。

    • retval 是被等待线程返回值的指针。填 NULL 表示不需要此返回值。

    • 返回值:0 成功,非 0 失败

退出线程

  • pthread_exit
    • 作用:结束当前线程

    • 签名:

      1void pthread_exit(void *retval);
      
    • retval 是线程返回值。

自取线程句柄

  • pthread_self
    • 作用:获取当前线程句柄

    • 签名:

      1pthread_t pthread_self(void);
      
    • 返回值:当前线程句柄

让线程脱管

  • pthread_detach
    • 作用:让线程自动结束

    • (join 可以近似理解为 wait)线程默认的状态是 joinable,如果一个线程结束运行但没有被 join,则它的状态类似僵尸进程,因为退出状态码尚未被回收,所以创建线程者应该调用 pthread_join 来等待线程运行结束,并可得到线程的退出代码,回收其资源(类似于wait, waitpid)

    • 如果不希望子线程阻塞当前线程,可以在当前线程调用 pthread_detach(child_thread_id) 将子线程设置为 detached 状态,子线程将在运行后自动释放所有资源。(也可以在子线程中调用 pthread_detach(pthread_self()),效果等同)

    • 签名:

      1int pthread_detach(pthread_t thread);
      
    • thread 是线程句柄。

    • 返回值:0 成功,非 0 失败

清理

void pthread_cleanup_push(void (*callback)(void *), void *arg); void pthread_cleanup_pop(int execute);

shm(共享内存)

创建共享内存

1#include <sys/ipc.h>
2#include <sys/shm.h>
3
4int shmget(key_t key, size_t size, int shmflg);
  • 作用:创建共享内存区域

  • 参数:

    • key 是共享内存的 key。输出参数。

    • size 是共享内存的大小,单位是字节。

    • shmflg 是共享内存的标志,可以是 0 或者 IPC_CREAT。

  • 返回值:如果成功,返回共享内存的 id,如果失败,返回 -1。

解除共享内存

1#include <sys/types.h>
2#include <sys/shm.h>
3
4int shmdt(const void *shmaddr);
  • 作用:解除共享内存区域

  • 参数:

    • shmaddr 是共享内存的起始地址。

mmap(共享内存)

mmap 除了可以映射文件,也可以将内存区域映射。实际上它是 shm 的底层实现。

1void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
  • 作用:映射内存区域

  • 参数:

    • addr 是映射区域的起始地址。如果为 NULL,系统将自动分配

    • length 是映射区域的大小,单位是字节。

    • prot 是映射区域的保护属性。可以是 PROT_READ、PROT_WRITE、PROT_EXEC。(读、写、执行)

      • 也可以设置为 PROT_NONE。(不可访问)
    • flags 是映射区域的标志。

      flags参数 说明
      MAP_SHARED 进程间共享内存
      MAP_PRIVATE 调用进程所私有。对该内存段的修改不会反映到映射文件
      MAP_ANNOYMOUS 匿名映射
      MAP_FIXED 内存段必须位于start参数指定的地址处,start必须是页大小的整数倍(4K整数倍)
      MAP_HUGETLB 按照大内存页面来分配内存空间
    • fd 是映射区域所在的文件的描述符。如果为 -1,则表示映射区域为内存区域。

    • offset 是映射区域所在的文件的偏移量。

匿名映射可以表示为全 0 的虚拟文件。匿名映射只是大型的、零填充的内存块,随时可供使用。这些映射驻留在堆之外,因此不会造成数据段碎片。详见 linux - What is the purpose of MAP_ANONYMOUS flag in mmap system call? - Stack Overflow

使用方法例子:

  1. open 打开一个文件,然后 mmap 得到共享内存。这样进程间可以通过相同的文件名得到相同的共享内存。

  2. 匿名映射,用于有直接或间接父子关系的进程间。

pipe

对于父子进程,可以使用匿名管道通信。对于不相干的进程,可以采用命名管道的通信。

1#include <unistd.h>
2int pipe(int pipefd[2]);
3#define _GNU_SOURCE             /* See feature_test_macros(7) */
4#include <fcntl.h>              /* Definition of O_* constants */
5#include <unistd.h>
6int pipe2(int pipefd[2], int flags);
  • 作用:创建管道

  • 参数:

    • pipefd 是管道的句柄。输出参数。

    • flags 是管道的标志

      • O_CLOEXEC:设置管道的文件描述符为 close-on-exec 标志。意思是,当进程 exec 为另一个进程后,管道的文件描述符将被关闭。

      • O_NONBLOCK:设置管道的文件描述符为 non-blocking (非阻塞)标志。意思是,当进程读取管道时,如果管道中没有数据,则读取将立即返回,而不会等待。

      • O_RDONLY:设置管道的文件描述符为只读标志。

#include <sys/types.h>
#include <sys/stat.h>

int mkfifo(const char *pathname, mode_t mode);
  • 作用:创建 FIFO 管道(命名管道)

  • 参数:

    • pathname 是管道的路径名。

    • mode 是管道的权限。

  • 返回值:0 成功,非 0 失败

此外,也可以用万能的 mknod(创建 inode)来创建 FIFO 管道。

需要注意,close 并不能关闭管道,而是关闭管道的文件描述符。必须通过 unlink 删除管道。

管道的读写:用 read 和 write 函数即可读写管道。其中,读写端分别是 fd[0]fd[1]

管道的关闭:用 close 函数即可关闭管道。

管道的缺点:

  • 只能承载字节流,需要自行设计协议来实现高级的通信。

  • 管道默认大小比较小。不过新的 Linux 内核已经允许很大的容量。

消息队列

创建和获取

1#include <sys/msg.h>
2int msgget(key_t key, int msgflg);
  • 作用:创建/获取消息队列

  • 参数:

    • key 是消息队列的 key。若作为输出参数,则会生成一个新的消息队列。若作为输入参数,则会获取已有的消息队列。

    • msgflg 是消息队列的标志。

      • IPC_CREAT 表示如果给定的 key 不存在,则创建一个新的消息队列。存在则忽略。
  • 返回值:返回一个以 key 命名的消息队列句柄

  • 例子:

    1auto msgqid = msgget(IPC_PRIVATE, IPC_CREAT | 0600);
    

收发消息

1#include <sys/msg.h>
2int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
  • 作用:发送消息

  • 参数:

    • msqid 是消息队列的句柄。

    • msgp 是消息的内容。

    • msgsz 是消息的大小。

    • msgflg 是消息的标志。

      • 0:阻塞式发送。(如队满)

      • IPC_NOWAIT:非阻塞式发送。(如队满,直接返回错误)

      • IPC_NOERROR:消息超过 msgsz 直接截断。

  • 返回值:0 成功,非 0 失败

1ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
2               int msgflg);
  • 作用:接收消息

  • 参数:同上。

    • msgflg
      • 0:阻塞式接收。直到等待的类型消息到来。

      • IPC_NOWAIT:如果队列中没有请求类型的消息,则立即返回。系统调用失败,错误设置为 ENOMSG。

      • IPC_EXCEPT:与 msgtyp 一起使用,用于读取队列中消息类型与 msgtyp 不同的第一条消息。

      • IPC_NOERROR:如果长度超过 msgsz 字节,则截断消息文本。

  • 返回值:0 成功,非 0 失败

msgp 的通行格式:

1struct msgbuf {
2    long mtype;       /* message type, must be > 0 */
3    char mtext[1];    /* message data */
4};

删除消息

下面的代码删除一个消息队列:

1msgctl(msqid, IPC_RMID, 0)

函数原型:

1int msgctl(int msgid, int command, struct msgid_ds *buf);
  • 作用:控制消息队列

  • 参数:

    • msgid 是消息队列的句柄。

    • command 是控制命令。

      • IPC_RMID 表示删除消息队列。
    • buf 是控制命令的参数。指向 msgid_ds 结构的指针,它指向消息队列模式和访问权限的结构。msgid_ds 结构至少包括以下成员:

  • 返回值:0 成功,非 0 失败

1struct msgid_ds
2{
3    uid_t shm_perm.uid;
4    uid_t shm_perm.gid;
5    mode_t shm_perm.mode;
6};

线程同步

Mutex

1int pthread_mutex_init(
2  pthread_mutex_t *restrict mutex,
3  const pthread_mutexattr_t *restrict attr
4);
  • 作用:初始化互斥锁

  • 参数:

    • mutex 是互斥锁的指针。

    • attr 是互斥锁的属性。

  • 返回值:0 成功,非 0 失败

1int pthread_mutex_destroy(pthread_mutex_t *mutex);
  • 作用:销毁互斥锁

  • 参数:

    • mutex 是互斥锁的指针。
  • 返回值:0 成功,非 0 失败

1int pthread_mutex_lock(pthread_mutex_t *mutex);
  • 作用:锁定互斥锁

  • 参数:

    • mutex 是互斥锁的指针。
  • 返回值:0 成功,非 0 失败

1int pthread_mutex_unlock(pthread_mutex_t *mutex);
  • 作用:解锁互斥锁

  • 参数:

    • mutex 是互斥锁的指针。
  • 返回值:0 成功,非 0 失败

1int pthread_mutex_trylock(pthread_mutex_t *mutex);
  • 作用:尝试锁定互斥锁然后解锁

  • 参数:

    • mutex 是互斥锁的指针。
  • 返回值:0 成功,非 0 失败

RWLock (SXLock)

读写锁,也称共享-独占锁。允许多个线程同时读取,但是只允许一个线程写入。

递归锁(Recursive Lock)也称为可重入互斥锁(reentrant mutex),是互斥锁的一种,同一线程对其多次加锁不会产生死锁。

1int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
2int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
3int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
4int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
5int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
6int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
7int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
1enum
2{
3  PTHREAD_RWLOCK_PREFER_READER_NP, // 读者优先,即必须所有读者都解锁,才能写入。且会被信新来的写者插队。(只要有读者,写者就会饥饿)
4  PTHREAD_RWLOCK_PREFER_WRITER_NP, // 写者优先,和读者优先基本一样,只不过新来的写者不能插队。
5  PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP, // 写者优先,和读者优先基本一样,只不过新来的写者不能插队,并且不能重入。(一般设置为此值避免饥饿)
6  PTHREAD_RWLOCK_DEFAULT_NP = PTHREAD_RWLOCK_PREFER_READER_NP
7};

spinlock

在申请加锁的时候,会使得线程阻塞,阻塞的过程又分两个阶段,第一阶段是会先自旋,即不断地去申请锁,失败一定次数(spin count),线程会进入等待队列,并休眠。当锁可用时被唤醒。

适合实时性较高的场合。

1    int pthread_spin_init (__pthread_spinlock_t *__lock, int __pshared);
2    int pthread_spin_destroy (__pthread_spinlock_t *__lock);
3    int pthread_spin_trylock (__pthread_spinlock_t *__lock);
4    int pthread_spin_unlock (__pthread_spinlock_t *__lock);
5    int pthread_spin_lock (__pthread_spinlock_t *__lock);

Conditional Variable

条件变量用于等待变量满足条件后唤醒线程。

1    int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
2    int pthread_cond_destroy(pthread_cond_t *cond);
3
4    int pthread_cond_signal(pthread_cond_t *cond);
5    int pthread_cond_broadcast(pthread_cond_t *cond);
6
7    int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
8    int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

条件信号必须在临界区中抛出。

Semaphore

1    int sem_destroy(sem_t *sem);
2    int sem_init(sem_t *sem, int pshared, unsigned int value);
3
4    int sem_wait(sem_t *sem); // sem -= 1
5    int sem_post(sem_t *sem); // sem += 1
6
7    int sem_getvalue(sem_t *sem, int *valp); // valp 是输出参数

Barrier

1int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *restrict attr, unsigned count);
2int pthread_barrier_destroy(pthread_barrier_t *barrier);
3int pthread_barrier_wait(pthread_barrier_t *barrier);

std::thread

线程对象

std::thread 表示一个线程对象。一旦构造就启动线程并执行:

1  std::thread t1(
2      [](const char* name) {
3        std::cout << "Hello, " << name << "!" << std::endl;
4      },
5      "world");
6  t1.join();

除了可以传入 lambda 表达式,还可以传入:

  • 仿函数对象。

  • 函数指针。

  • 成员函数。(则第一个参数必须是对象指针)

推荐阅读 C++并发:从std::thread()开始 | xinlu’s blog

如何获取线程的ID?

1  std::thread::id id = std::this_thread::get_id();

如何获取线程的返回值?

这就需要用 future 了

future

1  std::future<int> f = std::async(std::launch::async, []() {
2    return 42;
3  });
4  int result = f.get();
  • std::async 用于异步执行任务,并返回一个 std::future 对象。

  • std::lauch::async 表示异步启动策略

  • std::launch::deferred 表示延迟启动策略,即等待调用 std::future::get 才启动线程。

  • future_obj.get 用于获取异步执行的结果。

  • future_obj.wait 用于等待异步执行的结果。

mutex

C++ 提供这几种互斥锁:

  1. std::mutex: 不可重入的互斥锁。

  2. std::recursive_mutex: 可重入的互斥锁。

  3. std::timed_mutex: 带有超时的互斥锁。

  • 例子:
1{
2  std::timed_mutex m;
3  if(!m.try_lock_for(std::chrono::seconds(1)))
4    throw std::runtime_error("timeout");
5}
  1. std::recursive_timed_mutex: 带有超时的可重入的互斥锁。

相关辅助的类型:

  • std::lock_guard: 提供了基于作用域的上锁和自动释放。

    • 例子:
    1{
    2  std::mutex m;
    3  std::lock_guard<std::mutex> lk(m);
    4  // do something
    5}
    6// lk goes out of scope, m is unlocked
    
  • std::unique_lock: 允许延迟锁定、时间受限的锁定尝试、递归锁定、锁定所有权转移以及与条件变量一起使用。

    • 与 lock_guard 相比,unique_lock 必须指定泛型参数。
  • std::try_lock: 尝试锁定。如果锁定失败,则返回 false。

    • 例子:
    1{
    2  std::mutex m;
    3  std::unique_lock<std::mutex> lk(m);
    4  if(!lk.try_lock())
    5    throw std::runtime_error("failed");
    6  // do something
    7}
    
  • std::lock 尝试锁定,如果锁定失败,则抛出异常。

    • 例子:
    1{
    2  std::mutex m;
    3  std::unique_lock<std::mutex> lk(m);
    4  lk.lock();
    5  // do something
    6}
    
  • std::call_once: 只执行一次的函数。

    • 例子:
    1{
    2  std::once_flag flag;
    3  std::call_once(flag, []() {
    4    // do something
    5  });
    6}
    
    • once_flag: 一个辅助结构,相当于一个锁,共用这个标志的线程,只有其中一个能成功执行。

    • 如果一个线程异常,其它等待的线程会唤醒一个执行。

    • 实现原理:互斥锁+条件变量+广播唤醒

unique_lock + timed_mutex 的例子

1{
2  std::timed_mutex m;
3  std::unique_lock<std::timed_mutex> lk(m, std::chrono::seconds(1));
4  if (lk.owns_lock()) {
5    // do something
6  }
7}
8// lk goes out of scope, m is unlocked

condition_variable

  • std::condition_variable: 一个条件变量。

  • std::condition_variable_any: 一个可以包含多个条件变量的条件变量。

  • cv.wait (lk): 在 lk 上阻塞,直到有其它线程调用 cv.notify_one() 或 cv.notify_all()。

    • 例子:
    1{
    2  std::condition_variable cv;
    3  std::mutex m;
    4  std::unique_lock<std::mutex> lk(m);
    5  cv.wait(lk);
    6  // do something
    7}
    
  • cv.wait_for (lk, std::chrono::duration): 在 lk 上阻塞,直到有其它线程调用 cv.notify_one() 或 cv.notify_all()或超时。

  • cv.wait_until (lk, std::chrono::time_point): 在 lk 上阻塞,直到有其它线程调用 cv.notify_one() 或 cv.notify_all()或超时。

    • 例子:
    1{
    2  std::condition_variable cv;
    3  std::mutex m;
    4  std::unique_lock<std::mutex> lk(m);
    5  cv.wait_until(lk, std::chrono::system_clock::now() + std::chrono::seconds(1));
    6  // do something
    7}
    
  • cv.notify_one(): 唤醒一个线程。

    • 例子:
    1{
    2  std::condition_variable cv;
    3  std::mutex m;
    4  std::unique_lock<std::mutex> lk(m);
    5  cv.notify_one();
    6}
    
  • cv.notify_all(): 唤醒所有线程。

    • 例子:
    1{
    2  std::condition_variable cv;
    3  std::mutex m;
    4  std::unique_lock<std::mutex> lk(m);
    5  cv.notify_all();
    6}
    

semaphore

std::counting_semaphore: 一个计数信号量。信号量能够让最多 LeastMaxValue 个线程同时访问。

  • 例子:
 1{
 2  std::counting_semaphore s(1);
 3  std::thread t1([&s]() {
 4    s.wait();
 5    // do something
 6    s.signal();
 7  });
 8  std::thread t2([&s]() {
 9    s.wait();
10    // do something
11    s.signal();
12  });
13  t1.join();
14  t2.join();
15}

由于 C++ 11 没有标准库信号量,可以用 std::mutex + std::condition_variable 来实现一个:

 1#include <mutex>
 2#include <condition_variable>
 3
 4class semaphore {
 5    std::mutex mutex_;
 6    std::condition_variable condition_;
 7    unsigned long count_ = 0; // Initialized as locked.
 8
 9public:
10    void release() { // v
11        std::lock_guard<decltype(mutex_)> lock(mutex_);
12        ++count_;
13        condition_.notify_one();
14    }
15
16    void acquire() { // P
17        std::unique_lock<decltype(mutex_)> lock(mutex_);
18        while(!count_) // Handle spurious wake-ups.
19            condition_.wait(lock);
20        --count_;
21    }
22
23    bool try_acquire() {
24        std::lock_guard<decltype(mutex_)> lock(mutex_);
25        if(count_) {
26            --count_;
27            return true;
28        }
29        return false;
30    }
31};

以及使用 pthread 提供的信号量:

1int sem_init(sem_t *sem, int pshared, unsigned int value);
  • sem_init: 初始化信号量。

    • 例子:
    1{
    2  sem_t s;
    3  sem_init(&s, 0, 1);
    4  sem_wait(&s);
    5  // do something
    6  sem_post(&s);
    7}
    
    • pshared 我们填 0 就行。true 表示进程间共享,false 表示进程内线程间共享。
  • sem_post:用于唤醒一个等待的线程。(对应 PV 操作中的 V)

  • sem_wait:用于阻塞当前线程,直到获得访问机会。(对应 PV 操作中的 P)

锁的原理

任务 1:写两个线程,分别能打印一次 a 和 b。

参考代码:

 1#include <pthread.h>
 2#include <stdio.h>
 3struct args_t {
 4  char out;
 5};
 6int print_thread(struct args_t* args) {
 7  printf("%c", args->out);
 8  return 0;
 9}
10int main() {
11  pthread_t ha, hb;
12  struct args_t a = {.out = 'a'};
13  struct args_t b = {.out = 'b'};
14  pthread_create(&ha, NULL, (void* (*)(void*))(print_thread), (void*)(&a));
15  pthread_create(&hb, NULL, (void* (*)(void*))(print_thread), (void*)(&b));
16  pthread_join(ha, NULL);
17  pthread_join(hb, NULL);
18  return 0;
19}

参考代码2:

 1#include <algorithm>
 2#include <iostream>
 3#include <thread>
 4#include <vector>
 5using namespace std;
 6
 7int main() {
 8  vector<thread> ts;
 9  ts.push_back(thread{[]() { cout << 'a'; }});
10  ts.push_back(thread{[]() { cout << 'b'; }});
11  for (auto&& t : ts) {
12    t.join();
13  }
14  return 0;
15}

Peterson 算法的原理

 1int turn, x = 0, y = 0;
 2
 3void thread1() {
 4  [1] x = 1;
 5  [2] turn = T2;
 6  [3] while (y && turn == T2) ;
 7  [4] // critical section
 8  [5] x = 0;
 9}
10
11void thread2() {
12  [1] y = 1;
13  [2] turn = T1;
14  [3] while (x && turn == T1) ;
15  [4] // critical section
16  [5] y = 0;
17}

方括号表示 PC 指针。peterson 算法可以理解为:两个同学(x, y)想上厕所。

每个同学的策略都是:

  1. 先举旗子(表明自己要上厕所),然后在厕所门上写上对方的名字。

  2. 只要对方想上厕所,且门上写者对方的名字,就一直等待。

假设每行代码的执行都是顺序、原子的,则这个算法不会有任何问题。

可以用状态树来证明。此程序的状态初始状态是 <pc1, pc2, x, y, turn>。穷举所有调度序列可知不可能存在 pc1 = pc2 = 4 的情况。详见 jyy 视频。

而我提供一个自己的理解:

Peterson 算法之所以有效,举个例子,假设 T2 进入临界区,则一定有

  • turn != T1,则说明 y = 1 被执行,则说明 T1 未进入临界区。

  • 者 x = 0,则 thread1 不位于临界区。

实现互斥的根本困难

实现互斥的根本困难是:无法做到同时读、写共享内存。

LOAD:读取时,只能看到一瞬间的状态。而此状态可能看到之后就马上变了。

STORE:写入时,无法知道写入之前里面是什么。

Peterson 算法之所以有效,其根本原因在于 LOAD 时通过 x 判断 t1 是否在临界区。如果 x = 0,则绝无可能在临界区。这是对 LOAD 前瞬间的保证。

同时如果 turn != T1,也能判断出 T2 绝无可能在 LOAD 之后改变 turn 的值。因为被 while 循环堵住了。这是对 LOAD 后瞬间的保证。

因此,要解决互斥,硬件层面最好能提供同时结合 LOAD 和 STORE 功能的指令。

自旋锁的实现原理

x86 提供了 lock 前缀。

  • lock addq 实现对某个值原子增加一个值。

    • L步骤:读取旧值 val

    • S步骤:存放 val + incr

  • (lock) xchg 实现原子交换两个状态。

    • L步骤:读取旧值 val

    • S步骤:存放 newval

    • 返回 val

其它原子指令

  • tas(test and set) 实现用 1 交换旧值

    • L步骤:读取旧值 val

    • S步骤:存放 1

    • 返回 val

  • cas(compare and swap) 实现条件交换

    • L步骤:读取旧值 val

    • S步骤:比较 val == expect,如果相等,存放 newval

    • 返回 val

用 XCHG 实现互斥的原理:

  1. 初始状态:桌子上一把钥匙

  2. 大家去抢钥匙,具体方式是用手上的东西换

  3. 如果换出来一看是把钥匙,那就得到了锁。

  4. 如果换出来一看是不是要是,说明没抢到,重新抢。

  5. 用好钥匙的同学把钥匙还回去。

1int xchg(volatile int *addr, int newval) {
2  int result;
3  asm volatile ("lock xchg %0, %1"
4    : "+m"(*addr), "=a"(result) : "1"(newval));
5  return result;
6}
 1int table = YES;
 2
 3void lock() {
 4retry:
 5  int got = xchg(&table, NOPE);
 6  if (got == NOPE)
 7    goto retry;
 8  assert(got == YES);
 9}
10
11void unlock() {
12  xchg(&table, YES)
13}

自旋锁的优点:

  • xchg 成功,立即进入临界区,低开销

自旋锁的缺陷:

  1. 从硬件层面:xchg 指令自旋,会强制处理期间的缓存同步

  2. 僧多肉少:太多空转线程,争抢锁的处理器越多,利用率越低

  3. 调度灾难:获得自旋锁的线程可能被操作系统切换出去

自旋锁的使用场景

  1. 只有少量情况需要少量线程抢锁

  2. 持有锁时不会被切出

典型场景:操作系统内核的并发数据结构 (短临界区)

改进思路——睡眠锁

  • 方法:把锁的管理放到内核。

  • 原理:让没有获得锁的人进入等待队列。

  • 效果:

    • 上锁失败就休眠,不再浪费 CPU

    • 缺点:即使上锁成功也需要进出内核

改进思路——Futex(Fast Userspace muTexes)

  • 效果:

    • 上锁时成功立即返回

    • 失败时休眠

  • 原理:

    • 在用户空间抢锁

    • 未获得锁时,把当前线程放到等待队列

锁的用途

生产者-消费者问题

一个线程生产左括号,另一个线程消费右括号。

一旦生成的括号序列不是合法括号序列的前缀,说明出了问题。

检验程序:

 1#include <iostream>
 2#include <stack>
 3
 4int main() {
 5  std::stack<int> pairs;
 6  for (int ch; (ch = std::getchar()) != EOF;) {
 7    if (ch == '(') {
 8      pairs.push(ch);
 9    } else if (ch == ')') {
10      if (pairs.size() == 0) {
11        std::cerr << "failed!" << std::endl;
12        exit(1);
13      }
14      pairs.pop();
15    }
16  }
17  return 0;
18}

使用互斥锁:

 1#include <iostream>
 2#include <mutex>
 3#include <thread>
 4#include <vector>
 5
 6int count = 0;
 7int N = 100;
 8// 注意,mutex 不能 const
 9std::mutex m;
10void t1() {
11  std::cout << "t1";
12  for (;;) {
13    std::lock_guard<std::mutex> lk(m);
14    if (count != N) {
15      count++;
16      std::cout << "(";
17    }
18  }
19}
20void t2() {
21  std::cout << "t1";
22  for (;;) {
23    std::lock_guard<std::mutex> lk(m);
24    if (count != 0) {
25      std::cout << ")";
26      count--;
27    }
28  }
29}
30
31int main() {
32  std::vector<std::thread> ts;
33  ts.push_back(std::thread(t1));
34  ts.push_back(std::thread(t2));
35  for (auto&& t : ts) {
36    t.join();
37  }
38  return 0;
39}

问题:有大量的时间浪费在不停询问锁是否可用的过程中。

分析:T1 一直在等待 count != N 的条件。T2 一直在等待 count != 0 的条件。

所以我们可以把这种等待条件的自旋变成睡眠。

 1void t1() {
 2  std::cout << "t1";
 3  for (;;) {
 4    std::lock_guard<std::mutex> lk(m);
 5    while(count == N) {
 6      std::cout << ".";
 7      unlock_and_sleep();
 8    }
 9    count++;
10    wake_up();
11    std::cout << "(";
12  }
13}
14void t2() {
15  std::cout << "t1";
16  for (;;) {
17    std::lock_guard<std::mutex> lk(m);
18    if(count == 0) {
19      unlock_and_sleep();
20    }
21    std::cout << ")";
22    count--;
23    wake_up();
24  }
25}

这正是条件变量的思想:

 1std::mutex m;
 2std::condition_variable cv;
 3void t1() {
 4  std::cout << "t1";
 5  for (;;) {
 6    std::unique_lock<std::mutex> lk(m);
 7    if(count == N) {
 8      cv.wait(lk);
 9    }
10    std::cout << "(";
11    count++;
12    cv.notify_all();
13  }
14}
15void t2() {
16  std::cout << "t1";
17  for (;;) {
18    std::unique_lock<std::mutex> lk(m);
19    if(count == 0) {
20      cv.wait(lk);
21    }
22    std::cout << ")";
23    count--;
24    cv.notify_all();
25
26  }
27}

上面代码是有 BUG 的。主要问题,就是虚假唤醒。只有一个条件百度,它并不区分我唤醒的是生产者还是消费者。唤醒之后,如果不重新判断条件是否满足,就会产生错误的结果。比如假设 1 个生产者,两个消费者,则很可能一个消费者唤醒了另一个消费者。

不能同类唤醒。应带采用两个条件变量的方式。

一个常见的错误如下(为什么?):

 1std::mutex m;
 2std::condition_variable can_produce;
 3std::condition_variable can_comsume;
 4void t1() {
 5  std::cout << "t1";
 6  for (;;) {
 7    std::unique_lock<std::mutex> lk(m);
 8    if (count == N) {
 9      can_produce.wait(lk);
10    }
11    count++;
12    std::cout << "(";
13    can_comsume.notify_all();
14  }
15}
16void t2() {
17  std::cout << "t1";
18  for (;;) {
19    std::unique_lock<std::mutex> lk(m);
20    if (count == 0) {
21      can_comsume.wait(lk);
22    }
23    std::cout << ")";
24    count--;
25    can_produce.notify_all();
26  }
27}

我们可以用 gdb 的线程调试(详见我的另一篇文章)找出 bug。我们发现 count 竟然会变成负数。

原来,生产者唤醒了多个消费者,导致消费者不加检查就进行 count--

一个解决方法是醒来之后检查:

 1std::mutex m;
 2std::condition_variable can_produce;
 3std::condition_variable can_comsume;
 4void t1() {
 5  std::cout << "t1";
 6  for (;;) {
 7    std::unique_lock<std::mutex> lk(m);
 8    while (count == N) {
 9      can_produce.wait(lk);
10    }
11    count++;
12    std::cout << "(";
13    can_comsume.notify_all();
14  }
15}
16void t2() {
17  std::cout << "t1";
18  for (;;) {
19    std::unique_lock<std::mutex> lk(m);
20    while (count == 0) {
21      can_comsume.wait(lk);
22    }
23    std::cout << ")";
24    count--;
25    can_produce.notify_all();
26  }
27}

条件变量的含义是:

  • wait 表示:把自己加入等待队列,当条件满足时自己被唤醒,然后重新尝试取得锁,并继续执行。

  • notify_all 表示:唤醒所有等待的线程。让它们争抢锁,得到锁的执行。

信号量

信号量是一个令牌,得到令牌的可以进入临界区。

P 表示得到一把钥匙,信号量计数器减 1,如果计数器为 0,则不能进入临界区,在队列等待。

V 表示释放一把钥匙,信号量计数器加 1,如果有线程等待,则相当于把令牌直接交给等待的一个线程。

 1void producer(){
 2  for(;;){
 3    P(&empty);
 4    print("(");
 5    V(&fill);
 6  }
 7}
 8
 9void consumer(){
10  for(;;){
11    P(&fill);
12    print(")");
13    V(&empty);
14  }
15}

上面的代码定义了两个线程,一个生产者,一个消费者。两个信号量,empty 表示空计数器。fill 表示有计数器。

P(&empty) 的意思是,如果空闲计数器为 0,则不能进入临界区,在队列等待。进入后消耗一个空位。 V(&fill) 的意思是,将满计数器加 1。增加一个满位。

(可以结合上面我们自己实现的信号量的代码来理解)

哲学家吃饭问题

BUG 的复现

 1#include <semaphore.h>
 2#include <iostream>
 3#include <thread>
 4#include <vector>
 5
 6auto thread_phi(int N, int id, std::vector<sem_t*>* sems) {
 7  int lhs = (id - 1) % N;
 8  int rhs = id % N;
 9  std::cout << "phi " << id << " started, left " << lhs << " right " << rhs
10            << std::endl;
11  std::this_thread::sleep_for(std::chrono::microseconds(10));
12  for (;;) {
13    sem_wait((*sems)[lhs]);
14    std::cout << "T" << id << "got" << lhs << std::endl;
15    sem_wait((*sems)[rhs]);
16    std::cout << "T" << id << "got" << rhs << std::endl;
17    // eat
18    sem_post((*sems)[lhs]);
19    sem_post((*sems)[rhs]);
20  }
21}
22int main(int argc, char const* argv[]) {
23  if (argc != 2) {
24    std::cerr << "need number of philosopher" << std::endl;
25    exit(EXIT_FAILURE);
26  }
27  int N = atoi(argv[1]);
28  if (N < 2) {
29    std::cerr << "need positive number" << std::endl;
30    exit(EXIT_FAILURE);
31  }
32  std::vector<std::thread> ts;
33  std::vector<sem_t*> sems;
34  for (auto i = 0; i < N; ++i) {
35    auto sem = new sem_t;
36    sem_init(sem, false, 1);
37    sems.push_back(sem);
38  }
39  for (auto i = 0; i < N; ++i) {
40    // 注意 id 从 1 开始,否则 lhs 会出现负数
41    ts.push_back(std::thread(thread_phi, N, i + 1, &sems));
42  }
43  for (auto&& t : ts) {
44    t.join();
45  }
46  for (auto&& s : sems) {
47    delete s;
48  }
49
50  return 0;
51}

运行后发现很快死锁。

为什么呢?举个例子:每个人同时举起左手,那么每个人都在等待别人的右手放下,从而卡住。

解决方法:

  1. 万能方法:使用条件变量。如果不满足左右都有空,就睡眠。

  2. 更好的方法:把资源管理放到 Supervisor 中。

  • 对于每个哲学家:

    1. 发送 EAT 信号,表示想吃饭

    2. 等待许可

    3. 吃饭

    4. 发送 DONE 信号

  • 而 Supervisor 则管理所有哲学家的这些信号。使用消息队列。

    • 这需要我们自己设计如何调度和管理资源,计划经济,非常复杂。

死锁检测和处理

静态分析/动态分析

AA-Deadlock:使用防御性编程,assert

ABBA-Deadlock:

  • Locker-ordering 确保所有锁的获得顺序都是一样的。在任意时刻,总有获得最后一个锁的进程可以继续前进。

使用状态空间模型检测。如果一个状态,被两条不同颜色的边进入,则此状态是非法的。

数据竞争:两种常见情况

  • 忘记上锁——原子性违反(AV)

  • 忘记同步——顺序违反(OV)

Lockdep:运行时死锁检查。我们有每一个锁的分配日志,从而构建有向图,一旦发现环就说明有可能出现死锁。

ThreadSanitizer:运行时数据竞争检查。记录内存访问和 lock/unlock 的日志,为所有事件建立 happens-before 关系图。

常用的动态分析工具:

  • AddressSanitizer:检查内存访问是否合法。

  • ThreadSanitizer:检查线程间的数据竞争。

  • MemorySanitizer:检查内存访问是否合法。

  • UBSanitizer:检查代码是否有未定义行为。

kasan ktsan

Canary:牺牲一些内存单元,用来预警memory error

各种锁的实现

互斥锁

要实现不饥饿的互斥锁需要操作系统的帮助(调度、队列化)。但如果只是玩玩,可以用原子变量。这样的话一般只能做成自旋锁,不适合竞争激烈的情景。

用 compare_exchange_strong 实现互斥锁:

 1class Mutex {
 2    std::atomic<int> _lock{0};
 3
 4  public:
 5    void lock() {
 6        int expect = 0;
 7        while (!_lock.compare_exchange_strong(expect, 1,
 8                                              std::memory_order_acquire)) {
 9            expect = false;
10        }
11    }
12    void unlock() { _lock.store(0, std::memory_order_release); }
13};

更加省事儿的做法:

 1class Mutex {
 2    std::atomic_flag _lock{ATOMIC_FLAG_INIT};
 3
 4  public:
 5    void lock() {
 6        while (_lock.test_and_set(std::memory_order_acquire)) {
 7        };
 8    }
 9    void unlock() { _lock.clear(std::memory_order_release); }
10};

读写锁

读写锁的特点是写与「读、写」都互斥。典型例子:计数器。

我们使用条件变量配合 mutex 实现一个读写锁:

 1class RWLock {
 2    std::mutex _mutex;
 3    std::condition_variable _cond;
 4    int _stat; // +N, for reader count. -N for writer count
 5  public:
 6    void RLock() {
 7        std::unique_lock lk(_mutex);
 8        while (_stat < 0) {
 9            _cond.wait(lk); // 暂时解锁,直到被通知条件满足后获得并上锁
10        }
11        // std::cout << "RLock" << std::endl;
12        _stat++;
13    }
14    void WLock() {
15        std::unique_lock lk(_mutex);
16        while (_stat != 0) {
17            _cond.wait(lk);
18        }
19        // std::cout << "WLock" << std::endl;
20        _stat++;
21    }
22    void RUnlock() {
23        std::unique_lock lk(_mutex);
24        // std::cout << "RUnlock" << std::endl;
25        _stat--;
26        _cond.notify_one(); // 由于只允许一个写,所以用 notify_one
27    }
28    void WUnlock() {
29        std::unique_lock lk(_mutex);
30        // std::cout << "WUnlock" << std::endl;
31        _stat--;
32        _cond.notify_all();
33    }
34};

另一种方式:

 1class RWLock {
 2    std::mutex _read_mtx;
 3    std::mutex _write_mtx;
 4    int _nread = 0;
 5
 6  public:
 7    void RLock() {
 8        std::lock_guard g(_read_mtx);
 9        _nread++;
10        if (_nread == 1) {
11            _write_mtx.lock();
12        }
13    }
14    void WLock() { _write_mtx.lock(); }
15    void RUnlock() {
16        std::lock_guard g(_read_mtx);
17        _nread--;
18        if (_nread == 0) {
19            _write_mtx.unlock();
20        }
21    }
22    void WUnlock() { _write_mtx.unlock(); }
23};

参考

感谢南京大小 jyy 老师的 Bilibili 开源课程。