大厂实践|Apache Pulsar 消息队列在拉卡拉的应用

关于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。 
GitHub 地址:http://github.com/apache/pulsar/

表 1. 对消息队列和流式队列的要求

为什么选择 Apache Pulsar

大厂开源背书

现在可供用户选择的大厂开源消息平台有很多,架构设计大多类似,比如 Kafka 和 RocketMQ 都采用存储与计算一体的架构,只有 Pulsar 采用存储与计算分离的多层架构。我们比较选型的消息系统有三个:Kafka、RocketMQ 和 Pulsar。测试之前,我们通过 上的公开数据,对三者的性能和功能进行了简单的对比,表 2 为对比结果。从中可以看出 Pulsar 更符合我们的需求。

图 1. Pulsar 架构图

Broker 架构

Broker 主要由四个模块组成。我们可以根据实际需求对相应的功能进行二次开发。

?Dispatcher:调度分发模块,承担协议转换、序列化反序列化等。?Load balancer:负载均衡模块,对访问流量进行控制管理。?Global replicator:跨集群复制模块,承担异步的跨集群消息同步功能。?Service discovery:服务发现模块,为每个 topic 选择无状态的主节点。

图 3. Pulsar 持久层架构图隔离架构

隔离架构

保证了 Pulsar 的优良性能,主要体现在以下几个方面:

?IO 隔离:写入、追尾读和追赶读隔离。?利用 络流入带宽和磁盘顺序写入的特性实现高吞吐写:传统磁盘在顺序写入时,带宽很高,零散读写导致磁盘带宽降低,采取顺序写入方式可以提升性能。?利用 络流出带宽和多个磁盘共同提供的 IOPS 处理能力实现高吞吐读:收到数据后,写到性能较好的 SSD 盘里,进行一级缓存,然后再使用异步线程,将数据写入到传统的 HDD 硬盘中,降低存储成本。?利用各级缓存机制实现低延迟投递:生产者发送消息时,将消息写入 broker 缓存中;实时消费时(追尾读),首先从 broker 缓存中读取数据,避免从持久层 bookie 中读取,从而降低投递延迟。读取历史消息(追赶读)场景中,bookie 会将磁盘消息读入 bookie 读缓存中,从而避免每次都读取磁盘数据,降低读取延时。

图 5. 分区架构与分片架构对比图

图 6. Pulsar 测试过程

测试结论如下:

?部署方式:混合部署优于分开部署。broker 和 bookie 可以部署在同一个节点上,也可以分开部署。节点数量较多时,分开部署较好;节点数量较少或对性能要求较高时,将二者部署在同一个节点上较好,可以节省 络带宽,降低延迟。?负载大小:随着测试负载的增大,tps 降低,吞吐量稳定。?刷盘方式:异步刷盘优于同步刷盘。?压缩算法:压缩算法推荐使用 LZ4 方式。我们分别测试了 Pulsar 自带的几种压缩方式,使用 LZ4 压缩算法时,CPU 使用率最低。使用压缩算法可以降低 络带宽使用率,压缩比率为 82%。?分区数量:如果单 topic 未达到单节点物理资源上限,建议使用单分区;由于 Pulsar 存储未与分区耦合,可以根据业务发展情况,随时调整分区数量。?主题数量:压测过程中,增加 topic 数量,性能不受影响。?资源约束:如果 络带宽为千兆, 络会成为性能瓶颈, 络 IO 可以达到 880 MB/s;在 络带宽为万兆时,磁盘会成为瓶颈,磁盘 IO 使用率为 85% 左右。?内存与线程:如果使用物理主机,需注意内存与线程数目的比例。默认配置参数为 IO 线程数等于 CPU 核数的 2 倍。这种情况下,实体机核数为 48 核,如果内存设置得较小,比较容易出现 OOM 的问题。

除了上述测试以外,我们还复测了 Jack Vanlightly(RabbitMQ 的测试工程师)的破坏性测试用例,得到如下结论:

1.所有测试场景中,没有出现消息丢失与消息乱序;2.开启消息去重的场景中,没有出现消息重复。

支持团队专业

