1. 内存顺序和内存模型使用实现

1.1 sequencial consistent模型(最严格的)

memory_order_seq_cst代表全局一致性顺序,可以用于 storeloadread-modify-write 操作实现 sequencial consistent的顺序模型。在这个模型下, 所有线程看到的所有操作都有一个一致的顺序,即使这些操作可能针对不同的变量,运行在不同的线程。

在下面程序中,线程1修改完x和y的值,线程2立刻就能见到x和y修改的值,所以当线程2读到y为true时就会退出循环,然后读到的x也一定为true。因为需要保证与线程1修改顺序是一致的,线程1修改了x再修改y,当线程1修改完y的时候,x早就被修改为true了,所以当线程2读到了y为true时,接下来读到的x也一定为true。这样的话,z就是从0变为1,就不会发生断言。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
std::atomic<bool> x, y;       //定义两个bool类型的原子变量
std::atomic<int> z; //定义一个int类型的原子变量

void write_x_then_y() {
//先写x再写y,采用的顺序是全局一次性顺序
x.store(true, std::memory_order_seq_cst); // 1
y.store(true, std::memory_order_seq_cst); // 2
}

void read_y_then_x() {
//先读y再读x,采用的顺序也是全局一次性顺序,
while (!y.load(std::memory_order_seq_cst)) { // 3.当y得到的是true时,就退出循环
std::cout << "y load false" << std::endl;
}
if (x.load(std::memory_order_seq_cst)) { //4.如果x为true的情况,就++z
++z;
}
}

void TestOrderRelaxed() {
std::thread t1(write_x_then_y);
std::thread t2(read_y_then_x);
t1.join();
t2.join();
assert(z.load() != 0); // 5.当z还为0的时候,就会触发断言
}

实现sequencial consistent模型有一定的开销,现代 CPU 通常有多核,每个核心还有自己的缓存。为了做到全局顺序一致,每次写入操作都必须同步给其他核心(也就是一个核心写的时候,其它核心读都不能读,必须等该核心写完后同步到memory中,其它核心才能读)。为了减少性能开销,如果不需要全局顺序一致,我们应该考虑使用更加宽松的顺序模型,比如Acquire-releaseRelaxed模型。

1.2 relaxed模型(最宽松的)

memory_order_relaxed可以用于storeloadread-modify-write操作, 实现relaxed的顺序模型。在这种模型下只能保证操作的原子性和修改顺序(modification order)一致性(但改完了不一定立刻能见到),无法实现synchronizes-with的关系。

在下面程序中,是存在隐患的,即可能会发生断言,但因为现在c++尽可能的帮外面规避了这些问题,所以执行结果很难会出现断言崩溃,但还是尽量不这样编写程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TestOrderRelaxed() {
std::atomic<bool> rx, ry;

std::thread t1([&]() {
rx.store(true, std::memory_order_relaxed); // 1
ry.store(true, std::memory_order_relaxed); // 2
});
std::thread t2([&]() {
while (!ry.load(std::memory_order_relaxed)); //3.当ry被修改为true时,会执行循环里面的语句
assert(rx.load(std::memory_order_relaxed)); //4.虽然ry被修改为true,但rx因为某些原因还是可能为false
});
t1.join();
t2.join();
}

1.3 Acquire-Release模型(最常用的)

acquire-release模型中, 会使用memory_order_acquirememory_order_releasememory_order_acq_rel这三种内存顺序。它们的具体用法:

  • 对原子变量的 load 可以使用 memory_order_acquire 内存顺序,这称为 acquire 操作

  • 对原子变量的 store 可以使用 memory_order_release 内存顺序,这称为 release 操作

  • read-modify-write 操作即读(load)又写(store),它可以使用memory_order_acquire, memory_order_release 和 memory_order_acq_rel:

    • 如果使用 memory_order_acquire,则作为 acquire 操作
    • 如果使用 memory_order_release,则作为 release 操作
    • 如果使用 memory_order_acq_rel,则同时为两者

Acquire-release可以实现synchronizes-with的关系。如果一个 acquire 操作在同一个原子变量上读取到了一个 release 操作写入的值,则这个 release 操作 “synchronizes-with” 这个 acquire 操作。

在下面的程序中,对rx进行story和load使用的都是宽松的内存顺序,但对ry采用的是同步的内存顺序。如果线程2先运行,由于ry还是false,所以会一直执行循环,直到ry为true。当ry为true时,3处会退出循环,执行4处,由于线程1中ry都已经修改,所以rx也被修改,则4处的断言就不会发生。由于release在acquire之前,就构成了同步关系,2同步给了3,即2在3之前执行。

从cpu指令角度理解,如果编译器发现了一个store操作,且是release内存顺序,也发现了之前操作也是store,不管该store操作是什么内存顺序,编译器都会先把该store写入内存,再把该release的store写入内存。所以当线程2中把ry读到为true时,rx也一定被修改为了true,这样就不会发生断言的情况了。

1
2
3
4
5
6
7
8
9
10
11
12
13
void TestReleaseAcquire() {
std::atomic<bool> rx, ry;
std::thread t1([&]() {
rx.store(true, std::memory_order_relaxed); // 1.写,对rx进行releaxed操作
ry.store(true, std::memory_order_release); // 2.写,对ry进行release操作
});
std::thread t2([&]() {
while (!ry.load(std::memory_order_acquire)); //3读,进行的是acquire操作
assert(rx.load(std::memory_order_relaxed)); //4读,进行的是relaxed操作
});
t1.join();
t2.join();
}

Acquire-release的开销比sequencial consistent小。在 x86 架构下,memory_order_acquirememory_order_release的操作不会产生任何其他的指令(它们两个操作的中间是原子化的),只会影响编译器的优化。任何指令都不能重排到acquire操作的前面,且不能重排到release操作的后面。否则会违反 acquire-release 的语义。因此很多需要实现synchronizes-with关系的场景都会使用 acquire-releas

还有一种情况,多个线程对同一个变量release操作,另一个线程对这个变量acquire,那么只有一个线程的release操作和这个acquire线程构成同步关系。在下面程序中,线程1对xd和yd都做了存储操作,线程2只对yd进行存储操作,而在线程3中,断言语句是可能被触发的,因为操作2和操作4是构成同步关系的,操作3和操作4也是构成同步关系的,这种情况下,只有一个会与操作4构成同步关系。如果是操作3和操作4构成同步关系,断言就会触发;如果是操作2和操作4构成同步关系,断言就不会触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void ReleasAcquireDanger2() {
std::atomic<int> xd{0}, yd{0};
std::atomic<int> zd;

std::thread t1([&]() {
xd.store(1, std::memory_order_release); // (1)
yd.store(1, std::memory_order_release); // (2)
});
std::thread t2([&]() {
yd.store(2, std::memory_order_release); // (3)
});
std::thread t3([&]() {
while (!yd.load(std::memory_order_acquire)); //(4)
assert(xd.load(std::memory_order_acquire) == 1); // (5)
});

t1.join();
t2.join();
t3.join();
}

1.4 release sequence

