1. 引用计数实现无锁并发栈 1.1 简介 在C++并发编程中提出了两种计数,一种是外部计数,一种是内部计数,二者加起来就是有效的引用计数,下面提出一种新的方法,利用引用计数实现无锁并发的栈。
1.2 代码实现 栈结构函数:栈函数里面实现了栈的默认构造函数、析构函数和创建了一个原子类型的头部节点,同时也定义好了栈的内部节点类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 template <typename T>class single_ref_stack {public : single_ref_stack ():head (nullptr ) {} ~single_ref_stack () { while (pop ()); } private : struct ref_node { std::shared_ptr<T> _data; std::atomic<int > _ref_count; ref_node* _next; ref_node (T const & data_):_data(std::make_shared <T>(data_)), _ref_count(1 ), _next(nullptr ) {} }; std::atomic<ref_node*> head; };
push操作函数:这段程序就是解决两个线程同时push的情况,但同一时刻也只有一个线程会成功,那么另一个线程在while进行比较的时候,就会发现它的new_node的next与head不相等,那么它就会继续循环,而且将新的head值赋值给new_node的next,这样在没有其它线程抢先指向while的话,该线程就会更新head的值,将head指向了它的new_node,并退出循环,这样该线程就完成了节点的push。
1 2 3 4 5 void push (T const & data) { auto new_node = new ref_node (data); new_node->next = head.load (); while (!head.compare_exchange_weak (new_node->next, new_node)); }
pop操作函数:在下面程序中,是通过引用计数为0或1来判断该节点是否可以直接delete。比如说当线程1进入2处的if,线程2这时候就只能进入7处else,如果线程1在4处,将引用计数-2后,发现还等于1,那么线程1是回收不了该节点的,此时如果线程2发现该节点的引用计数为1,则此时只有它在访问该节点,所以线程2就可以进行回收。但当线程1执行4处时,引用计数为0,说明只有它在访问,这样线程1就可以对该节点进行回收。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 std::shared_ptr<T> pop () { ref_node* old_head = head.load (); for (;;) { if (!old_head) { return std::shared_ptr <T>(); } ++(old_head->_ref_count); if (head.compare_exchange_strong (old_head, old_head->_next)) { auto cur_count = old_head->_ref_count.load (); auto new_count; do { new_count = cur_count - 2 ; } while (!old_head->_ref_count.compare_exchange_weak (cur_count, new_count)); std::shared_ptr<T> res; res.swap (old_head->_data); if (old_head->_ref_count == 0 ) { delete old_head; } return res; } else { if (old_head->_ref_count.fetch_sub (1 ) == 1 ) { delete old_head; } } } }
以上pop操作函数的大体流程如下:
从流程上来看,该程序存在着很大的问题,在刚开没有pop前,所有节点的引用计数都为1;如果两个线程进入pop函数后,head的引用计数进行++,变为了3,此时线程1的old_head和线程2的old_head都指向head;当线程1先执行操作2时,它会更新head的指向,并且进入if程序里面,而线程2再来执行操作2时,因为此时的head被更改了,所以它会将它的old_head执行新的head,并执行else里面的程序,这时线程1将引用计数减2后为1,它就负责只将要删除的节点值返回,而没有delete掉要删除的节点(原头节点),以为线程2会delete掉该节点,而线程2判断当前old_head指向的节点引用计数为1,它以为指向的是原节点,所以就会直接delete,但指向的是要删除节点的下一个节点,这样系统就会崩溃。
所以为了解决这些问题,可以将引用计数提出来,不放在指针里,和指针解耦。下面程序是将原来的节点结构拆成两个,并且新增_dec_count
表示减少的引用计数,放在node结构里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 struct ref_node ;struct node { std::shared_ptr<T> _data; ref_node _next; node (T const & data_) : _data(std::make_shared <T>(data_)) {} std::atomic<int > _dec_count; }; struct ref_node { int _ref_count; node* _node_ptr; ref_node ( T const & data_):_node_ptr(new node (data_)), _ref_count(1 ){} ref_node ():_node_ptr(nullptr ),_ref_count(0 ){} }; std::atomic<ref_node> head; std::shared_ptr<T> pop () { ref_node old_head = head.load (); for (;;) { ref_node new_head; do { new_head = old_head; new_head._ref_count += 1 ; } while (!head.compare_exchange_weak (old_head, new_head)); old_head = new_head; auto * node_ptr = old_head._node_ptr; if (node_ptr == nullptr ) { return std::shared_ptr <T>(); } if (head.compare_exchange_strong (old_head, node_ptr->_next)) { std::shared_ptr<T> res; res.swap (node_ptr->_data); int increase_count = old_head._ref_count - 2 ; if (node_ptr->_dec_count.fetch_add (increase_count) == -increase_count) { delete node_ptr; } return res; }else { if (node_ptr->_dec_count.fetch_sub (1 ) == 1 ) { delete node_ptr; } } } }
2. 内存模型回顾 2.1 简介 之前实现的那些无锁并发的栈结构,它们对于原子变量的读,写以及读改写操作默认采用的是memory_order_seq_cst
,memory_order_seq_cst
为全局顺序模型,即所有线程看到的执行顺序是一致的。这种模型对性能消耗较大,所以可以在无锁栈的基础上通过更为宽松的模型提升性能。
2.2 release-acquire同步 在之前了解的6中内存顺序,其中可以通过release
和acquire
的方式实现同步的效果。也就是说,线程A执行store操作,采用memory_order_release
顺序模型,线程B执行load操作采用memory_order_acquire
顺序模型。如果线程B的load操作读取到线程A的store操作的数值,就可以称线程A的store操作 synchronizes-with(同步) 线程B的load操作。
2.3 happens-before先行 如果 a->store 同步于 b->load, 则 a->store 先行于 b->load。只要同步就能推出先行,所谓先行就是逻辑执行的顺序,一定是a->store 先于 b->load。先行还包括一种情况,sequenced-before(顺序执行), 所谓顺序执行就是单线程中执行的顺序为从上到下的顺序。
先行具有传递性 操作1 happens-before
操作2,操作2 happens-before
操作3,则操作1 happens-before
操作3
在下面程序中, 操作2处使用了release内存序,保证操作1会排在操作2之前。 操作3采用了acquire内存序,保证操作4排在操作3之后,且如果操作3能读到操作2的写入值,则保证操作1已经先于操作3执行完。因为while重试的机制,保证操作2同步于操作3,即操作2先于操作3执行,又因为操作1先于操作2执行,而操作3先于操作4执行,所以得出操作1先于操作4执行,那么操作4处断言正确就不会崩溃。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void TestReleaseSeq () { int data = 0 ; std::atomic<int > flag = 0 ; std::thread t1 ([&]() { data = 42 ; flag.store(1 , std::memory_order_release); }) ; std::thread t2 ([&]() { while (!flag.load(std::memory_order_acquire)); assert(data == 42 ); }) ; t1.join (); t2.join (); }
2.4 释放序列的扩展
如果存储操作的标记是memory_order_release、memory_order_acq_rel或memory_order_seq_cst,而载入操作则以memory_order_consume、memory_order_acquire或memory_order_seq_cst标记,这些操作前后相扣成链,每次载入的值都源自前面的存储操作,那么该操作链由一个释放序列 组成。若最后的载入操作服从内存次序memory_order_acquire或memory_order_seq_cst,则最初的存储操作与它构成同步关系。但如果该载入操作服从的内存次序是memory_order_consume,那么两者构成前序依赖关系。操作链中,每个“读-改-写”操作都可选用任意内存次序,甚至也能选用memory_order_relaxed次序。
release-sequnece:针对一个原子变量M的release操作A完成后, 接下来M上可能还会有一连串的其他操作。如果这一连串操作是由
同一线程上的写操作
或者任意线程上的 read-modify-write(可以是任意内存顺序) 操作
这两种构成的,则称这一连串的操作为以release操作A为首的release sequence。这里的写操作和read-modify-write操作可以使用任意内存顺序。
同步:一个acquire操作在同一个原子变量上读到了一个release操作写入的值,或者读到了以这个release操作为首的release sequence写入的值,那么这个release操作 “synchronizes-with” 这个 acquire 操作。所以release-sequence不一定构成同步,只有acquire到release的值才算作同步。
下面程序中,在入队函数的操作1处使用的是memory_order_release
内存序来记录入队的数量,这样一方面保证了它之前的代码都执行完了,另一方在出队函数的操作3处有memory_order_acquire
的载入操作,这样它们就构成了同步关系。fetch_sub
是一个原子操作函数,用于从原子变量 count
的当前值中减去指定的值,即1,并返回操作前的原始值。所以就可以通过该函数来完成队列数据的读取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 std::vector<int > queue_data; std::atomic<int > count; std::atomic<bool > store_finish = false ; void populate_queue () { unsigned const number_of_items = 20 ; queue_data.clear (); for (unsigned i = 0 ; i < number_of_items; ++i) { queue_data.push_back (i); } count.store (number_of_items, std::memory_order_release); store_finish.store (true , std::memory_order_release); } void consume_queue_items () { while (true ) { while (!store_finish.load (std::memory_order_acquire)); int item_index; if ((item_index = count.fetch_sub (1 , std::memory_order_acquire)) <= 0 ) { return ; } std::cout << "queue_data is " << queue_data[item_index-1 ] << std::endl; } } void TestReleaseSeq2 () { std::thread a (populate_queue) ; std::thread b (consume_queue_items) ; std::thread c (consume_queue_items) ; a.join (); b.join (); c.join (); }
执行结果:
从打印结果可以看到消费者线程b和c并没有打印重复的数据,说明他们互斥访问count,每个线程取到的count不一样进而访问queue_data中的不同数据。
如果单从线程角度考虑,b和c并不能构成同步,但是线程b和c必然有一个线程会先执行执行fetch_sub(原子变量的操作任何顺序模型都能保证操作的原子性)。假设b先执行,和a构成release-sequence关系,b读取到a执行的count.store的结果, b处于以a线程的release为首的释放序列中,则b的store操作会和c的读-改-写(fetch操作,只限这一段代码)构成同步。
如下图所示:实线表示先行关系,虚线标识释放序列
结论如下:
a线程和b线程构成release-sequence的释放序列。
即使b线程和c线程不构成同步,但是b线程的读改写操作处于release-sequence中,且c线程采用acquire方式读改写,则b的读改写和c线程的读改写构成同步,以a线程的release为首的sequence序列和c线程的读改写构成同步。
这里要强调一点,如果a release-sequence b,a和b不一定构成同步,但是b sychronizes with c,则a synchronizes with c。也就是说处于release序列中的任意读改写操作和其他的线程构成同步,那么我们就能得出release-sequence为首的操作和其他线程同步。
3. 优化无锁栈 3.1 简介 对于想要更好的优化无锁栈,可以结合释放序列这一技术来完成。当有数据入栈时,那么pop时要读取最新入栈的数据。所以我们要让push操作同步给pop操作,想到的办法很简单,push对head的修改采用release内存序列,pop对head的读改写采用acquire内存序列。多个线程并发pop,执行读改写操作,这些线程本来是无法同步的,但是最先pop的线程会和push线程构成同步关系,且形成release-sequence。那之后的线程pop就会和第一个pop的线程的写操作形成同步。
如果没有元素入栈,这时多个线程pop也不会产生问题,可以根据head内部的ptr指向为空判断空栈直接返回空指针。
总的来说,就是以下两方面含义:
因为要保证pop操作时节点的数据是有效的。push和pop要构成同步关系,即push采用release内存序修改head;pop采用acquire内存序修改head
第一个pop的线程的写操作和之后的pop线程读操作要构成同步关系
3.2 代码实现 push操作函数:该函数就是创建一个新节点,将新节点的next指向head,最后操作1处执行比较交换操作,当head等于新节点的next时,使用的是release内存序,并返回true;当head不等于新节点的next时(head被其它线程更新),使用的是relaxed内存序(因为要重试,所以什么内存序都可以),并返回false。
1 2 3 4 5 6 7 8 void push (T const & data) { counted_node_ptr new_node; new_node.ptr = new count_node (data); new_node.external_count = 1 ; new_node.ptr->next = head.load (); while (!head.compare_exchange_weak (new_node.ptr->next, new_node, memory_order::memory_order_release, memory_order::memory_order_relaxed)); }
pop操作函数:在increase_head_count
函数中,操作7的比较交换成功时,使用的是acquire的内存序,这样也是为了与push操作函数中操作1处的比较交换的release内存序构成同步关系。如果考虑两个线程并发执行,有两种情况发生:
线程1执行if,线程2执行else if,在操作3处count_increase
就为1,而内部引用计数internal_count初始化是为0的,如果线程2先执行完操作5,则内部引用计数就为-1(线程2因为不满足条件,不能进入释放ptr,留给线程1释放ptf),这时操作2就不会再用ptr了,线程1的操作4就可以进入if释放ptr了。
线程1执行if,线程2执行else if,在操作3处count_increase
就为1,而内部引用计数internal_count初始化是为0的,如果线程1先执行操作4,那么它会将内部引用计数变为1,并且返回0,不满足条件就不能释放ptr,就退出了。线程2这时执行操作5就能满足条件,从而释放ptr。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 std::shared_ptr<T> pop () { counted_node_ptr old_head = head.load (); for (;;) { increase_head_count (old_head); count_node* const ptr = old_head.ptr; if (!ptr) { return std::shared_ptr <T>(); } if (head.compare_exchange_strong (old_head, ptr->next)) { std::shared_ptr<T> res; res.swap (ptr->data); int const count_increase = old_head.external_count - 2 ; if (ptr->internal_count.fetch_add (count_increase) == -count_increase) { delete ptr; } return res; } else if (ptr->internal_count.fetch_sub (1 ) == 1 ) { delete ptr; } } } void increase_head_count (counted_node_ptr& old_counter) { counted_node_ptr new_counter; do { new_counter = old_counter; ++new_counter.external_count; } while (!head.compare_exchange_strong (old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed)); old_counter.external_count = new_counter.external_count; }
总结:
对于一个原子变量M,其释放序列中的读改写操作无论采用何种模型都能读取M的最新值(原子变量来保证的)。
为了保证原子变量上下程序的操作能和其它线程同步,可以利用内存顺序模型用来保证数据在多个线程的可见顺序。
3.3 改进 虽然以上程序优化了无锁栈,但还需要保证ptr的data在被删除之前swap到res里。
改进后的pop函数:在下面程序中,线程1如果是进入了操作4处执行delete,那么是需要保证swap操作先于fetch_add之后的delete操作,所以fetch_add可以采用release模型;而对于线程2执行操作5,内部delete操作之前,也需要保证其它线程执行的swap操作完成,所以操作5处的fetch_sub要采用acquire内存序,这样它就和操作4构成同步关系,即操作4先于操作5,而操作4完成之前可以保证它上面的程序执行完,所以swap就可以先于操作5了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 std::shared_ptr<T> pop () { counted_node_ptr old_head = head.load (); for (;;) { increase_head_count (old_head); count_node* const ptr = old_head.ptr; if (!ptr) { return std::shared_ptr <T>(); } if (head.compare_exchange_strong (old_head, ptr->next, std::memory_order_relaxed)) { std::shared_ptr<T> res; res.swap (ptr->data); int const count_increase = old_head.external_count - 2 ; if (ptr->internal_count.fetch_add (count_increase, std::memory_order_release) == -count_increase) { delete ptr; } return res; } else if (ptr->internal_count.fetch_add (-1 , std::memory_order_acquire) == 1 ) { delete ptr; } } }
当然,操作5处使用内存序relaxed也可以的,只要保证swap操作相较于delete先执行完即可,所以在操作6的delete上面添加内部引用计数的acquire内存序加载,即可和前面的释放序列构成同步关系。
4. 双引用实现无锁队列 4.1 简介 队列和栈容器它们的结构是不同的,对于队列结构,push()和pop()分别访问其不同部分,而在栈容器上,这两项操作都访问头节点,所以两种数据结构所需的同步操作相异。如果某线程在队列一端做出改动,而另一线程同时访问队列另一端,程序就要保证前者的改动过程能正确地为后者所见。
4.2 单线程队列 下面程序是一个单线程情况下实现的队列操作,在单线程情况下是不会出现问题的,但在多线程情况下,push和pop都会出现问题。
对于push操作,当两个线程都执行push,如果线程1在操作3先创建了新节点p1,然后数据交换,并让尾指针指向了p1,正准备执行操作7,更新尾指针位置,这时时间片被线程2抢到了,线程2就会覆盖线程1执行的操作,令尾指针指next向p2,并执行了操作7,移动尾指针,让尾指针指向了p2,这时线程1在执行操作7,就令尾指针指向p1。这样就导致数据混乱了,即队列中最后连接的是p2,但尾指针指向p1去了。
对于pop操作,当两个线程都执行pop时,当线程1通过old_head取出头节点时,正准备执行操作1,这时轮到线程2执行了,线程2也通过old_head取出头节点后,并且先执行了操作1,将head头指针更新为了原头节点的next,这时线程1有执行操作1,由于它的old_head是原头节点,所以它执行完操作7,还是将head头指针更新为原头节点的next。这样就导致,两个进行调用pop,结果head头指针只移动一个位置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 template <typename T>class SinglePopPush { private : struct node { std::shared_ptr<T> data; node* next; node ():next (nullptr ){} }; std::atomic<node*> head; std::atomic<node*> tail; node* pop_head () { node* const old_head = head.load (); if (old_head == tail.load ()) { return nullptr ; } head.store (old_head->next); return old_head; } public : SinglePopPush ():head (new node), tail (head.load ()){} SinglePopPush (const SinglePopPush& other) = delete ; SinglePopPush& operator =(const SinglePopPush& other) = delete ; ~SinglePopPush (){ while (node* const old_head = head.load ()) { head.store (old_head->next); delete old_head; } } std::shared_ptr<T> pop () { node* old_head = pop_head (); if (!old_head) { return std::shared_ptr <T>(); } std::shared_ptr<T> const res (old_head->data) ; delete old_head; return res; } void push (T new_value) { std::shared_ptr<T> new_data (std::make_shared<T>(new_value)) ; node* p = new node; node* const old_tail = tail.load (); old_tail->data.swap (new_data); old_tail->next = p; tail.store (p); } };
4.3 多线程push 为了解决多线程push的竞争问题,可以采用以下程序来完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void push (T new_value) { std::unique_ptr<T> new_data (new T(new_value)) ; counted_node_ptr new_next; new_next.ptr=new node; new_next.external_count=1 ; for (;;) { node* const old_tail=tail.load (); T* old_data=nullptr ; if (old_tail->data.compare_exchange_strong (old_data, new_data.get ())) { old_tail->next=new_next; tail.store (new_next.ptr); new_data.release (); break ; } } }
该方法是将data指针原子化,通过比较-交换操作来设置它的值。如果比较-交换操作成功,所操作的节点即为真正的尾节点,我们便可安全地设定next指针,使之指向新节点。若比较-交换操作失败,就表明有另一线程同时存入了数据,我们应该进行循环,重新读取tail指针并从头开始操作。