别了 Kafka!拉卡拉的 Apache Pulsar 最佳实践

关于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。 
GitHub 地址:http://github.com/apache/pulsar/

项目背景介绍

拉卡拉支付成立于 2005 年,是国内领先的第三方支付企业,致力于整合信息科技,服务线下实体,从支付切入,全维度为中小微商户的经营赋能。2011 年成为首批获得《支付业务许可证》企业的一员,2019 年上半年服务商户超过 2100 万家。2019 年 4 月 25 日,登陆创业板。

功能需求

由于拉卡拉的项目组数量较多,各个项目在建设时,分别根据需要选择了自己的消息系统。这就导致一方面很多系统的业务逻辑和具体的消息系统之间存在耦合,为后续系统维护和升级带来麻烦;另一方面业务团队成员对消息系统的管理和使用水平存在差异,从而使得整体系统服务质量和性能不稳定;此外,同时维护多套系统,物理资源利用率和管理成本都比较高。

因此,我们计划建设一套分布式基础消息平台,同时为各个团队提供服务。该平台需要具备以下特性:高可靠、低耦合、租户隔离、易于水平扩展、易于运营维护、统一管理、按需申请使用,同时支持传统的消息队列和流式队列。表 1 展示了这两类服务应该具备的特性。

表 2. Kafka、RocketMQ 和 Pulsar 性能、功能对比

Pulsar 的架构优势

Pulsar 是云原生的分布式消息流平台,源于 Yahoo!,支持 Yahoo! 应用,服务 140 万个 topic,日处理超过 1000 亿条消息。2016 年 Yahoo! 开源 Pulsar 并将其捐赠给 Apache 软件基金会,2018 年 Pulsar 成为 Apache 软件基金会的顶级项目。

作为一种高性能解决方案,Pulsar 具有以下特性:支持多租户,通过多租户可为每个租户单独设置认证机制、存储配额、隔离策略等;高吞吐、低延迟、高容错;原生支持多集群部署,集群间支持无缝数据复制;高可扩展,能够支撑上百万个 topic;支持多语言客户端,如 Java、Go、Python、C++ 等;支持多种消息订阅模式(独占、共享、灾备、Key_Shared)。

架构合理

Kafka 采用计算与存储一体的架构,当 topic 数量较多时,Kafka 的存储机制会导致缓存污染,降低性能。Pulsar 采用计算与存储分离的架构(如图 1)。无状态计算层由一组接收和投递消息的 broker 组成,broker 负责与业务系统进行通信,承担协议转换,序列化和反序列化、选主等功能。有状态存储层由一组 bookie 存储节点组成,可以持久存储消息。

图 2. Broker 架构图

持久层(BookKeeper)架构

图 3 为 Pulsar 中持久层的架构图。Bookie 是 BookKeeper 的存储节点,提供独立的存储服务。ZooKeeper 为元数据存储系统,提供服务发现以及元数据管理服务。BookKeeper 架构属于典型的 slave-slave 架构,所有 bookie 节点的角色都是 slave,负责持久化数据,每个节点的处理逻辑都相同;BookKeeper 客户端为 leader 角色,承担协调工作,由于其本身无状态,所以可以快速实现故障转移。

图 4. Pulsar 隔离架构图

对比总结

左侧为 Kafka、RabbitMQ 等消息系统采用的架构设计,broker 节点同时负责计算与存储,在某些场景中使用这种架构,可以实现高吞吐;但当 topic 数量增加时,缓存会受到污染,影响性能。

右侧为 Pulsar 的架构,Pulsar 对 broker 进行了拆分,增加了 BookKeeper 持久层,虽然这样会增加系统的设计复杂性,但可以降低系统的耦合性,更易实现扩缩容、故障转移等功能。表 3 总结了分区架构和分片架构的主要特性。

表 3. 分区架构与分片架构特性

基于对 Pulsar 的架构和功能特点,我们对 Pulsar 进行了测试。在操作系统层面使用 NetData 工具进行监控,使用不同大小的数据包和频率进行压测,测试的几个重要指标是磁盘、 络带宽等的波动情况。

图 7. 基于 Pulsar 构建的基础消息平台架构图

场景 1:流式队列

1. OGG For Pulsar 适配器

源数据存储在 Oracle 中,我们希望实时抓取 Oracle 的变更数据,进行实时计算、数据分析、提供给下游业务系统查询等场景。

我们使用 Oracle 的 OGG(Oracle Golden Gate) 工具进行实时抓取,它包含两个模块:源端 OGG 和目标 OGG。由于 OGG 官方没有提供 Sink 到 Pulsar 的组件,我们根据需要开发了 OGG For Pulsar 组件。下图为数据处理过程图,OGG 会抓取到表中每条记录的增删改操作,并且把每次操作作为一条消息推送给 OGG For Pulsar 组件。OGG For Pulsar 组件会调用 Pulsar 客户端的 producer 接口,进行消息投递。投递过程中,需要严格保证消息顺序。我们使用数据库表的主键作为消息的 key,数据量大时,可以根据 key 对 topic 进行分区,将相同的 key 投递到同一分区,从而保证对数据库表中主键相同的记录所进行的增删改操作有序。