针对一个原子变量M的release操作A完成后,接下来M上可能还会有一连串的其他操作。如果这一连串操作是由同一线程上的写操作或任意线程上的read-modify-write操作这两种构成的,则称这一连串的操称为以release操作A为首的release sequence。 这里的写操作和read-modify-write操作可以使用任意内存顺序。

如果一个acquire操作在同一个原子变量上读到了一个release操作写入的值,或者读到了以这个release操作为首的release sequence写入的值,那么这个release操作 “synchronizes-with” 这个acquire操作。

在下面程序中,可以知道操作2和操作3构成release sequence,而操作4也需要读到这个操作的最后一个结果(flag为2),则操作2和操作4构成同步关系。有因为操作1在操作2之前,操作5在操作4之后,所以最后不会触发断言问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void ReleaseSequence() {
std::vector<int> data;
std::atomic<int> flag{ 0 };

std::thread t1([&]() {
data.push_back(42); //(1)
flag.store(1, std::memory_order_release); //(2)对flag进行修改为1,以下是以操作2为首的release sequence
});
std::thread t2([&]() {
int expected = 1;
//对flasg进行读改写,如果flag不等于expected,就返回false,循环一直执行;相等的时候,就会把flag改为2,并返回true,退出循环
while (!flag.compare_exchange_strong(expected, 2, std::memory_order_relaxed)) // (3)
expected = 1;
});
std::thread t3([&]() {
while (flag.load(std::memory_order_acquire) < 2); // (4)当flag等于2时,循环会退出
assert(data.at(0) == 42); // (5)判断断言
});
t1.join();
t2.join();
t3.join();
}

1.5 memory_order_consume

memory_order_consume 其实是 acquire-release 模型的一部分,但是它比较特殊,它涉及到数据间相互依赖的关系,也可以理解为是同步关系。

memory_order_consume可以用于 load 操作,使用 memory_order_consume 的 load 称为 consume 操作。如果一个 consume 操作在同一个原子变量上读到了一个 release 操作写入的值,或以其为首的 release sequence 写入的值,则这个 release 操作 “dependency-ordered before” 这个 consume 操作。

在下面程序中,操作3和操作4构成了依赖关系,也就是操作3在操作4之前执行的,也可以理解为同步关系。因为操作1发生在操作3之前,操作5发生在操作6之后,所以操作5不会发生断言问题;而memory_order_consumer要考虑依赖关系,data并不依赖p,所以data可能不等于42,所以操作6可能触发断言。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void ConsumeDependency() {
std::atomic<std::string*> ptr;
int data;

std::thread t1([&]() {
std::string* p = new std::string("Hello World"); // (1)
data = 42; // (2)
ptr.store(p, std::memory_order_release); // (3)把p的值写入ptr
});
std::thread t2([&]() {
std::string* p2;
while (!(p2 = ptr.load(std::memory_order_consume))); // (4)对ptr进行读取,空就执行循环,不空就退出循环
assert(*p2 == "Hello World"); // (5)
assert(data == 42); // (6)
});

t1.join();
t2.join();
}

1.6 内存模式实现单例模式

之前用智能指针双重检测方式实现的单例模式是存在线程安全问题的,但可以通过内存模式来实现单例模式,这样可以解决改弊端。在底层,new一个对象再赋值给变量时会存在多个指令顺序。

第一种情况:

  1. 为对象allocate一块内存空间
  2. 调用construct构造对象
  3. 将构造到的对象地址返回

第二种情况:

  1. 为对象allocate一块内存空间
  2. 先将开辟的空间地址返回
  3. 调用construct构造对象

如果是第二种情况,当还未构造对象就将地址返回赋值给single,而此时有线程运行至1处判断single不为空直接返回单例实例,如果该线程调用这个单例的成员函数就会崩溃(结合代码看)。

下面就分别是通过智能指针和内存模型的方法实现单例模式:

