1. 引用计数实现无锁并发栈

1.1 简介

在C++并发编程中提出了两种计数,一种是外部计数,一种是内部计数,二者加起来就是有效的引用计数,下面提出一种新的方法,利用引用计数实现无锁并发的栈。

1.2 代码实现

栈结构函数:栈函数里面实现了栈的默认构造函数、析构函数和创建了一个原子类型的头部节点,同时也定义好了栈的内部节点类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
template<typename T>
class single_ref_stack {
public:
single_ref_stack():head(nullptr) {} //默认构造函数
~single_ref_stack() { //析构函数
while (pop()); //循环出栈
}

private:
struct ref_node { //引用节点(栈的内部节点)
std::shared_ptr<T> _data; //1 数据域智能指针
std::atomic<int> _ref_count; //2 引用计数
ref_node* _next; //3 下一个节点
//构造函数,接收外部传来的数据,根据数据构造一个节点
ref_node(T const& data_):_data(std::make_shared<T>(data_)), _ref_count(1), _next(nullptr) {}
};
//头部节点
std::atomic<ref_node*> head; //会被多个线程操作,设为原子类型的头部节点
};

push操作函数:这段程序就是解决两个线程同时push的情况,但同一时刻也只有一个线程会成功,那么另一个线程在while进行比较的时候,就会发现它的new_node的next与head不相等,那么它就会继续循环,而且将新的head值赋值给new_node的next,这样在没有其它线程抢先指向while的话,该线程就会更新head的值,将head指向了它的new_node,并退出循环,这样该线程就完成了节点的push。

1
2
3
4
5
void push(T const& data) {              //参数是要push的T类型的数据
auto new_node = new ref_node(data); //通过data创建出一个新的ref_node类型节点,并得到其指针
new_node->next = head.load(); //新节点的next要指向头部节点
while (!head.compare_exchange_weak(new_node->next, new_node)); //通过重试的方式更新头部节点
}

pop操作函数:在下面程序中,是通过引用计数为0或1来判断该节点是否可以直接delete。比如说当线程1进入2处的if,线程2这时候就只能进入7处else,如果线程1在4处,将引用计数-2后,发现还等于1,那么线程1是回收不了该节点的,此时如果线程2发现该节点的引用计数为1,则此时只有它在访问该节点,所以线程2就可以进行回收。但当线程1执行4处时,引用计数为0,说明只有它在访问,这样线程1就可以对该节点进行回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
std::shared_ptr<T> pop() {
ref_node* old_head = head.load(); //先取出头部元素
for (;;) {
if (!old_head) {
return std::shared_ptr<T>(); //如果为空就放回空指针
}
//1 只要执行pop就对引用计数+1
++(old_head->_ref_count);
//2 比较head和old_head想等则交换,更新head,返回true;否则说明head已经被其他线程更新,返回false
if (head.compare_exchange_strong(old_head, old_head->_next)) {
//此时head已经为old_head的next了
auto cur_count = old_head->_ref_count.load(); //取出原来头部节点的引用计数
auto new_count;
//3 循环重试保证引用计数安全更新
do {
//4 减去本线程增加的1次和初始的1次,如果为0,说明没有其它线程访问该节点
new_count = cur_count - 2;
} while (!old_head->_ref_count.compare_exchange_weak(cur_count, new_count));
std::shared_ptr<T> res; //返回头部数据
res.swap(old_head->_data); //5 交换数据,取出要删除节点old_head的值,此时old_head的值为0了
//6
if (old_head->_ref_count == 0) { //继续判断,如果此时old_head的引用计数还为0,就delete掉
delete old_head;
}
return res;
}
else {
//7
if (old_head->_ref_count.fetch_sub(1) == 1) {
delete old_head; //如果判断引用计数等于1的,说明也只有它应该在使用该节点,也可以直接delete
}
}
}
}

以上pop操作函数的大体流程如下:

