1. 内存顺序和内存模型使用实现 1.1 sequencial consistent模型(最严格的) memory_order_seq_cst
代表全局一致性顺序,可以用于 store
、 load
和 read-modify-write
操作实现 sequencial consistent
的顺序模型。在这个模型下, 所有线程看到的所有操作都有一个一致的顺序,即使这些操作可能针对不同的变量,运行在不同的线程。
在下面程序中,线程1修改完x和y的值,线程2立刻就能见到x和y修改的值,所以当线程2读到y为true时就会退出循环,然后读到的x也一定为true。因为需要保证与线程1修改顺序是一致的,线程1修改了x再修改y,当线程1修改完y的时候,x早就被修改为true了,所以当线程2读到了y为true时,接下来读到的x也一定为true。这样的话,z就是从0变为1,就不会发生断言。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 std::atomic<bool > x, y; std::atomic<int > z; void write_x_then_y () { x.store (true , std::memory_order_seq_cst); y.store (true , std::memory_order_seq_cst); } void read_y_then_x () { while (!y.load (std::memory_order_seq_cst)) { std::cout << "y load false" << std::endl; } if (x.load (std::memory_order_seq_cst)) { ++z; } } void TestOrderRelaxed () { std::thread t1 (write_x_then_y) ; std::thread t2 (read_y_then_x) ; t1.join (); t2.join (); assert (z.load () != 0 ); }
实现sequencial consistent
模型有一定的开销,现代 CPU 通常有多核,每个核心还有自己的缓存。为了做到全局顺序一致,每次写入操作都必须同步给其他核心(也就是一个核心写的时候,其它核心读都不能读,必须等该核心写完后同步到memory中,其它核心才能读)。为了减少性能开销,如果不需要全局顺序一致,我们应该考虑使用更加宽松的顺序模型,比如Acquire-release
和Relaxed
模型。
1.2 relaxed模型(最宽松的) memory_order_relaxed
可以用于store
、load
和read-modify-write
操作, 实现relaxed
的顺序模型。在这种模型下只能保证操作的原子性和修改顺序(modification order)一致性(但改完了不一定立刻能见到),无法实现synchronizes-with
的关系。
在下面程序中,是存在隐患的,即可能会发生断言,但因为现在c++尽可能的帮外面规避了这些问题,所以执行结果很难会出现断言崩溃,但还是尽量不这样编写程序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void TestOrderRelaxed () { std::atomic<bool > rx, ry; std::thread t1 ([&]() { rx.store(true , std::memory_order_relaxed); ry.store(true , std::memory_order_relaxed); }) ; std::thread t2 ([&]() { while (!ry.load(std::memory_order_relaxed)); assert(rx.load(std::memory_order_relaxed)); }) ; t1.join (); t2.join (); }
1.3 Acquire-Release模型(最常用的) 在 acquire-release
模型中, 会使用memory_order_acquire
、memory_order_release
和memory_order_acq_rel
这三种内存顺序。它们的具体用法:
对原子变量的 load 可以使用 memory_order_acquire 内存顺序,这称为 acquire 操作
对原子变量的 store 可以使用 memory_order_release 内存顺序,这称为 release 操作
read-modify-write 操作即读(load)又写(store),它可以使用memory_order_acquire, memory_order_release 和 memory_order_acq_rel:
如果使用 memory_order_acquire,则作为 acquire 操作
如果使用 memory_order_release,则作为 release 操作
如果使用 memory_order_acq_rel,则同时为两者
Acquire-release
可以实现synchronizes-with
的关系。如果一个 acquire 操作在同一个原子变量上读取到了一个 release 操作写入的值,则这个 release 操作 “synchronizes-with” 这个 acquire 操作。
在下面的程序中,对rx进行story和load使用的都是宽松的内存顺序,但对ry采用的是同步的内存顺序。如果线程2先运行,由于ry还是false,所以会一直执行循环,直到ry为true。当ry为true时,3处会退出循环,执行4处,由于线程1中ry都已经修改,所以rx也被修改,则4处的断言就不会发生。由于release在acquire之前,就构成了同步关系,2同步给了3,即2在3之前执行。
从cpu指令角度理解,如果编译器发现了一个store操作,且是release内存顺序,也发现了之前操作也是store,不管该store操作是什么内存顺序,编译器都会先把该store写入内存,再把该release的store写入内存。所以当线程2中把ry读到为true时,rx也一定被修改为了true,这样就不会发生断言的情况了。
1 2 3 4 5 6 7 8 9 10 11 12 13 void TestReleaseAcquire () { std::atomic<bool > rx, ry; std::thread t1 ([&]() { rx.store(true , std::memory_order_relaxed); ry.store(true , std::memory_order_release); }) ; std::thread t2 ([&]() { while (!ry.load(std::memory_order_acquire)); assert(rx.load(std::memory_order_relaxed)); }) ; t1.join (); t2.join (); }
Acquire-release
的开销比sequencial consistent
小。在 x86 架构下,memory_order_acquire
和memory_order_release
的操作不会产生任何其他的指令(它们两个操作的中间是原子化的),只会影响编译器的优化。任何指令都不能重排到acquire操作的前面,且不能重排到release操作的后面 。否则会违反 acquire-release 的语义。因此很多需要实现synchronizes-with
关系的场景都会使用 acquire-releas
。
还有一种情况,多个线程对同一个变量release操作,另一个线程对这个变量acquire,那么只有一个线程的release操作和这个acquire线程构成同步关系。在下面程序中,线程1对xd和yd都做了存储操作,线程2只对yd进行存储操作,而在线程3中,断言语句是可能被触发的,因为操作2和操作4是构成同步关系的,操作3和操作4也是构成同步关系的,这种情况下,只有一个会与操作4构成同步关系。如果是操作3和操作4构成同步关系,断言就会触发;如果是操作2和操作4构成同步关系,断言就不会触发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void ReleasAcquireDanger2 () { std::atomic<int > xd{0 }, yd{0 }; std::atomic<int > zd; std::thread t1 ([&]() { xd.store(1 , std::memory_order_release); yd.store(1 , std::memory_order_release); }) ; std::thread t2 ([&]() { yd.store(2 , std::memory_order_release); }) ; std::thread t3 ([&]() { while (!yd.load(std::memory_order_acquire)); assert(xd.load(std::memory_order_acquire) == 1 ); }) ; t1.join (); t2.join (); t3.join (); }
1.4 release sequence 针对一个原子变量M的release
操作A完成后,接下来M上可能还会有一连串的其他操作。如果这一连串操作是由同一线程上的写操作或任意线程上的read-modify-write
操作这两种构成的,则称这一连串的操称为以release
操作A为首的release sequence
。 这里的写操作和read-modify-write
操作可以使用任意内存顺序。
如果一个acquire
操作在同一个原子变量上读到了一个release
操作写入的值,或者读到了以这个release
操作为首的release sequence
写入的值,那么这个release操作 “synchronizes-with” 这个acquire操作。
在下面程序中,可以知道操作2和操作3构成release sequence,而操作4也需要读到这个操作的最后一个结果(flag为2),则操作2和操作4构成同步关系。有因为操作1在操作2之前,操作5在操作4之后,所以最后不会触发断言问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void ReleaseSequence () { std::vector<int > data; std::atomic<int > flag{ 0 }; std::thread t1 ([&]() { data.push_back(42 ); flag.store(1 , std::memory_order_release); }) ; std::thread t2 ([&]() { int expected = 1 ; while (!flag.compare_exchange_strong(expected, 2 , std::memory_order_relaxed)) expected = 1 ; }) ; std::thread t3 ([&]() { while (flag.load(std::memory_order_acquire) < 2 ); assert(data.at(0 ) == 42 ); }) ; t1.join (); t2.join (); t3.join (); }
1.5 memory_order_consume memory_order_consume
其实是 acquire-release
模型的一部分,但是它比较特殊,它涉及到数据间相互依赖的关系,也可以理解为是同步关系。
memory_order_consume
可以用于 load 操作,使用 memory_order_consume 的 load 称为 consume 操作。如果一个 consume 操作在同一个原子变量上读到了一个 release 操作写入的值,或以其为首的 release sequence 写入的值,则这个 release 操作 “dependency-ordered before” 这个 consume 操作。
在下面程序中,操作3和操作4构成了依赖关系,也就是操作3在操作4之前执行的,也可以理解为同步关系。因为操作1发生在操作3之前,操作5发生在操作6之后,所以操作5不会发生断言问题;而memory_order_consumer要考虑依赖关系,data并不依赖p,所以data可能不等于42,所以操作6可能触发断言。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void ConsumeDependency () { std::atomic<std::string*> ptr; int data; std::thread t1 ([&]() { std::string* p = new std::string("Hello World" ); data = 42 ; ptr.store(p, std::memory_order_release); }) ; std::thread t2 ([&]() { std::string* p2; while (!(p2 = ptr.load(std::memory_order_consume))); assert(*p2 == "Hello World" ); assert(data == 42 ); }) ; t1.join (); t2.join (); }
1.6 内存模式实现单例模式 之前用智能指针双重检测方式实现的单例模式是存在线程安全问题的,但可以通过内存模式来实现单例模式,这样可以解决改弊端。在底层,new一个对象再赋值给变量时会存在多个指令顺序。
第一种情况:
为对象allocate一块内存空间
调用construct构造对象
将构造到的对象地址返回
第二种情况:
为对象allocate一块内存空间
先将开辟的空间地址返回
调用construct构造对象
如果是第二种情况,当还未构造对象就将地址返回赋值给single,而此时有线程运行至1处判断single不为空直接返回单例实例,如果该线程调用这个单例的成员函数就会崩溃(结合代码看)。
下面就分别是通过智能指针和内存模型的方法实现单例模式:
在下面这个程序,在4处虽然allocate开辟了一块空间,并返回给single(不为空了),但还没有调用construct,如果其它线程此时在1处用了single,因为不为空,就直接返回了single,并且调用了成员函数,就会出现系统崩溃。所以,实际上,1处和4处是会出现线程安全的,即4处还没有完全结束,就会返回一个还没有构造好的对象把地址返回了,而其它线程判断不为空,就会使用这个没有构造好的对象,就会产生崩溃。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class SingleAuto { private : SingleAuto (){} SingleAuto (const SingleAuto&) = delete ; SingleAuto& operator =(const SingleAuto&) = delete ; public : ~SingleAuto () { std::cout << "single auto delete success " << std::endl; } static std::shared_ptr<SingleAuto> GetInst () { if (single != nullptr ) { return single; } s_mutex.lock (); if (single != nullptr ) { s_mutex.unlock (); return single; } single = std::shared_ptr <SingleAuto>(new SingleAuto); s_mutex.unlock (); return single; } private : static std::shared_ptr<SingleAuto> single; static std::mutex s_mutex; };
在下面程序中,因为锁的权力是最大的,它能保证所有看到的线程是一致的,所以3处有宽松的模型没有问题的。最后,因为是在4处的代码完全执行后,_b_init才会被设置为true,其它线程才能通过1处进入函数执行(对1处有加锁)。而智能指针的方式就是因为没有这种方法对1处加锁,所以其它线程随时都可以判断single是否为空,这样4处还没有完全执行完时,其它线程就可能判断single不为空,而直接返回使用没有完全构造好的对象了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class SingleMemoryModel { private : SingleMemoryModel (){} SingleMemoryModel (const SingleMemoryModel&) = delete ; SingleMemoryModel& operator =(const SingleMemoryModel&) = delete ; public : ~SingleMemoryModel () { std::cout << "single auto delete success " << std::endl; } static std::shared_ptr<SingleMemoryModel> GetInst () { if (_b_init.load (std::memory_order_acquire)) { return single; } s_mutex.lock (); if (_b_init.load (std::memory_order_relaxed)) { s_mutex.unlock (); return single; } single = std::shared_ptr <SingleMemoryModel>(new SingleMemoryModel); _b_init.store (true , std::memory_order_release); s_mutex.unlock (); return single; } private : static std::shared_ptr<SingleMemoryModel> single; static std::mutex s_mutex; static std::atomic<bool > _b_init ; }; std::shared_ptr<SingleMemoryModel> SingleMemoryModel::single = nullptr ; std::mutex SingleMemoryModel::s_mutex; std::atomic<bool > SingleMemoryModel::_b_init = false ;
2. 无锁队列实现 下面主要是通过无锁队列的方式来实现无锁并发。
2.1 无锁队列 要实现无锁并发,经常会用到一种结构无锁队列,而无锁队列和在数据结构经常使用的队列颇有不同,它采用的是环状的队列结构,至于为什么成环?主要有两个好处,一个是成环的队列大小是固定的,另外一个我们通过移动头和尾就能实现数据的插入和取出。
2.2 用锁实现环形队列 下面就是用锁实现环形队列,在push和pop时分别加锁,并通过head和tail计算队列是否为满或者空。最后通过测试代码,在添加完5个元素后,已经满了,再添加,是添加不进去的;弹出完5个元素后,再弹出,也是弹不出来的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 template <typename T, size_t Cap>class CircularQueLk :private std::allocator<T> { public : CircularQueLk () :_max_size(Cap + 1 ),_data(std::allocator<T>::allocate (_max_size)), _head(0 ), _tail(0 ) {} CircularQueLk (const CircularQueLk&) = delete ; CircularQueLk& operator = (const CircularQueLk&) volatile = delete ; CircularQueLk& operator = (const CircularQueLk&) = delete ; ~CircularQueLk () { std::lock_guard<std::mutex> lock (_mtx) ; while (_head != _tail) { std::allocator<T>::destroy (_data + _head); _head = (_head+1 )% _max_size; } std::allocator<T>::deallocate (_data, _max_size); } template <typename ...Args> bool emplace (Args && ... args) { std::lock_guard<std::mutex> lock (_mtx) ; if ((_tail + 1 ) % _max_size == _head) { std::cout << "circular que full ! " << std::endl; return false ; } std::allocator<T>::construct (_data + _tail, std::forward<Args>(args)...); _tail = (_tail + 1 ) % _max_size; return true ; } bool push (const T& val) { std::cout << "called push const T& version" << std::endl; return emplace (val); } bool push (T&& val) { std::cout << "called push T&& version" << std::endl; return emplace (std::move (val)); } bool pop (T& val) { std::lock_guard<std::mutex> lock (_mtx) ; if (_head == _tail) { std::cout << "circular que empty ! " << std::endl; return false ; } val = std::move (_data[_head]); _head = (_head + 1 ) % _max_size; return true ; } private : size_t _max_size; T* _data; std::mutex _mtx; size_t _head = 0 ; size_t _tail = 0 ; }; void TestCircularQue () { CircularQueLk<MyClass, 5 > cq_lk; MyClass mc1 (1 ) ; MyClass mc2 (2 ) ; cq_lk.push (mc1); cq_lk.push (std::move (mc2)); for (int i = 3 ; i <= 5 ; i++) { MyClass mc (i) ; auto res = cq_lk.push (mc); if (res == false ) { break ; } } cq_lk.push (mc2); for (int i = 0 ; i < 5 ; i++) { MyClass mc1; auto res = cq_lk.pop (mc1); if (!res) { break ; } std::cout << "pop success, " << mc1 << std::endl; } auto res = cq_lk.pop (mc1); }
2.3 无锁队列 如果用原子变量而不是用锁实现环形队列,那就是无锁并发的队列了。下面是之前用过的原子变量的读改写操作,compare_exchange_strong
会比较原子变量atomic<T>
的值和expected
的值是否相等,如果相等则执行交换操作,将atomic<T>
的值换为desired
并且返回true,否则将expected的值修改为atomic<T>
的值,并且返回false。
1 2 bool std::atomic<T>::compare_exchange_weak (T &expected, T desired);bool std::atomic<T>::compare_exchange_strong (T &expected, T desired);
用代码来理解就是:
1 2 3 4 5 6 7 8 9 10 11 12 template <typename T>bool atomic<T>::compare_exchange_strong (T &expected, T desired) { std::lock_guard<std::mutex> guard (m_lock) ; if (m_val == expected){ m_val = desired; return true ; } else { expected = m_val; return false ; } }
compare_exchange_weak
功能比compare_exchange_strong
弱一些,他不能保证atomic<T>
的值和expected
的值相等时也会做交换,很可能原子变量和预期值相等也会返回false,所以使用要多次循环使用。
下面程序中,将类的成员变量mutex
换成atomic
类型的原子变量,利用自旋锁的思路将锁替换为原子变量循环检测的方式,进而达到锁住互斥逻辑的效果。在析构函数中,因为_atomic_using
初始化时为false,当线程1到到操作1处时,_atomic_using
会被赋值为use_desired
的值(true),并且返回false,退出循环,完成队列类CircularQueSeq
的销毁和空间回收。在这个过程中,如果有其它线程要进入,当来到操作1处时,因为_atomic_using
已经为true了,与use_expected
不相等,就会让use_expected
赋值为_atomic_using
的值(这是不希望的,所以do里面会每次都重新设置,就是防止这种情况),并返回false,继续循环,只有当线程1执行完2处后,_atomic_using
为false时,其它线程才能够退出循环1处,进入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 template <typename T, size_t Cap>class CircularQueSeq :private std::allocator<T> {public : CircularQueSeq () :_max_size(Cap + 1 ), _data(std::allocator<T>::allocate (_max_size)), _atomic_using(false ),_head(0 ), _tail(0 ) {} CircularQueSeq (const CircularQueSeq&) = delete ; CircularQueSeq& operator = (const CircularQueSeq&) volatile = delete ; CircularQueSeq& operator = (const CircularQueSeq&) = delete ; ~CircularQueSeq () { bool use_expected = false ; bool use_desired = true ; do { use_expected = false ; use_desired = true ; } while (!_atomic_using.compare_exchange_strong (use_expected, use_desired)); while (_head != _tail) { std::allocator<T>::destroy (_data + _head); _head = (_head+1 )% _max_size; } std::allocator<T>::deallocate (_data, _max_size); do { use_expected = true ; use_desired = false ; } while (!_atomic_using.compare_exchange_strong (use_expected, use_desired)); } template <typename ...Args> bool emplace (Args && ... args) { bool use_expected = false ; bool use_desired = true ; do { use_expected = false ; use_desired = true ; } while (!_atomic_using.compare_exchange_strong (use_expected, use_desired)); if ((_tail + 1 ) % _max_size == _head) { std::cout << "circular que full ! " << std::endl; do { use_expected = true ; use_desired = false ; } while (!_atomic_using.compare_exchange_strong (use_expected, use_desired)); return false ; } std::allocator<T>::construct (_data + _tail, std::forward<Args>(args)...); _tail = (_tail + 1 ) % _max_size; do { use_expected = true ; use_desired = false ; } while (!_atomic_using.compare_exchange_strong (use_expected, use_desired)); return true ; } bool push (const T& val) { std::cout << "called push const T& version" << std::endl; return emplace (val); } bool push (T&& val) { std::cout << "called push T&& version" << std::endl; return emplace (std::move (val)); } bool pop (T& val) { bool use_expected = false ; bool use_desired = true ; do { use_desired = true ; use_expected = false ; } while (!_atomic_using.compare_exchange_strong (use_expected, use_desired)); if (_head == _tail) { std::cout << "circular que empty ! " << std::endl; do { use_expected = true ; use_desired = false ; } while (!_atomic_using.compare_exchange_strong (use_expected, use_desired)); return false ; } val = std::move (_data[_head]); _head = (_head + 1 ) % _max_size; do { use_expected = true ; use_desired = false ; }while (!_atomic_using.compare_exchange_strong (use_expected, use_desired)); return true ; } private : size_t _max_size; T* _data; std::atomic<bool > _atomic_using; size_t _head = 0 ; size_t _tail = 0 ; };
上面这种方式虽然通过原子变量的方式解决线程安全问题,但当某一线程线程通过do-while后,其它线程都得像自旋锁一样一直轮询检查,就造成了不必要的开销,下面是对这种方法的一种改进。
对于pop逻辑代码:先在1处取出头部的下标索引,进行判断,如果头部的下标与尾部下标相等,说明为空,就直接返回false退出,否则就通过头部的下标取出数据,然后在3处,如果线程1先到达这里,因为_head
等于h,所以就将_head
进行+1,这样其它线程来到这里的时候,由于_head
不等于h了,就返回false,继续循环。这时继续循环时,由于_head
已经等于h+1了,所以改线程在1处获得的下标就是h+1,那么它要pop的是h+1,不是h,就不会出现多次弹出同一个数据的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 bool pop (T& val) { size_t h; do { h = _head.load (); if (h == _tail.load ()) { return false ; } val = _data[h]; } while (!_head.compare_exchange_strong (h, (h+1 )% _max_size)); return true ; }
push逻辑代码:push逻辑与pop逻辑处理在这里是一样的,但push的这种处理是存在线程安全问题的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 bool push (T& val) { size_t t; do { t = _tail.load (); if ( (t+1 )%_max_size == _head.load ()) { return false ; } _data[t] = val; } while (!_tail.compare_exchange_strong (t, (t + 1 ) % _max_size)); return true ; }
3. 利用栅栏实现同步模型 3.1 线程可见顺序 有时候我们线程1对A的store操作采用release内存序,而线程2对B的load采用acquire内存序,并不能保证A 一定比 B先执行。因为两个线程并行执行无法确定先后顺序,我们指的先行不过是说如果B读取了A操作的结果,则称A先行于B。
在下面这段程序中,看似断言是不会触发,但在一些极端的情况下,还是可能触发断言的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 #include <iostream> #include <atomic> #include <thread> #include <cassert> std::atomic<bool > x, y; std::atomic<int > z; void write_x () { x.store (true , std::memory_order_release); } void write_y () { y.store (true , std::memory_order_release); } void read_x_then_y () { while (!x.load (std::memory_order_acquire)); if (y.load (std::memory_order_acquire)) ++z; } void read_y_then_x () { while (!y.load (std::memory_order_acquire)); if (x.load (std::memory_order_acquire)) ++z; } void TestAR () { x = false ; y = false ; z = 0 ; std::thread a (write_x) ; std::thread b (write_y) ; std::thread c (read_x_then_y) ; std::thread d (read_y_then_x) ; a.join (); b.join (); c.join (); d.join (); assert (z.load () != 0 ); std::cout << "z value is " << z.load () << std::endl; }
比如在一个4核CPU结构的主机上,a,b,c,d分别运行在不同的CPU内核上,如下图所示。线程a执行x.store(true)
先被线程c读取,而此时线程b对y的store还没有被线程c读取到新的值,所以此时c读取的x为true,y为false。同样的道理,d可以读取b修改y的最新值,但是没来的及读取x的最新值,那么读取到y为true,x为false。这样在上面程序中就会出现断言的情况。
所以即使采用release和acquire方式也不能保证全局顺序一致。如果一个线程对变量执行release内存序的store操作,另一个线程不一定会马上读取到。
3.2 栅栏 其实也可以通过栅栏机制保证指令的写入顺序,栅栏的机制和memory_order_release
类似。
下面程序中,在1和3中间添加了一个release内存序的栅栏2,这样可以确保该指令2之前的指令(什么内存序都包括)的编排不会编排到该指令2之后。在4和6之间又定义了一个acquire内存序的栅栏5,那么如果操作2被操作5读取到了,它们就构成了一个同步关系,所有执行操作6时,就一定可以读到操作1执行后的数据,也就不会发生断言。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void write_x_then_y_fence () { x.store (true , std::memory_order_relaxed); std::atomic_thread_fence (std::memory_order_release); y.store (true , std::memory_order_relaxed); } void read_y_then_x_fence () { while (!y.load (std::memory_order_relaxed)); std::atomic_thread_fence (std::memory_order_acquire); if (x.load (std::memory_order_relaxed)) ++z; }
4. 基于锁实现线程安全的栈 下面程序实现了基于锁控制push和pop操作的一个栈。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 struct empty_stack : std::exception { const char * what () const throw () ; }; template <typename T>class threadsafe_stack { private : std::stack<T> data; mutable std::mutex m; public : threadsafe_stack () {} threadsafe_stack (const threadsafe_stack& other) { std::lock_guard<std::mutex> lock (other.m) ; data = other.data; } threadsafe_stack& operator =(const threadsafe_stack&) = delete ; void push (T new_value) { std::lock_guard<std::mutex> lock (m) ; data.push (std::move (new_value)); } std::shared_ptr<T> pop () { std::lock_guard<std::mutex> lock (m) ; if (data.empty ()) throw empty_stack (); std::shared_ptr<T> const res (std::make_shared<T>(std::move(data.top()))) ; data.pop (); return res; } void pop (T& value) { std::lock_guard<std::mutex> lock (m) ; if (data.empty ()) throw empty_stack (); value = std::move (data.top ()); data.pop (); } bool empty () const { std::lock_guard<std::mutex> lock (m) ; return data.empty (); } };
上面代码虽然不会出现线程安全问题,但依然存在着不足,比如说在操作2处,当栈为空的时候,也会抛出异常,这是不会合理的。下面程序是解决栈为空就抛出异常的一个问题优化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 template <typename T>class threadsafe_stack_waitable { private : std::stack<T> data; mutable std::mutex m; std::condition_variable cv; public : threadsafe_stack_waitable () {} threadsafe_stack_waitable (const threadsafe_stack_waitable& other) { std::lock_guard<std::mutex> lock (other.m) ; data = other.data; } threadsafe_stack_waitable& operator =(const threadsafe_stack_waitable&) = delete ; void push (T new_value) { std::lock_guard<std::mutex> lock (m) ; data.push (std::move (new_value)); cv.notify_one (); } std::shared_ptr<T> wait_and_pop () { std::unique_lock<std::mutex> lock (m) ; cv.wait (lock, [this ]() { if (data.empty ()) { return false ; } return true ; }); std::shared_ptr<T> const res (std::make_shared<T>(std::move(data.top()))) ; data.pop (); return res; } void wait_and_pop (T& value) { std::unique_lock<std::mutex> lock (m) ; cv.wait (lock, [this ]() { if (data.empty ()) { return false ; } return true ; }); value = std::move (data.top ()); data.pop (); } bool empty () const { std::lock_guard<std::mutex> lock (m) ; return data.empty (); } bool try_pop (T& value) { std::lock_guard<std::mutex> lock (m) ; if (data.empty ()) { return false ; } value = std::move (data.top ()); data.pop (); return true ; } std::shared_ptr<T> try_pop () { std::lock_guard<std::mutex> lock (m) ; if (data.empty ()) { return std::shared_ptr <T>(); } std::shared_ptr<T> res (std::make_shared<T>(std::move(data.top()))) ; data.pop (); return res; } };
5. 实现线程安全的查找表 5.1 简介 下面实现了一个线程安全的查找结构,类似线程安全的map结构,但由于map基于红黑树的,实现难度就较大,而且需要加锁地方的粒度也不是很精细,所以这里就考虑用散列表实现。
散列表,它是根据键(Key)而直接访问在存储器存储位置的数据结构。 也就是说,它通过计算出一个键值的函数,将所需查询的数据映射到表中一个位置来让人访问,这加快了查找速度。 这个映射函数称做散列函数,存放记录的数组称做散列表。
5.2 代码实现 在下面程序中,将查找表封装为threadsafe_lookup_table
类,该类将hash值存入到一个vector容器中,每个hash值都又一个属于自己的桶,,因此vector容器里存了指定数量的桶,可以理解为一条链表。在threadsafe_lookup_table
类中,又封装了一个桶的类bucket_type
,它相当于是一条链表,用list容器存储,里面存储的类型是pair<Key,Value>
。然后bucket_type
类也实现了一系列的增删改查的功能(需要加锁)供外部调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 template <typename Key, typename Value, typename Hash = std::hash<Key>>class threadsafe_lookup_table {private : class bucket_type { friend class threadsafe_lookup_table; private : typedef std::pair<Key, Value> bucket_value; typedef std::list<bucket_value> bucket_data; typedef typename bucket_data::iterator bucket_iterator; bucket_data data; mutable std::shared_mutex mutex; bucket_iterator find_entry_for (const Key& key) { return std::find_if (data.begin (), data.end (), [&](bucket_value const & item) { return item.first == key; }); } public : Value value_for (Key const & key, Value const & default_value) { std::shared_lock<std::shared_mutex> lock (mutex) ; bucket_iterator const found_entry = find_entry_for (key); return (found_entry == data.end ()) ? default_value : found_entry->second; } void add_or_update_mapping (Key const & key, Value const & value) { std::unique_lock<std::shared_mutex> lock (mutex) ; bucket_iterator const found_entry = find_entry_for (key); if (found_entry == data.end ()) { data.push_back (bucket_value (key, value)); } else { found_entry->second = value; } } void remove_mapping (Key const & key) { std::unique_lock<std::shared_mutex> lock (mutex) ; bucket_iterator const found_entry = find_entry_for (key); if (found_entry != data.end ()) { data.erase (found_entry); } } }; private : std::vector<std::unique_ptr<bucket_type>> buckets; Hash hasher; bucket_type& get_bucket (Key const & key) const { std::size_t const bucket_index = hasher (key) % buckets.size (); return *buckets[bucket_index]; } public : threadsafe_lookup_table (unsigned num_buckets = 19 , Hash const & hasher_ = Hash ()) :buckets (num_buckets), hasher (hasher_) { for (unsigned i = 0 ; i < num_buckets; ++i) { buckets[i].reset (new bucket_type); } } threadsafe_lookup_table (threadsafe_lookup_table const & other) = delete ; threadsafe_lookup_table& operator =(threadsafe_lookup_table const & other) = delete ; Value value_for (Key const & key, Value const & default_value = Value()) { return get_bucket (key).value_for (key, default_value); } void add_or_update_mapping (Key const & key, Value const & value) { return get_bucket (key).add_or_update_mapping (key, value); } void remove_mapping (Key const & key) { return get_bucket (key).remove_mapping (key); } std::map<Key, Value> get_map () { std::vector<std::unique_lock<std::shared_mutex>> locks; for (unsigned i = 0 ; i < buckets.size (); ++i) { locks.push_back (std::unique_lock <std::shared_mutex>(buckets[i]->mutex)); } std::map<Key, Value> res; for (unsigned i = 0 ; i < buckets.size (); ++i) { typename bucket_type::bucket_iterator it = buckets[i]->data.begin (); for (; it != buckets[i]->data.end (); ++it) { res.insert (*it); } } return res; } };
自定义了一个MyClass类,用来表示Value类型值
1 2 3 4 5 6 7 8 9 10 11 12 class MyClass { public : MyClass (int i) :_data(i) {} friend std::ostream& operator << (std::ostream& os, const MyClass& mc) { os << mc._data; return os; } private : int _data; };
用于测试的函数:创建了一个查找表,并定义了三个线程,线程1负责向查找表添加0到99;线程2负责从查找表删除0到99;线程3负责向从查找表添加100到199。三个线程并发执行,执行完后,通过removeSet打印删除的那些数据,和打印出查找表里还剩下的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 void TestThreadSafeHash () { std::set<int > removeSet; threadsafe_lookup_table<int , std::shared_ptr<MyClass>> table; std::thread t1 ([&]() { for (int i = 0 ; i < 100 ; i++){ auto class_ptr = std::make_shared<MyClass>(i); table.add_or_update_mapping(i, class_ptr); } }) ; std::thread t2 ([&]() { for (int i = 0 ; i < 100 ; ){ auto find_res = table.value_for(i, nullptr ); if (find_res) { table.remove_mapping(i); removeSet.insert(i); i++; } std::this_thread::sleep_for(std::chrono::milliseconds(10 )); } }) ; std::thread t3 ([&]() { for (int i = 100 ; i < 200 ; i++) { auto class_ptr = std::make_shared<MyClass>(i); table.add_or_update_mapping(i, class_ptr); } }) ; t1.join (); t2.join (); t3.join (); for (auto & i : removeSet){ std::cout << "remove data is " << i << std::endl; } auto copy_map = table.get_map (); for (auto & i : copy_map) { std::cout << "copy data is " << *(i.second) << std::endl; } } int main () { TestThreadSafeHash (); }
5.3 缺陷
首先我们的查找表可以支持并发读,并发写,并发读的时候不会阻塞其他线程。但是并发写的时候会卡住其他线程。基本的并发读写没有问题。
但是对于bucket_type中链表的操作加锁精度并不精细,因为我们采用的是std提供的list容器,所以增删改查等操作都要加同一把锁,导致锁过于粗糙。
6. 基于锁实现线程安全的链表 6.1 简介 对于做一个支持多线程并发访问的链表,首先想到的是用一个互斥量控制整个链表,达到多线程访问时串行的效果。但是这么做精度不够,需要分化互斥量的功能。所以下面让每个节点都维护一个互斥量,这样能保证多个线程操作不同节点时加不同的锁,减少耦合性。
首先需要将head独立为一个虚节点,即不存储数据,只做头部标记。这样每次从头部插入就只需要修将新的节点的next指针指向原来head的next指向的节点,再将head的next指针指向新的节点。具体图像如下:
6.2 代码实现 头节点函数:定义了一个智能指针指向的数据域和unique智能指针指向下一个节点的地址,同时定义了默认构造函数和拷贝构造函数。
1 2 3 4 5 6 7 8 9 10 template <typname T>struct node { std::mutex m; std::shared_ptr<T> data; std::unique_ptr<node> next; node () :next (){} node (T const & value):data (std::make_shared <T>(value)){} };
定义链表的函数:链表类中将拷贝构造和拷贝赋值函数删除,同时链表中初始状态包含了一个头节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 template <typename T>class threadsafe_list { struct node { std::mutex m; std::shared_ptr<T> data; std::unique_ptr<node> next; node ():next (){} node (T const & value):data (std::make_shared <T>(value)){} }; node head; public : threadsafe_list (){} ~threadsafe_list (){} threadsafe_list (threadsafe_list const & other) = delete ; threadsafe_list& operator =(threadsafe_list const & other) = delete ; }
析构函数:在remove_if函数中,将Predicate p中的p可以理解为一个lambda表达式或者是一个函数的功能,通过p来判断,如果p返回了true,说明满足条件,把满足条件的节点删除掉;如果p返回的是false,就不满足条件,就不删除该节点。而析构函数调用remove_if,p谓词就是一个lambda表达式,始终返回true,这样就可以保证删除链表所有的节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 template <typename Predicate>void remove_if (Predicate p) { node* current = &head; std::unique_lock<std::mutex> lk (head.m) ; while (node* const next = current->next.get ()) { std::unique_lock<std::mutex> next_lk (next->m) ; if (p (*next->data)) { std::unique_ptr<node> old_next = std::move (current->next); current->next = std::move (next->next); next_lk.unlock (); } else { lk.unlock (); current = next; lk = std::move (next_lk); } } } ~threadsafe_list () { remove_if ([](node const &) {return true ; }); }
插入节点函数:通过头插法的方式插入新的节点
1 2 3 4 5 6 7 void push_front (T const & value) { std::unique_ptr<node> new_node (new node(value)) ; std::lock_guard<std::mutex> lk (head.m) ; new_node->next = std::move (head.next); head.next = std::move (new_node); }
查找对应节点的函数:该函数是结合谓词p来在链表中一个一个寻找第一个满足条件的节点,找到了就直接返回该data的一个指针,没有找到就返回空指针。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 template <typename Predicate>std::shared_ptr<T> find_first_if (Predicate p) { node* current = &head; std::unique_lock<std::mutex> lk (head.m) ; while (node* const next = current->next.get ()){ std::unique_lock<std::mutex> next_lk (next->m) ; lk.unlock (); if (p (*next->data)){ return next->data; } current = next; lk = std::move (next_lk); } return std::shared_ptr <T>(); }
遍历所有节点函数:该函数是结合谓词函数f来完成的,从头节点开始遍历,到最后空时,就退出循环,停止遍历。
1 2 3 4 5 6 7 8 9 10 11 12 13 template <typename Function>void for_each (Function f) { node* current = &head; std::unique_lock<std::mutex> lk (head.m) ; while (node* const next = current->next.get ()){ std::unique_lock<std::mutex> next_lk (next->m) ; lk.unlock (); f (*next->data); current = next; lk = std::move (next_lk); } }
7. 线程安全的无锁栈实现 7.1 简介 之前通过锁的互斥机制实现了并发安全的栈,队列,查找表,以及链表等结构。接下来是通过无锁的原子变量的方式实现栈。
单线程情况下添加节点步骤:
创建新节点
将元素入栈,将新节点的next指针指向现在的head节点。
将head节点更新为新节点的值。
而在多线程情况下,假设线程1执行到第2步,即将元素入栈后,没来得及更新head节点的值为新节点的值。此时线程2也执行完第2步,将head更新为线程2插入的新节点,之后线程1又将head更新为线程1的新插入节点,那么此时head的位置就是错的。
单线程情况下删除节点步骤:
取出头节点元素
更新head为下一个节点。
返回取出头节点元素的数据域。
而在多线程情况下,第1,2步同样存在线程安全问题。此外我们返回节点数据域时会进行拷贝赋值,如果出现异常会造成数据丢失,这一点也要考虑。
所以为了解决这些问题,可以通过原子变量的compare_exchange
(比较交换操作)来控制更新head节点,以此来达到线程安全的目的。
7.2 代码实现 头节点函数:创建了一个模板类,在该类中,定义了一个T类型的数据域、node*类型的指针,指向下一个节点的地址和一个拷贝构造的函数。
1 2 3 4 5 6 7 template <typename T>struct node { T data; node* next; node (T const & data_):data (data_){} };
无锁栈结构:下面程序定义了一个无锁栈的结构,结构中定义了一个采用原子变量表示的头部节点。需要注意的是,对于栈和队列,如果想要实现线程安全,都不能进行拷贝,不然管理不了。所以得删除拷贝构造和拷贝赋值,采用默认构造即可。
1 2 3 4 5 6 7 8 9 10 template <typename T>class lock_free_stack { private : lock_free_stack (const lock_free_stack&) = delete ; lock_free_stack& operator = (const lock_free_stack&) = delete ; std::atomic<node*> head; public : lock_free_stack () {} }
入栈函数:在多线程情况下,当线程1创建新节点node1并指向了下一个头节点,但没有来得及更新head,这时线程2也创建了新节点node2执行到1处,如下图所示:
然后线程2更新了head节点,即head指向了node2。如下图所示:
当时间片回到线程1,线程1执行2处代码时,发现head的值与node1的next的值不一样,就将head的值赋值给node1的next,并返回false,继续执行循环,那么node1的next就指向了head节点(node2),再执行2处代码时,head与node1->next值相等了,就将node1的值赋值给head了(head进行了更新),然后返回ture,退出循环。如下图所示;
1 2 3 4 5 6 7 template <typename T>void push (const T& value) { auto new_node = new Node (value) do { new_node->next = head.load (); }while (!head.compare_exchange_strong (new_node->next, new_node)); }
出栈函数:下面程序是一个单线程的版本,当线程1执行完1处时,即线程1的old_head指向了头节点,但还没有来得及执行2处代码,线程2就将它的old_head指向了头节点,并将头节点往下移动了,此时线程1再来执行2处代码时,就会出现问题。
1 2 3 4 5 6 template <typename T>void pop (T& value) { node* old_head = head.load (); head = head->next; value = old_head->data; }
所以为了解决上面问题,对代码进行了改进,当线程2执行完2处代码后,即head指向了线程2old_head
的next,此时线程1再执行到2处时,因为head指向的节点和线程1old_head
指向的不一样了,就将线程1old_head
指向head节点(修改删除节点),当在执行2处代码时,就能取出当前head所值的节点(要弹出的节点),并将head的指向往下移,退出线程。
1 2 3 4 5 6 7 template <typename T>void pop (T& value) { do { node* old_head = head.load (); }while (!head.compare_exchange_weak (old_head, old_head->next)); value = old_head->data; }
上面代码存在的问题:
未判断空栈的情况,当old_head为空时,在2处还有old_head->next操作,就很危险。
将数据域赋值给引用类型的value时存在拷贝赋值(3处),我们都知道拷贝赋值会存在异常的情况,当异常发生时元素已经从栈定移除了,破坏了栈的结构,这一点和锁处理时不一样,锁处理的时候是先将元素数据域取出赋值再出栈,所以不会有问题,但是无锁的方式就会出现栈被破坏的情况。解决方式也比较简单,数据域不再存储T类型数据,而是存储std::shared_ptr<T>
类型的数据。智能指针在赋值的时候不会产生异常。
未释放弹出的节点的内存。
先解决未判断栈未空的问题,如下程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class lock_free_stack { private : struct node { std::shared_ptr<T> data; node* next; node (T const & data_) : data (std::make_shared <T>(data_)) {} }; lock_free_stack (const lock_free_stack&) = delete ; lock_free_stack& operator = (const lock_free_stack&) = delete ; std::atomic<node*> head; public : lock_free_stack () {} std::shared_ptr<T> pop () { node* old_head = nullptr ; do { old_head = head.load (); if (old_head == nullptr ) { return nullptr ; } } while (!head.compare_exchange_weak (old_head, old_head->next)); return old_head->data; } };
对于资源回收问题,可以先实现一个简单的回收处理逻辑,通过临时智能指针与要删除的数据进行交换的方法来回收资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 template <typename T>std::shared_ptr<T> pop () { node* old_head = nullptr ; do { old_head = head.load (); if (old_head == nullptr ) { return nullptr ; } } while (!head.compare_exchange_weak (old_head, old_head->next)); std::shared_ptr<T> res; res.swap (old_head->data); delete old_head; return res; }
上面程序虽然解决了资源回收的问题,但还是存在很大的问题,当线程1走到1处时,时间片被分配到线程2,然后线程2直接执行到第5处,把old_head删除了,但此时线程1的old_head与线程2删除的old_head是一样的,这样就会导致线程1继续执行,到执行第2处时,取old_head的next时,就会导致系统崩溃。所以就不能把old_head直接删除,其实能不能删除old_head,取决于其它线程是否还用old_head,就像如果是单线程的,就完全可以删除。所以考虑多线程的情况,就需要引入延迟删除节点的机制来解决该问题。
设计思路:
如果head已经被更新,且旧head不会被其他线程引用,那旧head就可以被删除。否则放入待删列表。
如果仅有一个线程执行pop操作,那么待删列表可以被删除,如果有多个线程执行pop操作,那么待删列表不可被删除。
我们需要用一个原子变量threads_in_pop记录有几个线程执行pop操作。在pop结束后再减少threads_in_pop。 我们需要一个原子变量to_be_deleted记录待删列表的首节点。程序如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 std::shared_ptr<T> pop () { ++threads_in_pop; node* old_head = nullptr ; do { old_head = head.load (); if (old_head == nullptr ) { --threads_in_pop; return nullptr ; } } while (!head.compare_exchange_weak (old_head, old_head->next)); std::shared_ptr<T> res; if (old_head) { res.swap (old_head->data); } try_reclaim (old_head); return res; } void try_reclaim (node* old_head) { if (threads_in_pop == 1 ) { node* nodes_to_delete = to_be_deleted.exchange (nullptr ); if (!--threads_in_pop) { delete_nodes (nodes_to_delete); }else if (nodes_to_delete) { chain_pending_nodes (nodes_to_delete); } delete old_head; } else { chain_pending_node (old_head); --threads_in_pop; } } static void delete_nodes (node* nodes) { while (nodes) { node* next = nodes->next; delete nodes; nodes = next; } } void chain_pending_node (node* n) { chain_pending_nodes (n, n); } void chain_pending_nodes (node* first, node* last) { last->next = to_be_deleted; while (!to_be_deleted.compare_exchange_weak ( last->next, first)); } void chain_pending_nodes (node* nodes) { node* last = nodes; while (node* const next = last->next) { last = next; } chain_pending_nodes (nodes, last); }