1. 线程基础

1.1 线程发起

线程发起就是指启动一个线程,C++11标准统一了线程操作,可以在定义一个线程变量后,该变量启动线程执行回调逻辑。

1
2
3
4
5
6
7
8
void thead_work1(std::string str) {
std::cout << "str is " << str << std::endl;
}
int main(){
std::string hellostr = "hello world!"; //定义一个字符串
std::thread t1(thead_work1, hellostr); //通过()初始化并启动一个线程
t1.join(); //主线程等待子线程退出
}

在上面程序中,如果没有t1.join();或用std::this_thread::sleep_for(std::chrono::seconds(1));代替,执行该程序都会报错。需要调用t1.join();来等t1子线程执行完了,主线程才往下执行。

1.2 仿函数作为参数

用仿函数作为参数传递给线程时,也可以当作线程的回调函数来使用。下面程序中,第1种执行方法,编译器会将t1当成一个函数对象, 返回一个std::thread类型的值, 即函数的参数被视为一个函数指针了,所以程序是有问题的,可以通过第2和第3种方式执行,用()或{}避免将background_task()视为一个函数指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class background_task {
public:
void operator()() { //重载了一个()运算符
std::cout << "background_task called" << std::endl;
}
};
int main(){
//1.t1被当作函数对象的定义,其类型为返回std::thread, 参数为background_task
//std::thread t1(background_task());
//t1.join();

//2.可多加一层()
std::thread t2((background_task()));
t2.join();

//3.可使用{}方式初始化
std::thread t3{ background_task() };
t3.join();
}

1.3 lambda表达式

lambda表达式也可以作为线程的参数传递给thread

1
2
3
4
5
6
7
int main{
std::string hellostr = "hello world!"; //定义一个字符串
std::thread t4([](std::string str) {
std::cout << "str is " << str << std::endl;
}, hellostr); //第1个参数是lambda回调函数,第2个参数是所要用的字符串
t4.join();
}

1.4 detach使用

detach允许子线程采用分离的方式在后台独自运行,但下面的程序仍然有隐患。因为some_local_state是局部变量, 当oops调用结束后局部变量some_local_state就可能被释放了,而子线程还在detach后台运行,容易出现崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct func {   //结构体,相当于类,区别在于c++中结构体中所以的成员都是公有的
int& _i; //引用的成员变量
func(int & i): _i(i){} //将传进来的局部变量通过引用赋值
void operator()() {
for (int i = 0; i < 3; i++) {
_i = i; //模拟一直在使用局部变量的情况
std::cout << "_i is " << _i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1)); //执行完一次,睡
}
}
};
void oops() {
int some_local_state = 0; //定义了一个局部变量
func myfunc(some_local_state); //将局部变量传给了结构体对象myfunc
std::thread functhread(myfunc); //子线程
//隐患,访问局部变量,局部变量可能会随着}结束而回收或随着主线程退出而回收
functhread.detach();
}
int main(){
// detach 注意事项
oops();
//防止主线程退出过快,需要停顿一下,让子线程跑起来detach
std::this_thread::sleep_for(std::chrono::seconds(1));
}

所以当我们在子线程中使用主线程的一些局部变量,或通过引用或指针的方式使用了一些函数的局部变量,一定要关注这些局部变量是否会被释放掉。解决该方面问题的一些方法:

  • 通过智能指针传递参数,因为引用计数会随着赋值增加,可保证局部变量在使用期间不被释放,这也就是伪闭包策略。
  • 将局部变量的值作为参数传递,这么做需要局部变量有拷贝复制的功能,而且拷贝耗费空间和效率。
  • 将线程运行的方式用join代替detach,这样能保证局部变量被释放前线程已经运行结束。但是这么做可能会影响运行逻辑。

1.5 异常处理

当我们启动一个线程后,如果主线程产生崩溃,会导致子线程也会异常退出,就是调用terminate,如果子线程在进行的是一些重要的操作,那么丢失这些信息是很危险的。所以常用的做法是捕获异常,即在主线程出现异常情况下需要保证子线程稳定运行结束后,主线程才抛出异常结束运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
void catch_exception() {
int some_local_state = 0;
func myfunc(some_local_state);
std::thread functhread{ myfunc };
try {
//本线程做一些事情,可能引发崩溃
std::this_thread::sleep_for(std::chrono::seconds(1));
}catch (std::exception& e) {
functhread.join(); //主线程出现异常了,也要等待子线程执行完,才抛出异常
throw;
}
functhread.join();
}

2. 线程管控

2.1 线程归属权

众所周知线程可以通过detach在后台运行或者通过join让开辟这个线程的父线程等待该线程完成。但每个线程都应该有其归属权,也就是归属给某个变量管理:

下面程序启动了一个线程,t1是线程的变量,这里可以理解为在定义完线程变量,系统就会帮我们去分配线程资源,让线程运行起来,线程就归属t1变量管理。

1
2
3
4
5
6
7
8
void some_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main(){
std::thread t1(some_function);
}

我们知道t1是一个线程变量,管理一个线程,该线程执行some_function()。对于std::thread C++ 不允许其执行拷贝构造拷贝赋值,所以只能通过移动和局部变量返回的方式将线程变量管理的线程转移给其他变量管理。在c++中,像std::mutex, std::ifstream, std::unique_ptr都是这样的类型。

在下面程序中,定义了一个t1变量管理一个线程,将t1管理的线程通过move的方式转移给了t2,move过后,t1变量就没有绑定线程了,即无效变量了。虽然t1无效了,但还可以继续赋值,通过t1 = std::thread(some_other_function)t1绑定新的线程。为什么这种赋值方式可以?因为std::thread返回的是一个局部变量,且是右值,右值赋给t1,调用的就是thread的移动赋值函数,这种方式是可以的,t1就绑定了新的线程。如果这时再创建一个线程变量t3,将t2的线程转移给t3来用,t3再转移给t1,这时就会出现崩溃了。因为t1已经绑定了线程,再对起进行赋值,就会造成它原来的线程强行终止,触发线程内部terminate函数,该terminate就会引发程序的崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void some_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void some_other_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
//1 t1绑定some_function
std::thread t1(some_function);
//2 转移t1管理的线程给t2,转移后t1无效
std::thread t2 = std::move(t1);
//3 但t1可继续绑定其他线程,执行some_other_function
t1 = std::thread(some_other_function);
//4 创建一个线程变量t3
std::thread t3;
//5 转移t2管理的线程给t3
t3 = std::move(t2);
//6 转移t3管理的线程给t1
t1 = std::move(t3);
std::this_thread::sleep_for(std::chrono::seconds(2000));

2.2 局部变量返回值

对于返回一个局部变量给调用者,是当函数返回一个类类型的局部变量时会先调用移动构造,如果没有移动构造再调用拷贝构造。 所以对于一些没有拷贝构造但是实现了移动构造的类类型也支持通过函数返回局部变量。 在 C++11 之后,编译器会默认使用移动语义来提高性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class TestCopy {
public:
TestCopy(){} //默认构造
TestCopy(const TestCopy& tp) { //拷贝构造
std::cout << "Test Copy Copy " << std::endl;
}
TestCopy(TestCopy&& cp) { //移动构造
std::cout << "Test Copy Move " << std::endl;
}
};

