网易严选数据平台是如何做到T+0.0104天延迟的

达芬奇密码2018-08-02 11:48

在今年6月份的时候,我们开启了一个“严选数据平台实时离线融合计划”的项目。项目的目标很简单,就是加速严选数据中心数据产出的时效,提升数据中心产出数据的价值。把T+1的延迟改为T+0.0104(也就是15分钟)的延迟,达到业内一流水平。

为什么T+1不够好?当然,显而易见,慢总是不好的。T+1的数据时效更多是用于制作数据报表,事后分析等等时效性依赖不高的场景,无法支撑实时在线分析,在线决策,风控等需要强时效的场景。因此,在许多系统中,强时效的数据工作经常是业务系统研发团队自己完成的,这种模式存在工作成本高,不可复用,效果有限,大量异构系统需要维护,需要大量研发团队支撑等问题。

T+1的数据收集还有一个致命的问题:“窒息时刻”。由于数据需要统一产出,因此也需要统一收集,而数据的大规模统一收集,容易对交换机产生巨大的冲击,从而导致内网由于流量风暴,产生大量连锁反应,影响整个业务的稳定性。下图是某公司系统工程部团队给我们展示的高峰期交换机流量压力示意图。每个小时,整个集群的流量都会面临一场危机。想必,在这样的压力下工作,应该不是什么快乐的事情。

国内知名公司的大数据集群交换机流量示意图

 

那么如果我们把基础数据的时效从1天的延迟降低到30分钟,甚至15分钟呢?问题是不是就解决了?答案是肯定的。

当然这不是一件容易的事情,牵扯到大量基础系统的统一改造。业内有许多知名公司的离线基础数据时效控制在1个小时左右,已经非常难得。美团等技术能力雄厚的公司也在往30分钟延迟的目标进军(2017.7月交流,上万规模集群的情况下,30分钟的数据延迟目标要达到非常艰难)。在时效的问题上,严选数据平台有自己的优势:

1.     从零开始,没有历史包袱需要兼顾;

2.     后发优势,可以利用大量现代的架构和工具。

为了完成这个实时离线融合计划,我们制定了完成线路图:完成4个主要项目:

1.      DataHub

2.      ATOM Streaming Computing

3.      DBMirrorsbinlog Merge/oplog merge 算法),

4.      HolmesDB(统一SQL输出)。

这些项目各自解耦合,合在一起打通了数据从收集到输出的完整链路,我们依赖这些项目的成功实施来打通严选数据的任督二脉。

项目RoadMap如图23所示。

2 RoadMap

3 RoadMap

稍微展开说说这4个项目。

DataHub由我们的离线计算及工具链团队来负责实现,主要是负责收集MySQL Binlog/MongoDB OPLog/Log/MQ等数据,负责将这些实时生成的数据转成统一的数据格式,并提供给严选流计算平台。同时DataHub也负责了严选各个异构数据存储系统之间的数据同步工作,包括:MySQL, HIVE, ES, HBase, Redis, MongoDBExcel,外部http接口产生的数据之间的双向或单向同步。DataHub每天有近千张DB表在各个异构系统之间的同步。

ATOM StreamingComputing是由我们的流计算团队来负责研发,ATOM平台负责处理严选所有的实时任务,当然也包括数据的ETL,实时入HIVE库等工作。ATOM平台每天24小时RunningJOB150多个,日处理消息17亿次+,最高峰每秒处理30W次计算,支撑着严选的CRM,风控,仓配调度,实时预测,数据大屏,数据实时统计等大量业务。

DBMirrors 数据库的binlog记录进入hive后,会产生一张数据变更历史表。我们如何从这个数据变更历史表还原出原始的数据库镜像表,是DBMirrors项目要解决的问题。DBMirrors项目最关键的是性能。我们需要在1~2分钟左右的时间内完成这个数据还原的工作。

HIVE不支持RecordUpdate, 常见的Merge算法是遍历整个数据变更表(即Binlog表),根据数据库的操作(insert, update or delete)来计算最新的数据,从而还原出一个由最新数据组成的表,就是数据库Mirror表。比如:

Record_1:  id = 3 name = ’tubie’ type = ‘cat’ dbop=’insert’ timestamp=xx

Record_2:  id = 3 name = ’tubie2’ type = ‘cat’ dbop=’update’  timestamp=xx1

最终还原到镜像表的数据就是:

Recordid = 3 name = ’tubie2’ type = ‘cat’ timestamp=xx1

遍历历史表来还原的算法简单可靠,遗憾的是运行开销与表的记录数是正比关系。对于记录数上亿的表,我们要在1~2分钟内完成merge工作,就不大可能了。

直觉告诉我们,要优化这个Merge算法,算法本身的复杂度应该跟一段时间内新增的条目数成正比关系,或者在绝大部分情况下,算法的运行时间仅跟新增变更的条目数有关系。同时,我们注意到,数据库记录修改的往往是最近被修改过的数据。而10天,20天,1年前写入的记录,基本不大会被更改。根据这两点,我们研发了一个算法,将原始数据(Parquet格式,一种列数据存储格式),根据记录被修改的时间分类,分成多个不同的文件,使得有些文件基本不会被修改,而有些文件则会被频繁修改,从而确保如果要还原一段时间内的数据变更,我们只需要重新计算某个或者某几个Parquet文件就可以。我把这个算法成为Asymmetric Merge Algo. 具体实现的细节和技巧我就不详细解释了。

当然,会有一些DB表的修改模式未必符合“数据库记录修改的往往是最近被修改过的数据”,这个模式,没关系。我们总能根据他的修改历史找到一种数据修改的pattern. 从而基于这个pattern来调整算法,依然能达到这个目标。

Asymmetric Merge Algo.算法是我们能完成15分钟数据延迟承诺的重要依靠。

HolmesDB 也是由离线计算与工具链团队完成的。这个项目为我们数据平台提供了完整的数据输出能力。项目提供了一个JDBCDriver与一个Python SQL Interface,支持JavaPythonSQL的形式对数据平台进行访问。业务方可以直接使用HolmesDB与大数据平台直接沟通。HolmesDB封装了Spark SQLHIVEImpalaKylin等多种查询引擎,可以根据需要的并发能力,延迟等自主选择。HolmesDB同时也负责了数据的权限校验,数据查询控制,数据访问与修改审计等工作。

 

4是我们15分钟延迟的数据库表的一部分,目前已经完成了300余张表的近实时同步。

4  15分钟而非T+1延迟的数据库表

网易云新用户大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者陈炬授权发布。