从流程上来看,该程序存在着很大的问题,在刚开没有pop前,所有节点的引用计数都为1;如果两个线程进入pop函数后,head的引用计数进行++,变为了3,此时线程1的old_head和线程2的old_head都指向head;当线程1先执行操作2时,它会更新head的指向,并且进入if程序里面,而线程2再来执行操作2时,因为此时的head被更改了,所以它会将它的old_head执行新的head,并执行else里面的程序,这时线程1将引用计数减2后为1,它就负责只将要删除的节点值返回,而没有delete掉要删除的节点(原头节点),以为线程2会delete掉该节点,而线程2判断当前old_head指向的节点引用计数为1,它以为指向的是原节点,所以就会直接delete,但指向的是要删除节点的下一个节点,这样系统就会崩溃。

所以为了解决这些问题,可以将引用计数提出来,不放在指针里,和指针解耦。下面程序是将原来的节点结构拆成两个,并且新增_dec_count表示减少的引用计数,放在node结构里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
struct ref_node;
struct node {
std::shared_ptr<T> _data; //1 数据域智能指针
ref_node _next; //2 下一个节点
node(T const& data_) : _data(std::make_shared<T>(data_)) {}
std::atomic<int> _dec_count; //减少的数量
};

struct ref_node {
int _ref_count; // 引用计数
node* _node_ptr;
ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}
ref_node():_node_ptr(nullptr),_ref_count(0){}
};
//头部节点
std::atomic<ref_node> head; //将栈中的head结构变为ref_node类型的原子变量。

std::shared_ptr<T> pop() {
ref_node old_head = head.load();
for (;;) {
ref_node new_head; //1 只要执行pop就对引用计数+1并更新到head中
//2
do {
new_head = old_head;
new_head._ref_count += 1;
} while (!head.compare_exchange_weak(old_head, new_head));
old_head = new_head;
auto* node_ptr = old_head._node_ptr; //3
if (node_ptr == nullptr) {
return std::shared_ptr<T>(); //为空就放回空指针
}
//4 比较head和old_head相等则交换否则说明head已经被其他线程更新
if (head.compare_exchange_strong(old_head, node_ptr->_next)) {
std::shared_ptr<T> res; //要返回的值
res.swap(node_ptr->_data); //交换智能指针
int increase_count = old_head._ref_count - 2; //5 增加的数量
if (node_ptr->_dec_count.fetch_add(increase_count) == -increase_count) { //6
delete node_ptr;
}
return res;
}else {
if (node_ptr->_dec_count.fetch_sub(1) == 1) { //7
delete node_ptr;
}
}
}
}

2. 内存模型回顾

2.1 简介

之前实现的那些无锁并发的栈结构,它们对于原子变量的读,写以及读改写操作默认采用的是memory_order_seq_cstmemory_order_seq_cst为全局顺序模型,即所有线程看到的执行顺序是一致的。这种模型对性能消耗较大,所以可以在无锁栈的基础上通过更为宽松的模型提升性能。

2.2 release-acquire同步

在之前了解的6中内存顺序,其中可以通过releaseacquire的方式实现同步的效果。也就是说,线程A执行store操作,采用memory_order_release顺序模型,线程B执行load操作采用memory_order_acquire顺序模型。如果线程B的load操作读取到线程A的store操作的数值,就可以称线程A的store操作 synchronizes-with(同步) 线程B的load操作。

2.3 happens-before先行

如果 a->store 同步于 b->load, 则 a->store 先行于 b->load。只要同步就能推出先行,所谓先行就是逻辑执行的顺序,一定是a->store 先于 b->load。先行还包括一种情况,sequenced-before(顺序执行), 所谓顺序执行就是单线程中执行的顺序为从上到下的顺序。

先行具有传递性 操作1 happens-before 操作2,操作2 happens-before 操作3,则操作1 happens-before 操作3

在下面程序中, 操作2处使用了release内存序,保证操作1会排在操作2之前。 操作3采用了acquire内存序,保证操作4排在操作3之后,且如果操作3能读到操作2的写入值,则保证操作1已经先于操作3执行完。因为while重试的机制,保证操作2同步于操作3,即操作2先于操作3执行,又因为操作1先于操作2执行,而操作3先于操作4执行,所以得出操作1先于操作4执行,那么操作4处断言正确就不会崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void TestReleaseSeq() {
int data = 0;
std::atomic<int> flag = 0;
std::thread t1([&]() {
data = 42; //1
flag.store(1, std::memory_order_release); //2
});
std::thread t2([&]() {
while (!flag.load(std::memory_order_acquire)); //3
assert(data == 42); //4
});

t1.join();
t2.join();
}