在下面这个程序,在4处虽然allocate开辟了一块空间,并返回给single(不为空了),但还没有调用construct,如果其它线程此时在1处用了single,因为不为空,就直接返回了single,并且调用了成员函数,就会出现系统崩溃。所以,实际上,1处和4处是会出现线程安全的,即4处还没有完全结束,就会返回一个还没有构造好的对象把地址返回了,而其它线程判断不为空,就会使用这个没有构造好的对象,就会产生崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//利用智能指针解决释放问题
class SingleAuto
{
private:
SingleAuto(){}
SingleAuto(const SingleAuto&) = delete;
SingleAuto& operator=(const SingleAuto&) = delete;
public:
~SingleAuto()
{
std::cout << "single auto delete success " << std::endl;
}
static std::shared_ptr<SingleAuto> GetInst()
{
// 1处
if (single != nullptr)
{
return single;
}
// 2处
s_mutex.lock();
// 3处
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
// 4处
single = std::shared_ptr<SingleAuto>(new SingleAuto);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleAuto> single;
static std::mutex s_mutex;
};

在下面程序中,因为锁的权力是最大的,它能保证所有看到的线程是一致的,所以3处有宽松的模型没有问题的。最后,因为是在4处的代码完全执行后,_b_init才会被设置为true,其它线程才能通过1处进入函数执行(对1处有加锁)。而智能指针的方式就是因为没有这种方法对1处加锁,所以其它线程随时都可以判断single是否为空,这样4处还没有完全执行完时,其它线程就可能判断single不为空,而直接返回使用没有完全构造好的对象了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//利用智能指针解决释放问题
class SingleMemoryModel
{
private:
SingleMemoryModel(){}
SingleMemoryModel(const SingleMemoryModel&) = delete;
SingleMemoryModel& operator=(const SingleMemoryModel&) = delete;
public:
~SingleMemoryModel()
{
std::cout << "single auto delete success " << std::endl;
}
static std::shared_ptr<SingleMemoryModel> GetInst()
{
// 1处 在这里,只有读到_b_init为true,才会返回single使用
if (_b_init.load(std::memory_order_acquire)) //如果为true,说明之前初始化过,直接返回使用即可
{
return single;
}
// 2处
s_mutex.lock(); //如果多个线程经过1都为false,在这里就只有一个线程会加锁,加锁成功就执行下面
// 3处 由于锁的保护(锁要求全局看到的都是一致的),在其它线程将_b_init设为true后,另外一个线程在这里不管用什么内存序,读到的都是true
if (_b_init.load(std::memory_order_relaxed)) //进来的线程判断,如果变为true了,就解锁返回,直接使用
{
s_mutex.unlock();
return single;
}
// 4处
single = std::shared_ptr<SingleMemoryModel>(new SingleMemoryModel);
_b_init.store(true, std::memory_order_release); //这里是只有在4处执行完后,才将_b_init设为true
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleMemoryModel> single;
static std::mutex s_mutex;
static std::atomic<bool> _b_init ; //定义一个原子类型的成员变量
};

std::shared_ptr<SingleMemoryModel> SingleMemoryModel::single = nullptr;
std::mutex SingleMemoryModel::s_mutex;
std::atomic<bool> SingleMemoryModel::_b_init = false; //将改变量初始化为false,没有还没有实例化指针

2. 无锁队列实现

下面主要是通过无锁队列的方式来实现无锁并发。

2.1 无锁队列

要实现无锁并发,经常会用到一种结构无锁队列,而无锁队列和在数据结构经常使用的队列颇有不同,它采用的是环状的队列结构,至于为什么成环?主要有两个好处,一个是成环的队列大小是固定的,另外一个我们通过移动头和尾就能实现数据的插入和取出。

2.2 用锁实现环形队列

下面就是用锁实现环形队列,在push和pop时分别加锁,并通过head和tail计算队列是否为满或者空。最后通过测试代码,在添加完5个元素后,已经满了,再添加,是添加不进去的;弹出完5个元素后,再弹出,也是弹不出来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
template<typename T, size_t Cap>
//继承自 std::allocator<T>,这意味着可以管理内存分配和释放
class CircularQueLk :private std::allocator<T> { //定义一个模板类 CircularQueLk,接受类型参数T和大小参数Cap
public:
//初始化最大大小、使用分配器的allocate函数分配队列的内存、将头指针和尾指针设置为0。
CircularQueLk() :_max_size(Cap + 1),_data(std::allocator<T>::allocate(_max_size)), _head(0), _tail(0) {}
//删除拷贝构造和拷贝赋值运算符
CircularQueLk(const CircularQueLk&) = delete;
CircularQueLk& operator = (const CircularQueLk&) volatile = delete;
CircularQueLk& operator = (const CircularQueLk&) = delete;

~CircularQueLk() {
//循环销毁
std::lock_guard<std::mutex> lock(_mtx); //获取锁,以确保线程安全
//调用内部元素的析构函数
while (_head != _tail) { //依次销毁队列中的元素
std::allocator<T>::destroy(_data + _head);
_head = (_head+1)% _max_size; //销毁完当前元素,head++
}
//调用回收操作
std::allocator<T>::deallocate(_data, _max_size); //释放内存
}

//先实现一个可变参数列表版本的插入函数最为基准函数
template <typename ...Args>
bool emplace(Args && ... args) { //emplace 是一个模板函数,接受可变参数列表 Args,允许插入不同类型的参数
std::lock_guard<std::mutex> lock(_mtx); //加锁
//判断队列是否满了
if ((_tail + 1) % _max_size == _head) { //判断队列是否满了
std::cout << "circular que full ! " << std::endl;
return false; //满了就直接退出
}
//在尾部位置构造一个T类型的对象,构造参数为args...
std::allocator<T>::construct(_data + _tail, std::forward<Args>(args)...);
//更新尾部元素位置
_tail = (_tail + 1) % _max_size;
return true; //插入成功就返回true
}

//push 实现两个版本,一个接受左值引用,一个接受右值引用
//接受左值引用版本
bool push(const T& val) {
std::cout << "called push const T& version" << std::endl;
return emplace(val); //左值引用的插入
}

//接受右值引用版本,当然也可以接受左值引用,T&&为万能引用
// 但是因为我们实现了const T&
bool push(T&& val) {
std::cout << "called push T&& version" << std::endl;
return emplace(std::move(val)); //右值引用的插入
}

//出队函数
bool pop(T& val) {
std::lock_guard<std::mutex> lock(_mtx); //加锁
//判断头部和尾部指针是否重合,如果重合则队列为空
if (_head == _tail) { //判断是否为空
std::cout << "circular que empty ! " << std::endl;
return false; //为空就直接返回
}
//取出头部指针指向的数据
val = std::move(_data[_head]); //这行不添加也可以
//更新头部指针
_head = (_head + 1) % _max_size; //出队了,头部指针++
return true;
}
private:
size_t _max_size; //存储队列的最大容量
T* _data; //是一个指向队列元素的指针
std::mutex _mtx; //是一个互斥锁,用于同步访问队列
size_t _head = 0; //头部指针
size_t _tail = 0; //尾部指针
};
//-------------------下面是测试-------------------------
void TestCircularQue() {
//创建了一个CircularQueLk类型的实例cq_lk,模板参数MyClass表示队列存储的元素类型,5 表示队列的最大容量。
CircularQueLk<MyClass, 5> cq_lk;
//创建了两个 MyClass 类型的对象 mc1 和 mc2,分别初始化为值 1 和 2。
MyClass mc1(1);
MyClass mc2(2);
cq_lk.push(mc1); //通过左值引用的方式添加元素
cq_lk.push(std::move(mc2)); //通过右值引用的方式添加元素
for (int i = 3; i <= 5; i++) {
MyClass mc(i);
auto res = cq_lk.push(mc); //通过左值引用的方式添加元素
if (res == false) {
break;
}
}
cq_lk.push(mc2); //左值添加元素,但满了,添加不进行
for (int i = 0; i < 5; i++) {
MyClass mc1;
auto res = cq_lk.pop(mc1); //弹出元素5次
if (!res) {
break;
}
std::cout << "pop success, " << mc1 << std::endl;
}
auto res = cq_lk.pop(mc1); //继续弹出元素,但空了,弹不成功
}

2.3 无锁队列

如果用原子变量而不是用锁实现环形队列,那就是无锁并发的队列了。下面是之前用过的原子变量的读改写操作,compare_exchange_strong会比较原子变量atomic<T>的值和expected的值是否相等,如果相等则执行交换操作,将atomic<T>的值换为desired并且返回true,否则将expected的值修改为atomic<T>的值,并且返回false。

1
2
bool std::atomic<T>::compare_exchange_weak(T &expected, T desired);
bool std::atomic<T>::compare_exchange_strong(T &expected, T desired);

用代码来理解就是:

1
2
3
4
5
6
7
8
9
10
11
12
template <typename T>
bool atomic<T>::compare_exchange_strong(T &expected, T desired) {
std::lock_guard<std::mutex> guard(m_lock);
if (m_val == expected){
m_val = desired;
return true;
}
else{
expected = m_val;
return false;
}
}

compare_exchange_weak功能比compare_exchange_strong弱一些,他不能保证atomic<T>的值和expected的值相等时也会做交换,很可能原子变量和预期值相等也会返回false,所以使用要多次循环使用。

下面程序中,将类的成员变量mutex换成atomic类型的原子变量,利用自旋锁的思路将锁替换为原子变量循环检测的方式,进而达到锁住互斥逻辑的效果。在析构函数中,因为_atomic_using初始化时为false,当线程1到到操作1处时,_atomic_using会被赋值为use_desired的值(true),并且返回false,退出循环,完成队列类CircularQueSeq的销毁和空间回收。在这个过程中,如果有其它线程要进入,当来到操作1处时,因为_atomic_using已经为true了,与use_expected不相等,就会让use_expected赋值为_atomic_using的值(这是不希望的,所以do里面会每次都重新设置,就是防止这种情况),并返回false,继续循环,只有当线程1执行完2处后,_atomic_using为false时,其它线程才能够退出循环1处,进入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
template<typename T, size_t Cap>
class CircularQueSeq :private std::allocator<T> {
public:
CircularQueSeq() :_max_size(Cap + 1), _data(std::allocator<T>::allocate(_max_size)), _atomic_using(false),_head(0), _tail(0) {}
//删除拷贝构造和拷贝赋值
CircularQueSeq(const CircularQueSeq&) = delete;
CircularQueSeq& operator = (const CircularQueSeq&) volatile = delete;
CircularQueSeq& operator = (const CircularQueSeq&) = delete;

~CircularQueSeq() {
//循环销毁
bool use_expected = false;
bool use_desired = true;
do
{
use_expected = false;
use_desired = true;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired)); // 1处
//调用内部元素的析构函数
while (_head != _tail) {
std::allocator<T>::destroy(_data + _head);
_head = (_head+1)% _max_size;
}
//调用回收操作
std::allocator<T>::deallocate(_data, _max_size);

do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired)); //2处
}