图 9. Pulsar To TiDB 组件使用流程图

3. Pulsar 的消息持久化过程分析

Pulsar 的消息持久化过程包括以下四步:

1.OGG For Pulsar 组件调用 Pulsar 客户端的 producer 接口,投递消息。2.Pulsar 客户端根据配置文件中的 broker 地址列表,获取其中一个 broker 的地址,然后发送 topic 归属查询服务,获取服务该 topic 的 broker 地址(下图示例中为 broker2)。3.Pulsar 客户端将消息投递给 Broker2。4.Broker2 调用 BookKeeper 的客户端做持久化存储,存储策略包括本次存储可选择的 bookie 总数、副本数、成功存储确认回复数。

图 11. 表结构管理流程图

5. 一致性保障

要保证消息有序和去重,需要从 broker、producer、consumer 三方面进行设置。

Broker

?在 namespace 级别开启去重功能:bin/pulsar-admin namespaces set-deduplication namespace –enable?修复 / 优化 Pulsar 客户端死锁问题。2.7.1 版本已修复,详细信息可参考 PR 9552[1]

Producer

?pulsar.producer.batchingEnabled=false

在 producer 设置中,关闭批量发送。如果开启批量发送消息,则消息可能会乱序。

?pulsar.producer.blocklfQueueFull=true

为了提高效率,我们采用异步发送消息,需要开启阻塞队列处理,否则可能会出现消息丢失。

调用异步发送超时,发送至异常 topic。如果在异步超时重发消息时,出现消息重复,可以通过开启自动去重功能进行处理;其它情况下出现的消息发送超时,需要单独处理,我们将这些消息存储在异常 topic 中,后续通过对账程序从源库直接获取终态数据。

Consumer

实现拦截器:ConsumerInterceptorlmpl implements ConsumerInterceptor 配置确认超时:pulsarClient.ackTimeout(3000, TimeUnit.MILLISECONDS).ackTimeoutTickTime(500, TimeUnit.MILLISECONDS) 使用累积确认:consumer.acknowledgeCumulative(sendMessageID)

备注:配置确认超时参数,如果没有在 ackTimeout 时间内进行消费确认的话,消息将重新投递。为了严格保证一致性,我们需要使用累计确认方式进行确认。

6. 消息消费的确认方式

假如在 MessageID 为 1 的消息已确认消费成功,开始采用累积确认方式,此时正在确认 MessageID 为 3 的消息,则已消费但未确认的 MessageID 为 2 的消息也会被确认成功。假如在“确认超时”时间内一直未收到确认,则会按照原顺序重新投递 MessageID 为 2、3、4、5 的消息。

图 12. 消息确认流程图(2)

总结:队列消费模式建议使用单条确认方式,流式消费模式建议使用累积确认方式。

7. 消息确认超时(客户端)检测机制

确认超时机制中有两个参数,超时时间和轮询间隔。超时检测机制通过一个双向队列 + 多个 HashSet 实现。HashSet 的个数为(超时时间)除以(轮询间隔)后取整,因此每次轮询处理一个 HashSet,从而有效规避全局锁带来的性能损耗。

图 14. 透明层协议流程图

场景 3:流式队列:自定义 Kafka 0.8-Source(Source 开发)

Pulsar IO 可以轻松对接到各种数据平台。我们的部分业务系统使用的是 Kafka 0.8,官方没有提供对应的 Source,因此我们根据 Pulsar IO 的接口定义,开发了 Kafka 0.8 Source 组件。

图 16. Pulsar Functions 消息过滤示意图

场景 5:流式队列:Pulsar Flink Connector 流式计算(流式计算)

商户经营分析场景中,Flink 通过 Pulsar Flink Connector 连接到 Pulsar,对流水数据根据不同维度,进行实时计算,并且将计算结果再通过 Pulsar 持久化到 TiDB 中。从目前的使用情况来看,Pulsar Flink Connector 的性能和稳定性均表现良好。

图 18. TiDB CDC For Pulsar 组件示意图

未来规划

我们基于 Pulsar 构建的基础消息平台有效提高了物理资源的使用效率;使用一套消息平台简化了系统维护和升级等操作,整体服务质量也得以提升。我们对 Pulsar 的未来使用规划主要包括以下两点:

1.陆续下线其它消息系统,最终全部接入到 Pulsar 基础消息平台;2.深度使用 Pulsar 的资源隔离和流控机制。

在实践过程中,借助 Pulsar 诸多原生特性和基于 Pulsar 开发的组件,新消息平台完美实现了我们预期的功能需求。

 

 

 

  • 发送关键词“集锦”,立即领取“史上最全”消息系统干货文章、视频资料,限量免费发送 100 份,先到先得!

  • 发送关键词“入群”,加入 Pulsar 技术交流群,和 Pulsar PMC 成员和贡献者无障碍交流;

 

声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

上一篇 2021年4月22日
下一篇 2021年4月22日

相关推荐