什么是线程安全?
答:线程安全是多线程编程时的计算机程序代码中的一个概念。在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况。
前面几篇写的线性结构,在多线程并行的情况下会出现共享数据会线程间读取与写入不一直的情况,为了解决这种情况,通常会使用锁来解决,也就是将并行改为串行。但是在使用穿行违背了使用多线程并发的初衷,这种情况下我们可以考虑采用线程安全结构。
先看下线程安全队列的用法:
ConcurrentQueue<int> ts = new System.Collections.Concurrent.ConcurrentQueue<int>(); ts.Enqueue(1); ts.Enqueue(234); foreach (var r in ts) { Console.Write($"data:{r} "); } Console.WriteLine(); ts.TryPeek(out int pk); Console.WriteLine($peek:{pk}); ts.TryDequeue( ck); ts.Enqueue(56); Console.WriteLine(); ); } Console.WriteLine(); Console.ReadLine();
现在我们看下线程安全队列的实现方式:(参考自:.net framework 4.8),核心代码全部做了注释。
总的来说,(总结语放到前面,防止代码篇幅太大,同志们没有耐心翻到最底下~)
1、线程安全队列通过SpinWait自旋类来实现等待并行线程完成与Interlocked原子操作类计数实现的。
2、线程安全队列通过单向链表实现的,链的节点为长度32的数组,通过记录链的头节点与尾节点、以及队列的头尾实现队列的存储与入队、出队操作的。
public class MyConcurrentQueue<T> : IProducerConsumerCollection<T> { [NonSerialized] private volatile Segment m_head; [NonSerialized] Segment m_tail; private T[] m_serializationArray; const int SEGMENT_SIZE = 32; [NonSerialized] internal volatile int m_numSnapshotTakers = 0; /// <summary> /// 链尾部节点 </summary> public MyConcurrentQueue() { m_head = m_tail = new Segment(0,this); } //尝试添加 bool IProducerConsumerCollection<T>.TryAdd(T item) { Enqueue(item); return true; } 尝试从中移除并返回对象 </summary> <param name="item"> </remarks> bool IProducerConsumerCollection<T>.TryTake(out T item) { return TryDequeue( item); } 判断当前链是否为空 bool IsEmpty { get { Segment head = m_head; if (!head.IsEmpty) 如果头不为空,则链非空 false; else if (head.Next == null) 如果头节点的下一个节点为空,且为链尾, else 如果头节点为空且不是最后一个节点 ,则标识另一个线程正在写入该数组 等待中.. { SpinWait spin = new SpinWait(); while (head.IsEmpty) { 此时为空 ) ; 否则标识正在有线程占用写入 线程循环一次 spin.SpinOnce(); head = m_head; } ; } } } 用来判断链是否在变化 <param name="head"></param> <param name="tail"></param> <param name="headLow"></param> <param name="tailHigh"></param> void GetHeadTailPositions(out Segment head,1)"> Segment tail,int headLow,1)"> tailHigh) { head = m_head; tail = m_tail; headLow = head.Low; tailHigh = tail.High; SpinWait spin = SpinWait(); Console.WriteLine($head.Low:{head.Low},tail.High:{tail.High},head.m_index:{head.m_index},tail.m_index:{tail.m_index}); 通过循环来保证值不再更改(也就是说并行线程操作结束) 保证线程串行核心的判断逻辑 ( 头尾发生变化 head != m_head || tail != m_tail 如果队列头、尾索引发生变化 || headLow != head.Low || tailHigh != tail.High || head.m_index > tail.m_index) { spin.SpinOnce(); head = m_head; tail = m_tail; headLow = head.Low; tailHigh = tail.High; } } 获取总数 Count { { Segment head,tail; headLow,tailHigh; GetHeadTailPositions(out head,1)">out tail,1)">out headLow,1)"> tailHigh); if (head == tail) { return tailHigh - headLow + ; } 头节点长度 int count = SEGMENT_SIZE - headLow; 加上中间其他节点长度 count += SEGMENT_SIZE * ((int)(tail.m_index - head.m_index - )); 加上尾节点长度 count += tailHigh + return count; } } object SyncRoot => throw NotImplementedException(); bool IsSynchronized => void CopyTo(T[] array,1)"> index) { } 暂未实现 <returns></returns> public IEnumerator<T> GetEnumerator() { 添加 <param name="item"></param> void Enqueue(T item) { SpinWait spin = SpinWait(); while () { Segment tail = m_tail; if (tail.TryAppend(item)) ; spin.SpinOnce(); } } 尝试删除节点 <param name="result"></param> bool TryDequeue( T result) { while (!IsEmpty) { Segment head =if (head.TryRemove( result)) ; } result = default(T); 查看最后一个添加入的元素 bool TryPeek(原子增加值 Interlocked.Increment(ref m_numSnapshotTakers); IsEmpty) { 首先从头节点看一下第一个节点是否存在 Segment head =if (head.TryPeek( result)) { Interlocked.Decrement( m_numSnapshotTakers); ; } } result = (T); Interlocked.Decrement( m_numSnapshotTakers); ; } void CopyTo(Array array,1)"> index) { NotImplementedException(); } IEnumerator IEnumerable.GetEnumerator() { NotImplementedException(); } T[] ToArray() { NotImplementedException(); } 为线程安全队列提供一个 单向链表, 链表的每个节点存储长度为32的数组 class Segment { <summary> 定义一个数组,用于存储每个节点的内容 </summary> T[] m_array; 定义一个结构数组,用于标识数组中每个节点是否有效(是否存储内容) VolatileBool[] m_state; 指针,指向下一个节点数组 如果是最后一个节点,则节点为空 Segment m_next; 索引,用来存储链表的长度 readonly long m_index; 用来标识队列头-数组弹出索引 m_low; 用来标识队列尾-数组最新存储位置 m_high; 用来标识队列 volatile MyConcurrentQueue<T> m_source; 实例化链节点 internal Segment(long index,MyConcurrentQueue<T> source) { m_array = T[SEGMENT_SIZE]; m_state = new VolatileBool[SEGMENT_SIZE]; all initialized to false m_high = -; m_index = index; m_source = source; } 链表的下一个节点 internal Segment Next { get { m_next; } } 如果当前节点数组为空返回true, IsEmpty { return (Low > High); } } 非安全添加方法(无判断数组长度) </summary> <param name="value"></param> UnsafeAdd(T value) { m_high++; m_array[m_high] = value; m_state[m_high].m_value = ; } Segment UnsafeGrow() { Segment newSegment = new Segment(m_index + ,m_source); m_next = newSegment; newSegment; } 如果当前数组满了 >=32,则链扩展节点。 Grow() { 重新船舰数组 Segment newSegment = 赋值给next指针 m_next =将节点添加到链 m_source.m_tail = m_next; } 在末尾添加元素 <param name="value">元素</param> <param name="tail">The tail.<returns>如果附加元素,则为true;如果当前数组已满,则为false</returns> <remarks>如果附加指定的元素成功,并且在此之后数组满了,在链上添加新节点(节点为32长度数组) </remarks> TryAppend(T value) { 如果数组已满则跳出方法 if (m_high >= SEGMENT_SIZE - ) { 局部变量初始化 int newhigh = SEGMENT_SIZE; try { } finally { 原子递增 newhigh = Interlocked.Increment( m_high); if (newhigh <= SEGMENT_SIZE - ) { m_array[newhigh] = value; m_state[newhigh].m_value = ; } 如果数组满了,则扩展链节点。 if (newhigh == SEGMENT_SIZE - ) { Grow(); } } 如果 newhigh <= SEGMENT_SIZE-1,这意味着当前线程成功地占据了一个位置 return newhigh <= SEGMENT_SIZE - ; } 尝试从链的头部数组删除节点 <param name="result"></param> <returns></returns> bool TryRemove( T result) { SpinWait spin = SpinWait(); int lowLocal = Low,highLocal = High; while (lowLocal <= highLocal) { 获取队头索引 if (Interlocked.CompareExchange(ref m_low,lowLocal + 1,lowLocal) == lowLocal) { 如果要弹出队列的值不可用,说明这个位置被并行线程获取到了权限,但是值还未写入。 通过线程自旋等待值写入 SpinWait spinLocal = SpinWait(); m_state[lowLocal].m_value) { spinLocal.SpinOnce(); } 取出值 result = m_array[lowLocal]; 如果没有其他线程读取(GetEnumerator()、ToList()) 执行删除 如 TryPeek 的时候m_numSnapshotTakers会在进入方法体时++,在出方法体-- 清空该索引下的值 if (m_source.m_numSnapshotTakers <= ) m_array[lowLocal] = (T); 如果说lowLocal+1 = 32 说明当前链节点的数组已经全部出队 if (lowLocal + 1 >= SEGMENT_SIZE) { 由于lowLocal <= highLocal成立 lowLocal + 1 >= SEGMENT_SIZE 如果成立,且m_next == null 成立, 说明在此时有其他线程正在做扩展链结构 那么当前线程需要等待其他线程完成扩展链表,再做出队操作。 spinLocal = SpinWait(); while (m_next == ) { spinLocal.SpinOnce(); } m_source.m_head = m_next; } else { 此时说明 当前线程竞争资源失败,做短暂自旋后继续竞争资源 spin.SpinOnce(); lowLocal = Low; highLocal = High; } } 失败的情况下返回空值 result = (T); ; } 尝试获取队列头节点元素 T result) { result = int lowLocal = Low; 校验当前队列是否正确 if (lowLocal > High) ; SpinWait spin = 如果头节点无效,则说明当前节点被其他线程占用,并在做写入操作, 需要等待其他线程写入后再执行读取操作 m_state[lowLocal].m_value) { spin.SpinOnce(); } result = m_array[lowLocal]; 返回队列首位置 Low { Math.Min(m_low,SEGMENT_SIZE); } } 获取队列长度 High { 如果m_high>SEGMENT_SIZE,则表示超出范围,我们应该返回 SEGMENT_SIZE-1 return Math.Min(m_high,SEGMENT_SIZE - ); } } } }
<summary> 结构-用来存储整数组每个索引上是否存储值 </summary> struct VolatileBool { public VolatileBool( value) { m_value = value; } m_value; }
代码通篇看下来有些长(已经精简了很多,只实现入队、出队、与查看下一个出队的值),不知道有多少人能翻到这里~
说明:
1、TryAppend方法通过Interlocked.Increment()原子递增方法获取下一个数组存储点,通过比对32判断链是否需要增加下一个链节点,也就是说,链的存储空间每次扩展为32个存储位置。
2、TryRemove方法通过 Interlocked.CompareExchange()方法来判断当前是否有并行线程在写入,如果有则通过 while循环 SpinWait类的SpinOnce()方法实现等待写入完成后,再做删除;特别说明,判断是否写入是靠VolatileBool结构来实现的,每个链表的每个节点在存储值的同时每个存储都对应一个VolatileBool结构用来标识当前写入点是否成功写入。特殊情况,如果当前链节点的数组已经空了,则需要pinWait类的SpinOnce()简短的自旋等待并行的写入方法完成扩展链后,再做删除。
3、TryPeek方法,同样会判断要获取的元素是否已经成功写入(不成功则说明并行线程还未完成写入),如果未完成,则通过 while pinWait类的SpinOnce()来等待写入完成后,再读取元素内容。
现在代码已经看完了,来试下:
MyConcurrentQueue<string> myConcurrentQueue = new MyConcurrentQueue<string>(); for (int i = 0; i < 67; i++) { myConcurrentQueue.Enqueue($第{i}位); Console.WriteLine($总数:{myConcurrentQueue.Count}); } myConcurrentQueue.TryPeek(string rs); Console.WriteLine($TryPeek 总数:{myConcurrentQueue.Count}34; i++) { myConcurrentQueue.TryDequeue( result0); Console.WriteLine($TryDequeue 总数:{myConcurrentQueue.Count}); } Console.ReadKey();
打印:
head.Low:0,head.m_index:0,tail.m_index: 总数: head.Low:1,head.m_index:2,head.m_index:3,head.m_index:4,head.m_index:5,head.m_index:6,head.m_index:77,head.m_index:88,head.m_index:99,head.m_index:1010,head.m_index:1111,head.m_index:1212,head.m_index:1313,head.m_index:1414,head.m_index:1515,head.m_index:1616,head.m_index:1717,head.m_index:1818,head.m_index:1919,head.m_index:2020,head.m_index:2121,head.m_index:2222,head.m_index:2323,head.m_index:2424,head.m_index:2525,head.m_index:2626,head.m_index:2727,head.m_index:2828,head.m_index:2929,head.m_index:3030,head.m_index:313334353637383940414243444546474849505152535455565758596061626364656667 TryPeek 总数: TryDequeue 总数:2,1)">3,1)">4,1)">5,1)">6,1)">7,1)">8,1)">9,1)">10,1)">11,1)">12,1)">13,1)">14,1)">15,1)">16,1)">17,1)">18,1)">19,1)">20,1)">21,1)">22,1)">23,1)">24,1)">25,1)">26,1)">27,1)">28,1)">29,1)">30,1)">31,1)">1,tail.m_index:33
有时间希望大家能将代码跑一下,相信会更明白其中的原理。