TestCopy TestCp() {
//局部变量返回值会优先考虑移动构造
TestCopy tp; //默认构造
return tp; //移动构造(不考虑RVO的情况)
}

返回值优化(RVO):C++ 编译器通常会进行返回值优化(RVO),甚至在有移动构造函数时也可能跳过移动构造,直接在调用者的内存空间中构造对象,以进一步减少性能开销。

2.3 容器存储

容器存储线程时,比如vector,如果用push_back操作势必会调用std::thread,又因为std::thread没有拷贝构造函数,这样会引发编译错误。而采用emplace方式,可以直接根据线程构造函数需要的参数来构造(比如说threads容器里存的thread类型的变量,通过emplace方法,传入的参数就可以根据thread构造函数时需要的参数,参数就会生成一个右值,存到emplace里),这样就避免了调用thread的拷贝构造函数。

1
2
3
4
5
6
7
8
9
10
11
12
void use_vector() {
std::vector<std::thread> threads;
for (unsigned i = 0; i < 10; ++i) {
//auto t = std::thread(param_function,i);
//threads.push_back(std::move(t));
//emplace_back内部相当于执行的就是上面两句代码
threads.emplace_back(param_function, i);
}
for (auto& entry : threads) {
entry.join();
}
}

2.4 其它函数

std::thread::hardware_concurrency()函数:它的返回值是一个指标,表示程序在各次运行中可真正并发的线程数量。

get_id()函数:获取线程id。

3. 互斥与死锁

3.1 锁的使用

对于一些共享的数据,可以通过mutex对共享数据进行加锁,防止多线程访问共享区造成数据不一致问题。

在下面程序中,初始化了一个共享变量shared_data,然后定义了一个互斥量std::mutex,接下来启动了两个线程,一个是use_lock函数增加数据,另一个lambda表达式减少数据。从最后执行的结果可以看到两个线程对于共享数据的访问是独占的,单位时间片只有一个线程访问并输出日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
std::mutex mtx1;          //定义的互斥量
int shared_data = 100; //共享数据

void use_lock() {
while (true) {
mtx1.lock(); //加锁
shared_data++; //对共享数据执行++操作
std::cout << "current thread is " << std::this_thread::get_id() << std::endl; //输出当前的线程id
std::cout << "share data is " << shared_data << std::endl; //打印修改的数据
mtx1.unlock(); //解锁
std::this_thread::sleep_for(std::chrono::microseconds(10)); //睡眠一会,释放时间片。不睡眠的话,这是一个死循环,会一直抢占cpu的时间片(死循环对时间片的抢占严重)
}
}

void test_lock() {
std::thread t1(use_lock); //启动了线程t1,参数是写好的函数
std::thread t2([]() { //启动线程t2,参数是lambda表达式
while (true) {
mtx1.lock(); //加锁
shared_data--; //对共享数据执行--操作
std::cout << "current thread is " << std::this_thread::get_id() << std::endl; //输出当前的线程id
std::cout << "share data is " << shared_data << std::endl; //打印修改的数据
mtx1.unlock(); //解锁
std::this_thread::sleep_for(std::chrono::microseconds(10)); //睡眠一会,释放时间片
}
});
t1.join();
t2.join();
}
int main()
{
test_lock();
}

3.2 lock_guard的使用

lock_guard可以自动地加锁和解锁,参数只需要传入互斥量即可,就没有必要手动的去加锁和解锁了。因为lock_guard解锁是要遇到右括号才自动解锁,所以需要把与访问共享区域有关的代码放入一个新的{}里面执行。

1
2
3
4
5
6
7
8
9
10
11
void use_lock() {
while (true) {
{
std::lock_guard<std::mutex> lock(mtx1);
shared_data++;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
}
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}

3.3 保证数据安全

有时候我们可以将对共享数据的访问和修改聚合到一个函数,在函数内加锁保证数据的安全性。但是对于读取类型的操作,即使读取函数是线程安全的,但是返回值抛给外边使用,可能就会存在不安全性。

对于下面这个程序,empty()函数内部对栈进行访问的时候,用了加锁,是安全的,但当将是否为空的结果传到外部后,外部将结果存到队列中,没有及时用。如果这段期间栈有所变化,则会造成程序崩溃。总的来说,就是empty()在返回true或false时,使用这个bool时机的一个问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
template<typename T>
class threadsafe_stack1 {
private:
std::stack<T>data;
mutable std::mutex m; //mutable可变的
public:
threadsafe_stack1() {} //无参构造
threadsafe_stack1(const threadsafe_stack1& other) { //拷贝构造
std::lock_guard<std::mutex>lock(other.m); //加锁,这里参数用的是参数的m,因为自己的m还没有产生
data = other.data; //data等于参数的data
}
threadsafe_stack1& operator = (const threadsafe_stack1&) = delete; //去除赋值构造

void push(T new_value) {
std::lock_guard<std::mutex>lock(m); //对互斥量加锁
data.push(std::move(new_value)); //把元素添加进来(通过移动构造,添加进去,可以减少一次拷贝)
}

//问题代码
T pop() {
std::lock_guard<std::mutex>mutex(m);
auto element = data.top();
data.pop();
return element;
}

//危险
bool empty() const { //这里是const,如果上面的锁没有声明为mutable,就不能在这个函数进行加锁
std::lock_guard<std::mutex>lock(m);
return data.empty();
}
};

//模拟的一个例子
void test_threadsafe_stack1() {
threadsafe_stack1<int>safe_stack;
safe_stack.push(1);

std::thread t1([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1)); //当判断不为空后,睡眠一会
safe_stack.pop();
}
});

std::thread t2([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1)); //当判断不为空后,睡眠一会
safe_stack.pop();
}
});
t1.join();
t2.join();
}

3.4 死锁

死锁一般是由于调运顺序不一致导致的,比如两个线程循环调用。当线程1先加锁A,再加锁B,而线程2先加锁B,再加锁A。那么在某一时刻就可能造成死锁,因为它们彼此都想要占用对方的锁,又不释放自己占有的锁,这样就导致了死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
std::mutex t_lock1;
std::mutex t_lock2;
int m_1 = 0;
int m_2 = 1;

void dead_lock1() {
while (true) {
std::cout << "dead_lock1 begin " << std::endl;
t_lock1.lock(); //对m1加锁
m_1 = 1024; //修改m1的值
t_lock2.lock(); //对m2加锁
m_2 = 2048; //修改m2的值
t_lock2.unlock();
t_lock1.unlock();
std::cout << "dead_lock1 end" << std::endl;
}
}

void dead_lock2() {
while (true) {
std::cout << "dead_lock2 begin " << std::endl;
t_lock1.lock(); //对m1加锁
m_1 = 1024; //修改m1的值
t_lock2.lock(); //对m2加锁
m_2 = 2048; //修改m2的值
t_lock2.unlock();
t_lock1.unlock();
std::cout << "dead_lock2 end" << std::endl;
}
}

void test_dead_lock() {
std::thread t1(dead_lock1);
std::thread t2(dead_lock2);
t1.join();
t2.join();
}