2.4 释放序列的扩展

如果存储操作的标记是memory_order_release、memory_order_acq_rel或memory_order_seq_cst,而载入操作则以memory_order_consume、memory_order_acquire或memory_order_seq_cst标记,这些操作前后相扣成链,每次载入的值都源自前面的存储操作,那么该操作链由一个释放序列组成。若最后的载入操作服从内存次序memory_order_acquire或memory_order_seq_cst,则最初的存储操作与它构成同步关系。但如果该载入操作服从的内存次序是memory_order_consume,那么两者构成前序依赖关系。操作链中,每个“读-改-写”操作都可选用任意内存次序,甚至也能选用memory_order_relaxed次序。

release-sequnece:针对一个原子变量M的release操作A完成后, 接下来M上可能还会有一连串的其他操作。如果这一连串操作是由

  1. 同一线程上的写操作
  2. 或者任意线程上的 read-modify-write(可以是任意内存顺序) 操作

这两种构成的,则称这一连串的操作为以release操作A为首的release sequence。这里的写操作和read-modify-write操作可以使用任意内存顺序。

同步:一个acquire操作在同一个原子变量上读到了一个release操作写入的值,或者读到了以这个release操作为首的release sequence写入的值,那么这个release操作 “synchronizes-with” 这个 acquire 操作。所以release-sequence不一定构成同步,只有acquire到release的值才算作同步。

下面程序中,在入队函数的操作1处使用的是memory_order_release内存序来记录入队的数量,这样一方面保证了它之前的代码都执行完了,另一方在出队函数的操作3处有memory_order_acquire的载入操作,这样它们就构成了同步关系。fetch_sub是一个原子操作函数,用于从原子变量 count 的当前值中减去指定的值,即1,并返回操作前的原始值。所以就可以通过该函数来完成队列数据的读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
std::vector<int> queue_data;                //存储数据的队列
std::atomic<int> count; //记录入队的数量
std::atomic<bool> store_finish = false; //表示是否存储完成
//入队函数
void populate_queue()
{
unsigned const number_of_items = 20;
queue_data.clear(); //队列清空
for (unsigned i = 0; i < number_of_items; ++i)
{
queue_data.push_back(i); //从0到20,往队列放入数据
}
//1 最初的存储操作
count.store(number_of_items, std::memory_order_release); //记录入队数量,内存序用的是memory_order_release
store_finish.store(true, std::memory_order_release); //将store_finish置未true,不太关注时效性和同步性,所以就用宽松的内存序
}
//出队函数
void consume_queue_items()
{
while (true)
{
//2 循环等待存储完成,使用的是宽松的内存序
while (!store_finish.load(std::memory_order_acquire));
int item_index;
//3 读—改—写”操作
if ((item_index = count.fetch_sub(1, std::memory_order_acquire)) <= 0)
{
return; //当count为0的时候,会执行到这里
}
//4 从内部容器queue_data 读取数据项是安全行为
std::cout << "queue_data is " << queue_data[item_index-1] << std::endl;
}
}
//测试函数
void TestReleaseSeq2() {
std::thread a(populate_queue);
std::thread b(consume_queue_items);
std::thread c(consume_queue_items);
a.join();
b.join();
c.join();
}

执行结果:

从打印结果可以看到消费者线程b和c并没有打印重复的数据,说明他们互斥访问count,每个线程取到的count不一样进而访问queue_data中的不同数据。

如果单从线程角度考虑,b和c并不能构成同步,但是线程b和c必然有一个线程会先执行执行fetch_sub(原子变量的操作任何顺序模型都能保证操作的原子性)。假设b先执行,和a构成release-sequence关系,b读取到a执行的count.store的结果, b处于以a线程的release为首的释放序列中,则b的store操作会和c的读-改-写(fetch操作,只限这一段代码)构成同步。

如下图所示:实线表示先行关系,虚线标识释放序列