//先实现一个可变参数列表版本的插入函数最为基准函数
template <typename ...Args>
bool emplace(Args && ... args) {

bool use_expected = false;
bool use_desired = true;
do
{
use_expected = false;
use_desired = true;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));

//判断队列是否满了
if ((_tail + 1) % _max_size == _head) {
std::cout << "circular que full ! " << std::endl;
do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
return false;
}
//在尾部位置构造一个T类型的对象,构造参数为args...
std::allocator<T>::construct(_data + _tail, std::forward<Args>(args)...);
//更新尾部元素位置
_tail = (_tail + 1) % _max_size;

do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));

return true;
}

//push 实现两个版本,一个接受左值引用,一个接受右值引用

//接受左值引用版本
bool push(const T& val) {
std::cout << "called push const T& version" << std::endl;
return emplace(val);
}

//接受右值引用版本,当然也可以接受左值引用,T&&为万能引用
// 但是因为我们实现了const T&
bool push(T&& val) {
std::cout << "called push T&& version" << std::endl;
return emplace(std::move(val));
}

//出队函数
bool pop(T& val) {

bool use_expected = false;
bool use_desired = true;
do
{
use_desired = true;
use_expected = false;
} while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
//判断头部和尾部指针是否重合,如果重合则队列为空
if (_head == _tail) {
std::cout << "circular que empty ! " << std::endl;
do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
return false;
}
//取出头部指针指向的数据
val = std::move(_data[_head]);
//更新头部指针
_head = (_head + 1) % _max_size;

do
{
use_expected = true;
use_desired = false;
}while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
return true;
}
private:
size_t _max_size;
T* _data;
std::atomic<bool> _atomic_using;
size_t _head = 0;
size_t _tail = 0;
};

上面这种方式虽然通过原子变量的方式解决线程安全问题,但当某一线程线程通过do-while后,其它线程都得像自旋锁一样一直轮询检查,就造成了不必要的开销,下面是对这种方法的一种改进。

对于pop逻辑代码:先在1处取出头部的下标索引,进行判断,如果头部的下标与尾部下标相等,说明为空,就直接返回false退出,否则就通过头部的下标取出数据,然后在3处,如果线程1先到达这里,因为_head等于h,所以就将_head进行+1,这样其它线程来到这里的时候,由于_head不等于h了,就返回false,继续循环。这时继续循环时,由于_head已经等于h+1了,所以改线程在1处获得的下标就是h+1,那么它要pop的是h+1,不是h,就不会出现多次弹出同一个数据的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bool pop(T& val) {
size_t h;
do
{
h = _head.load(); //1处 先获得头部的一个下标索引
//判断头部和尾部指针是否重合,如果重合则队列为空
if(h == _tail.load())
{
return false; //相等就返回false
}
val = _data[h]; // 2处 就从头部取出要pop的数据
} while (!_head.compare_exchange_strong(h, (h+1)% _max_size)); //3处 !_head等于h的时候就+1
return true;
}

push逻辑代码:push逻辑与pop逻辑处理在这里是一样的,但push的这种处理是存在线程安全问题的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bool push(T& val){
size_t t;
do
{
t = _tail.load(); //1
//判断队列是否满
if( (t+1)%_max_size == _head.load())
{
return false;
}
_data[t] = val; //2
} while (!_tail.compare_exchange_strong(t, (t + 1) % _max_size)); //3
return true;
}

3. 利用栅栏实现同步模型

3.1 线程可见顺序

有时候我们线程1对A的store操作采用release内存序,而线程2对B的load采用acquire内存序,并不能保证A 一定比 B先执行。因为两个线程并行执行无法确定先后顺序,我们指的先行不过是说如果B读取了A操作的结果,则称A先行于B。

在下面这段程序中,看似断言是不会触发,但在一些极端的情况下,还是可能触发断言的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <atomic>
#include <thread>
#include <cassert>
std::atomic<bool> x, y;
std::atomic<int> z;

void write_x()
{
x.store(true, std::memory_order_release); //1
}
void write_y()
{
y.store(true, std::memory_order_release); //2
}
void read_x_then_y()
{
while (!x.load(std::memory_order_acquire));
if (y.load(std::memory_order_acquire)) //3
++z;
}
void read_y_then_x()
{
while (!y.load(std::memory_order_acquire));
if (x.load(std::memory_order_acquire)) //4
++z;
}
//--------------------下面是测试--------------------------
void TestAR()
{
x = false;
y = false;
z = 0;
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join();
b.join();
c.join();
d.join();
assert(z.load() != 0); //5
std::cout << "z value is " << z.load() << std::endl;
}

比如在一个4核CPU结构的主机上,a,b,c,d分别运行在不同的CPU内核上,如下图所示。线程a执行x.store(true)先被线程c读取,而此时线程b对y的store还没有被线程c读取到新的值,所以此时c读取的x为true,y为false。同样的道理,d可以读取b修改y的最新值,但是没来的及读取x的最新值,那么读取到y为true,x为false。这样在上面程序中就会出现断言的情况。

所以即使采用release和acquire方式也不能保证全局顺序一致。如果一个线程对变量执行release内存序的store操作,另一个线程不一定会马上读取到。

3.2 栅栏

其实也可以通过栅栏机制保证指令的写入顺序,栅栏的机制和memory_order_release类似。

下面程序中,在1和3中间添加了一个release内存序的栅栏2,这样可以确保该指令2之前的指令(什么内存序都包括)的编排不会编排到该指令2之后。在4和6之间又定义了一个acquire内存序的栅栏5,那么如果操作2被操作5读取到了,它们就构成了一个同步关系,所有执行操作6时,就一定可以读到操作1执行后的数据,也就不会发生断言。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void write_x_then_y_fence()
{
x.store(true, std::memory_order_relaxed); //1 对x存储为true
std::atomic_thread_fence(std::memory_order_release); //2 添加一个栅栏,执行了release的内存序
y.store(true, std::memory_order_relaxed); //3
}

void read_y_then_x_fence()
{
while (!y.load(std::memory_order_relaxed)); //4
std::atomic_thread_fence(std::memory_order_acquire); //5
if (x.load(std::memory_order_relaxed)) //6
++z;
}

4. 基于锁实现线程安全的栈