3.5 层级锁

现实开发中常常很难规避同一个函数内部加多个锁的情况,我们要尽可能避免循环加锁。所以可以自定义一个层级锁,它是一种用于避免死锁的策略,它通过为锁分配优先级来减少死锁的可能性。在这种策略中,每个锁都被赋予一个唯一的优先级,并且规定只能按照优先级顺序获取锁,这样就保证实际项目中对多个互斥量加锁时是有序的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
//层级锁
class hierarchical_mutex {
public:
explicit hierarchical_mutex(unsigned long value) :_hierarchy_value(value),
_previous_hierarchy_value(0) {}

hierarchical_mutex(const hierarchical_mutex&) = delete; //去除拷贝构造
hierarchical_mutex& operator=(const hierarchical_mutex&) = delete; //去除赋值构造

void lock() { //加锁
check_for_hierarchy_violation(); //检测锁
_internal_mutex.lock();
update_hierarchy_value(); //更新锁
}
void unlock() { //解锁
if (_this_thread_hierarchy_value != _hierarchy_value) { //判断线程当前的层级值是否等于要解锁的互斥量的层级值
throw std::logic_error("mutex hierarchy violated"); //抛出错误
}
//如果对了,就将之前存的层级值,赋值给当前线程的层级值
_this_thread_hierarchy_value = _previous_hierarchy_value;
_internal_mutex.unlock();
}
bool try_lock() { //尝试锁:尝试一下,加不到锁也可以马上退出,不会阻塞
check_for_hierarchy_violation(); //先检测锁
if (!_internal_mutex.try_lock()) { //尝试锁失败,就直接返回false
return false;
}
//如果尝试锁成功的话就执行下面
update_hierarchy_value(); //更新锁,然后返回true
return true;
}
private:
std::mutex _internal_mutex;
unsigned long const _hierarchy_value; //当前层级值
unsigned long _previous_hierarchy_value; //上一次层级值
static thread_local unsigned long _this_thread_hierarchy_value; //本线程记录的层级值
//检测锁
void check_for_hierarchy_violation() {
if (_this_thread_hierarchy_value <= _hierarchy_value) { //如果线程的层级比要加的锁的层级低
throw std::logic_error("mutex hierarchy violated"); //抛出逻辑错误
}
}
//更新锁
void update_hierarchy_value() {
_previous_hierarchy_value = _this_thread_hierarchy_value; //将当前线程的层级值赋值到上一次层级值
_this_thread_hierarchy_value = _hierarchy_value; //将要加的锁的值赋值给线程
}
};
thread_local unsigned long hierarchical_mutex::_this_thread_hierarchy_value(ULONG_MAX);
//下面模拟这个是有问题的
void test_hierarchy_lock() {
hierarchical_mutex hmtx1(1000); //第1个互斥量的层级值是1000
hierarchical_mutex hmtx2(500); //第2个互斥量的层级值是500
std::thread t1([&hmtx1, &hmtx2]() { //通过引用的方式来捕获,因为拷贝已经被去除了
hmtx1.lock();
hmtx2.lock();
hmtx2.unlock();
hmtx1.unlock();
});
std::thread t2([&hmtx1, &hmtx2]() {
hmtx2.lock();
hmtx1.lock();
hmtx1.unlock();
hmtx2.unlock();
});
t1.join();
t2.join();
}

主要原理就是将当前锁的权重保存在线程变量中,这样该线程再次加锁时判断线程变量的权重和锁的权重是否大于,如果满足条件则继续加锁。

3.6 unique_lock

unique_locklock_guard基本用法相同,构造时默认加锁,析构时默认解锁,但unique_lock有个好处就是可以手动解锁。这一点尤为重要,方便我们控制锁住区域的粒度(加锁的范围大小),也能支持和条件变量配套使用。

1
2
3
4
5
6
7
8
9
10
//unique_lock 基本用法
std::mutex mtx;
int shared_data = 0;
void use_unique() {
//lock可自动解锁,也可手动解锁
std::unique_lock<std::mutex> lock(mtx);
std::cout << "lock success" << std::endl;
shared_data++;
lock.unlock(); //这行可以加也可以不加
}

同时,还可以通过unique_lockowns_lock来判断是否持有锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//可判断是否占有锁
void owns_lock() {
//lock可自动解锁,也可手动解锁
std::unique_lock<std::mutex> lock(mtx); //这里加锁了
shared_data++;
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl; //所以会打印这一行
}
else {
std::cout << "doesn't own lock" << std::endl;
}
lock.unlock(); //解锁了
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "doesn't own lock" << std::endl; //所以会打印这行
}
}

unique_lock也可以完成延迟加锁。这里的延迟加锁是指在 std::unique_lock 对象创建时不立即锁定互斥量,而是在之后显式调用 lock.lock() 时才锁定。因此,这个延迟加锁的时间并不是指一个具体的时间段,而是指在 std::unique_lock 对象创建后到显式调用 lock.lock() 之间的这段时间。

1
2
3
4
5
void defer_lock() {
std::unique_lock<std::mutex> lock(mtx, std::defer_lock); //这里还没有进行加锁
lock.lock(); //进行加锁
lock.unlock(); //解锁了
}

下面综合运用owns_lockdefer_lock来实现一个例子,下面这个程序中会发生阻塞,阻塞在24行的lock.lock()。因为主线程在初始化时就加锁了,未释放,等待子线程执行完了自动释放;而子线程虽然是延迟加锁,但到了下面显示加锁时,加不了锁,就会阻塞在这里。因此就导致整个程序卡住。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void use_own_defer() {
std::unique_lock<std::mutex> lock(mtx); //因为没有使用延迟加锁,所以这里初始化时,就进行了加锁
// 判断是否拥有锁
if (lock.owns_lock())
{
std::cout << "Main thread has the lock." << std::endl; //会打印这一行
}
else
{
std::cout << "Main thread does not have the lock." << std::endl;
}
//子线程再主线程里运行
std::thread t([]() {
std::unique_lock<std::mutex> lock(mtx, std::defer_lock); //这里设置了延迟加锁,所以等到下面显示加锁才算加锁
// 判断是否拥有锁
if (lock.owns_lock())
{
std::cout << "Thread has the lock." << std::endl;
}
else
{
std::cout << "Thread does not have the lock." << std::endl; //此时子线程还没有拥有锁,所以打印这行
}
lock.lock(); // 这里进行加锁是不行的,会导致程序阻塞在这里
// 判断是否拥有锁
if (lock.owns_lock())
{
std::cout << "Thread has the lock." << std::endl;
}
else
{
std::cout << "Thread does not have the lock." << std::endl;
}
// 解锁
lock.unlock();
});
t.join();
}

lock_guard一样,unique_lock也支持领养锁。下面的程序中,首先通过mtx互斥量进行加锁,然后创建了一个lock对象来接管这个锁,此时mtx就无效了,后面就可以通过手动的lock对象来释放锁,但通过mtx.unlock()来释放锁就会报错。如果开始没有先通过mtx来进行加锁,后面通过lock对象来接管锁也会报错,因为锁还没有加锁,接管不了。