结论如下:

  1. a线程和b线程构成release-sequence的释放序列。
  2. 即使b线程和c线程不构成同步,但是b线程的读改写操作处于release-sequence中,且c线程采用acquire方式读改写,则b的读改写和c线程的读改写构成同步,以a线程的release为首的sequence序列和c线程的读改写构成同步。
  3. 这里要强调一点,如果a release-sequence b,a和b不一定构成同步,但是b sychronizes with c,则a synchronizes with c。也就是说处于release序列中的任意读改写操作和其他的线程构成同步,那么我们就能得出release-sequence为首的操作和其他线程同步。

3. 优化无锁栈

3.1 简介

对于想要更好的优化无锁栈,可以结合释放序列这一技术来完成。当有数据入栈时,那么pop时要读取最新入栈的数据。所以我们要让push操作同步给pop操作,想到的办法很简单,push对head的修改采用release内存序列,pop对head的读改写采用acquire内存序列。多个线程并发pop,执行读改写操作,这些线程本来是无法同步的,但是最先pop的线程会和push线程构成同步关系,且形成release-sequence。那之后的线程pop就会和第一个pop的线程的写操作形成同步。

如果没有元素入栈,这时多个线程pop也不会产生问题,可以根据head内部的ptr指向为空判断空栈直接返回空指针。

总的来说,就是以下两方面含义:

  1. 因为要保证pop操作时节点的数据是有效的。push和pop要构成同步关系,即push采用release内存序修改head;pop采用acquire内存序修改head
  2. 第一个pop的线程的写操作和之后的pop线程读操作要构成同步关系

3.2 代码实现

push操作函数:该函数就是创建一个新节点,将新节点的next指向head,最后操作1处执行比较交换操作,当head等于新节点的next时,使用的是release内存序,并返回true;当head不等于新节点的next时(head被其它线程更新),使用的是relaxed内存序(因为要重试,所以什么内存序都可以),并返回false。

1
2
3
4
5
6
7
8
void push(T const& data) {
counted_node_ptr new_node; //定义一个新的节点指针
new_node.ptr = new count_node(data); //通过yao创建的data数据,来构造一个节点
new_node.external_count = 1; //将该节点的引用计数置1
new_node.ptr->next = head.load(); //另新节点的next指向head
//1 比较交换操作
while (!head.compare_exchange_weak(new_node.ptr->next, new_node, memory_order::memory_order_release, memory_order::memory_order_relaxed));
}

pop操作函数:在increase_head_count函数中,操作7的比较交换成功时,使用的是acquire的内存序,这样也是为了与push操作函数中操作1处的比较交换的release内存序构成同步关系。如果考虑两个线程并发执行,有两种情况发生:

  1. 线程1执行if,线程2执行else if,在操作3处count_increase就为1,而内部引用计数internal_count初始化是为0的,如果线程2先执行完操作5,则内部引用计数就为-1(线程2因为不满足条件,不能进入释放ptr,留给线程1释放ptf),这时操作2就不会再用ptr了,线程1的操作4就可以进入if释放ptr了。
  2. 线程1执行if,线程2执行else if,在操作3处count_increase就为1,而内部引用计数internal_count初始化是为0的,如果线程1先执行操作4,那么它会将内部引用计数变为1,并且返回0,不满足条件就不能释放ptr,就退出了。线程2这时执行操作5就能满足条件,从而释放ptr。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
