1. 引用计数实现无锁并发栈 1.1 简介 在C++并发编程中提出了两种计数,一种是外部计数,一种是内部计数,二者加起来就是有效的引用计数,下面提出一种新的方法,利用引用计数实现无锁并发的栈。
1.2 代码实现 栈结构函数:栈函数里面实现了栈的默认构造函数、析构函数和创建了一个原子类型的头部节点,同时也定义好了栈的内部节点类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 template <typename T>class single_ref_stack {public : single_ref_stack ():head (nullptr ) {} ~single_ref_stack () { while (pop ()); } private : struct ref_node { std::shared_ptr<T> _data; std::atomic<int > _ref_count; ref_node* _next; ref_node (T const & data_):_data(std::make_shared <T>(data_)), _ref_count(1 ), _next(nullptr ) {} }; std::atomic<ref_node*> head; };
1 2 3 4 5 void push (T const & data) { auto new_node = new ref_node (data); new_node->next = head.load (); while (!head.compare_exchange_weak (new_node->next, new_node)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 std::shared_ptr<T> pop () { ref_node* old_head = head.load (); for (;;) { if (!old_head) { return std::shared_ptr <T>(); } ++(old_head->_ref_count); if (head.compare_exchange_strong (old_head, old_head->_next)) { auto cur_count = old_head->_ref_count.load (); auto new_count; do { new_count = cur_count - 2 ; } while (!old_head->_ref_count.compare_exchange_weak (cur_count, new_count)); std::shared_ptr<T> res; res.swap (old_head->_data); if (old_head->_ref_count == 0 ) { delete old_head; } return res; } else { if (old_head->_ref_count.fetch_sub (1 ) == 1 ) { delete old_head; } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 struct ref_node ;struct node { std::shared_ptr<T> _data; ref_node _next; node (T const & data_) : _data(std::make_shared <T>(data_)) {} std::atomic<int > _dec_count; }; struct ref_node { int _ref_count; node* _node_ptr; ref_node ( T const & data_):_node_ptr(new node (data_)), _ref_count(1 ){} ref_node ():_node_ptr(nullptr ),_ref_count(0 ){} }; std::atomic<ref_node> head; std::shared_ptr<T> pop () { ref_node old_head = head.load (); for (;;) { ref_node new_head; do { new_head = old_head; new_head._ref_count += 1 ; } while (!head.compare_exchange_weak (old_head, new_head)); old_head = new_head; auto * node_ptr = old_head._node_ptr; if (node_ptr == nullptr ) { return std::shared_ptr <T>(); } if (head.compare_exchange_strong (old_head, node_ptr->_next)) { std::shared_ptr<T> res; res.swap (node_ptr->_data); int increase_count = old_head._ref_count - 2 ; if (node_ptr->_dec_count.fetch_add (increase_count) == -increase_count) { delete node_ptr; } return res; }else { if (node_ptr->_dec_count.fetch_sub (1 ) == 1 ) { delete node_ptr; } } } }
2. 内存模型回顾 2.1 简介 之前实现的那些无锁并发的栈结构,它们对于原子变量的读,写以及读改写操作默认采用的是memory_order_seq_cst
2.2 release-acquire同步 在之前了解的6中内存顺序,其中可以通过release
顺序模型。如果线程B的load操作读取到线程A的store操作的数值,就可以称线程A的store操作 synchronizes-with(同步) 线程B的load操作。
2.3 happens-before先行 如果 a->store 同步于 b->load, 则 a->store 先行于 b->load。只要同步就能推出先行,所谓先行就是逻辑执行的顺序,一定是a->store 先于 b->load。先行还包括一种情况,sequenced-before(顺序执行), 所谓顺序执行就是单线程中执行的顺序为从上到下的顺序。
先行具有传递性 操作1 happens-before
操作2,操作2 happens-before
操作3,则操作1 happens-before
在下面程序中, 操作2处使用了release内存序,保证操作1会排在操作2之前。 操作3采用了acquire内存序,保证操作4排在操作3之后,且如果操作3能读到操作2的写入值,则保证操作1已经先于操作3执行完。因为while重试的机制,保证操作2同步于操作3,即操作2先于操作3执行,又因为操作1先于操作2执行,而操作3先于操作4执行,所以得出操作1先于操作4执行,那么操作4处断言正确就不会崩溃。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void TestReleaseSeq () { int data = 0 ; std::atomic<int > flag = 0 ; std::thread t1 ([&]() { data = 42 ; flag.store(1 , std::memory_order_release); }) ; std::thread t2 ([&]() { while (!flag.load(std::memory_order_acquire)); assert(data == 42 ); }) ; t1.join (); t2.join (); }
2.4 释放序列的扩展
如果存储操作的标记是memory_order_release、memory_order_acq_rel或memory_order_seq_cst,而载入操作则以memory_order_consume、memory_order_acquire或memory_order_seq_cst标记,这些操作前后相扣成链,每次载入的值都源自前面的存储操作,那么该操作链由一个释放序列 组成。若最后的载入操作服从内存次序memory_order_acquire或memory_order_seq_cst,则最初的存储操作与它构成同步关系。但如果该载入操作服从的内存次序是memory_order_consume,那么两者构成前序依赖关系。操作链中,每个“读-改-写”操作都可选用任意内存次序,甚至也能选用memory_order_relaxed次序。
release-sequnece:针对一个原子变量M的release操作A完成后, 接下来M上可能还会有一连串的其他操作。如果这一连串操作是由
或者任意线程上的 read-modify-write(可以是任意内存顺序) 操作
这两种构成的,则称这一连串的操作为以release操作A为首的release sequence。这里的写操作和read-modify-write操作可以使用任意内存顺序。
同步:一个acquire操作在同一个原子变量上读到了一个release操作写入的值,或者读到了以这个release操作为首的release sequence写入的值,那么这个release操作 “synchronizes-with” 这个 acquire 操作。所以release-sequence不一定构成同步,只有acquire到release的值才算作同步。
是一个原子操作函数,用于从原子变量 count
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 std::vector<int > queue_data; std::atomic<int > count; std::atomic<bool > store_finish = false ; void populate_queue () { unsigned const number_of_items = 20 ; queue_data.clear (); for (unsigned i = 0 ; i < number_of_items; ++i) { queue_data.push_back (i); } count.store (number_of_items, std::memory_order_release); store_finish.store (true , std::memory_order_release); } void consume_queue_items () { while (true ) { while (!store_finish.load (std::memory_order_acquire)); int item_index; if ((item_index = count.fetch_sub (1 , std::memory_order_acquire)) <= 0 ) { return ; } std::cout << "queue_data is " << queue_data[item_index-1 ] << std::endl; } } void TestReleaseSeq2 () { std::thread a (populate_queue) ; std::thread b (consume_queue_items) ; std::thread c (consume_queue_items) ; a.join (); b.join (); c.join (); }
如果单从线程角度考虑,b和c并不能构成同步,但是线程b和c必然有一个线程会先执行执行fetch_sub(原子变量的操作任何顺序模型都能保证操作的原子性)。假设b先执行,和a构成release-sequence关系,b读取到a执行的count.store的结果, b处于以a线程的release为首的释放序列中,则b的store操作会和c的读-改-写(fetch操作,只限这一段代码)构成同步。
这里要强调一点,如果a release-sequence b,a和b不一定构成同步,但是b sychronizes with c,则a synchronizes with c。也就是说处于release序列中的任意读改写操作和其他的线程构成同步,那么我们就能得出release-sequence为首的操作和其他线程同步。
3. 优化无锁栈 3.1 简介 对于想要更好的优化无锁栈,可以结合释放序列这一技术来完成。当有数据入栈时,那么pop时要读取最新入栈的数据。所以我们要让push操作同步给pop操作,想到的办法很简单,push对head的修改采用release内存序列,pop对head的读改写采用acquire内存序列。多个线程并发pop,执行读改写操作,这些线程本来是无法同步的,但是最先pop的线程会和push线程构成同步关系,且形成release-sequence。那之后的线程pop就会和第一个pop的线程的写操作形成同步。
3.2 代码实现 push操作函数:该函数就是创建一个新节点,将新节点的next指向head,最后操作1处执行比较交换操作,当head等于新节点的next时,使用的是release内存序,并返回true;当head不等于新节点的next时(head被其它线程更新),使用的是relaxed内存序(因为要重试,所以什么内存序都可以),并返回false。
1 2 3 4 5 6 7 8 void push (T const & data) { counted_node_ptr new_node; new_node.ptr = new count_node (data); new_node.external_count = 1 ; new_node.ptr->next = head.load (); while (!head.compare_exchange_weak (new_node.ptr->next, new_node, memory_order::memory_order_release, memory_order::memory_order_relaxed)); }
线程1执行if,线程2执行else if,在操作3处count_increase
线程1执行if,线程2执行else if,在操作3处count_increase
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 std::shared_ptr<T> pop () { counted_node_ptr old_head = head.load (); for (;;) { increase_head_count (old_head); count_node* const ptr = old_head.ptr; if (!ptr) { return std::shared_ptr <T>(); } if (head.compare_exchange_strong (old_head, ptr->next)) { std::shared_ptr<T> res; res.swap (ptr->data); int const count_increase = old_head.external_count - 2 ; if (ptr->internal_count.fetch_add (count_increase) == -count_increase) { delete ptr; } return res; } else if (ptr->internal_count.fetch_sub (1 ) == 1 ) { delete ptr; } } } void increase_head_count (counted_node_ptr& old_counter) { counted_node_ptr new_counter; do { new_counter = old_counter; ++new_counter.external_count; } while (!head.compare_exchange_strong (old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed)); old_counter.external_count = new_counter.external_count; }
3.3 改进 虽然以上程序优化了无锁栈,但还需要保证ptr的data在被删除之前swap到res里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 std::shared_ptr<T> pop () { counted_node_ptr old_head = head.load (); for (;;) { increase_head_count (old_head); count_node* const ptr = old_head.ptr; if (!ptr) { return std::shared_ptr <T>(); } if (head.compare_exchange_strong (old_head, ptr->next, std::memory_order_relaxed)) { std::shared_ptr<T> res; res.swap (ptr->data); int const count_increase = old_head.external_count - 2 ; if (ptr->internal_count.fetch_add (count_increase, std::memory_order_release) == -count_increase) { delete ptr; } return res; } else if (ptr->internal_count.fetch_add (-1 , std::memory_order_acquire) == 1 ) { delete ptr; } } }
4. 双引用实现无锁队列 4.1 简介 队列和栈容器它们的结构是不同的,对于队列结构,push()和pop()分别访问其不同部分,而在栈容器上,这两项操作都访问头节点,所以两种数据结构所需的同步操作相异。如果某线程在队列一端做出改动,而另一线程同时访问队列另一端,程序就要保证前者的改动过程能正确地为后者所见。
4.2 单线程队列 下面程序是一个单线程情况下实现的队列操作,在单线程情况下是不会出现问题的,但在多线程情况下,push和pop都会出现问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 template <typename T>class SinglePopPush { private : struct node { std::shared_ptr<T> data; node* next; node ():next (nullptr ){} }; std::atomic<node*> head; std::atomic<node*> tail; node* pop_head () { node* const old_head = head.load (); if (old_head == tail.load ()) { return nullptr ; } head.store (old_head->next); return old_head; } public : SinglePopPush ():head (new node), tail (head.load ()){} SinglePopPush (const SinglePopPush& other) = delete ; SinglePopPush& operator =(const SinglePopPush& other) = delete ; ~SinglePopPush (){ while (node* const old_head = head.load ()) { head.store (old_head->next); delete old_head; } } std::shared_ptr<T> pop () { node* old_head = pop_head (); if (!old_head) { return std::shared_ptr <T>(); } std::shared_ptr<T> const res (old_head->data) ; delete old_head; return res; } void push (T new_value) { std::shared_ptr<T> new_data (std::make_shared<T>(new_value)) ; node* p = new node; node* const old_tail = tail.load (); old_tail->data.swap (new_data); old_tail->next = p; tail.store (p); } };
4.3 多线程push 为了解决多线程push的竞争问题,可以采用以下程序来完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void push (T new_value) { std::unique_ptr<T> new_data (new T(new_value)) ; counted_node_ptr new_next; new_next.ptr=new node; new_next.external_count=1 ; for (;;) { node* const old_tail=tail.load (); T* old_data=nullptr ; if (old_tail->data.compare_exchange_strong (old_data, new_data.get ())) { old_tail->next=new_next; tail.store (new_next.ptr); new_data.release (); break ; } } }