下面程序实现了基于锁控制push和pop操作的一个栈。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
struct empty_stack : std::exception         //定义了员工异常的结构(类)
{
const char* what() const throw(); //出问题直接抛出异常
};
template<typename T>
class threadsafe_stack //模板类
{
private:
std::stack<T> data; //存放T数据类型的员工栈
mutable std::mutex m; //互斥量
public:
threadsafe_stack() {} //默认构造

threadsafe_stack(const threadsafe_stack& other) //拷贝构造
{
std::lock_guard<std::mutex> lock(other.m); //先对other对象加锁,防止其它线程还在使用other对象
data = other.data;
}

threadsafe_stack& operator=(const threadsafe_stack&) = delete; //栈不支持赋值操作

void push(T new_value)
{
std::lock_guard<std::mutex> lock(m); //先加锁
data.push(std::move(new_value)); //1.用移动构造来push一个元素,这样就免去了再构造一次
}

std::shared_ptr<T> pop() //pop返回智能指针的版本
{
std::lock_guard<std::mutex> lock(m); //加锁
if (data.empty()) throw empty_stack(); //2.判断栈是否为空,为空就抛出异常
std::shared_ptr<T> const res(std::make_shared<T>(std::move(data.top()))); //3.将栈顶元素取出来,构造了一个智能指针
data.pop(); //4.弹出栈定元素
return res; //返回栈顶元素
}

void pop(T& value) //pop返回引用的版本
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = std::move(data.top()); //5.直接移动操作即可,存到传进来的参数中
data.pop();
}

bool empty() const
{
std::lock_guard<std::mutex> lock(m); //加锁
return data.empty(); //返回栈是否为空
}
};

上面代码虽然不会出现线程安全问题,但依然存在着不足,比如说在操作2处,当栈为空的时候,也会抛出异常,这是不会合理的。下面程序是解决栈为空就抛出异常的一个问题优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
template<typename  T>
class threadsafe_stack_waitable
{
private:
std::stack<T> data;
mutable std::mutex m;
std::condition_variable cv; //条件变量,当栈为空的时候,就挂起
public:
threadsafe_stack_waitable() {} //无参构造

threadsafe_stack_waitable(const threadsafe_stack_waitable& other) //拷贝构造
{
std::lock_guard<std::mutex> lock(other.m); //先锁住other对象,防止其它线程再访问
data = other.data;
}

threadsafe_stack_waitable& operator=(const threadsafe_stack_waitable&) = delete; //无拷贝赋值

void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value)); //1.通过移动构造的方式push一个元素
cv.notify_one(); //添加完一个元素后,唤醒通知其它线程栈里有元素了
}

std::shared_ptr<T> wait_and_pop() //pop智能指针版本
{
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [this]()
{
if(data.empty()) //如果栈为空,返回fasle,就在这里挂起,释放锁,其它线程就可以往栈放数据
{
return false;
}
return true; //如果不为空,就返回true,继续往下执行(还是加锁状态)
}); // ⇽-- - 2
std::shared_ptr<T> const res(std::make_shared<T>(std::move(data.top()))); // ⇽-- - 3
data.pop(); // ⇽-- - 4
return res;
}

void wait_and_pop(T& value) //pop引用版本
{
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [this]()
{
if (data.empty())
{
return false;
}
return true;
});
value = std::move(data.top()); // ⇽-- - 5
data.pop(); // ⇽-- - 6
}

bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}

bool try_pop(T& value) //pop引用的另一个版本
{
std::lock_guard<std::mutex> lock(m);
if(data.empty())
{
return false;
}
value = std::move(data.top());
data.pop();
return true;
}

std::shared_ptr<T> try_pop() //pop智能指针的另一个版本
{
std::lock_guard<std::mutex> lock(m);
if(data.empty())
{
return std::shared_ptr<T>(); //为空,就返回一个空的智能指针
}
std::shared_ptr<T> res(std::make_shared<T>(std::move(data.top())));
data.pop();
return res;
}

};

5. 实现线程安全的查找表

5.1 简介

下面实现了一个线程安全的查找结构,类似线程安全的map结构,但由于map基于红黑树的,实现难度就较大,而且需要加锁地方的粒度也不是很精细,所以这里就考虑用散列表实现。

散列表,它是根据键(Key)而直接访问在存储器存储位置的数据结构。 也就是说,它通过计算出一个键值的函数,将所需查询的数据映射到表中一个位置来让人访问,这加快了查找速度。 这个映射函数称做散列函数,存放记录的数组称做散列表。

5.2 代码实现

