lock-free 编程是什么?
当谈及 lock-free 编程时,我们常将其概念与 mutex 或 lock 联系在一起,描述要在编程中尽量少使用这些锁结构,降低线程间互相阻塞的机会,以提高应用程序的性能。类同的概念还有 "lockless" 和 "non-blocking" 等。实际上,这样的描述只涵盖了 lock-free 编程的一部分内容。本质上说,lock-free 编程仅描述了代码所表述的性质,而没有限定或要求代码该如何编写。
基本上,如果程序中的某一部分符合下面的条件判定描述,则我们称这部分程序是符合 lock-free 的。反过来说,如果某一部分程序不符合下面的条件描述,则称这部分程序是不符合 lock-free 的。
从这个意义上来说,lock-free 中的 "lock" 并没有直接涉及 mutex 或 lock 等互斥量结构,而是描述了应用程序因某种原因被锁定的可能性,例如可能因为死锁(deadlock)、活锁(livelock)或线程调度(thread scheduling)导致优先级被抢占等。
lock-free 编程的一个重要效果就是,在一系列访问 lock-free 操作的线程中,如果某一个线程被挂起,那么其绝对不会阻止其他线程继续运行(non-blocking)。
下面的这段简单的程序片段中,没有使用任何互斥量结构,但却不符合 lock-free 的性质要求。如果用两个线程同时执行这段代码,在线程以某种特定的调度方式运行时,非常有可能两个线程同时陷入死循环,也就是互相阻塞了对方。
while (x == 0)
{
x = 1 - x;
}
所以说,lock-free 编程所带来的挑战不仅来自于其任务本身的复杂性,还要始终着眼于对事物本质的洞察。
通常,应该没有人会期待一个大型的应用程序中全部采用 lock-free 技术,而都是在有特定需求的类的设计上采用 lock-free 技术。例如,如果需要一个 stack 类应对多线程并发访问的场景,可以使用 lock-free 相关技术实现 concurrentstack 类,在其 push 和 pop 操作中进行具体的实现。所以,在使用 lock-free 技术前,需要预先考虑一些软件工程方面的成本:
- lock-free 技术很容易被错误的使用,代码后期的维护中也不容易意识到,所以非常容易引入 bug,而且这样的 bug 还非常难定位。
- lock-free 技术的细节上依赖于内存系统模型、编译器优化、cpu架构等,而这在使用 lock 机制时是不相关的,所以也增加了理解和维护的难度。
lock-free 编程技术
当我们准备要满足 lock-free 编程中的非阻塞条件时,有一系列的技术和方法可供使用,如原子操作(atomic operations)、内存栅栏(memory barrier)、避免 aba 问题(avoiding aba problem)等。那么我们该如何抉择在何时使用哪种技术呢?可以根据下图中的引导来判断。
读改写原子操作(atomic read-modify-write operations)
原子操作(atomic operations)在操作内存时可以被看做是不可分割的(indivisible),其他线程不会打断该操作,没有操作只被完成一部分之说。在现代的 cpu 处理器上,很多操作已经被设计为原子的,例如对齐读(aligned read)和对齐写(aligned write)等。
read-modify-write(rmw)操作的设计则让执行更复杂的事务操作变成了原子的,使得当有多个写入者想对相同的内存进行修改时,保证一次只执行一个操作。
例如,常见的对整型值进行加法操作的 rmw 操作:
- 在 win32 中有 _interlockedincrement
- 在 ios 中有 osatomicadd32
- 在 c 11 中有 std::atomic
::fetch_add
rmw 操作在不同的 cpu 家族中是通过不同的方式来支持的。
- x86/64 和 itanium 架构通过 compare-and-swap (cas) 方式来实现;
- powerpc、mips 和 arm 架构通过 load-link/store-conditional (ll/sc) 方式来实现;
例如在 x86 架构下,通过 lock 指令前缀可以使许多指令操作(add, adc, and, btc, btr, bts, cmpxchg, cmpxch8b, dec, inc, neg, not, or, sbb, sub, xor, xadd, and xchg)变成原子操作,其中 cmpxchg 指令可用于实现 cas 操作。
下面是使用 lock 和 cmpxchg 来实现 cas 操作的代码示例。
__inline int cas(volatile int & destination, int exchange, int comperand)
{
__asm {
mov eax, comperand
mov ecx, exchange
mov edx, destination
lock cmpxchg[edx], ecx /* 如果eax与edx相等, 则ecx送edx且zf置1;
否则edx送ecx, 且zf清0.*/
}
}
/* accumulator = al, ax, eax, or rax depending on
whether a byte, word, doubleword, or
quadword comparison is being performed */
if accumulator = dest
then
zf ← 1;
dest ← src;
else
zf ← 0;
accumulator ← dest;
fi;
compare-and-swap 循环(cas loops)
在 win32 平台上,cas 操作有一组原生的实现,例如 _interlockedcompareexchange 等。对 rmw 操作最常见的讨论可能就是,如何通过 cas loops 来完成对事务的原子处理。
通常,开发人员会设计在一个循环中重复地执行 cas 操作以试图完成一个事务操作。这个过程分为 3 步:
- 从指定的内存位置读取原始的值;
- 根据读取到的原始的值计算出新的值;
- 检测如果内存位置仍然是原始的值时,则将新值写入该内存位置;
例如,向 lockfreestack 中压入新的节点:
1 void lockfreestack::push(node* newhead)
2 {
3 for (;;)
4 {
5 // read the original value from a memory location.
6 // copy a shared variable (m_head) to a local.
7 node* oldhead = m_head;
8
9 // compute the new value to be set.
10 // do some speculative work, not yet visible to other threads.
11 newhead->next = oldhead;
12
13 // set the new value only if the memory location is still the original value.
14 // next, attempt to publish our changes to the shared variable.
15 // if the shared variable hasn't changed, the cas succeeds and we return.
16 // otherwise, repeat.
17 if (_interlockedcompareexchange(&m_head, newhead, oldhead) == oldhead)
18 return;
19 }
20 }
上面代码中的循环操作仍然符合 lock-free 条件要求,因为如果 _interlockedcompareexchange 条件测试失败,也就意味着另外的线程已经成功修改了值,而当前线程可以再下一个循环周期内继续判断以完成操作。
aba 问题(aba problem)
在实现 cas loops 时,当存在多个线程交错地对共享的内存地址进行处理时,如果实现设计的不正确,将有可能遭遇 aba 问题。
若线程对同一内存地址进行了两次读操作,而两次读操作得到了相同的值,通过判断 "值相同" 来判定 "值没变"。然而,在这两次读操作的时间间隔之内,另外的线程可能已经修改了该值,这样就相当于欺骗了前面的线程,使其认为 "值没变",实际上值已经被篡改了。
下面是 aba 问题发生的过程:
- t1 线程从共享的内存地址读取值 a;
- t1 线程被抢占,线程 t2 开始运行;
- t2 线程将共享的内存地址中的值由 a 修改成 b,然后又修改回 a;
- t1 线程继续执行,读取共享的内存地址中的值仍为 a,认为没有改变然后继续执行;
由于 t1 并不知道在两次读取的值 a 已经被 "隐性" 的修改过,所以可能产生无法预期的结果。
例如,使用 list 来存放 item,如果将一个 item 从 list 中移除并释放了其内存地址,然后重新创建一个新的 item,并将其添加至 list 中,由于优化等因素,有可能新创建的 item 的内存地址与前面删除的 item 的内存地址是相同的,导致指向新的 item 的指针因此也等同于指向旧的 item 的指针,这将引发 aba 问题。
举个更生活化的例子:
土豪拿了一个装满钱的 hermes 黑色钱包去酒吧喝酒,将钱包放到吧台上后,转头和旁边的朋友聊天,小偷趁土豪转头之际拿起钱包,将钱包里的钱取出来并放入餐巾纸保持钱包厚度,然后放回原处,小偷很有职业道德,只偷钱不偷身份证,土豪转过头后发现钱包还在,并且还是他熟悉的 hermes 黑色钱包,厚度也没什么变化,所以土豪认为什么都没发生,继续和朋友聊天,直到结账时发现钱包中的钱已经被调包成餐巾纸。
所以,我觉得 aba 问题还可以被俗称为 "调包问题"。那么怎么解决 "调包问题" 呢?土豪开始想办法了。
土豪想的第一个办法是,找根绳子将钱包绑在手臂上,要打开钱包就得先把绳子割断,割绳子就会被发现。这种做法实际上就是 load-link/store-conditional (ll/sc) 架构中所做的工作。
土豪想的另一个办法是,在钱包上安个显示屏,每次打开钱包显示屏上的数字都会 1,这样当土豪在转头之前可以先记录下显示屏上的数字,在转过头后可以确认数字是否有变化,也就知道钱包是否被打开过。这种做法实际上就是 x86/64 架构中 double-word cas tagging 所做的工作。
土豪还担心小偷下次会不会买一个一模一样的钱包,直接调包整个钱包,这样连银行卡和身份证都丢了怎么办,土豪决定买一个宇宙独一无二的钱包,除非把它给烧了,否则就不会再有相同的钱包出现。这种做法实际上就是 garbage collection (gc) 所做的工作。
内存模型(memory model)对细粒度锁的影响
在多线程系统中,当多个线程同时访问共享的内存时,就需要一个规范来约束不同的线程该如何与内存交互,这个规范就称之为内存模型(memory model)。
顺序一致性内存模型(sequential consistency memory model)则是内存模型规范中的一种。在这个模型中,内存与访问它的线程保持独立,通过一个控制器(memory controller)来保持与线程的联系,以进行读写操作。在同一个线程内的,读写操作的顺序也就是代码指定的顺序。但多个线程时,读写操作就会与其他线程中的读写操作发生交错。
如上图中所示,thread 1 中在写入 value 和 inited 的值,而 thread 2 中在读取 inited 和 value 的值到 ri 和 rv 中。由于在内存控制器中发生重排(memory reordering),最终的结果可能有很多种情况,如下表所示。
顺序一致性内存模型非常的直观,也易于理解。但实际上,由于该模型在内存硬件实现效率上的限制,导致商用的 cpu 架构基本都没有遵循该模型。一个更贴近实际的多处理器内存模型更类似于下图中的效果。
也就是说,每个 cpu 核都会有其自己的缓存模型,例如上图中的 level 1 cache 和 level 2 cache,用以缓存最近使用的数据,以提升存取效率。同时,所有的写入数据都被缓冲到了 write buffer 缓冲区中,在数据在被刷新至缓存前,处理器可以继续处理其他指令。这种架构提升了处理器的效率,但同时也意味着我们不仅要关注 memory,同时也要关注 buffer 和 cache,增加了复杂性。
上图所示为缓存不一致问题(incoherent caches),当主存(main memory)中存储着 value=5,inited=0 时,processor 1 就存在着新写入 cache 的值没有被及时刷新至 memory 的问题,而 processor 2 则存在着读取了 cache 中旧值的问题。
显然,上面介绍着内存重排和缓存机制会导致混乱,所以实际的内存模型中会引入锁机制(locking protocol)。通常内存模型会遵循以下三个规则:
- rule 1:当线程在隔离状态运行时,其行为不会改变;
- rule 2:读操作不能被移动到获取锁操作之前;
- rule 3:写操作不能被移动到释放锁操作之后;
rule 3 保证了在释放锁之前,所有写入操作已经完成。rule 2 保证要读取内存就必须先获取锁,不会再有其他线程修改内存。rule 1 则保证了获得锁之后的操作行为是顺序的。
在体现锁机制(locking protocol)的价值的同时,我们也会意识到它所带来的限制,也就是限制了编译器和 cpu 对程序做优化的自由。
我们知道,.net framework 遵循 ecma 标准,而 ecma 标准中则定义了较为宽松的内存访问模型,将内存访问分为两类:
- 常规内存访问(ordinary memory access)
- 易变内存访问(volatile memory access)
其中,易变内存访问是特意为 "volatile" 设计,它包含如下两个规则:
- 读和写操作不能被移动到 volatile-read 之前;
- 读和写操作不能被移动到 volatile-write 之后;
对于那些没有使用 "lock" 和 "volatile" 的程序片段,编译器和硬件可以对常规内存访问做任何合理的优化。反过来讲,内存系统仅需在应对 "lock" 和 "volatile" 时采取缓存失效和刷新缓冲区等措施,这极大地提高了性能。
顺序一致性(sequential consistency)的要求描述了程序代码描述的顺序与内存操作执行的顺序间的关系。多数编程语言都提供顺序一致性的支持,例如在 c# 中可以将变量标记为 volatile。
a volatile read has "acquire semantics" meaning that the read is guaranteed to occur prior to any references to memory that occur after the read instruction in the cil instruction sequence.
a volatile write has "release semantics" meaning that the write is guaranteed to happen after any memory references prior to the write instruction in the cil instruction sequence.
下面的列表展示了 .net 中内存读写操作的效果。
|
construct |
refreshes thread cache before? |
flushes thread cache after? |
notes |
|
ordinary read |
no |
no |
read of a non-volatile field |
|
ordinary write |
no |
yes |
write of a non-volatile field |
|
volatile read |
yes |
no |
read of volatile field, or thread.volitelread |
|
volatile write |
no |
yes |
write of volatile field |
|
thread.memorybarrier |
yes |
yes |
special memory barrier method |
|
interlocked operations |
yes |
yes |
increment, add, exchange, etc. |
|
lock acquire |
yes |
no |
monitor.enter or entering a lock {} region |
|
lock release |
no |
yes |
monitor.exit or exiting a lock {} region |
代码实践
我们需要在实践中体会 lock-free 编程,方能洞察机制的本质,加深理解。下面用实现栈 stack 类的过程来完成对 lock-free 编程的探索。
栈结构实际上就是 filo 先入后出队列,通常包括两个操作:
- push:向栈顶压入一个元素(item);
- pop:从栈顶弹出一个元素(item);
这里我们选用单链表结构(singly linked list)来实现 filo 栈,每次入栈的都是新的链表头,每次出栈的也是链表头。
实现普通的栈 simplestack 类
构建一个内部类 node 用于存放 item,并包含 next 引用以指向下一个节点。
1 private class node2 { 3 public node next; 4 public tnode item; 5 public override string tostring() 6 { 7 return string.format("{0}", item); 8 } 9 }
这样,实现 push 操作就是用新压入的节点作为新的链表头部,而实现 pop 操作则是将链表头部取出后将所指向的下一个节点作为新的链表头。
1 public class simplestack2 { 3 private class node 4 { 5 public node next; 6 public tnode item; 7 public override string tostring() 8 { 9 return string.format("{0}", item); 10 } 11 } 12 13 private node _head; 14 15 public simplestack() 16 { 17 _head = new node (); 18 } 19 20 public void push(t item) 21 { 22 node node = new node (); 23 node.item = item; 24 25 node.next = _head.next; 26 _head.next = node; 27 } 28 29 public t pop() 30 { 31 node node = _head.next; 32 if (node == null) 33 return default(t); 34 35 _head.next = node.next; 36 37 return node.item; 38 } 39 }
使用如下代码,先 push 入栈 1000 个元素,然后在多线程中 pop 元素。
1 class program
2 {
3 static void main(string[] args)
4 {
5 simplestack stack = new simplestack();
6
7 for (int i = 1; i <= 1000; i )
8 {
9 stack.push(i);
10 }
11
12 bool[] poppeditems = new bool[10000];
13
14 parallel.for(0, 1000, (i) =>
15 {
16 int item = stack.pop();
17 if (poppeditems[item])
18 {
19 console.writeline(
20 "thread [{0:00}] : item [{1:0000}] was popped before!",
21 thread.currentthread.managedthreadid, item);
22 }
23 poppeditems[item] = true;
24 });
25
26 console.writeline("done.");
27 console.readline();
28 }
29 }
运行效果如下图所示。
由上图运行结果可知,当多个线程同时 pop 数据时,可能发生看起来像同一个数据项 item 被 pop 出两次的现象。
实现普通的加锁的栈 simplelockedstack 类
那么为了保持一致性和准确性,首先想到的办法就是加锁。lock 不仅可以保护代码区域内的指令不会被重排,还能在获取锁之后阻止其他线程修改数据。
1 public class simplelockedstack2 { 3 private class node 4 { 5 public node next; 6 public tnode item; 7 public override string tostring() 8 { 9 return string.format("{0}", item); 10 } 11 } 12 13 private node _head; 14 private object _sync = new object(); 15 16 public simplelockedstack() 17 { 18 _head = new node (); 19 } 20 21 public void push(t item) 22 { 23 lock (_sync) 24 { 25 node node = new node (); 26 node.item = item; 27 28 node.next = _head.next; 29 _head.next = node; 30 } 31 } 32 33 public t pop() 34 { 35 lock (_sync) 36 { 37 node node = _head.next; 38 if (node == null) 39 return default(t); 40 41 _head.next = node.next; 42 43 return node.item; 44 } 45 } 46 }
加锁之后,显然运行结果就不会出错了。
实现 lock-free 的栈 lockfreestack 类
但显然我们更关注性能问题,当有多个线程在交错 push 和 pop 操作时,
- 首先我们不希望发生等待锁现象,如果线程取得锁后被更高优先级的操作调度抢占,则所有等待锁的线程都被阻塞;
- 其次我们不希望线程等待锁的时间过长;
所以准备采用 lock-free 技术,通过引入 cas 操作通过细粒度锁来实现。此处 cas 使用 c# 中的 interlocked.compareexchange 方法,该操作是原子的,并且有很多重载方法可供使用。
1 private static bool cas( 2 ref nodelocation, node newvalue, node comparand) 3 { 4 return comparand == 5 interlocked.compareexchange >( 6 ref location, newvalue, comparand); 7 }
在实现 cas loops 时,我们使用 do..while.. 语法来完成。
1 public void push(t item)
2 {
3 node node = new node();
4 node.item = item;
5
6 do
7 {
8 node.next = _head.next;
9 }
10 while (!cas(ref _head.next, node, node.next));
11 }
这样,新的 lockfreestack 类就诞生了。
1 public class lockfreestack2 { 3 private class node 4 { 5 public node next; 6 public tnode item; 7 public override string tostring() 8 { 9 return string.format("{0}", item); 10 } 11 } 12 13 private node _head; 14 15 public lockfreestack() 16 { 17 _head = new node (); 18 } 19 20 public void push(t item) 21 { 22 node node = new node (); 23 node.item = item; 24 25 do 26 { 27 node.next = _head.next; 28 } 29 while (!cas(ref _head.next, node, node.next)); 30 } 31 32 public t pop() 33 { 34 node node; 35 36 do 37 { 38 node = _head.next; 39 40 if (node == null) 41 return default(t); 42 } 43 while (!cas(ref _head.next, node.next, node)); 44 45 return node.item; 46 } 47 48 private static bool cas( 49 ref node location, node newvalue, node comparand) 50 { 51 return comparand == 52 interlocked.compareexchange >( 53 ref location, newvalue, comparand); 54 } 55 }
这个新的类的测试结果正如我们想象也是正确的。
实现 concurrentstack 类
那实现 lockfreestack 类之后,实际已经满足了 lock-free 的条件要求,我们还能不能做的更好呢?
我们来观察下上面实现的 push 方法:
1 public void push(t item)
2 {
3 node node = new node();
4 node.item = item;
5
6 do
7 {
8 node.next = _head.next;
9 }
10 while (!cas(ref _head.next, node, node.next));
11 }
发现当 cas 操作判定失败时,立即进入下一次循环判定。而在实践中,当 cas 判定失败时,是因为其他线程正在更改相同的内存数据,如果立即再进行 cas 判定则失败几率会更高,我们需要给那些正在修改数据的线程时间以完成操作,所以这里当前线程最好能 "休息" 一会。
"休息" 操作我们选用 .net 中提供的轻量级(4 bytes)线程等待机制 spinwait 类。
1 public void push(t item)
2 {
3 node node = new node();
4 node.item = item;
5
6 spinwait spin = new spinwait();
7
8 while (true)
9 {
10 node oldhead = _head;
11 node.next = oldhead.next;
12
13 if (interlocked.compareexchange(ref _head, node, oldhead) == oldhead)
14 break;
15
16 spin.spinonce();
17 }
18 }
实际上 spinonce() 方法调用了 thread.spinwait() 等若干操作,那么这些操作到底做了什么并且耗时多久呢?
首先,thread.spinwait(n) 会在当前 cpu 上紧凑的循环 n 个周期,每个周期都会发送 pause 指令给 cpu,告诉 cpu 当前正在执行等待,不要做其他工作了。所以,重点是 n 的值是多少。在 .net 中实现的 spinone 中根据统计意义的度量,将此处的 n 根据调用次数来变化。
- 第一次调用,n = 4;
- 第二次调用,n = 8;
- ...
- 第十次调用,n = 2048;
那么在 10 次调用之后呢?
10 次之后 spinonce 就不再进行 spin 操作了,它根据情况选择进入不同的 yield 流程。
- thread.yield:调用静态方法 thread.yield(),如果在相同的 cpu core 上存在相同或较低优先级的线程正在等待执行,则当前线程让出时间片。如果没有找到这样的线程,则当前线程继续运行。
- thread.sleep(0):将 0 传递给 thread.sleep(),产生的行为与 thread.yield() 类似,唯一的区别就是要在所有的 cpu core 上查找的相同或较低优先级的线程,而不仅限于当前的 cpu core。如果没有找到这样的线程,则当前线程继续运行。
- thread.sleep(1):当前线程此时真正的进入了睡眠状态(sleep state)。虽然指定的是 1 毫秒,但依据不同系统的时间精度不同,这个操作可能花费 10-15 毫秒。
上面三种情况在 spinonce 中是根据如下的代码来判断执行的。
1 int yieldssofar = (m_count >= 10 ? m_count - 10 : m_count);
2
3 if ((yieldssofar % 20) == (20 - 1))
4 {
5 thread.sleep(1);
6 }
7 else if ((yieldssofar % 5) == (5 - 1))
8 {
9 thread.sleep(0);
10 }
11 else
12 {
13 thread.yield();
14 }
这样,我们就可以通过添加失败等待来进一步优化,形成了新的 concurrentstack 类。
1 // a stack that uses cas operations internally to maintain 2 // thread-safety in a lock-free manner. attempting to push 3 // or pop concurrently from the stack will not trigger waiting, 4 // although some optimistic concurrency and retry is used, 5 // possibly leading to lack of fairness and/or live-lock. 6 // the stack uses spinning and back-off to add some randomization, 7 // in hopes of statistically decreasing the possibility of live-lock. 8 // 9 // note that we currently allocate a new node on every push. 10 // this avoids having to worry about potential aba issues, 11 // since the clr gc ensures that a memory address cannot be 12 // reused before all references to it have died. 13 14 ///15 /// represents a thread-safe last-in, first-out collection of objects. 16 /// 17 public class concurrentstack18 { 19 // a volatile field should not normally be passed using a ref or out parameter, 20 // since it will not be treated as volatile within the scope of the function. 21 // there are exceptions to this, such as when calling an interlocked api. 22 // as with any warning, you may use the #pragma warning to disable this warning 23 // in those rare cases where you are intentionally using a volatile field 24 // as a reference parameter. 25 #pragma warning disable 420 26 27 /// 28 /// a simple (internal) node type used to store elements 29 /// of concurrent stacks and queues. 30 /// 31 private class node 32 { 33 internal readonly t m_value; // value of the node. 34 internal node m_next; // next pointer. 35 36 ///37 /// constructs a new node with the specified value and no next node. 38 /// 39 /// the value of the node. 40 internal node(t value) 41 { 42 m_value = value; 43 m_next = null; 44 } 45 } 46 47 // the stack is a singly linked list, and only remembers the head. 48 private volatile node m_head; 49 50 ///51 /// inserts an object at the top of the stack. 52 /// 53 /// the object to push onto the stack. 54 /// the value can be a null reference for reference types. 55 /// 56 public void push(t item) 57 { 58 // pushes a node onto the front of the stack thread-safely. 59 // internally, this simply swaps the current head pointer 60 // using a (thread safe) cas operation to accomplish lock freedom. 61 // if the cas fails, we add some back off to statistically 62 // decrease contention at the head, and then go back around and retry. 63 64 node newnode = new node(item); 65 newnode.m_next = m_head; 66 if (interlocked.compareexchange( 67 ref m_head, newnode, newnode.m_next) == newnode.m_next) 68 { 69 return; 70 } 71 72 // if we failed, go to the slow path and loop around until we succeed. 73 spinwait spin = new spinwait(); 74 75 // keep trying to cas the existing head with 76 // the new node until we succeed. 77 do 78 { 79 spin.spinonce(); 80 // reread the head and link our new node. 81 newnode.m_next = m_head; 82 } 83 while (interlocked.compareexchange( 84 ref m_head, newnode, newnode.m_next) != newnode.m_next); 85 } 86 87 ///88 /// attempts to pop and return the object at the top of the stack. 89 /// 90 /// 91 /// when this method returns, if the operation was successful, 92 /// result contains the object removed. 93 /// if no object was available to be removed, the value is unspecified. 94 /// 95 ///true if an element was removed and returned 96 /// from the top of the stack successfully; otherwise, false. 97 public bool trypop(out t result) 98 { 99 // capture the original value from memory 100 node head = m_head; 101 102 // is the stack empty? 103 if (head == null) 104 { 105 result = default(t); 106 return false; 107 } 108 109 if (interlocked.compareexchange( 110 ref m_head, head.m_next, head) == head) 111 { 112 result = head.m_value; 113 return true; 114 } 115 116 // fall through to the slow path. 117 spinwait spin = new spinwait(); 118 119 // try to cas the head with its current next. 120 // we stop when we succeed or when we notice that 121 // the stack is empty, whichever comes first. 122 int backoff = 1; 123 124 // avoid the case where tickcount could return int32.minvalue 125 random r = new random(environment.tickcount & int32.maxvalue); 126 127 while (true) 128 { 129 // capture the original value from memory 130 head = m_head; 131 132 // is the stack empty? 133 if (head == null) 134 { 135 result = default(t); 136 return false; 137 } 138 139 // try to swap the new head. if we succeed, break out of the loop. 140 if (interlocked.compareexchange( 141 ref m_head, head.m_next, head) == head) 142 { 143 result = head.m_value; 144 return true; 145 } 146 147 // we failed to cas the new head. spin briefly and retry. 148 for (int i = 0; i < backoff; i ) 149 { 150 spin.spinonce(); 151 } 152 153 // arbitrary number to cap back-off. 154 backoff = spin.nextspinwillyield ? r.next(1, 8) : backoff * 2; 155 } 156 } 157 158 ///159 /// gets a value that indicates whether the stack is empty. 160 /// 161 ///true if the stack is empty; otherwise, false. 162 public bool isempty 163 { 164 // checks whether the stack is empty. clearly the answer 165 // may be out of date even prior to 166 // the function returning (i.e. if another thread 167 // concurrently adds to the stack). it does 168 // guarantee, however, that, if another thread 169 // does not mutate the stack, a subsequent call 170 // to trypop will return true 171 // -- i.e. it will also read the stack as non-empty. 172 get { return m_head == null; } 173 } 174 175 ///176 /// gets the number of elements contained in the stack. 177 /// 178 ///the number of elements contained in the stack. 179 public int count 180 { 181 // counts the number of entries in the stack. 182 // this is an o(n) operation. the answer may be out of date before 183 // returning, but guarantees to return a count that was once valid. 184 // conceptually, the implementation snaps a copy of the list and 185 // then counts the entries, though physically this is not 186 // what actually happens. 187 get 188 { 189 int count = 0; 190 191 // just whip through the list and tally up the number of nodes. 192 // we rely on the fact that 193 // node next pointers are immutable after being en-queued 194 // for the first time, even as 195 // they are being dequeued. if we ever changed this 196 // (e.g. to pool nodes somehow), 197 // we'd need to revisit this implementation. 198 199 for (node curr = m_head; curr != null; curr = curr.m_next) 200 { 201 // we don't handle overflow, to be consistent with existing 202 // generic collection types in clr 203 count ; 204 } 205 206 return count; 207 } 208 } 209 210 ///211 /// removes all objects from the this stack. 212 /// 213 public void clear() 214 { 215 // clear the list by setting the head to null. 216 // we don't need to use an atomic operation for this: 217 // anybody who is mutating the head by pushing or popping 218 // will need to use an atomic operation to guarantee they 219 // serialize and don't overwrite our setting of the head to null. 220 m_head = null; 221 } 222 223 #pragma warning restore 420 224 }
实际上,上面的 concurrentstack