另外,我们与 Apache Pulsar 项目的核心开发人员交流沟通时间较早,他们在 Yahoo! 和推特有过丰富的实践经验,预备成立公司在全世界范围内推广使用 Pulsar,并且会将中国作为最重要的基地,这为我们的使用提供了强有力的保障。现在大家也都知道,他们成立了 StreamNative 公司,并且已获得多轮融资,队伍也在不断壮大。

Pulsar 在基础消息平台的实践

我们基于 Pulsar 构建的基础消息平台架构如下图,图中绿色部分为基于 Pulsar 实现的功能或开发的组件。本节将结合实际使用场景,详细介绍我们如何在实际使用场景中应用 Pulsar 及基于 Pulsar 开发的组件。

图 8. OGG For Pulsar 组件示意图

2. Pulsar To TiDB 组件

我们通过 Pulsar To TiDB 组件将抓取到的变更消息存储到 TiDB 中,对下游系统提供查询服务。这一组件的处理逻辑为:

1.使用灾备订阅方式,消费 Pulsar 消息。2.根据消息的 key 进行哈希运算,将相同的 key 散列到同一持久化线程中。3.启用 Pulsar 的消息去重功能,避免消息重复投递。假设 MessageID2 重复投递,那么数据一致性将被破坏。

图 10. Pulsar 的消息持久化示意图

4. 数据库表结构动态传递

OGG 使用 AVRO 方式进行序列化操作时,如果将多个表投递到同一个 topic 中,AVRO Schema 为二级结构:wrapper schema 和 table schema。wrapper schema 结构始终不变,包含 table_name、schema_fingerprint、payload 三部分信息;OGG 在抓取数据时,会感知数据库表结构的变化并通知给 OGG For Pulsar,即表结构决定其 table schema,再由 table schema 生成对应的 schema_fingerprint。

我们将获取到的 table schema 发送并存储在指定的 Schema topic 中。Data topic 中的消息只包含 schema_fingerprint 信息,这样可以降低序列化后消息包的大小。Pulsar To TiDB 启动时,从 Schema topic 消费数据,使用 schema_fingerprint 为 Key 将 table schema 缓存在内存中。反序列化 Data Topic 中的消息时,从缓存中根据 schema_fingerprint 提取 table schema,对 payload 进行反序列化操作。

图 12. 消息确认流程图(1)

假如采用单条确认方式,图中 MessageID 为 1、3、4 的消息确认消费成功,而 MessageID 为 2 的消息“确认超时”。在这种情况下,如果应用程序处理不当,未按照消费顺序逐条确认,则出现消息“确认超时”时,只有发生超时的消息(即 MessageID 为 2 的消息)会被重新投递,导致消费顺序发生错乱。

图 13. 消息确认超时(客户端)检测机制示意图

场景 2:消息队列:OpenMessaging 协议实现(透明层协议)

我们过去使用的很多业务系统都和消息系统强耦合,导致后续升级和维护很麻烦,因此我们决定使用 OpenMessaging 协议作为中间层进行解耦。

1.通过 Pulsar 实现 OpenMessaging 协议。2.开发框架(基于 spring boot)调用 OpenMessaging 协议接口,发送和接收消息。

图 15. Kafka 0.8 Source 组件示意图

场景 4:流式队列:Function 消息过滤(消息过滤)

我们通过 Pulsar Functions 把 Pulsar IDC 集群消息中的敏感字段(比如身份证 ,手机 )脱敏后实时同步到云集群中,供云上应用消费。

图 17. Pulsar Flink Connector 流式计算示意图

场景 6:流式队列:TiDB CDC 适配(TiDB 适配)

我们需要基于 TiDB 数据变更进行实时抓取,但 TiDB CDC For Pulsar 序列化方式不支持 AVRO 方式,因此我们针对这一使用场景进行了定制化开发,即先封装从 TiDB 发出的数据,再投递到 Pulsar 中。TiDB CDC For Pulsar 组件的开发语言为 Go 语言。

BAT某大厂CodeReview规范

点个在看再走吧~

文章知识点与官方知识档案匹配,可进一步学习相关知识云原生入门技能树首页概览8930 人正在系统学习中

声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

上一篇 2021年4月6日
下一篇 2021年4月6日

相关推荐