在下面程序中,将查找表封装为threadsafe_lookup_table类,该类将hash值存入到一个vector容器中,每个hash值都又一个属于自己的桶,,因此vector容器里存了指定数量的桶,可以理解为一条链表。在threadsafe_lookup_table类中,又封装了一个桶的类bucket_type,它相当于是一条链表,用list容器存储,里面存储的类型是pair<Key,Value>。然后bucket_type类也实现了一系列的增删改查的功能(需要加锁)供外部调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//这是一个模板类,接受三个模板参数:键的类型(Key)、值的类型(value)、哈希函数类型,默认使用std::hash<Key>
template<typename Key, typename Value, typename Hash = std::hash<Key>>
class threadsafe_lookup_table {
private:
class bucket_type { //桶
friend class threadsafe_lookup_table; //将threadsafe_lookup_table定义为bucket_type的友缘
private:
typedef std::pair<Key, Value> bucket_value; //一个键值对别名,类型为 std::pair<Key, Value>。
typedef std::list<bucket_value> bucket_data; //一个链表别名,用于存储键值对
typedef typename bucket_data::iterator bucket_iterator; //链表的迭代器类型别名
bucket_data data; //实际存储键值对的链表
mutable std::shared_mutex mutex; //一个共享互斥锁,用于保证线程安全
//在桶中查找键,返回迭代器。
bucket_iterator find_entry_for(const Key& key) {
//从begin()开始找,找到end(),传递一个谓词(用lambda表示)
return std::find_if(data.begin(), data.end(), [&](bucket_value const& item) { //从桶里面开始找
return item.first == key; //在桶中等于key就返回
}); //找到了就返回迭代器
}

public:
//查找key值,找到返回对应的value,未找到则返回默认值
Value value_for(Key const& key, Value const& default_value)
{
std::shared_lock<std::shared_mutex> lock(mutex); //共享锁,都来查找的话,可以并发查找,不用造成互斥的效果
bucket_iterator const found_entry = find_entry_for(key); //返回一个迭代器,用auto也可以
return (found_entry == data.end()) ? default_value : found_entry->second; //如果找到了,就放回找到的结果value值,没有找到就返回默认值
}
//添加key和value,找到则更新,没找到则添加
void add_or_update_mapping(Key const& key, Value const& value)
{
std::unique_lock<std::shared_mutex> lock(mutex); //加锁
bucket_iterator const found_entry = find_entry_for(key);
if (found_entry == data.end()) //如果没有找到
{
data.push_back(bucket_value(key, value)); //向data里面添加
}
else
{
found_entry->second = value; //通过key找到了,就更新该key的value值
}
}
//删除对应的key
void remove_mapping(Key const& key)
{
std::unique_lock<std::shared_mutex> lock(mutex); //加锁
bucket_iterator const found_entry = find_entry_for(key);
if (found_entry != data.end())
{
data.erase(found_entry); //找到了就直接通过迭代器来删除该key和value
}
}
};

private:
std::vector<std::unique_ptr<bucket_type>> buckets; //用vector存储桶类型,每个vector的值都是一个桶类型
Hash hasher; //hash<Key> 哈希表 用来根据key生成哈希值
//根据key生成数字,并对桶的大小取余得到下标,根据下标返回对应的桶智能指针
bucket_type& get_bucket(Key const& key) const
{
//将key值传给hasher,它会自动计算,返回一个散列值,然后取余,让其映射在bucket上
std::size_t const bucket_index = hasher(key) % buckets.size();
return *buckets[bucket_index]; //解引用,得到该下标对应的一个值
}

public:
//桶的大小默认为19(最好取质数);哈希也默认是系统提供的哈希,这些都可以外部自己指定
threadsafe_lookup_table(unsigned num_buckets = 19, Hash const& hasher_ = Hash()) :buckets(num_buckets), hasher(hasher_) {
for (unsigned i = 0; i < num_buckets; ++i)
{
buckets[i].reset(new bucket_type); //创建好每一个桶
}
}

threadsafe_lookup_table(threadsafe_lookup_table const& other) = delete; //删除拷贝构造
threadsafe_lookup_table& operator=(threadsafe_lookup_table const& other) = delete; //删除拷贝赋值

Value value_for(Key const& key, Value const& default_value = Value())
{
return get_bucket(key).value_for(key, default_value); //提供key找到对应的桶,再在该桶下通过key找到对应的value。这里不需要加锁,因为value_for里有加锁
}

void add_or_update_mapping(Key const& key, Value const& value)
{
return get_bucket(key).add_or_update_mapping(key, value); //往对应桶里面添加
}

void remove_mapping(Key const& key) //删除对应桶里的key和value
{
return get_bucket(key).remove_mapping(key);
}
//将查找表里面的数据存储到map容器
std::map<Key, Value> get_map()
{
std::vector<std::unique_lock<std::shared_mutex>> locks;
for (unsigned i = 0; i < buckets.size(); ++i)
{
//里面查找表里面的数据时,需要对每一个桶都加锁
locks.push_back(std::unique_lock<std::shared_mutex>(buckets[i]->mutex));
}
std::map<Key, Value> res;
for (unsigned i = 0; i < buckets.size(); ++i) //遍历每一个桶
{
//需用typename告诉编译器bucket_type::bucket_iterator是一个类型,以后再实例化
//当然此处可简写成auto it = buckets[i]->data.begin();
typename bucket_type::bucket_iterator it = buckets[i]->data.begin();
for (; it != buckets[i]->data.end(); ++it)
{
res.insert(*it); //对于每一个桶,只要没有到最后一个元素,都往res里面存
}
}
return res; //最后返回res
}
};

自定义了一个MyClass类,用来表示Value类型值

1
2
3
4
5
6
7
8
9
10
11
12
class MyClass
{
public:
MyClass(int i) :_data(i) {}
//重写了<<
friend std::ostream& operator << (std::ostream& os, const MyClass& mc) {
os << mc._data;
return os; //输出参数的值
}
private:
int _data;
};

用于测试的函数:创建了一个查找表,并定义了三个线程,线程1负责向查找表添加0到99;线程2负责从查找表删除0到99;线程3负责向从查找表添加100到199。三个线程并发执行,执行完后,通过removeSet打印删除的那些数据,和打印出查找表里还剩下的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void TestThreadSafeHash() {
std::set<int> removeSet; //用来存储移出的数据
//创建模板类查找表,Key是int,Value是std::shared_ptr<MyClass>
threadsafe_lookup_table<int, std::shared_ptr<MyClass>> table;

std::thread t1([&]() { //往查找表里加元素
for (int i = 0; i < 100; i++){
auto class_ptr = std::make_shared<MyClass>(i);
table.add_or_update_mapping(i, class_ptr);
}
});

std::thread t2([&]() { //从查找表里移出元素
for (int i = 0; i < 100; ){
auto find_res = table.value_for(i, nullptr);
if (find_res)
{
table.remove_mapping(i);
removeSet.insert(i);
i++;
}
//因为是多线程一起运行,所以也有可能删除比添加快,导致要删除的元素,还没有添加进来,所以就先睡眠一会
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});

std::thread t3([&]() { //往查找表里加元素
for (int i = 100; i < 200; i++)
{
auto class_ptr = std::make_shared<MyClass>(i);
table.add_or_update_mapping(i, class_ptr);
}
});
t1.join();
t2.join();
t3.join();

for (auto& i : removeSet){ //打印移出的元素
std::cout << "remove data is " << i << std::endl;
}

auto copy_map = table.get_map(); //将查找表里面的元素转为map
for (auto& i : copy_map)
{
std::cout << "copy data is " << *(i.second) << std::endl;
}
}

int main() {
TestThreadSafeHash();
}

5.3 缺陷

  1. 首先我们的查找表可以支持并发读,并发写,并发读的时候不会阻塞其他线程。但是并发写的时候会卡住其他线程。基本的并发读写没有问题。
  2. 但是对于bucket_type中链表的操作加锁精度并不精细,因为我们采用的是std提供的list容器,所以增删改查等操作都要加同一把锁,导致锁过于粗糙。

6. 基于锁实现线程安全的链表

6.1 简介

对于做一个支持多线程并发访问的链表,首先想到的是用一个互斥量控制整个链表,达到多线程访问时串行的效果。但是这么做精度不够,需要分化互斥量的功能。所以下面让每个节点都维护一个互斥量,这样能保证多个线程操作不同节点时加不同的锁,减少耦合性。

首先需要将head独立为一个虚节点,即不存储数据,只做头部标记。这样每次从头部插入就只需要修将新的节点的next指针指向原来head的next指向的节点,再将head的next指针指向新的节点。具体图像如下:

6.2 代码实现

头节点函数:定义了一个智能指针指向的数据域和unique智能指针指向下一个节点的地址,同时定义了默认构造函数和拷贝构造函数。

1
2
3
4
5
6
7
8
9
10
template<typname T>
struct node
{
std::mutex m; //每个节点独有的互斥量
std::shared_ptr<T> data; //data为智能指针,存储的是T类型的数据域
std::unique_ptr<node> next; //unique类型的智能指针,存储的是下一个节点的地址
//构造函数
node() :next(){}
node(T const& value):data(std::make_shared<T>(value)){}
};

定义链表的函数:链表类中将拷贝构造和拷贝赋值函数删除,同时链表中初始状态包含了一个头节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template<typename T>
class threadsafe_list
{
struct node
{
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<node> next;
node():next(){}
node(T const& value):data(std::make_shared<T>(value)){}
};
node head; //头节点
public:
threadsafe_list(){} //构造函数

~threadsafe_list(){} //析构函数

threadsafe_list(threadsafe_list const& other) = delete; //删除拷贝构造
threadsafe_list& operator=(threadsafe_list const& other) = delete; //删除拷贝赋值
}

析构函数:在remove_if函数中,将Predicate p中的p可以理解为一个lambda表达式或者是一个函数的功能,通过p来判断,如果p返回了true,说明满足条件,把满足条件的节点删除掉;如果p返回的是false,就不满足条件,就不删除该节点。而析构函数调用remove_if,p谓词就是一个lambda表达式,始终返回true,这样就可以保证删除链表所有的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template<typename Predicate>
void remove_if(Predicate p)
{
node* current = &head; //将head头节点(虚节点)赋值给当前节点
std::unique_lock<std::mutex> lk(head.m); //对头节点加锁
while (node* const next = current->next.get()) //循环取出当前节点的下一个节点(裸指针),为空就不执行循环
{
std::unique_lock<std::mutex> next_lk(next->m); //对下一个指针进行加锁
if (p(*next->data)) //用谓词p操作下一个节点,判断是否为true,如果为true,则条件满足
{
//将当前节点的下一个节点移动到old_next里,则其下一个节点已经失效
std::unique_ptr<node> old_next = std::move(current->next);
current->next = std::move(next->next); //将当前节点的下一个的下一个节点赋值给当前节点
next_lk.unlock(); //将当前节点的下一个节点解锁(已经要删除)
} //当局部作用域结束的时候,old_next就会被回收掉(unique_ptr类型),这样就删除了该节点
else{ //如果为false,则条件不满足,不用删除,跳过当前节点
lk.unlock(); //将当前节点解锁
current = next; //将下一个节点赋值给当前节点(相当于当前节点向右移)
lk = std::move(next_lk); //将下一个节点的锁移动给当前节点的锁,这样当前节点就可以保持一直锁着
}
}
}

~threadsafe_list()
{
remove_if([](node const&) {return true; });
}

插入节点函数:通过头插法的方式插入新的节点

1
2
3
4
5
6
7
void push_front(T const& value)             //头插法
{
std::unique_ptr<node> new_node(new node(value)); //创建了一个新节点
std::lock_guard<std::mutex> lk(head.m); //锁住头部节点
new_node->next = std::move(head.next); //将头部的下一个节点地址赋值给新节点的下一个节点
head.next = std::move(new_node); //头部节点的下一个节点地址改为新节点的地址
}

查找对应节点的函数:该函数是结合谓词p来在链表中一个一个寻找第一个满足条件的节点,找到了就直接返回该data的一个指针,没有找到就返回空指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p)
{
node* current = &head; //从头节点开始,赋值给当前节点
std::unique_lock<std::mutex> lk(head.m); //当前节点加锁
while (node* const next = current->next.get()){ //循环取当前节点的下一个节点,为空则退出循环
std::unique_lock<std::mutex> next_lk(next->m); //对当前节点的下一个节点加锁
lk.unlock();
if (p(*next->data)){ //将next的数据域传给p,判断是否为true,为true就说明找到了,返回next的数据源data
return next->data;
}
//如果改节点不满足,继续判断下一个
current = next; //将下一个节点地址赋值给当前节点
lk = std::move(next_lk); //next的锁也移动给当前节点,则lk永远锁定的是当前节点
}
return std::shared_ptr<T>(); //如果最后都没有找到对应的节点,就返回空
}