1
2
3
4
5
6
7
8
9
10
11
12
std::mutex mtx;                //定义的互斥量
void use_own_adopt() {
mtx.lock(); //通过互斥量进行了加锁
std::unique_lock<std::mutex> lock(mtx, std::adopt_lock); //创建了一个lock对象来接管锁
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl; //会打印这行
}
else {
std::cout << "does not have the lock" << std::endl;
}
lock.unlock(); //通过lock对象来释放锁
}

需要注意的是,一旦mutexunique_lock管理,加锁和释放的操作就交给unique_lock,不能调用mutex加锁和解锁,因为锁的使用权已经交给unique_lock了。

1
2
3
4
5
6
7
8
9
std::mutex mtx1;                //定义的互斥量
std::mutex mtx2; //定义的互斥量
void safe_swap2() {
std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
//需用lock1,lock2加锁
std::lock(lock1, lock2);
//std::lock(mtx1, mtx2); //错误用法
}

众所周知的mutex是不支持移动和拷贝的,但是unique_lock支持移动,当一个mutex被转移给unique_lock后,可以通过unique_ptr转移其归属权。

在下面程序中,get_lock()首先定义了一个局部的lock,执行相应操作后,返回该lock,因为到达了作用域末尾,所以自动解锁了。虽然解锁了,但该lock已经作为局部变量被返回了,在use_return函数中,因为返回的不是引用类型,所以先考虑拷贝构造和拷贝赋值,因为unique_lock没有这两种拷贝,所以就只能用移动构造,会把lock里所有的管理权交给use_return里的lock对象。这意味着互斥量mtx在 use_return 函数中的lock对象被销毁之前不会被解锁。

1
2
3
4
5
6
7
8
9
10
11
//转移互斥量所有权,互斥量本身不支持move操作,但是unique_lock支持
std::mutex mtx; //定义的互斥量
std::unique_lock <std::mutex> get_lock() {
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
return lock;
}
void use_return() {
std::unique_lock<std::mutex> lock(get_lock()); //不是引用类型,所以先考虑拷贝构造和拷贝赋值
shared_data++;
}

4. 线程安全的单例模式

4.1 局部静态变量

众所周知当一个函数中定义一个局部静态变量,那么这个局部静态变量只会初始化一次,就是在这个函数第一次调用的时候,以后无论调用几次这个函数,函数内的局部静态变量都不再初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Single2 {
private:
Single2(){} //无参构造是私有的
Single2(const Single2&) = delete; //去除拷贝构造
Single2& operator=(const Single2&) = delete; //去除拷贝赋值
public:
static Single2& GetInst() //公有的,让外部可以获取它的一个引用实例
{
static Single2 single; //定义了一个静态局部变量
return single; //返回该变量
}
};
//c++11之后,这种方式是可以的,它们打印的地址都是一样的。
void test_single2(){
std::cout<<"s1 addr is "<<&Single2::GetInst()<<std::endl; //打印地址
std::cout<<"s2 addr is "<<&Single2::GetInst()<<std::endl; //打印地址
}

上述版本的单例模式在C++11 以前存在多线程不安全的情况,编译器可能会初始化多个静态变量。但是C++11推出以后,各厂商优化编译器,能保证线程安全。尽管是多个线程,多核机器实现,只要调用GetInst()接口,里面调用生成的局部变量都是统一的。

在C++11 推出以前,局部静态变量的方式实现单例存在线程安全问题,所以部分人推出了以下方案实现:

4.2 饿汉模式

饿汉模式就是指定义类的时候就创建了单例对象,创建出来后,什么时候用,时候什么就调用静态的成员函数(得到单例对象),且在多线程的场景下,饿汉模式是没有线程安全问题的(多线程可以同时访问这个单例的对象)。但这种方法的缺点就是浪费空间资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class TaskQueue {      //通过类名来得到的对象是静态的,通过类名访问类里面的属性和方法,这个属性和方法是静态的
public:
//堵死在外部创建对象的方法,防止通过拷贝构造或赋值操作来创建多个实例
TaskQueue(const TaskQueue& t) = delete; //删除了拷贝构造函数
TaskQueue& operator = (const TaskQueue& t) = delete; //删除了拷贝赋值函数
//静态的公共成员函数,把唯一的单例对象返回给调用者
static TaskQueue* getInstance() {
return m_taskQ;
}
void print() {
cout << "我是单例对象的一个成员函数..." << endl;
}

private:
TaskQueue() = default; //定义了一个无参构造,它与默认的无参构造具有相同的行为
//只能通过类名访问静态属性或方法
static TaskQueue* m_taskQ; //该静态成员指针只能通过静态成员方法访问
};
//类里面的静态成员变量使用前,必须在类的外部对其进行初始化(不能在内部进行初始化)
TaskQueue* TaskQueue::m_taskQ = new TaskQueue; //类的作用域下,可以访问类的私有成员

4.3 懒汉模式

懒汉模式就是指什么时候使用这个单例对象,在使用的时候再去创建对应的实例,这样就解决了饿汉模式下的空间资源浪费的问题。但在多线程的场景下,懒汉模式是有线程安全问题的(在多线程情况下,若干个线程同时访问单例的实例,会出问题)。
解决方式:通过互斥锁,阻塞线程,依次访问这个单例对象,就可以避免在懒汉模式下多线程同时访问单例对象而创建出多个类的实例这种问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class SinglePointer
{
private:
SinglePointer(){}
SinglePointer(const SinglePointer&) = delete;
SinglePointer& operator=(const SinglePointer&) = delete;
public:
static SinglePointer* GetInst()
{
if (single != nullptr) //如果single已经被初始化
{
return single; //直接返回
}
s_mutex.lock(); //如果还没有被初始化
if (single != nullptr) //再确定一遍,
{
s_mutex.unlock(); //如果被初始化了,就解锁,返回
return single;
}
//还是没有被初始化,就初始化该类
single = new SinglePointer();
s_mutex.unlock(); //解锁返回
return single;
}
private:
static SinglePointer* single;
static std::mutex s_mutex;
};
//类里面的静态成员变量使用前,必须在类的外部对其进行初始化(不能在内部进行初始化)
SinglePointer* SinglePointer::single = nullptr;
std::mutex SinglePointer::s_mutex;

5. 条件变量