std::shared_ptr<T> pop() {
counted_node_ptr old_head = head.load(); //取出head
for (;;) {
increase_head_count(old_head); //增加引用计数
count_node* const ptr = old_head.ptr; //获取old_head的指向
//1 判断为空责直接返回
if (!ptr) {
return std::shared_ptr<T>(); //如果为空,返回空指针即可
}
//开始头部更新,如果线程1的old_head与head相等,说明还没有其它线程抢占到head,线程1就负责回收数据和将外部引用计数转为内部引用计数
if (head.compare_exchange_strong(old_head, ptr->next)) {
std::shared_ptr<T> res; //返回头部数据
res.swap(ptr->data); //交换数据
//3 因为外部引用计数初始化为1,该进行进来时又+1,所以减2,可以统计到目前为止增加了多少外部引用
int const count_increase = old_head.external_count - 2;
//4 将内部引用计数添加
if (ptr->internal_count.fetch_add(count_increase) == -count_increase)
{
delete ptr;
}
return res;
}
else if (ptr->internal_count.fetch_sub(1) == 1) { //5
//如果old_head与head不相等,有两种情况:其它线程把head更新了;还有就是该线程先将head引用计数++,后面又被其它线程将head引用计数++了,因为该线程的old_head是局部变量的一个缓存,所以head引用计数更新了,old_head还是旧数据
//该线程负责减少内部引用计数(该进程进pop的时候,加的是外部引用计数),最后是判断,外部的引用计数和内部引用计数相加为0,就做回收操作
delete ptr; //当前线程减少内部引用计数,返回之前值为1说明指针仅被当前线程引用
}
}
}

//增加头部节点引用数量
void increase_head_count(counted_node_ptr& old_counter) {
counted_node_ptr new_counter;
do {
new_counter = old_counter;
++new_counter.external_count;
}//7 循环判断保证head和old_counter相等时做更新,多线程情况保证引用计数原子递增。
while (!head.compare_exchange_strong(old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed));
//8 走到此处说明head的external_count已经被更新了
old_counter.external_count = new_counter.external_count; //因为old_counter是引用传进来的,所以external_count更新了想要传给它
}

总结:

  • 对于一个原子变量M,其释放序列中的读改写操作无论采用何种模型都能读取M的最新值(原子变量来保证的)。
  • 为了保证原子变量上下程序的操作能和其它线程同步,可以利用内存顺序模型用来保证数据在多个线程的可见顺序。

3.3 改进

虽然以上程序优化了无锁栈,但还需要保证ptr的data在被删除之前swap到res里。

改进后的pop函数:在下面程序中,线程1如果是进入了操作4处执行delete,那么是需要保证swap操作先于fetch_add之后的delete操作,所以fetch_add可以采用release模型;而对于线程2执行操作5,内部delete操作之前,也需要保证其它线程执行的swap操作完成,所以操作5处的fetch_sub要采用acquire内存序,这样它就和操作4构成同步关系,即操作4先于操作5,而操作4完成之前可以保证它上面的程序执行完,所以swap就可以先于操作5了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
std::shared_ptr<T> pop() {
counted_node_ptr old_head = head.load();
for (;;) {
increase_head_count(old_head);
count_node* const ptr = old_head.ptr;
//1 判断为空责直接返回
if (!ptr) {
return std::shared_ptr<T>();
}
//2 本线程如果抢先完成head的更新
if (head.compare_exchange_strong(old_head, ptr->next, std::memory_order_relaxed)) {
//返回头部数据
std::shared_ptr<T> res;
//交换数据
res.swap(ptr->data);
//3 减少外部引用计数,先统计到目前为止增加了多少外部引用
int const count_increase = old_head.external_count - 2;
//4 将内部引用计数添加
if (ptr->internal_count.fetch_add(count_increase, std::memory_order_release) == -count_increase) {
delete ptr;
}
return res;
} else if (ptr->internal_count.fetch_add(-1, std::memory_order_acquire) == 1) { //5
//ptr->internal_count.load(std::memory_order_acquire);
delete ptr; //6
}
}
}

当然,操作5处使用内存序relaxed也可以的,只要保证swap操作相较于delete先执行完即可,所以在操作6的delete上面添加内部引用计数的acquire内存序加载,即可和前面的释放序列构成同步关系。

4. 双引用实现无锁队列

4.1 简介

队列和栈容器它们的结构是不同的,对于队列结构,push()和pop()分别访问其不同部分,而在栈容器上,这两项操作都访问头节点,所以两种数据结构所需的同步操作相异。如果某线程在队列一端做出改动,而另一线程同时访问队列另一端,程序就要保证前者的改动过程能正确地为后者所见。

4.2 单线程队列

下面程序是一个单线程情况下实现的队列操作,在单线程情况下是不会出现问题的,但在多线程情况下,push和pop都会出现问题。

