#IT教育##程序员##软件开发##软件工程师##.NET#
背景
最近开发项目要缓冲数据,让数据库保存缓冲数据,涉及到多线程,从codeproject找了个例子太复杂,自己开发只需要缓冲跨线程传递,不过抱着学习态度,翻译学习下。要点是生产者消费者多线程模型多任务模型的思路,对不知道怎么锻炼多线程能力的朋友帮助极大。
多核时代——每个人都听说过——Microsoft .NET、Intel TBB 中的并行扩展,甚至新的 C++ 标准都包括对线程的基本支持。现在我们的小型台式机(笔记本电脑)或小型服务器拥有多核可能性——软件将探索这些优势。
提出解决方案的职责
特征
数据和任务并行性——管道处理。数据被分成块,在多个阶段并行处理。每个阶段可以由多个正在运行的线程执行。这个概念类似于流水线、超标量处理器架构。每个处理阶段可以存在任意数量的任务。
当任务耗时并且可以轻松并行化(任务袋模式)时,该解决方案是可用的。当数据从外部(文件、数据库)到达并且读取不是处理器负荷操作时,它也可以使用。在这种情况下,额外的线程可以对已经读取的数据块执行处理(当下一个块读取正在进行时)。
主意
每个项目都分配了数据和 ID。ID 应是唯一且连续的(对于OrderedItemsPool 类)。这是某种票证算法。下面的代码展示了 3 个阶段处理(一个生产者,多个处理任务,一个消费者)的示意性解决方案。
这个怎么运作
一般使用重锁。:) 解决方案是基于监视器。我们这里有 2 个监视器——第一个用于对空项堆栈的操作,第二个用于对数据项队列的操作。
当生产者线程需要新的空项目(DequeueEmpty 方法)时,它可以从空项目池中返回或创建,但对要创建的项目的最大数量有限制(内存限制)。当没有空闲的空缓冲区并且无法创建更多项目时,线程必须等待空监视器的条件变量。当不再需要 item 时,必须将其返回到空池(EnqueueEmpty 方法)或显式释放(DropEmpty 方法)。返回或释放由消费者线程完成,而由生产者线程出队变成空。
当生产者用数据填充空项时,应将其放入数据队列(EnqueueData 方法)。另一方面,消费者线程使用DequeueData 方法获取项目。当不存在数据或读取已订购(OrderedItemsPool 类)并且适当的项目尚未到达时,读取可能会被阻塞。在这种情况下,线程等待完整监视器的条件变量。
在OrderedItemsPool 类的情况下,项目存储在优先级队列(SortedList 使用的类)中,对于UnorderedItemsPool 类使用普通Queue 类。
敲定
当所有项目都产生后,线程应标记数据结束。这是使用 EnqueueEndOfData 方法完成的。信息在所有任务之间传播,它们可以停止处理——当没有更多要处理的项目时DequeueData 方法返回false
EndOfData 是id 最大的特殊物品。EndOfData存储后 ,无法存储具有较大 id 的项目(EndOfDataException 将出现)。
错误
在多个线程中完成的工作会遇到错误传播问题。当一个线程出现异常并且该线程是处理链的一部分时,应通知其他线程(任务)。否则会出现死锁。因此,应使用SetError 方法捕获异常并将其传递到通信链中。之后,
ErrorInProcessingException 将出现在所有处理数据的任务中作为一个问题的信 。原始异常存储为内部异常。
IsError 可以将使用池的错误和Error 属性通知主线程。属性的值应从主线程正在等待的线程使用的池中获取(参见下面的代码):
死锁
当任务位于处理链的中间时,重要的是先获取输出项,然后再输入项,以避免死锁。当没有可用的输出项时会出现死锁(并且在处理链中的顺序应保留 -OrderedItemsPool 类)。当输入池的项目多于输出池时,这种情况很容易看到。添加了帮助类ItemsPoolsInOut 作为此类案例(DequeueEmptyAndData 方法)的示例解决方案。
此外,有序项目池(OrderedItemsPool 类)永远不会出现在无序项目池(UnorderedItemsPool 类)之后,否则可能会发生死锁。
问题
例子
所提出的解决方案的用途是处理大文件,即加密和压缩。加密是用RijndaelManaged 类实现的,而压缩是用DeflateStream 类实现的。数据以 512 KB 的块读取、处理并写入输出文件。第二个程序允许对数据进行解压缩和解密。
工作分为 4 组:读取器(一个线程)、加密器(多个任务、多个线程)、压缩器(多个任务、多个线程)和写入器(一个线程)。第二个程序(解压缩器、解密器)看起来类比。
注意每组任务使用不同的TaskManager 对象,否则可能会死锁。此外,读取器和写入器是单独的线程。
当然,将压缩(解压缩)和加密(解密)放在一个任务中(没有额外的缓冲区和同步)可能更明智,但这是一个示例,只是为了展示可能性。
此外,使用 deflate 的压缩是废话——我知道——这只是一个例子——如果需要真正的压缩,请替换算法。:)
该代码使用 Microsoft Parallel Extensions Jun08 CTP,因此需要下载并安装此库。
实现的附加说明 -DeflateStream 类构造函数具有类leaveOpen 中不存在的参数CryptoStream ,因此streams 不能重用。这在设计上似乎很不一致——也许一些微软架构师可以解释为什么?
声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!