当前位置: 首页 >Java技术 > Rx:4-[编外篇] .NET4里的Concurrent Collections

Rx:4-[编外篇] .NET4里的Concurrent Collections

前面说了,Rx可以在3.5里用,他带给了3.5可以使用的支持并发的集合类型,说白了,就是提供了Thread-Safe的Collection。

在.NET 4.0之前,是不直接支持线程安全的集合的。除非自己去做lock。而Lock带给我们的除了风险之外,系统的性能下降问题特别明显。在我之前的项目中,需要用到一个Thread-safe的Queue,我当时使用Compare-And-Swap方式实现的。但是这种方式并不能保证高效,个人能力有限,Google也没帮上多大忙。所以.net 4.0一出来的时候,宣布支持ConcurrentQueue<T>,我立刻就抄起Reflector去看看它的实现方式。

当然,除了ConcurrentQueue<T>外,还有其他几个支持线程安全的并发集合类型,看完之后,才发现他们并不全都是Lock-free的,这点还是跟我的判断不同的。

我们常说的Lock-free,对于.net来讲,其实是说就是不用lock(),也就是Monitor这个类。在Team的技术分享中我多次建议大家不要直接写lock,而是写Monitor.TryEnter(object, 200)这样的方式,这样的好处是避免整个程序面临尴尬的状况,毕竟对于一个互联网来说,绝大多数应用并不是追求完整性的,牺牲整个站点的代价太高了。在.net里,我们可以通过memory barrier或者使用CPU的CAS(就是上面说的Compare-And-Swap,那个Interlocked类就是)指令来进行一些粒度锁,这些都是很轻量级的,我们认为他是不会损坏系统性能的。

注:(感谢@Kavin Yang的提醒)

使用lock()的时候,.net会一直等待能够lock才行。而lock()的实现就是调用Monitor.Enter。但是Monitor还有一个叫做TryEnter的方法,参数除了要lock的object外,还提供一个时间参数作为该操作的超时时间。

可以参见:http://msdn.microsoft.com/en-us/library/42h9d380(v=VS.80).aspx

那看几个实现:

ConcurrentQueue<T>和ConcurrentStack<T>

这两个是使用CAS的方式来完成任务的,实现原理是一样的:

1: private void PushCore(Node<T> head, Node<T> tail)
2: {
3: SpinWait wait = new SpinWait();
4: do
5: {
6: wait.SpinOnce();
7: tail.m_next = this.m_head;
8: }
9: while (Interlocked.CompareExchange<Node<T>>(ref this.m_head, head, tail.m_next) != tail.m_next);
  10: }

注意到do…while。整个实现都没有使用lock的地方,唯独在CAS失败的时候使用SpinWait的SpinOnce方法。

ConcurrentDictionary<TKey, TValue>

这个类也在mscorlib.dll里,同样位于System.Collections.Concurrent命名空间下。但是它的add、update等方法都是使用了lock来完成的(当然不是完全依赖lock,所以有性能提升)

1: try
2:{
3:if (acquireLock)
4:{
5:Monitor.Enter(this.m_locks[num3], ref lockTaken);
6:}
7:if (nodeArray != this.m_buckets)
8:{
9:goto Label_000D;
  10:}
  11:Node<TKey, TValue> node = null;
  12: ....

但是对于他的读操作:

1: public bool TryGetValue(TKey key, out TValue value)
2: {
3: int num;
4: int num2;
5: if (key == null)
6: {
7: throw new ArgumentNullException("key");
8: }
9: Node<TKey, TValue>[] buckets = this.m_buckets;
  10: this.GetBucketAndLockNo(this.m_comparer.GetHashCode(key), out num, out num2, buckets.Length);
  11: Node<TKey, TValue> next = buckets[num];
  12: Thread.MemoryBarrier();
  13: while (next != null)
  14: {
  15: if (this.m_comparer.Equals(next.m_key, key))
  16: {
  17: value = next.m_value;
  18: retu true;
  19: }
  20: next = next.m_next;
  21: }
  22: value = default(TValue);
  23: retu false;
  24: }

能看到这里的处理方式还是很巧妙的,Dictionary这个数据结构本身就是一个设计为读多的类型,read的次数肯定要远远超过write的次数。所以这个ConcrruentDictionary的设计并不是追求lock-free的改/写操作,而是将read操作做到了lock-free!

ConcrruntBat<T>

这个类设计的更有意思,思路很像是采用了Pool的方式(Pool一般有3种常见的使用方式),将pool对应到应用的thread上,他们之间就没有锁的我问题了(或者无损耗的锁),比如说Add操作:

1: public void Add(T item)
2: {
3: ThreadLocalList<T> threadList = this.GetThreadList(true);
4: this.AddInteal(threadList, item);
5: }

 

1: private void AddInteal(ThreadLocalList<T> list, T item)
2: {
3: bool taken = false;
4: try
5: {
6: Interlocked.Exchange(ref list.m_currentOp, 1);
7: if ((list.Count < 2) || this.m_needSync)
8: {
9: list.m_currentOp = 0;
  10: Monitor2.Enter(list, ref taken);
  11: }
  12: list.Add(item, taken);
  13: }
  14: finally
  15: {
  16: list.m_currentOp = 0;
  17: if (taken)
  18: {
  19: Monitor.Exit(list);
  20: }
  21: }
  22: }

 

Rx系列:

Rx:1-Observable

Rx:2-Observable more

Rx:3-System.CoreEx.dll

Rx:4-[编外篇] .NET4里的Concurrent Collections

作者:new 维生素C.net()
来源链接:https://www.cnblogs.com/fanweixiao/archive/2010/06/02/Rx.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/java/112980.html

标签:Collections
分享给朋友:

“Rx:4-[编外篇] .NET4里的Concurrent Collections” 的相关文章