遍历所有节点函数:该函数是结合谓词函数f来完成的,从头节点开始遍历,到最后空时,就退出循环,停止遍历。

1
2
3
4
5
6
7
8
9
10
11
12
13
template<typename Function>
void for_each(Function f)
{
node* current = &head; //从头节点开始,赋值给当前节点
std::unique_lock<std::mutex> lk(head.m); //当前节点加锁
while (node* const next = current->next.get()){ //循环取当前节点的下一个节点,为空则退出循环
std::unique_lock<std::mutex> next_lk(next->m); //把下一个节点加锁
lk.unlock(); //当前节点解锁
f(*next->data); //执行谓词函数f,
current = next; //把下一个节点更新为当前节点
lk = std::move(next_lk); //把下一个节点的锁移动给当前节点的锁
}
}

7. 线程安全的无锁栈实现

7.1 简介

之前通过锁的互斥机制实现了并发安全的栈,队列,查找表,以及链表等结构。接下来是通过无锁的原子变量的方式实现栈。

单线程情况下添加节点步骤:

  1. 创建新节点
  2. 将元素入栈,将新节点的next指针指向现在的head节点。
  3. 将head节点更新为新节点的值。

而在多线程情况下,假设线程1执行到第2步,即将元素入栈后,没来得及更新head节点的值为新节点的值。此时线程2也执行完第2步,将head更新为线程2插入的新节点,之后线程1又将head更新为线程1的新插入节点,那么此时head的位置就是错的。

单线程情况下删除节点步骤:

  1. 取出头节点元素
  2. 更新head为下一个节点。
  3. 返回取出头节点元素的数据域。

而在多线程情况下,第1,2步同样存在线程安全问题。此外我们返回节点数据域时会进行拷贝赋值,如果出现异常会造成数据丢失,这一点也要考虑。

所以为了解决这些问题,可以通过原子变量的compare_exchange(比较交换操作)来控制更新head节点,以此来达到线程安全的目的。

7.2 代码实现

头节点函数:创建了一个模板类,在该类中,定义了一个T类型的数据域、node*类型的指针,指向下一个节点的地址和一个拷贝构造的函数。

1
2
3
4
5
6
7
template<typename T>
struct node
{
T data; //数据域
node* next; //下一个节点的地址
node(T const& data_):data(data_){} //拷贝构造
};

无锁栈结构:下面程序定义了一个无锁栈的结构,结构中定义了一个采用原子变量表示的头部节点。需要注意的是,对于栈和队列,如果想要实现线程安全,都不能进行拷贝,不然管理不了。所以得删除拷贝构造和拷贝赋值,采用默认构造即可。

1
2
3
4
5
6
7
8
9
10
template<typename T>
class lock_free_stack
{
private:
lock_free_stack(const lock_free_stack&) = delete; //删除拷贝构造
lock_free_stack& operator = (const lock_free_stack&) = delete; //删除拷贝赋值
std::atomic<node*> head; //node*类型的原子变量
public:
lock_free_stack() {}
}

入栈函数:在多线程情况下,当线程1创建新节点node1并指向了下一个头节点,但没有来得及更新head,这时线程2也创建了新节点node2执行到1处,如下图所示:

然后线程2更新了head节点,即head指向了node2。如下图所示:

当时间片回到线程1,线程1执行2处代码时,发现head的值与node1的next的值不一样,就将head的值赋值给node1的next,并返回false,继续执行循环,那么node1的next就指向了head节点(node2),再执行2处代码时,head与node1->next值相等了,就将node1的值赋值给head了(head进行了更新),然后返回ture,退出循环。如下图所示;

1
2
3
4
5
6
7
template<typename T>
void push(const T& value){
auto new_node = new Node(value) //创建一个新的节点
do{
new_node->next = head.load(); //1.将头节点的值赋值给新节点的next指针
}while(!head.compare_exchange_strong(new_node->next, new_node)); //2.当返回false时就相当于执行了1处的代码
}

