This is my reading notes of “Concurrent Programming on Windows”, as I said before, I am quite a newbie on this area, there maybe errors in my understanding and summary, please kindly point them out if there is any.
tbb::concurrent_vector
和stl::vector相比,tbb::concurrent_vector不支持insert和erase操作,且不保证内存是连续的。因为不支持insert操作,故而无法实现由一个线程进行插入排序,并同时由另一个线程去遍历的场景。TBB中也没有平衡树的支持,估计是因为,插入/删除时的节点移动操作很多,细粒度锁不容易带来更佳性能的原因。
template<typename T, class A>
class concurrent_vector: protected internal::allocator_base<T, A>,
private internal::concurrent_vector_base;
typedef concurrent_vector_base_v3 concurrent_vector_base;
// VC10 defines its own v4 template
struct concurrent_vector_base_v3::segment_t {
void* array;
};
atomic<segment_t*> my_segment;
segment_t my_storage[pointers_per_short_table];
// pointers_per_short_table = 3
concurrent_vector_base_v3::concurrent_vector_base_v3():
将指针数组my_storage的元素赋值为NULL, 然后将my_storage赋值给my_segment。
concurrent_vector_base_v3::segment_index_of(index):
即 log2(index|1)
concurrent_vector_base_v3::segment_base (k):
(1 << k & ~1),和2**k(即1<<k)不同,当k为0时,结果为0而不是1
concurrent_vector_base_v3::segment_base_index_of(index):
将index调整为segment内的索引,并返回是第几个segment。
concurrent_vector_base_v3::segment_size(k):
返回第k个segment的大小。1 << k,k==0时应该返回2才对呀?
从逻辑上看,concurrent_vector内部的segments布局如下,总会是2,2,4,8,16,32…
但是实际分配的空间,则可能是16,16,32,64,取决于第一次的reservation, growth 或 assignment操作。如果第一次要求分配10个元素,那么heap上分配的空间是,比10大的最小的2的幂数,即16。
(插图来自http://blogs.msdn.com/b/nativeconcurrency,VC10中的concurrent_vector和TBB中的是同出一脉的。)
concurrent_vector::push_back (item)
调用internal_push_back(sizeof(T), k)去分配一段空间,然后调用internal_loop_guide(ntrails为1)将item拷贝到分配的空间上,最后返回iterator
concurrent_vector_base_v3::internal_push_back(element_size, &index)
使用__TBB_FetchAndIncrementWacquire将m_early_size加1,并将原来的size赋值给tmp和index。调用helper::extend_table_if_necessary(*this, k_old, tmp),在必要时扩展segment table,调用helper::acquire_segment获取一个segment的基地址,计算偏移并返回对应的地址。
libittnotify是一些profiling的函数,在编译时可以打开或关闭。ITT=>Intel Threading Tools。
因为工作的关系,现在对C++的并发编程十分有兴趣,TBB无疑是这一领域的佼佼者。我主要对并发容器、scalable allocator比较有兴趣。现在刚刚开始读TBB的代码,这里是一些粗浅的笔记,欢迎大家多指正 …
tbb::machine
tbb_machine.h中引入了所有支持平台的头文件,如果对应平台没有定义__TBB_CompareAndSwap4,__TBB_CompareAndSwap8,__TBB_Yield,或__TBB_release_consistency_helpler,就会报错,无法支持。如果没有定义__TBB_load_with_acquire或__TBB_store_with_release,就会通过__TBB_release_consistency_helpler来实现它们;如果没有实现__TBB_Pause,就通过__TBB_Yield来实现。
__TBB_rel_acq_fence和__TBB_release_consistency_helper不同,前者是一个真正的硬件的fence,后者只是为了告诉编译器不要进行重排。Intel的编译器就将__TBB_release_consistency_helper定义为空了,但是GCC需要特殊的定义。但是奇怪的是,MSDN上说_ReadWriteBarrier是包括硬件fence的。
内存语义(memory semantics):
load_with_acquire: 栅栏之后的内存操作不能移动到load之前;
store_with_release: 栅栏之前的内存操作不能移动到store之后;
__TBB_machine_fetchadd1等方法,通常定义在src/<arch>/atomic_support.asm中的,ibm_aix51中是一个C文件。
某些term和Windows API的对应关系:
fetch-and-add => InterlockedExchangeAdd
fetch-and-store => InterlockedExchange
compare-and-swap => InterlockedCompareExchange
atomic_backoff: pause()方法,在前5以内,是指数级的backoff,之后调用__TBB_Yield()来交出CPU。bounded_pause()会在5次尝试之后,返回false。
还定义了原子的OR和AND操作,以及__TBB_TryLockByte,__TBB_LockByte等辅助函数(都是基于CAS来实现的)。
tbb::atomic
enum memory_semantics: {__TBB_full_fence, acquire, release};
后面两个应该只是为支持IA64的,只有在linux_ia64.h中定义了__TBB_DECL_FENCED_ATOMICS这个宏,并实现了型如__TBB_CompareAndSwap##S##M(其中S是size,M是memory_semantic,例如__TBB_CompareAndSwap4acquire)等方法。如果这些方法没有被定义的话,tbb_machine.h中会用__TBB_full_fence语义的版本重定义之。
atomic_rep: 对长度为[2, 4, 8]字节的类型,都要求对齐;即加入了一些编译器属性。
template struct atomic_traits;
包括compare_and_swap, fetch_and_add, fetch_and_store等几个基本的方法。
atomic_impl实际上是对atomic_rep和atomic_traits的一个包装。而atomic模板从atomic_impl中派生得到的,对指针类型来说,是从atomic_impl_with_arithmetic中派生的(void *除外),以对指针的移动操作(例如++, –)进行memory barrier的保护。而atomic_impl_with_arithmetic也是从atomic_impl中派生的。相比atomic_impl,atomic对赋值操作operator=加入了fence的保护。
tbb::blocked_range
包括1维、2维、3维的实现。模板参数是iterator类型。表示一个半闭半开的区间,[my_begin, my_end)。当区间的size<=my_grainsize时,区间就不可再分了。其中一个构造函数,可以将当前的range(*this)分为两半,新构造的range是后一半,旧的range被修改为前一半。
RCU较RWL有更好的性能,其读操作几乎是free的;writer需要先对原始数据做一份拷贝,再进行修改(在同一时刻只能有一个writer),完成之后替换掉原来的指针(如果swap不是原子性操作,需要critical section的保护);然后,由reclaimer在适当的时机将原始数据占用的内存释放掉。这样,writer不像RWL会因为有reader而被阻塞。RCU和RWL一样,比较适用于读多写少的情景。
RCU多见于操作系统的内核中,也有user-space的实现可供参考(liburcu)。个人觉得,如果将reclaimer的回收工作分摊到某个reader或writer上,借用tr1::shared_ptr,应该可以很方便的实现。下面是实现的代码,我对并发编程经验尚浅,一直拿不太准,是否正确实现了,贴出来请大家多指正!
基本思路是这样的,reader可以通过get_reading_copy()获得当前数据的shared_ptr(记为Generation 1,G1);不过当m_is_swapping为1时,要阻塞等待。writer通过get_updating_copy()得到当前数据的一个副本(记为Generation 2,G2),由m_is_writing进行保护,只允许有一个写者进入;在更新完成之后,writer调用update()将这个副本的shared_ptr传回来,然后通过swap操作令m_data_ptr指向新的数据(G2),然后打开m_is_writing和m_is_swapping。writer持有的那个shared_ptr在调用update()之后指向了原始数据(G1),之前reader(s)持有的shared_ptr(s)也同样指向的是原始数据(G1),当这些shared_ptr(s)统统被析构时,会释放掉原始数据所占用的内存(可能发生在writer或reader上)。新的reader则会获得新数据的shared_ptr。如果有另外一个writer进入,得到的是新数据的副本(记为Generation 3,G3)。因此,如果指向G1的那些shared_ptr(s)还没有被完全析构时,有可能存在多个不同代(generations)的数据副本。
template <typename T>
class rcu_protected
{
public:
typedef T type;
typedef const T const_type;
typedef std::tr1::shared_ptr<type> rcu_pointer;
typedef std::tr1::shared_ptr<const_type> rcu_const_pointer;
rcu_protected() : m_data_ptr (new type()) {}
rcu_protected(T* data) : m_data_ptr (data) {}
rcu_const_pointer get_reading_copy ()
{
LockGuard< CRWLock,
&CRWLock::read_lock,
&CRWLock::read_unlock> l_guard (m_ptr_lock);
return m_data_ptr;
}
rcu_pointer get_updating_copy ()
{
m_writer_lock.write_lock();
LockGuard< CRWLock,
&CRWLock::read_lock,
&CRWLock::read_unlock> l_guard (m_ptr_lock);
return rcu_pointer(new type(*m_data_ptr));
}
void update (rcu_pointer new_data_ptr)
{
LockGuard< CRWLock,
&CRWLock::write_lock,
&CRWLock::write_unlock> l_guard (m_ptr_lock);
m_data_ptr.swap (new_data_ptr);
m_writer_lock.write_unlock();
}
private:
CRWLock m_ptr_lock;
CRWLock m_writer_lock;
rcu_pointer m_data_ptr;
};
自注:
- 在VC2005之后(包括2005),编译器对volatile变量的访问会自动加fence,所以那两个barrier(s)可以省去。
- 其实在update()中,直接将new_data_ptr赋值给m_data_ptr应该也是可以的,但是可能导致在update()中去析构m_data_ptr指向的数据,而我们应该让update()能尽可能快的完成。
- 感谢Dmitry Vyukov的review,我遗漏了一种简单的情况,当reader等到m_is_swapping为0,准备拷贝指针时,突然又有writer线程进入要update指针,这就可能导致crash了… 应该用一个rw_lock将读和写保护起来 … 教训啊,任何关键数据的读写,都应该是全程保护的 …
CAS(CompareAndSwap),是用来实现lock-free编程的重要手段之一,多数处理器都支持这一原子操作。其用伪码描述如下,
template bool CAS(T* addr, T expected, T value)
{
if (*addr == expected) {
*addr = value;
return true;
}
return false;
}
Herb在其一篇文章中提到,看到类如if(variable.compare_exchange(x,y))这样的表达式时, 你应该去习惯将其解读为,“是否我就是那个要把variable从x改为y的线程?”。
使用CAS实现lock-free的一般过程可描述为:
- 拷贝目标对象
- 对拷贝进行修改
- 用CAS对目标对象进行修改,如果失败跳转到#1
你可能觉得这个过程有些眼熟,和C++里异常安全编码的规则有些类似,都是先得到目标对象的一份拷贝,然后对拷贝进行修改,最后通过一个不会抛出异常的swap(),将拷贝和目标对象进行置换。只不过这里多了一个CAS,判断目标对象是否已经被其他线程修改了。如果没有被修改过,则commit;否则,就要重做并重试。
这个过程有一个很实际的风险,就是人们常说的ABA问题,即在当前线程得到目标对象的拷贝并进行修改时,目标对象可能被另一个线程从A修改到B、进而又从B又修改到A了。解决ABA问题的方法是,为操作的对象加上一个版本号。大部分的处理器都支持CAS2,支持对两个连续的WORD进行比较和交换。对Intel/AMD的x86和x64处理器来说,CAS2分别对应的是8个byte和16个byte。
我一直在想,这个过程有没有一个比较直观、容易让人理解的类比。后来想到,作为程序员的我们,在使用版本管理系统进行合作开发时,其实经常遇到类似的问题。例如,我们可以把团队里的多个成员想象为多个线程,我们可能同时对一个源文件进行修改。作为个人,修改代码并最终提交的过程大概是这个样子的,
- 从代码仓库中检出或更新某个源代码文件
- 对本地的文件拷贝进行修改
- 在提交之前从代码仓库中update一下该文件,如果没有冲突,就可以提交了;否则,就要解决冲突再尝试提交
在步骤iii中,存在冲突的原因是,一定有其他同事(另一个线程)已经对这个文件进行了修改并提交;如果我们把解决冲突的过程描述为,重新拷贝目标源文件,并apply/merge步骤ii中得到的patch,这个过程和前面的那个过程非常接近了。
对于ABA问题,可以想象:另一个同事在我提交之前对文件进行了修改,然后又revert了自己的修改(A->B->A),虽然没有冲突,我们依然可以将步骤iii设想为重新拷贝新版本并apply patch的方式(虽然大多数版本管理工具是根据文件的checksum进行相等性比较的)。
如果在这个时间段,某同事快速不停地对该文件进行修改提交的话,我可能就一直处于“redo-retry”的状态而无法提交自己的代码了。这时,索性我就先把这个修改放到一边,去冲杯咖啡,溜达溜达,然后回来继续尝试。这个就是所谓“back-off”操作了,即如果CAS一直失败,就空闲一会儿,甚至将线程切换出去。
似乎基于CAS的lock-free结构和spin-lock也很相像,不都是一直自旋等待么,不也都是基于乐观性假设么?也对,spin-lock也有可能是用CAS来实现的呢。不过,spin-lock毕竟还是一个锁,因此也就沿袭了锁的一些问题,例如优先级反转、死锁等。而基于CAS的lock-free结构,这不会有这样的问题(虽然运行比较慢的、优先级比较低的线程依然可能被饿死),并且有更好的可伸缩性。
Recent Comments