0%

浅谈Dataflow model

前言

最近在Clubhouse上参与一个《Designing Data-Intensive Application》(后文略称DDIA)的读书讨论。最直观的感受是输入加输出才是最好的学习模式,之前光是凭借阅读的输入,书上的东西的确是学到了,但是因为没有相应的实践机会来刷新这些知识,过一段时间就模糊不清了。所以想着借着这个机会重新开始更新博客(距离上一次更新似乎已经快一年了),一个是通过写写东西帮自己加深理解,另外是希望有机会通过讨论来进一步迭代。

前几天聊到DDIA中的batch processing章节,这章我之前看的时候是略过的(因为觉得对MapReduce模型派生出的batch processing已经相对了解了),这次完整看下来觉得还是很有帮助,特别是后一部分提到了dataflow model。而在我之前对《Streaming Systems》(后文略称SS)的理解里,只是把dataflow作为流处理的一个抽象模型。这次重新把Google Dataflow论文和SS过了一遍,有了一些新的理解。

正文

首先dataflow区别了bounded/unbounded data与batch/streaming。尽管这一对术语看起来是一一对应的,但是当我们在说batch和steaming processing的时候,我们实际上已经预设了将要使用的处理引擎(即流处理引擎还是批处理引擎)。正如《Clean Architecture》一书中提到的,我们在做design的时候应该尽可能地把这一决定往后延迟。因为无论选择什么引擎,这些都是实现细节,不应该过早地介入到做design的阶段。而bounded/unbounded的概念更倾向于描述数据的本质,属于业务逻辑和需求层面,不至于太早地引入细节进而导致太过紧密的耦合。

Dataflow提供了一个相对于batch/processing更高层的抽象。在模型中,unbounded data才是现实世界中更加宽泛自然的一个概念,而bounded data只是一个特殊情况。模型从What,Where,When,How四个问题展开描述。

  • “What”我们对于数据进行怎样的操作,这一部分与传统的batch processing概念中的各种操作(如map,filter,aggregate,join等)大体一致,就不展开了。

  • “Where”指的是划定数据处理发生的边界,在业务逻辑中,这个边界可能是每小时(fixed window or sliding window),也可能是一个用户在一次登录期间(session window)。且前文之所以说bounded data(以及batch processing)只是一种特殊情况,其中的一个原因是在这种情况下,窗口在处理开始时被预先划分了,且不会有新的数据加入产生新的窗口。

  • ”When“指的是每个窗口中的结果将如何被获取。这里需要引入一个叫Watermark的概念,它代表着当系统收到一个watermark的时候,发生在watermark之前的数据都已经被处理完了(理想情况,实际上由于分布式系统中的不可靠传输,会有数据晚到的情况)。我们可以想象这么一个坐标系,X轴是单条数据实际发生的时间,Y轴是我们处理该数据的时间。那么处理所有数据的过程实际上就是水位从最开始的处理时间不断上涨的过程(见动图)。而对于每一个window的处理会在到达watermark(可译作水位线)时停下(可以对晚到的数据做额外操作或者直接丢弃,由具体实现决定),我猜想这大概就是watermark这一概念的由来。那么我们可以想象一下在batch processing中,水位线将会是一条水平线,因为我们需要到所有数据都处理完成之后才能结束操作。而streaming process之所以能提供更好的latency也正是因为它提前释放了相对更早的window,但是相应的也会有completeness层面的风险,即数据丢失和延迟等等。

    处理结果也不单单只在watermark处发生,用户也可以自行定义trigger,如每隔多少时间或者每隔多少条数据获取一次结果。Trigger可以发生在watermark之前和之后。发生在watermark之后的trigger就是对于late event的额外操作。

  • “How”代表了当处理结果生成后,我们会怎样使用,即会丢弃原先的数据;在原先数据的基础上加上新数据;或者是将原先数据中的一部分减去之后再加上新数据(具体例子是当一条新数据会使两个session window合并的情况下,需要分别去除两个旧window的结果,并加上新window的结果)。这部分让我联想到了Hudi这些data lake文件格式的增量处理,也许这也是Hudi现在正在试图用Flink作为底层实现的一个原因吧,但是由于我对于这一块并不是很熟悉,也就不展开了。

此外SS中也提到了如何用查询catalog来dedup事件的id以实现exactly once,并且可以通过bloom filter对查询进行优化(bloom filter只会产生false postive的查询结果,因此在duplication的占比小的情况下能减少对于catalog的查询)。但这些实现是由引擎自身决定的,比如Flink是利用checkpoint来实现在节点失败并replay时的exactly once。Flink将checkpoint插入在事件之间以实现在不间断的情况下对系统进行快照,当一个节点挂掉时,会把所有被波及的节点回滚到上一个checkpoint。Checkpoint之后的数据虽然在系统内部会不止一次经过节点,但是节点只会在某个checkpoint发生后,将这个checkpoint之前的数据反应给外部系统,所以对于外部系统来说,我们完全可以认为在上一个checkpoint发生后,且下一个checkpoint未发生之前的处理是不可见的。这部分数据就不会在外部系统中出现两次。

另外一个有意思的点是SS提出了一个“表-流相对论”的概念:表的增量(类似WAL)是流,流的积累形成来表。无论一个系统是基于流还是基于表,中间的过程都是流和表在不停交替。系统只是做了将某部分抽象起来的决定。在一个基于表的系统中,当我们把数据从一个表提取出来并进行操作后写入另一个表,这一过程本身就包含了表->流->表的过程。而在一个基于流的系统中,那些被存储起来的状态实际上也就是表。而这些逻辑在这个系统的设计过程中被选择性地向用户隐藏了。