出栈函数:下面程序是一个单线程的版本,当线程1执行完1处时,即线程1的old_head指向了头节点,但还没有来得及执行2处代码,线程2就将它的old_head指向了头节点,并将头节点往下移动了,此时线程1再来执行2处代码时,就会出现问题。

1
2
3
4
5
6
template<typename T>
void pop(T& value){
node* old_head = head.load(); //1
head = head->next; //2
value = old_head->data;
}

所以为了解决上面问题,对代码进行了改进,当线程2执行完2处代码后,即head指向了线程2old_head的next,此时线程1再执行到2处时,因为head指向的节点和线程1old_head指向的不一样了,就将线程1old_head指向head节点(修改删除节点),当在执行2处代码时,就能取出当前head所值的节点(要弹出的节点),并将head的指向往下移,退出线程。

1
2
3
4
5
6
7
template<typename T>
void pop(T& value){
do{
node* old_head = head.load(); //1. 将头部节点加载出来,old_head指向了头部节点
}while(!head.compare_exchange_weak(old_head, old_head->next)); //2
value = old_head->data; //3
}

上面代码存在的问题:

  1. 未判断空栈的情况,当old_head为空时,在2处还有old_head->next操作,就很危险。
  2. 将数据域赋值给引用类型的value时存在拷贝赋值(3处),我们都知道拷贝赋值会存在异常的情况,当异常发生时元素已经从栈定移除了,破坏了栈的结构,这一点和锁处理时不一样,锁处理的时候是先将元素数据域取出赋值再出栈,所以不会有问题,但是无锁的方式就会出现栈被破坏的情况。解决方式也比较简单,数据域不再存储T类型数据,而是存储std::shared_ptr<T>类型的数据。智能指针在赋值的时候不会产生异常。
  3. 未释放弹出的节点的内存。

先解决未判断栈未空的问题,如下程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class lock_free_stack
{
private:
struct node
{
std::shared_ptr<T> data; //用T类型的智能指针指向data
node* next;
node(T const& data_) : //⇽-- - 1
data(std::make_shared<T>(data_))
{}
};
lock_free_stack(const lock_free_stack&) = delete;
lock_free_stack& operator = (const lock_free_stack&) = delete;
std::atomic<node*> head;
public:
lock_free_stack() {}
std::shared_ptr<T> pop() {
node* old_head = nullptr; //1
do {
old_head = head.load(); //2
if (old_head == nullptr) {
return nullptr; //如果加载出来的节点是空节点,就返回空,退出
}
} while (!head.compare_exchange_weak(old_head, old_head->next)); //3

return old_head->data; //4
}
};

对于资源回收问题,可以先实现一个简单的回收处理逻辑,通过临时智能指针与要删除的数据进行交换的方法来回收资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<typename T>
std::shared_ptr<T> pop() {
node* old_head = nullptr;
do {
old_head = head.load(); //1
if (old_head == nullptr) {
return nullptr;
}
} while (!head.compare_exchange_weak(old_head, old_head->next)); //2
std::shared_ptr<T> res; //3 定义一个临时智能指针
res.swap(old_head->data); //4 将res与old_head的data做交换,这样old_head就会变成空指针
delete old_head; //5 直接去除掉old_head
return res; //6 返回删除的值
}

上面程序虽然解决了资源回收的问题,但还是存在很大的问题,当线程1走到1处时,时间片被分配到线程2,然后线程2直接执行到第5处,把old_head删除了,但此时线程1的old_head与线程2删除的old_head是一样的,这样就会导致线程1继续执行,到执行第2处时,取old_head的next时,就会导致系统崩溃。所以就不能把old_head直接删除,其实能不能删除old_head,取决于其它线程是否还用old_head,就像如果是单线程的,就完全可以删除。所以考虑多线程的情况,就需要引入延迟删除节点的机制来解决该问题。

设计思路:

  1. 如果head已经被更新,且旧head不会被其他线程引用,那旧head就可以被删除。否则放入待删列表。
  2. 如果仅有一个线程执行pop操作,那么待删列表可以被删除,如果有多个线程执行pop操作,那么待删列表不可被删除。

我们需要用一个原子变量threads_in_pop记录有几个线程执行pop操作。在pop结束后再减少threads_in_pop。 我们需要一个原子变量to_be_deleted记录待删列表的首节点。程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
std::shared_ptr<T> pop() {
++threads_in_pop; //1 计数器首先自增,然后才执行其他操作
node* old_head = nullptr;
do {
//2 加载head节点给旧head存储
old_head = head.load();
if (old_head == nullptr) {
--threads_in_pop;
return nullptr;
}
} while (!head.compare_exchange_weak(old_head, old_head->next)); // 3
//3处 比较更新head为旧head的下一个节点

std::shared_ptr<T> res;
if (old_head) //old_head不为空,就执行下面
{
// 4 只要有可能,就回收已删除的节点数据
res.swap(old_head->data); //将要删除的节点于res进行交换,
}
// 5 从节点提取数据,而非复制指针
try_reclaim(old_head); //old_head已经是空节点了,通过该函数来判断。old_head是否要删除
return res;
}

void try_reclaim(node* old_head)
{
if(threads_in_pop == 1) //1 原子变量判断仅有一个线程进入
{
//2 当前线程把待删列表取出
node* nodes_to_delete = to_be_deleted.exchange(nullptr);
//3 更新原子变量获取准确状态,判断pop是否仅仅正被当前线程唯一调用
if(!--threads_in_pop)
{
delete_nodes(nodes_to_delete); //4 如果唯一调用则将待删列表删除
}else if(nodes_to_delete)
{
//5 如果pop还有其他线程调用且待删列表不为空,则将待删列表首节点更新给to_be_deleted
chain_pending_nodes(nodes_to_delete); //将本地的待删列表还原成全局待删列表
}
delete old_head;
}
else {
//多个线程pop会访问该节点(不会用该节点的数据,因为数据值已经通过交换删除了,但之前存该数据的节点没有删)
chain_pending_node(old_head); //将其放入待删列表,此时不能删除old_head
--threads_in_pop; //线程用完了,计数器减1
}
}

static void delete_nodes(node* nodes)
{
while (nodes) //遍历链表删除即可
{
node* next = nodes->next;
delete nodes;
nodes = next;
}
}

void chain_pending_node(node* n){ //将单个节点放入待删列表
chain_pending_nodes(n, n);
}

void chain_pending_nodes(node* first, node* last){ //接受两个参数,分别为链表的头和尾
//1 先将last的next节点更新为待删列表的首节点
last->next = to_be_deleted;
//2 借循环保证 last->next指向正确
// 将待删列表的首节点更新为first节点
while (!to_be_deleted.compare_exchange_weak(
last->next, first));
}

void chain_pending_nodes(node* nodes){ //将nodes_to_delete为首的链表还原到待删列表中,接受一个参数为待还原的链表的首节点
node* last = nodes;
//1 沿着next指针前进到链表末端
while (node* const next = last->next)
{
last = next;
}
//2 将链表放入待删链表中
chain_pending_nodes(nodes, last);
}