在下面程序中,线程1负责打印1后++,线程2负责打印2后–,但当线程1打印完1后,如果2还没有打印,那么它将睡眠,线程2也是相同的情况,所以就导致打印时虽然是12…12这样打印的,但中间互发生停顿,这样就浪费就cpu的时间片。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int num = 1;
std::mutex mtx_num;
void PoorImpleman() {
std::thread t1([]() {
for (;;) {
{
std::lock_guard<std::mutex> lock(mtx_num); //先加锁
if (num == 1) { //如果num=1,就执行下面
std::cout << "thread A print 1....." << std::endl;
num++;
continue;
}
}
//如果num不等于1,就睡一会
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
std::thread t2([]() {
for (;;) {
{
std::lock_guard<std::mutex> lock(mtx_num); //先加锁
if (num == 2) { //如果num=2,就执行下面
std::cout << "thread B print 2....." << std::endl;
num--;
continue;
}
}
//如果num不等于2,就睡一会
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
t1.join();
t2.join();
}

下面就可以通过条件变量的方式实现,就是当线程1在执行打印完1后,会将线程2唤醒;而当线程2打印完线程2后,有会唤醒线程1。这样执行的结果就不会出现中间停顿的现象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int num = 1;
std::mutex mtx_num;
std::condition_variable cvA; //第1个条件变量
std::condition_variable cvB; //第2个条件变量
void ResonableImplemention() {
std::thread t1([]() {
for (;;) {
std::unique_lock<std::mutex> lock(mtx_num);
//写法1
cvA.wait(lock, []() {
return num == 1; //如果num不等于1,就会在这里挂起,把锁解开,交给其它线程,不会消耗cpu的资源
});
//如果num等于1,就执行下面
num++;
std::cout << "thread A print 1....." << std::endl;
cvB.notify_one(); //唤醒线程2
}
});
std::thread t2([]() {
for (;;) {
std::unique_lock<std::mutex> lock(mtx_num);
//写法2
while(num!=2){ //如果num不等于2,就会发生阻塞
cvB.wait(lock);
}
num--;
std::cout << "thread B print 2....." << std::endl;
cvA.notify_one(); //唤醒线程1
//cvA.notify_all(); //唤醒多个线程
}
});
t1.join();
t2.join();
}

6. 并发三剑客future, promise和async

6.1 async与future

std::async 是一个用于异步执行函数的模板函数,它返回一个 std::future 对象,该对象用于获取函数的返回值。

在下面这个程序中,定义了一个需要耗费一定时间的任务函数,在主函数中,先启动一个异步执行的任务,通过future定义的变量来接收异步返回的一个结果,并可以使用get()来获取该结果。如果该异步执行的函数还没有完成,get()就会一直阻塞着,但由于任务函数是子线程异步执行的,所以在执行get()这期间,可以执行其它内容的程序,当需要调用异步任务的结果时,再直接通过get()获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 模拟一个异步任务函数,比如从数据库中获取数据
std::string fetchDataFromDB(std::string query) {
std::this_thread::sleep_for(std::chrono::seconds(5)); //睡眠5秒,模拟查询
return "Data: " + query;
}
int main() {
//使用std::async 异步调用 fetchDataFromDB()函数。参数:一种启动策略(表示单独去启动一个子线程,去执行第二个参数函数);任务函数;任务函数的参数
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");
// 在主线程中做其他事情(在异步执行和获取到值之间这段时间)
std::cout << "Doing something else..." << std::endl;
// 从 future 对象中获取数据
std::string dbData = resultFromDB.get(); //只有在上面启动子线程执行完,这里才能get到
std::cout << dbData << std::endl;
return 0;
}

启动策略:std::async函数可以接受几个不同的启动策略,这些策略在std::launch枚举中定义。除了上述使用的std::launch::async异步执行之外,还有以下启动策略:

  • std::launch::deferred:延迟执行,这种策略意味着任务将在调用std::future::get()std::future::wait()函数时才执行。即任务将在需要结果时同步执行

  • std::launch::async | std::launch::deferred:这种策略是上面两个策略的组合。任务可以在一个单独的线程上异步执行,也可以延迟执行,具体取决于实现。默认情况下,std::async使用的是这种策略,但需要注意的是,不同的编译器和操作系统可能会有不同的默认行为。

6.2 future的wait和get

std::future::get()std::future::wait() 是 C++ 中用于处理异步任务的两个方法。

  • std::future::get() 是一个阻塞调用,用于获取 std::future 对象表示的值或异常。如果异步任务还没有完成,get() 会阻塞当前线程,直到任务完成。如果任务已经完成,get() 会立即返回任务的结果。重要的是,get() 只能调用一次,因为它会移动或消耗掉 std::future 对象的状态。一旦 get() 被调用,std::future 对象就不能再被用来获取结果。
  • std::future::wait() 也是一个阻塞调用,但它与 get() 的主要区别在于 wait() 不会返回任务的结果。它只是等待异步任务完成。如果任务已经完成,wait() 会立即返回。如果任务还没有完成,wait() 会阻塞当前线程,直到任务完成。与 get() 不同,wait() 可以被多次调用,它不会消耗掉 std::future 对象的状态。

这两个方法的主要区别:

  1. std::future::get() 用于获取并返回任务的结果,而 std::future::wait() 只是等待任务完成。

  2. get() 只能调用一次,而 wait() 可以被多次调用。

  3. 如果任务还没有完成,get()wait() 都会阻塞当前线程,但 get() 会一直阻塞直到任务完成并返回结果,而 wait() 只是在等待任务完成。

补:可以使用std::futurewait_for()wait_until()方法来检查异步操作是否已完成。这些方法返回一个表示操作状态的std::future_status值。

6.3 任务和future关联

std::packaged_taskstd::future是C++11中引入的两个类,它们用于处理异步任务的结果。

std::packaged_task是一个可调用目标,它包装了一个任务,该任务可以在另一个线程上运行。它可以捕获任务的返回值或异常,并将其存储在std::future对象中,以便以后使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int my_task() {              //模拟一个任务的函数
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "my task run 5 s" << std::endl;
return 42;
}
void use_package() {
// 创建一个包装了任务的 std::packaged_task 对象
std::packaged_task<int()> task(my_task); //将my_task函数封装到task里,<int()>表示返回值是int,参数是空
// 获取与任务关联的 std::future 对象
std::future<int> result = task.get_future();
// 在另一个线程上执行任务
std::thread t(std::move(task)); //将task传入到线程里时,必须要用移动构造,packaged_task只支持移动构造
t.detach(); // 将线程与主线程分离,以便主线程可以等待任务完成。将在执行任务的子线程与t解开了(t被回收,任务也不会中断)
// 等待任务完成并获取结果
int value = result.get();
std::cout << "The result is: " << value << std::endl;
}

6.4 promise 用法

C++11引入了std::promisestd::future两个类,用于实现异步编程。std::promise用于在某一线程中设置某个值或异常,而std::future则用于在另一线程中获取这个值或异常。

在下面的程序中,首先创建了一个std::promise<int>对象,然后通过调用get_future()方法获取与之相关联的std::future<int>对象。然后,我们在新线程中通过调用set_value()方法设置promise的值,并在主线程中通过调用fut.get()方法获取这个值。注意,在调用fut.get()方法时,如果promise的值还没有被设置,则该方法会阻塞当前线程,直到值被设置为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void set_value(std::promise<int> prom) {
std::this_thread::sleep_for(std::chrono::seconds(5));
// 设置 promise 的值
prom.set_value(10); //设置主线程需要的值,下面内容无论执行多久,都问题不大了,外部可以通过方法得到这个值
std::cout<<"promise set value success"<<std::endl;
}
int main() {
// 创建一个 promise 对象
std::promise<int> prom;
// 获取与 promise 相关联的 future 对象
std::future<int> fut = prom.get_future();
// 在新线程中设置 promise 的值
std::thread t(set_value, std::move(prom)); //必须是移动构造的方式
// 在主线程中获取 future 的值
std::cout << "Waiting for the thread to set the value...\n";
std::cout << "Value set by the thread: " << fut.get() << '\n'; //获取子线程中的promise设置的值10
t.join();
return 0;
}

除了set_value()方法外,std::promise还有一个set_exception()方法,用于设置异常。该方法接受一个std::exception_ptr参数,该参数可以通过调用std::current_exception()方法获取。在下面这个程序中,子线程是抛出异常,然后自己又捕获异常,捕获异常就是将异常值写入到prom里,这样主线程就可以通过fut.get()方法得到子线程的异常值了。但要注意的是,主线程必须要用try-catch来捕获该异常,不然系统会崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void set_exception(std::promise<void> prom) {
try {
// 抛出一个异常
throw std::runtime_error("An error occurred!");
} catch(...) {
//子线程在这里捕获抛出的异常
prom.set_exception(std::current_exception()); //设置promise的异常,将异常的值写入到了prom里
}
}
int main() {
// 创建一个 promise 对象
std::promise<void> prom;
// 获取与 promise 相关联的 future 对象
std::future<void> fut = prom.get_future();
// 在新线程中设置 promise 的异常
std::thread t(set_exception, std::move(prom));
// 在主线程中获取 future 的异常
try {
std::cout << "Waiting for the thread to set the exception...\n";
fut.get(); //主线程获取子线程抛出来的异常
} catch(const std::exception& e) {
std::cout << "Exception set by the thread: " << e.what() << '\n';
}
t.join();
return 0;
}

6.5 共享类型的future

当多个线程需要等待同一个执行结果时,可以使用std::shared_future,也就是多个线程去等待同一个执行结果,不会出现资源竞争的问题,也不用考虑是否加锁(天生线程安全的)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void myFunction(std::promise<int>&& promise) {
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::seconds(1));
promise.set_value(42); // 设置 promise 的值
}
//该线程负责获取promise设置的值
void threadFunction(std::shared_future<int> future) {
try {
int result = future.get();
std::cout << "Result: " << result << std::endl;
}
catch (const std::future_error& e) {
std::cout << "Future error: " << e.what() << std::endl;
}
}
void use_shared_future() {
std::promise<int> promise;
std::shared_future<int> future = promise.get_future(); //std::future类型会自动转换为std::shared_future类型

std::thread myThread1(myFunction, std::move(promise)); // 将promise移动到线程1中
// 使用share()方法获取新的shared_future对象,它们都可以获取promise设置的值
//shared_future是支持拷贝的,所以可以直接传future,但这里也只能这样传值,不能移动构造,因为线程2用了,线程3还要继续用future
std::thread myThread2(threadFunction, future);
std::thread myThread3(threadFunction, future);
myThread1.join();
myThread2.join();
myThread3.join();
}

补:std::shared_future是支持拷贝的,std::future不支持拷贝,只能移动构造

7. 并发设计模式Actor和CSP

7.1 简介

在并发设计中有两种常用的设计模式,即Actor和CSP模式。传统的并发设计经常都是通过共享内存加锁保证逻辑安全,这种模式有几个缺点:如频繁加锁影响性能、 耦合度高等。而Actor和CSP设计模式恰好可以解决这些缺陷。

7.2 Actor设计模式

actor通过消息传递的方式与外界通信。消息传递是异步的。每个Actor都有一个邮箱,用于接收其他Actor发送的消息,Actor可以按顺序处理邮箱中的消息。actor一次只能同步处理一个消息,处理消息过程中,除了可以接收消息,不能做任何其他操作。 每一个类独立在一个线程里称作Actor,Actor之间通过队列通信,比如Actor1 发消息给Actor2, Actor2 发消息给Actor1都是投递到对方的队列中。好像给对方发邮件,对方从邮箱中取出一样。

Actor模型的另一个好处就是可以消除共享状态,因为它每次只能处理一条消息,所以actor内部可以安全的处理状态,而不用考虑锁机制。一些网络编程中对于逻辑层的处理就采用了该模式,就是将要处理的逻辑消息封装成包投递给逻辑队列,逻辑类通过单线程的方式从队列中一条一条取出数据消费的思想。

7.3 CSP模式

CSP称为通信顺序进程,是一种并发编程模型,也是一个很强大的并发数据模型,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。相对于Actor模型,CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。

CSP和Actor类似,只不过CSP将消息投递给channel,不关注谁从channel中取数据和发送的一方是谁。而Actor在发送消息前是知道接收方是谁,接受方收到消息后也知道发送方是谁,更像是邮件的通信模式。而csp是完全解耦合的。

但它们有一个共同的特性:不要通过共享内存来通信;相反,通过通信来共享内存

7.4 CSP模式的实现

因为go是天然支持csp模式的语言,所以实现起来简单。但c++是一种比较全能的的语言,也可以实现出一种类似于go的channel。下面实现的是类似于生产者和消费者问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
template <typename T>     //因为channel可以存放任意类型的数据,所以这里用模板
class Channel {
private:
std::queue<T> queue_; //存放数据的队列
std::mutex mtx_; //互斥量,用来维护队列的安全性
std::condition_variable cv_producer_; //生产者条件变量
std::condition_variable cv_consumer_; //消费者条件变量
size_t capacity_; //最大容量
bool closed_ = false; //Channel是否被关闭

public:
Channel(size_t capacity = 0) : capacity_(capacity) {} //初始化时,可以先显示的设置容量为0

bool send(T value) { //向Channel去投递数据
std::unique_lock<std::mutex> lock(mtx_); //对互斥量进行加锁操作
//等待锁,判断条件是否满足,不满足就挂起在这里
cv_producer_.wait(lock, [this]() {
// 对于无缓冲的channel,我们应该等待直到有消费者准备好
return (capacity_ == 0 && queue_.empty()) || queue_.size() < capacity_ || closed_;
});
//当队列没有数据并且初始化了一个无缓冲的channel 或 初始化的容量不为0并且队列长度还没有达到容量 或 close 都不挂起
if (closed_) {
return false; //如果是关闭了,说明要退出,直接返回false
}
//没有关闭,就往队列里放数据
queue_.push(value);
cv_consumer_.notify_one(); //通知消费者
return true;
}

bool receive(T& value) { //向Channel去取数据
std::unique_lock<std::mutex> lock(mtx_); //对互斥量加速
//等待锁,判断条件是否满足,不满足就挂起在这里
cv_consumer_.wait(lock, [this]() { return !queue_.empty() || closed_; });
//当队列不为空 或 关闭了 都不挂起
if (closed_ && queue_.empty()) { //如果关闭了,并且队列为空,就直接退出,返回false
return false;
}
//如果没有关闭,就直接从队列取数据消费
value = queue_.front();
queue_.pop();
cv_producer_.notify_one(); //提醒生产者
return true;
}

void close() {
std::unique_lock<std::mutex> lock(mtx_);
closed_ = true; //将关闭置为true
//通知所有的生产者和消费者,要退出了
cv_producer_.notify_all();
cv_consumer_.notify_all();
}
};

// 示例使用
int main() {
Channel<int> ch(10); // 10缓冲的channel,如果是0的话,即无容量,就是同步应用了,生产者放,消费者马上就取

std::thread producer([&]() { //启动一个生产者线程
for (int i = 0; i < 5; ++i) {
ch.send(i);
std::cout << "Sent: " << i << std::endl;
}
ch.close(); //向channel投递数据,投递完了就关闭
});

std::thread consumer([&]() { //启动一个消费者线程
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 故意延迟消费者开始消费
int val;
//当消费者生成完指定数据后就不生产了,而close为true了,所以receive会返回false,退出循环、消费者线程
while (ch.receive(val)) {
std::cout << "Received: " << val << std::endl;
}
});
producer.join();
consumer.join();
return 0;
}

8. 原子操作和内存模型

8.1 改动序列

在一个C++程序中,每个对象都具有一个改动序列,它由所有线程在对象上的全部写操作构成,其中第一个写操作即为对象的初始化。 大部分情况下,这个序列会随程序的多次运行而发生变化,但是在程序的任意一次运行过程中,所含的全部线程都必须形成相同的改动序列(要么都从头开始修改,要么都从尾开始修改)。

改动序列基本要求如下:

  • 只要某线程看到过某个对象,则该线程的后续读操作必须获得相对最近的值,并且,该线程就同一对象的后续写操作,必然出现在改动序列后方(基于上一次的改动)。
  • 如果某线程先向一个对象写数据,过后再读取它,那么必须读取前面写的值。
  • 若在改动序列中,上述读写操作之间还有别的写操作,则必须读取最后写的值。
  • 在程序内部,对于同一个对象,全部线程都必须就其形成相同的改动序列,并且在所有对象上都要求如此。
  • 多个对象上的改动序列只是相对关系,线程之间不必达成一致。

8.2 原子类型

标准原子类型的定义位于头文件<atomic>内。可以通过atomic<>定义一些原子类型的变量,如atomic<bool>,atomic<int> 这些类型的操作全是原子化的。

从C++17开始,所有的原子类型都包含一个静态常量表达式成员变量std::atomic::is_always_lock_free,它能够返回一个结构在任意给定的目标硬件上是否支持不加锁(无锁结构形式实现)。如果在所有支持该程序运行的硬件上,原子类型X都以无锁结构形式实现,那么这个成员变量的值就为true;否则为false。

std::atomic_flag 是唯一一个不提供is_lock_free()成员函数的原子类型。因为 std::atomic_flag 本身就是设计为一个简单的、无锁的原子操作。它的操作(设置和清除)总是无锁的,因此不需要 is_lock_free() 函数来查询是否可以无锁执行。

类型std::atomic_flag的对象在初始化时清零,随后即可通过成员函数test_and_set()查值并设置成立,或者由clear()清零。整个过程只有这两个操作。其他的atomic<>的原子类型都可以基于其实现。

总的来说,std::atomic_flagtest_and_set成员函数是一个原子操作,他会先检查std::atomic_flag当前的状态是否被设置过。如果没有被设置过(比如初始状态或者清除后),将std::atomic_flag当前的状态设置为true,并返回false;如果被设置过则直接返回true

对于std::atomic<T>类型的原子变量,还支持load()store()exchange()compare_exchange_weak()compare_exchange_strong()等操作。

8.3 内存次序

对于原子类型上的每一种操作,我们都可以提供额外的参数,从枚举类std::memory_order中取值,用于设定所需的内存次序语义。枚举类std::memory_order具有6个可能的值,包括std::memory_order_relaxedstd:: memory_order_acquirestd::memory_order_consumestd::memory_order_acq_relstd::memory_order_releasestd::memory_order_seq_cst

  1. 存储(store)操作,可选用的内存次序有:

    • std::memory_order_relaxedstd::memory_order_releasestd::memory_order_seq_cst
  2. 载入(load)操作,可选用的内存次序有:

    • std::memory_order_relaxedstd::memory_order_consumestd::memory_order_acquirestd::memory_order_seq_cst
  3. 读-改-写(read-modify-write)操作,可选用的内存次序有:

    • std::memory_order_relaxedstd::memory_order_consumestd::memory_order_acquirestd::memory_order_releasestd::memory_order_acq_relstd::memory_order_seq_cst

原子操作默认使用的是std::memory_order_seq_cst次序。这六种内存顺序相互组合可以实现三种顺序模型:

  • Sequencial consistent ordering:实现同步, 且保证全局顺序一致的模型,是一致性最强的模型, 也是默认的顺序模型

  • Acquire-release ordering:实现同步, 但不保证保证全局顺序一致的模型

  • Relaxed ordering :不能实现同步, 只保证原子性的模型

8.4 自旋锁的实现

自旋锁是一种在多线程环境下保护共享资源的同步机制。它的基本思想是,当一个线程尝试获取锁时,如果锁已经被其他线程持有,那么该线程就会不断地循环检查锁的状态,直到成功获取到锁为止。

下面程序是使用std:atomic_flag实现的一个自旋锁,flag初始化为0,当线程1先获得锁后,它会将flag的值设为1,并返回false,这样就不会进入循环,表示加锁成功。当线程2准备进行加锁时,发现flag的值已经被设置为1,所以就返回true,进入循环,不断的检查flag的值来判断。只有当线程1执行完任务后,解锁将flag的值重新设为0时,线程2检查到flag的值为0后,将其设为1,并返回false,才退出循环,表示它加锁成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class SpinLock {
public:
void lock() {
//test_and_set相当于是读改写,可以通过memory_order_acquire来设置
//逻辑:先检查当前的一个值flag,如果是初始状态,就设置为成立状态(1),并返回false(不循环);如果是成立状态,就不用设置1了,直接返回true(要循环)
while (flag.test_and_set(std::memory_order_acquire)); //自旋等待,直到成功获取到锁
}
void unlock() {
flag.clear(std::memory_order_release); //释放锁,将flag清0
}
private:
//定义了一个最基础的原子类型flag,并初始化为一个无效的状态(0),
std::atomic_flag flag = ATOMIC_FLAG_INIT;
};

//下面是测试函数
void TestSpinLock() {
SpinLock spinlock;
std::thread t1([&spinlock]() {
spinlock.lock();
for (int i = 0; i < 3; i++) {
std::cout << "*";
}
std::cout << std::endl;
spinlock.unlock();
});
std::thread t2([&spinlock]() {
spinlock.lock();
for (int i = 0; i < 3; i++) {
std::cout << "?";
}
std::cout << std::endl;
spinlock.unlock();
});
t1.join();
t2.join();
}

8.5 宽松内存序(memory_order_relaxed)

下面是CPU的一个内存结构图:

其中StoreBuffer就是一级Cache, Catche是二级Cache,Memory是三级Cache。每个CPU都有一个专属的StoreBuffer,其对StoreBuffer的操作对其它的CPU是不可见的。

每个标识CPU的块就是core,上图画的就是4核结构。每两个core构成一个bank,共享一个cache。四个core共享memory。

每个CPU会在任何时刻将StoreBuffer中结果写入到cache或者memory中。如果CPU1往Catche里面写数据,CPU2也往Catche里面写数据,并且写的是同一个变量,数据就乱了。所以就得通过MESI一致性协议来保证数据一致性。

MESI协议,是一种叫作写失效的协议。在写失效协议里,只有一个 CPU 核心负责写入数据,其他的核心,只是同步读取到这个写入。在这个 CPU 核心写入 cache 之后,它会去广播一个“失效”请求告诉所有其他的 CPU 核心。

MESI 协议对应的四个不同的标记,分别是:

  • M:代表已修。用来告诉其他CPU已经修改完成,其他CPU可以向cache中写入数据

  • E:代表独占。表示数据只是从Catche加载到当前CPU核的storebuffer中,其他的CPU核,并没有加载对应的数据到自己的storebuffer里。这个时候,如果要向独占的storebuffer写入数据,我们可以自由地写入数据,而不需要告知其他CPU核。

  • S:代表共享。就是在多核中同时加载了同一份数据到自己的StoreBuffer。所以在共享状态下想要修改数据要先向所有的其他CPU核心广播一个请求,要求先把其他CPU核心里面的cache,都变成无效的状态,然后再更新当前cache里面的数据。

  • I:代表已失效。如果变量a此刻在各个cpu的StoreBuffer中,那么CPU1核修改这个a的值,放入cache时通知其他CPU核写失效,因为同一时刻仅有一个CPU核可以写数据,但是其他CPU核是可以读数据的,那么其他核读到的数据可能是CPU1核修改之前的。这就涉及我们提到的改动序列了。

关于两个改动序列的术语:

  1. synchronizes-with“ : 同步, “A synchronizes-with B” 的意思就是 A和B同步,简单来说如果多线程环境下,有一个线程先修改了变量m,我们将这个操作叫做A,之后有另一个线程读取变量m,我们将这个操作叫做B,那么B一定读取A修改m之后的最新值。也可以称作 A “happens-before“ B,即A操作的结果对B操作可见。

  2. happens-before“ : 先行,”A happens-before B” 的意思是如果A操作先于B操作,那么A操作的结果对B操作可见。”happens-before“包含很多种境况,不仅仅是上面提到的”synchronizes-with“。

关于std::memory_order_relaxed具备如下几个功能:

  1. 作用于原子变量
  2. 不具有synchronizes-with关系,即如果线程1对m进行写,由0变成1,线程2读到的很有可能是0
  3. 对于同一个原子变量,在同一个线程中具有happens-before关系;在同一线程中不同的原子变量不具有happens-before关系,可以乱序执行
  4. 多线程情况下不具有happens-before关系

在下面程序中,write_x_then_y()是先写x再写y,都写为true;read_y_then_x()是先读y,再读x,最后通过TestOrderRelaxed()来调用实现,看是否会出现断言。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
std::atomic<bool> x, y;     //定义bool类型的原子变量
std::atomic<int> z; //定义int类型的原子变量

void write_x_then_y() { //存储x和y
//对于store的处理,为什么对x的处理不能先于y,因为设置的内存序是宽松的,不能保证线程按照这个顺序依次去执行,在cpu底层会对这两条代码进行编排,很有可能y会先被执行
x.store(true, std::memory_order_relaxed); //1.先把x值存为true
y.store(true, std::memory_order_relaxed); //2.再把y值存为true
}

void read_y_then_x() { //读取y和x
while (!y.load(std::memory_order_relaxed)) { //3.取出y值
std::cout << "y load false" << std::endl;
}

if (x.load(std::memory_order_relaxed)) { //4.取出x值
++z;
}
}
//下面是函数调用实现
void TestOrderRelaxed() {
x = false; //初始化为false
y = false; //初始化为false
z = 0;
std::thread t1(write_x_then_y); //启动线程1,表示先写x再写y
std::thread t2(read_y_then_x); //启动线程2,表示先读y再度x
t1.join();
t2.join();
assert(z.load() != 0); //5.()里面等于false时,就会崩掉,弹出一个断言
}

从CPU架构分析:假设线程t1运行在CPU1上,t2运行在CPU3上,那么t1对x和y的操作,t2是看不到的。比如当线程t1运行至1处将x设置为true,t1运行至2处将y设置为true。这些操作仅在CPU1的storebuffer中,还未放入cache和memory中,CPU2和CPU3自然不知道。如果CPU1先将y放入memory,那么CPU3就会读取y的值为true。那么t2就会运行至3处从while循环退出,进而运行至4处,此时CPU1还未将x的值写入memory,t2读取的x值为false,进而线程t2运行结束,然后CPU1将x写入true,t1结束运行,最后主线程运行至5处,因为z为0,所以触发断言。

从宽松内存序分析:因为memory_order_relaxed是宽松的内存序列,它只保证操作的原子性,并不能保证多个变量之间的顺序性,也不能保证同一个变量在不同线程之间的可见顺序。可以理解为不能保证可见的及时性,但如果有修改,迟早是可见的。

在下面程序中,线程1和线程2往里面去写值,线程3和线程4往a里面去读值分别存到v3和v4中。需要注意的是,虽然线程1和线程2是抢着去修改a,但它们的操作一定是满足原子性的,即两个线程去修改,只有其中一个线程修改完,另一个线程才会去修改。而读也能保证一个原子性,即线程3和线程4读出来的数据a,一定是某一时刻线程1或线程2对a修改的一个值,但读到的可能不是最新的,所以就可能是一堆乱序的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void TestOderRelaxed2() {
std::atomic<int> a{0}; //定义一个原子变量a
std::vector<int> v3, v4; //用来读取存储到的值
std::thread t1([&a]() { //线程1往a存0,2,4,6,8
for (int i = 0; i < 10; i += 2) {
a.store(i, std::memory_order_relaxed);
}
});
std::thread t2([&a]() { //线程2往a存1,3,5,7,9
for (int i = 1; i < 10; i += 2)
a.store(i, std::memory_order_relaxed);
});
std::thread t3([&v3, &a]() { //线程3是从a里面取10个值,存到v3里
for (int i = 0; i < 10; ++i)
v3.push_back(a.load(std::memory_order_relaxed));
});
std::thread t4([&v4, &a]() { //线程4是从a里面取10个值,存到v4里
for (int i = 0; i < 10; ++i)
v4.push_back(a.load(std::memory_order_relaxed));
});
t1.join();
t2.join();
t3.join();
t4.join();

for (int i : v3) {
std::cout << i << " ";
}
std::cout << std::endl;
for (int i : v4) {
std::cout << i << " ";
}
std::cout << std::endl;
}

在上面程序,如果v3中7先于6,8,9等,那么v4中也是7先于6,8,9。总的来说,memory_order_relaxed保证了多线程对同一个变量的原子操作的安全性,只是可见性会有延迟。

8.6 先行(Happens-before)

Happens-before 是一个非常重要的概念。如果操作 a “happens-before” 操作 b,则操作 a 的结果对于操作 b 可见。happens-before 的关系可以建立在用一个线程的两个操作之间,也可以建立在不同的线程的两个操作之间。

顺序先行(sequenced-before):单线程情况下前面的语句先执行,后面的语句后执行。操作a先于操作b,那么操作b可以看到操作a的结果。我们称操作a顺序先行于操作b。也就是”a sequenced-before b”。

线程间先行:

依赖关系:单线程情况下a “sequenced-before” b, 且 b 依赖 a 的数据, 则 a “carries a dependency into” b. 称作 a 将依赖关系带给 b, 也理解为b依赖于a。