.NET多线程缓冲池

#IT教育##程序员##软件开发##软件工程师##.NET#

背景

最近开发项目要缓冲数据,让数据库保存缓冲数据,涉及到多线程,从codeproject找了个例子太复杂,自己开发只需要缓冲跨线程传递,不过抱着学习态度,翻译学习下。要点是生产者消费者多线程模型多任务模型的思路,对不知道怎么锻炼多线程能力的朋友帮助极大。

多核时代——每个人都听说过——Microsoft .NET、Intel TBB 中的并行扩展,甚至新的 C++ 标准都包括对线程的基本支持。现在我们的小型台式机(笔记本电脑)或小型服务器拥有多核可能性——软件将探索这些优势。

提出解决方案的职责

  • 管理数据项池(可能保留或不保留项目顺序)
  • 管理空项目池——重用项目(当这些是大缓冲区时很有用)
  • 特征

  • 多生产者-消费者模式的有界缓冲区问题解决方案。也可以归类为共享内存异步消息传递
  • 易于使用的解决方案——所有锁定机制都隐藏在里面
  • 版本:
  • unordered – 获取物品的顺序无关紧要 – 多个消费者(一个或多个消费者)
  • ordered– 应保留物品的取用顺序 – 一位消费者(一位或多位生产者)
  • 限制内存使用
  • 避免频繁的内存分配和释放——更可预测的运行时间(内存分配时间不可预测)
  • 使用 Parallel Extensions 运行线程的数量可以轻松自动地适应硬件(内核数量)
  • 数据和任务并行性——管道处理。数据被分成块,在多个阶段并行处理。每个阶段可以由多个正在运行的线程执行。这个概念类似于流水线、超标量处理器架构。每个处理阶段可以存在任意数量的任务。

    当任务耗时并且可以轻松并行化(任务袋模式)时,该解决方案是可用的。当数据从外部(文件、数据库)到达并且读取不是处理器负荷操作时,它也可以使用。在这种情况下,额外的线程可以对已经读取的数据块执行处理(当下一个块读取正在进行时)。

    主意

    每个项目都分配了数据和 ID。ID 应是唯一且连续的(对于OrderedItemsPool 类)。这是某种票证算法。下面的代码展示了 3 个阶段处理(一个生产者,多个处理任务,一个消费者)的示意性解决方案。

    
    
    
    
    
    
    
    
    
    
    
    
    
    

    这个怎么运作

    一般使用重锁。:) 解决方案是基于监视器。我们这里有 2 个监视器——第一个用于对空项堆栈的操作,第二个用于对数据项队列的操作。

    当生产者线程需要新的空项目(DequeueEmpty 方法)时,它可以从空项目池中返回或创建,但对要创建的项目的最大数量有限制(内存限制)。当没有空闲的空缓冲区并且无法创建更多项目时,线程必须等待空监视器的条件变量。当不再需要 item 时,必须将其返回到空池(EnqueueEmpty 方法)或显式释放(DropEmpty 方法)。返回或释放由消费者线程完成,而由生产者线程出队变成空。

    当生产者用数据填充空项时,应将其放入数据队列(EnqueueData 方法)。另一方面,消费者线程使用DequeueData 方法获取项目。当不存在数据或读取已订购(OrderedItemsPool 类)并且适当的项目尚未到达时,读取可能会被阻塞。在这种情况下,线程等待完整监视器的条件变量。

    在OrderedItemsPool 类的情况下,项目存储在优先级队列(SortedList 使用的类)中,对于UnorderedItemsPool 类使用普通Queue 类。

    敲定

    当所有项目都产生后,线程应标记数据结束。这是使用 EnqueueEndOfData 方法完成的。信息在所有任务之间传播,它们可以停止处理——当没有更多要处理的项目时DequeueData 方法返回false

    EndOfData 是id 最大的特殊物品。EndOfData存储后 ,无法存储具有较大 id 的项目(EndOfDataException 将出现)。

    错误

    在多个线程中完成的工作会遇到错误传播问题。当一个线程出现异常并且该线程是处理链的一部分时,应通知其他线程(任务)。否则会出现死锁。因此,应使用SetError 方法捕获异常并将其传递到通信链中。之后,
    ErrorInProcessingException 将出现在所有处理数据的任务中作为一个问题的信 。原始异常存储为内部异常。

    IsError 可以将使用池的错误和Error 属性通知主线程。属性的值应从主线程正在等待的线程使用的池中获取(参见下面的代码):

    
    

    死锁

    当任务位于处理链的中间时,重要的是先获取输出项,然后再输入项,以避免死锁。当没有可用的输出项时会出现死锁(并且在处理链中的顺序应保留 -OrderedItemsPool 类)。当输入池的项目多于输出池时,这种情况很容易看到。添加了帮助类ItemsPoolsInOut 作为此类案例(DequeueEmptyAndData 方法)的示例解决方案。

    此外,有序项目池(OrderedItemsPool 类)永远不会出现在无序项目池(UnorderedItemsPool 类)之后,否则可能会发生死锁。

    问题

  • 未返回池(或显式删除)的项目 – 可能导致死锁。通过从空池中赋值可以轻松解决该问题(不幸的是内存限制)。
  • 重锁 – .NET 监视器比进程间互斥锁或信 量更轻,但仍然有很多同步 – 是否有更无锁的解决方案可用?
  • 发生异常时后台(分离)线程的问题
  • 处理停止未实现
  • 例子

    所提出的解决方案的用途是处理大文件,即加密和压缩。加密是用RijndaelManaged 类实现的,而压缩是用DeflateStream 类实现的。数据以 512 KB 的块读取、处理并写入输出文件。第二个程序允许对数据进行解压缩和解密。

    工作分为 4 组:读取器(一个线程)、加密器(多个任务、多个线程)、压缩器(多个任务、多个线程)和写入器(一个线程)。第二个程序(解压缩器、解密器)看起来类比。

    注意每组任务使用不同的TaskManager 对象,否则可能会死锁。此外,读取器和写入器是单独的线程。

    当然,将压缩(解压缩)和加密(解密)放在一个任务中(没有额外的缓冲区和同步)可能更明智,但这是一个示例,只是为了展示可能性。

    此外,使用 deflate 的压缩是废话——我知道——这只是一个例子——如果需要真正的压缩,请替换算法。:)

    该代码使用 Microsoft Parallel Extensions Jun08 CTP,因此需要下载并安装此库。

    实现的附加说明 -DeflateStream 类构造函数具有类leaveOpen 中不存在的参数CryptoStream ,因此streams 不能重用。这在设计上似乎很不一致——也许一些微软架构师可以解释为什么?

    声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

    上一篇 2022年8月22日
    下一篇 2022年8月22日

    相关推荐