曹操出行创立于 2015 年 5 月 21 日,是吉利控股集团布局“新能源汽车共享生态”的战略性投资业务,以“科技重塑绿色共享出行”为使命,将全球领先的互联网、车联网、自动驾驶技术以及新能源科技,创新应用于共享出行领域,以“用心服务国民出行”为品牌主张,致力于打造服务口碑最好的出行品牌。
作为一家互联网出行平台,曹操主要提供了网约车、顺风车和专车等多种出行服务。其中,打车是核心业务之一。整体业务过程大致如下: 首先,用户在我们的平台上下单,然后曹操平台会给司机进行订单的派发,司机接到订单后,会进行履约服务。结束一次订单服务后,乘客会在平台上进行支付。
在整个流程中,涉及到的数据将会在业务系统中流转,主要包括有营销、订单、派单、风控、支付、履约等。这些系统产生的数据存储在 RDS 中,并进一步流入实时数仓中以进行分析和处理。最终数据会进入到不同的使用场景中,比如实时的标签,实时大屏、多维 BI 分析,还有实时业务监控以及实时算法决策。
面对曹操出行不断增长的数据生产成本和研发需求,传统 lambda 架构已无力支撑现有业务,并呈现以下痛点:
数据组件过多:为了满足多样化应用场景,架构中引入了大量数据处理组件,增加了架构的复杂性。
研发成本高:不仅在实时流处理链路上投入大量研发资源,还需额外构建一条离线数据链路,导致成本加剧。
运维效率低:架构基于 Kafka 搭建,数据审核和数据订正变得非常困难。
资源开销大:组件众多,加大了运维和管理成本;在需要精准一致性的场景下,双链路数据同步增加了业务的复杂度;在某些计算场景中,需要 Flink 维护大状态进行处理,引发了性能问题和导致了资源浪费。
市面上主流的数据湖产品通常采用 LSM(Log-Structured Merge)架构,主流数据主键模型更新模式有 CopyOnWrite 和 MergeOnRead。这两种场景都有各自的问题:CopyOnWrite 具有写放大的问题,数据的延迟会比较高;MergeOnRead(读时合并)模式在读取数据时需要进行大量的数据合并操作,因此读取性能可能较差。
Hologres 的存储架构采用分布式存储系统,并在其上构建了存储引擎。在底层,Hologres 使用了分布式存储系统来管理数据的存储和分布。存储引擎包括一些关键组件,如 Block Cache、Shard。每个 Shard 中包含了多个 Tablet 和 Write-Ahead Log(WAL)。在 Hologres 中,行存使用 MergeOnRead 方式,列存采用 MergeOnWrite。
在 MergeOnWrite 模式下,一条数据进入 Hologres 时,首先到达 WAL Manager(Write-Ahead Log 管理器),同时也会进入到 Memtable(内存表)。在 Memtable 中,主要存储三类数据:数据文件、删除标志文件(例如基于 RoaringBitmap 的文件)和索引文件。当 Memtable 数据积累到一定阶段后,会生成不可变的 Memtable,并通过异步线程定期将其刷新(flush)到 Data File(数据文件)中。通过这种架构,Hologres 能够兼顾行存和列存的优势,并通过适当的数据合并策略来提高性能和存储效率。
Hologres 还支持 Binlog,Binlog 也是一种物理表,其跟原表的主要区别是内置的几种自身结构,包含自身递增序列,数据修改类型以及数据修改时间,Binlog 本质上也是分 shard 进行存储,所以也是一种分布式表,并且在 WAL 之前生成,因此在数据上可以与原表保证强一致性。
此外,Hologres Binlog 修改类型也还原了 Flink 中的四种 RowKind 类型。在数据更新过程中会产生两条更新记录(update_before,update_after),并且保证了更新记录是连续的存储。
如下图所示,写入一个数据一个 pk1,然后再写入一个 pk2 数据,pk2 的数据再做一次更新,那么在 Binlog 中会产生 4 条数据结果。
dwd 宽表构建实践
Hologres 列更新能力能很好地实现宽表 Join。在整个生产过程中,还需重点关注维表的应用场景,其应用场景包含几种情况:一种是维表是不变的,或者缓慢的变化,另一种是维表频繁变化的。为了保证数据最终的一致,通常的设计是像离线的方式去构建一个维表拉链的数据,通过用过 Start Time 和 End Time 的方式去存储维度状态有效的一个周期。
其次需要关注维表延迟问题。在实际生产过程中,维表链路与主表的链路通常是异步的,可能会出现维表延迟导致主表关联数据为空或关联到过时的维度状态。为处理这种情况,需要在 Hologres 中实施维度缺失记录的过滤,并采取补偿机制进行维度补偿处理。同时,还需要定时调度进行维度字段和维表对比检查,以增量方式修正不一致的维度状态。
聚合计算场景优化
针对许多预聚合计算场景,统一收敛到 Rollup 计算模型中,主要解决以下问题:在 Flink 聚合场景中经常会出现状态兼容性的问题;数据复用性非常差的问题,例如研发人员收到需求需要新增指标或者维度粒度时,为了不影响生产数据的稳定性,新增需求需要构建新任务,导致任务管理混乱。
针对上述问题,曹操出行主要进行了两点优化:
构建 MapSumAgg 算子,MapSum 主要通过对 SumAgg 算子做了重新设计,使之能够支持 Map 内部结构的求和逻辑。
对 Grouping Sets 进行动态配置化,这样 Grouping Sets 动态增加维度粒度,使整个任务在不重启的情况下也能自动去做自适应。
结合这两点,把已有的指标放入 map 结构中进行封装,这样在不改变原有的算子状态,也可以得到很好的处理。在下游中可以针对不同维度组合,指标集合做好选择,然后由同步工具做实时的数据路由,为下游提供服务。
对于第二个聚合场景的优化,是对精确去重场景的拆分。在前面例子中,我们把 Count Distinct 的精确去重做了剥离,主要解决两个问题:
链路中吞吐能力调优
整个流链路中吞吐能力的调优主要涉及两个部分:
数据写入侧:在将数据写入 Hologres 之前,针对字段状态频繁变更的场景进行了优化。引入了一个 Union 层,在 Union 层和 ODS 层中,数据根据主键进行分区。在 Union 层中,通过一个小窗口进行预聚合计算,以减少对 Hologres 的写入压力,从而提高整体数据吞吐量。然而,这种方式的缺点是无法捕获中间状态的数据。
数据读取侧。在使用 Binlog 更新数据时,会产生连续的变更前后数据。在这种场景下,可以采用 lag 开窗的方式来获取一次变更中连续的上下游数据。通过比较这两个数据之间的差异,可以过滤掉冗余的变更数据,从而减轻整个处理下游数据的压力。这种方式可以提高读取数据的效率和吞吐量,减少不必要的数据处理。
元数据血缘的改造
曹操出行主要采取以下措施来进行元数据血缘的改造:
Flink Catalog 集成,在元数据中去整合 Hologres 的 Catalog,也支持 Kafka Topic 表中自定义 Catalog,支持多版本 schema 和任务数据的多版本,提供更灵活的数据处理能力。
Kafka Source 和 Kafka Sink 的改造。结合整个上线发布的流程,对于数据的版本信息,是通过 Kafka Sink 对 Header 进行记录,Kafka Source 对 header 的版本信息进行过滤,从而把数据版本引入到整个上下游的链路,提供上下游数据灵活的迭代。这种做法的好处是,在整个链路中可以感知到下游数据的使用情况,帮助用户快速定位是否还有任务依赖于某个版本的数据,右边的图片主要是展示一个开发流程中元数据的集成。
元数据血缘的改造主要解决了以下问题:
Schema 的演进提供了一个更便利的管控。
解决实时链路发布流程中的依赖链问题。
对任务元数据信息进行有效的管理。
搭建链路保障体系
在日常开发过程中,对于任务健康以及任务出现异常后的判断和检测,都是通过异常检测诊断工具去做支持。主要体现四个方面:
对于基础信息采集,通过采集工具,把 Flink 内置 Metric、Yarn 的 Metric 以及 Kafka 信息进行采集,提供基础数据,包括作业信息,Kafka 一些 Topic 信息,作业最新指标情况。
对于异常的判断,通过内存以及 Topic 增长情况,包括 CPU 使用情况,以及任务有无出现反压,任务有无倾斜做出异常的判断。
对于异常原因的诊断-内部原因,内部原因主要会看 CheckPoint 的失败情况,Kafka LAG 具体是什么算子造成的反压,Restart 的次数,attempt 的次数。
对于异常原因的诊断-外部原因,外部原因主要是看 Job Manager 以及 Task Manager 所在节点自身的情况,包括 CPU 使用率、IO 利用率、内存情况等,然后做出综合判断,帮助用户去快速定位具体问题的原因。
在链路保障体系中,全链路的感知能力是非常重要的。曹操出行主要通过流量监控和延迟监控来实现全链路的感知能力:
流量监控层面:通过 Kafka Cueernt Offset 以及 Hologres 内置的 Offset 信息做定时的采集,从而推算出 Kafka 以及 Hologres 表的生产速率。
Latency 监控层面:主要采集 Kafka Offset 以及 Flink Source 的 Offset 情况,结合 Kafka Massage Timestamp 去推算出每个任务自身延迟情况,再结合整个数据血缘进行一个串联,可以得出端到任务自身整体的延迟时间。
通过任务上下游生产速率比,以及任务自身延迟情况,在整个生产链路中可以快速定位出具体异常和问题发生的节点,以便及时处理和优化,提高系统的性能和稳定性。
建设数据订正能力
在传统的 Streaming 链路中,数据订正方案一直是个复杂工程,主要涉及以下两个方面的挑战:
如何知晓订正的数据为正确数据?验证其具有一定困难。
在整个验证过程中,如何保证对下游的透明?如果丢状态去做重启的订正,肯定会对下游造成很大的影响。
因此我们主要思路是基于 Hologres 去做实现。首先对于原始任务进行代码修正后,并维持原有状态去做重启。第二步将对 Hologres 表做 Schema 的拷贝,然后新建一个订正的临时表。第三步会将任务进行拷贝,并将 Sink 调整到订正临时表,去做无状态从头消费的重启。这样可以把订正的结果数据订正进 Hologres 订正表中。等待消费结束后停止订正任务,然后通过修正脚本去对比原表以及订正表中关键信息,去做数据的订正。由于数据的订正,它处于数据终态,对于下游来说,不会造成大起大落。并且在整个链路中,因为正确数据可以通过整个数据链路做回撤的传导,因此整个下游就可以自动完成数据的订正。
架构清晰简单:
对比原有 Lamada 架构,Hologres+Flink 整体架构更加清晰,使用数据组件大大减少。
整体技术复杂难度降低,原先为了解决数据一致性问题,数据需要在不同的异构存储和异构链路中来回传输和计算,整个技术复杂度较高。
开发效率提高:
整个开发模式变得简单易用,大大缩短开发人力投入和周期。数据实时模型分层非常清晰,提升了整体下游的复用性,且使用门槛大幅度降低。
运维体验提升:
由于数据存储在 Hologres 上,因此数据探查更加便捷,数据订正难度大幅降低。
成本减少:
组件维护成本减少。数据的离线存储和实时存储,从双份存储降低到一份存储,以及降低了数据在异构存储之间的同步与计算成本。解决了 Flink 中各类计算场景中大状态的资源成本,减少了计算开销并提升了处理性能。