对于push操作,当两个线程都执行push,如果线程1在操作3先创建了新节点p1,然后数据交换,并让尾指针指向了p1,正准备执行操作7,更新尾指针位置,这时时间片被线程2抢到了,线程2就会覆盖线程1执行的操作,令尾指针指next向p2,并执行了操作7,移动尾指针,让尾指针指向了p2,这时线程1在执行操作7,就令尾指针指向p1。这样就导致数据混乱了,即队列中最后连接的是p2,但尾指针指向p1去了。

对于pop操作,当两个线程都执行pop时,当线程1通过old_head取出头节点时,正准备执行操作1,这时轮到线程2执行了,线程2也通过old_head取出头节点后,并且先执行了操作1,将head头指针更新为了原头节点的next,这时线程1有执行操作1,由于它的old_head是原头节点,所以它执行完操作7,还是将head头指针更新为原头节点的next。这样就导致,两个进行调用pop,结果head头指针只移动一个位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
template<typename T>
class SinglePopPush
{
private:
struct node //节点的结构类型
{
std::shared_ptr<T> data; //节点的数据域
node* next;
node():next(nullptr){}
};
std::atomic<node*> head; //头指针
std::atomic<node*> tail; //尾指针
//弹出头部节点
node* pop_head()
{
node* const old_head = head.load(); //接收头指针
if (old_head == tail.load()) //如果头指针和尾指针相等,则是空的,直接返回空指针
{
return nullptr;
}
//1 没有空,就直接更新头指针,指向删除节点的next
head.store(old_head->next);
return old_head; //返回要删除的节点指针
}
public:
SinglePopPush():head(new node), tail(head.load()){} //默认构造函数
SinglePopPush(const SinglePopPush& other) = delete;
SinglePopPush& operator=(const SinglePopPush& other) = delete;
~SinglePopPush(){
while (node* const old_head = head.load())
{
//循环取出头指针,为空就返回,不为空就delete掉
head.store(old_head->next);
delete old_head;
}
}
std::shared_ptr<T> pop() //出队函数(消费者)
{
node* old_head = pop_head(); //先取出要删除的头节点
if (!old_head)
{
return std::shared_ptr<T>(); //为空,就返回头节点
}
//不为空,就取出要删除节点的数据值
std::shared_ptr<T> const res(old_head->data);
delete old_head; //delete要删除的头节点
return res; //返回数据值
}
void push(T new_value) //入队函数(生产者)
{
std::shared_ptr<T> new_data(std::make_shared<T>(new_value)); //先构造好数据
node* p = new node; //3 创建一个新节点
node* const old_tail = tail.load(); //4 获取尾节点指针
old_tail->data.swap(new_data); //5 数据交换(old_tail->data原来是没有数据的空指针)
old_tail->next = p; //6 令尾节点的next指向p
tail.store(p); //7 更新尾指针,令尾指针存储新节点p
}
};

4.3 多线程push

为了解决多线程push的竞争问题,可以采用以下程序来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value)); //先创建处要push的数据
counted_node_ptr new_next; //定义一个可计数的节点指针,有两个参数
new_next.ptr=new node; //ptr指向新建的一个节点
new_next.external_count=1; //外部引用计数为1
for(;;)
{
node* const old_tail=tail.load(); //1 将尾部节点加载出来(它的data和next开始都是空的)
T* old_data=nullptr; //定义并初始化T类型的old_data为空
//2 尾节点的data如果还是空的,就更新尾节点的data为new_data的数据data,并返回true
if(old_tail->data.compare_exchange_strong(old_data, new_data.get()))
{
old_tail->next=new_next; //将尾节点的next指向一个空的可计数的节点
tail.store(new_next.ptr); //3 更新尾指针为最后连接的一个节点
new_data.release(); //解绑new_data的裸指针,因为该指针已经交给old_tail的data管理
break;
}
}
}

该方法是将data指针原子化,通过比较-交换操作来设置它的值。如果比较-交换操作成功,所操作的节点即为真正的尾节点,我们便可安全地设定next指针,使之指向新节点。若比较-交换操作失败,就表明有另一线程同时存入了数据,我们应该进行循环,重新读取tail指针并从头开始操作。