Netflix如何在Kappa架構下做回填(backfill)

Netflix的首頁是經過客製化的,會根據使用者的喜好有不同的外觀,當然,推薦影片的種類也會完全根據使用者的偏好。要做到這種程度,是因為在系統背後有一個應用程式會根據各種演算法將好的UX遞送給使用者。

這個應用程式稱為RMI (Real-time Merched Impression),其架構如下。

graph TD
    imp[Impression Source] --> kb1[KeyBy] --> join[Join] --> rmi[RMI Sink]
    pb[Playback Source] --> kb2[KeyBy] --> join

這是一個實時串流(real-time streaming)的Flink應用,他會將播放事件和觸及事件即時結合並產生準實時的首頁推送。這背後會經過許多機器學習演算法、資料分析以及A/B測試等機制,最後產生結果。

這個RMI也是Netflix內數一數二大的有狀態Flink應用。

為什麼需要回填?

一個串流應用可能會因為諸多原因而掛點。

  • 上下游失效,也就是無法存取上下游的資料存儲。
  • 依賴服務失效,這會造成要往下游寫入的資料無法經過豐富化(enrich)。
  • 上游資料改變,我相信這是身為資料工程師最大的痛點。

發生以上種種原因就會需要做資料回填。

而回填有以下幾種類型。

  • 修正錯誤資料
  • 回填缺失資料
  • 重新建立狀態

既有的解決方案

那麼要如何做回填?有以下三種方法。

方案一

最簡單的回填做法就是讓串流應用重新跑一次問題區間的所有事件。

聽起來很簡單對吧?但這有點不現實,因為資料留存的時間是有限的。也許你會說,我們可以加長資料留存的時間,但這樣會造成中間件(Kafka, Pulsar, etc.)的硬體成本大幅增加。

此外,留存這些原始事件的效益很差,因為在中間件停留的資料格式大多是純文字型態,頂多是Apache Avro的datum。這些壓縮率都比不上一些列式存儲格式,例如Parquet。

而且,中間件的存儲媒介是SSD,這也遠比一些物件存儲系統例如S3等高了不少。若是將Netflix所有應用的事件都延長至30天,那麼一年需要額外支出$93M。

那麼,可以把這些原始事件存到別的地方嗎?答案是肯定的,這也是Netflix自研的Keystone的價值。

Keystone是Netflix自研的一個資料繞送平台,透過簡單的設定就可以讓Kafka內的事件去到任何想去的地方,可以是另一個Kafka的topic,可以是某一個資料庫,或者是資料湖倉(Data lakehouse)。

什麼是資料湖倉?

在上一篇一文看懂 Netflix 處理 hourly pipeline 延遲中有快介紹了一下,簡單的說就是在物件存儲上(例如S3)提供可以做SQL操作的能力。

那資料湖倉有什麼優勢?

首先,資料可以用壓縮率較佳的存儲格式,例如Parquet,以大幅減少存儲成本。此外,資料湖倉還提供許多額外功能。

  • 資料剪枝(pruning),這可以極大的提升讀取效能。
  • Schema演化。為了應付上游不斷的資料改變,下游要能夠有對應的schea修改才行。
  • 無額外的依賴套件,提供所有資料應用都能夠讀寫原始事件的能力,這點尤其重要。

舉例來說。

假設我們在Kafka的播放事件如下。

1
2
3
4
{"account_id": 123, "show_id": 123, "view_duration_sec": 123},
{"account_id": 456, "show_id": 456, "view_duration_sec": 456},
{"account_id": 789, "show_id": 789, "view_duration_sec": 789},
...

那麼在資料湖倉就會建立對應的資料表。

account_id show_id view_duration metadata
123 123 123 {kafka_ingestion_ts:…}
456 456 456 {kafka_ingestion_ts:…}
789 789 789 {kafka_ingestion_ts:…}

這樣看起來,我們已經在資料湖倉有完整的事件資料,我們可以直接拿這些來做回填嗎?絕對可以,事實上這的確是解決方案的一部分。

方案二

建立一個批次處理的應用,例如Spark,讓他有和實時應用一樣的能力,只是是從資料湖倉讀取資料而不是從Kafka。

這其實就是一個Lambda架構,整個架構圖如下。

graph LR
    kafka[[Kafka]] --> flink[Flink App: prod] --> out[Output]
    kafka -.-> lake[(Lakehouse)] --> spark[Spark App: backfill] --> out

透過另一個批次應用來做回填的確可行,但這樣的方案有顯著的維護成本,任何修改都必須同步修改兩邊的應用。除此之外,因為涉及兩個應用,所以如何進行資料驗證也是一大挑戰。

而這些也都是Lambda架構天生的缺陷。

方案三

既然額外造一個批次應用會有顯著的維護成本,那何不做一個批流合一的應用?

的確,有些框架的確有支援批流合一個能力,例如:Apache Flink和Apache Beam。

但無論哪一個框架,都有他的限制。

  • Apache Flink:雖然有批流合一的能力,但實際要用批次模式運行還是需要修改一部分的程式碼。
  • Apache Beam:Beam也有批流合一的能力,但對於串流應用的一些重要特性僅有部分支援,例如狀態和流水號(watermark)。

三方案比較

| | 重播事件 | 批次應用 | 批流合一 |
|作法|用既有的實時應用搭配事件重播|做一個等價的批次應用|使用批流合一框架|
|Pros|沒有額外開發成本|資料湖倉有低廉的儲存成本|同一個應用卻支援批模式|
|Cons|儲存成本非常昂貴|顯著的維護成本|還是需要修改程式|

既然三個既有方案都還是有缺點,那我們可以合併三者的優點產生一個新的方案嗎?

當然可以,也就是標題寫到的「在Kappa架構下做回填」。

利用資料湖倉在Kappa架構下做回填

這個新的解決方案有三個目標。

  1. 提供一個可以套用在各種場景的通用解決方案
  2. 盡可能減少程式碼修改
  3. 具備水平擴展能力以提升回填效能

以下是一個概覽。

graph LR
    kafka[[Kafka]] --> flink[Flink App] --> sink[(Sink)]
    lake[(Lakehouse)] --> flink

這個實時應用可以同時支援Kafka串流資料源(更好的資料新鮮度)以及資料湖倉(更高的吞吐量)。

在處理邏輯時,新資料以Kafka資料為主,而舊資料則從資料湖倉擷取。當在回填時,統一由資料湖倉作為資料源。

儘管如此,這樣的解法依然有數種實作方式。

策略一:根據回填日期直接從檔案讀取事件

在事件送入資料湖倉時會根據時間建立分區(partition),並且將相同分區的事件共同儲存在一個檔案,也因此檔案路徑上有時間標示且單一檔案會包含對應時間內的所有事件。那麼當要做回填時,就可以根據需要回填的時間區間直接找出對應的檔案並直接利用這些事件進行回填。

之所以要將相同分區的事件儲存在一個共同檔案有兩個目的。

  1. 一個檔案內放多個事件會有較佳的壓縮率
  2. 可以避免單一事件檔案產生的檔案破碎化

當在做回填時,只要找出所有需要回填的檔案就可以使用Flink的分流算子(Split Reader)平行處理。最理想的情況是每一個檔案都可以分配到一個算子。

這種作法的優勢顯而易見。

  1. 容易水平擴展,只要增加算子就可以平行加速。
  2. 實作簡單

儘管如此,若是回填操作需要依賴其他的數據源,這樣的做法就會很難實作,因此這個做法的適用性會有些問題。

除此之外,這樣的架構有一個明顯的挑戰,那就是要如何解決事件的順序性?

當算子都是平行處理,就無法確保事件的先後,那麼對於有順序性的運算就會產生錯誤的結果。

策略二:將檔案排序並循序處理

既然策略一會碰到事件順序性的問題,那麼只要協調算子確保能夠循序讀取檔案,就能夠解決順序問題。

確實這樣就可以解決事件順序造成的影響,但這做法卻犧牲掉水平擴展能力。

即便可以解決順序性,但這樣的代價過於龐大,而且不是所有的實時應用都依賴強順序保證。

對於實時應用來說,通常只需要確保「事件時間(Event Time)」的語意完整性即可。所謂的事件時間指的是事件發生的時間,而不是事件到達的時間。因為事件有可能會延遲到達,所以實時應用能容許一定延遲。

策略三:讓實時應用接受事件延遲

這個實作的方法是策略二的強化版,既然不需要確保強順序性,那麼就可以允許某種程度的平行回填。

假設,某個實時應用允許十分鐘的延遲事件,那也就是說,我們可以讓十分鐘內的算子一起運行。

只要透過一個流水號(watermark)紀錄已經處理完的時間區間,就可以通知之後的算子開跑。

舉例來說,在資料湖倉中以五分鐘為一個分區,但因為0-5以及5-10的事件量大,所以切分成兩個檔案。目前湖倉內的檔案共五個如下:

  • T=0..5
  • T=0..5
  • T=5..10
  • T=5..10
  • T=10..15

再假設,總共有三個算子,所以一開始運行的時候可以同時處理

  • T=0..5
  • T=0..5
  • T=5..10

當前兩個檔案處理完時,就可以更新流水號為5,那麼算子就可以接續處理後面的10-15。

透過流水號的機制就可以確實記錄處理進度,並協調算子之間的順序。這樣的解法既能兼顧順序性,又有部份水平擴展的能力,能夠盡可能加速回填的效率。

看起來策略三已經盡可能在兼顧順序的同時提升回填效率了,但還有一個問題沒有解決,當實時應用需要依賴多個數據源又該怎麼辦?

回到RMI的例子。

graph TD
    imp[Impression Source] --> kb1[KeyBy] --> join[Join] --> rmi[RMI Sink]
    pb[Playback Source] --> kb2[KeyBy] --> join

當兩個數據源有明顯的流速差異,那麼個別計算流水號會造成狀態爆炸(state explosion)。

狀態爆炸不只會佔用額外的系統資源,也會導致實時應用的快照緩慢甚至超時。要知道,快照對於實時應用來說是至關重要的,當快照生成失敗,最嚴重可能導致系統崩壞。

舉個明確例子,假設觸及事件的流速遠大於播放事件,那麼處理觸及事件的算子拼命消化所有的事件,但其實播放事件根本不需要這麼多,這就會造成觸及事件的算子產生大量的多餘狀態。

解決方案也很單純,既然每個事件源的流速不同,那麼就定義一個全域的流水號來記錄最慢的那個算子流水號,就可以控制所有算子的處理速度。

繼續沿用之前的例子,假設RMI這個應用可以允許延遲10分鐘,我們定義記錄觸及事件的流水號是Wimp,而播放事件的流水號是Wpb。此外,還有一個紀錄全域的流水號Wg

當觸及事件處理完一些檔案後,Wimp=5,但播放事件還沒結束,這時的Wg = min(Wimp, Wpb),也就是Wg = 0,那麼觸及事件會被限制住無法繼續處理T > 10的檔案,直到Wpb也跟上。

當Wpb跟上後,例如Wpb=10,那麼這時的Wg會是5,因此處理觸及事件的算子就可以繼續下去。

透過一個全域流水號就可以協調不同算子之間的處理速度就可以顯著避免狀態爆炸。

有了這些基礎。最終的解法出爐了。

Kappa架構的回填

flowchart LR
    imp_kafka[[Impression Kafka]] --> prod[Production Stack] --> out_kafka[[RMI Out Kafka]] -.-> out_ice[(RMI Out Iceberg)]
    pb_kafka[[Playback Kafka]] --> prod
    imp_kafka -.-> imp_ice[(Impression Iceberg)] --> back --> out_kafka
    pb_kafka -.-> pb_ice[(Playback Iceberg)] --> back[Backfill Stack]
    subgraph RMI [RMI Flink App]
        prod
        back
    end

首先,RMI這個實時應用接受觸及事件以及播放事件作為資料源。產生結果後送入接收輸出事件的Kafka內。

在Kafka內的事件都會經由Keystone同步路由到對應的Iceberg資料湖倉。

至於回填,依然是由RMI這個應用來處理,只是資料源由Kafka改為Iceberg。值得一提的是,即便有分兩個堆疊(棧),但他們用的依然是同套程式碼。

以下是一個RMI的程式範例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@SpringBootApplication
class PersonlizationStreamApp {
@Bean
def flinkJob(
@Source("impression-source") impressionSource: SourceBuilder[Record[ImpressionEvent]],
@Source("playback-source") playbaclSource: SourceBuilder[Record[PlaybackEvent]],
@sink("summary-sink") summarySink: SinkBuilder[ImpressionPlaySummary]
) {...}

@Bean
def liveImpressionSourceConfigurer(): KafkaSourceConfigurer[Record[ImpressionEvent]] =
new KafkaSourceConfigurer("live-impression-source", KafkaCirceDeserializer[ImpressionEvent])

@Bean
def backfillImpressionSourceConfigurer(): IcebergSourceConfigurer[Record[ImpressionEvent]] =
new IcebergSourceConfigurer("backfill-impression-source", Avro.deserializer[ImpressionEvent])
}

這是一個Java的範例,但就算不會寫Java也應該看得懂。這裡有兩個重點:

  1. 無論是實時處理或回填,都使用相同的結構:ImpressionEvent,即便資料源分別是Kafka和Iceberg,但資料結構完全相同。
  2. 若是該應用需要支援回填,也只需要寫一個額外的函式提供設定,而其他的程式碼都不需要修改。

至於設定檔也有類似的特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
nfflink:
job.name: rmi-app
connectors:
sources:
impression-source:
type: dynamic
selected: live-impression-source
candidates:
- live-impression-source
- backfill-impression-source
live-impression-source:
type: kafka
topics: impressions
cluster: impressions_cluster
backfill-impression-source:
type: iceberg
database: default
table: impression_table_name
max_misalignment_thresold: 10min

在設定檔內只需要提供需要回填的資訊即可,其中max_misalignment_thresold這項指的就是前面提到可以容許的最大延遲。

Conclusion

透過這樣的架構,Netflix確實取得了不錯的成效。

  • 容易建置回填功能
  • 高吞吐量:處理一整天的資料只需要約五小時。
  • 因為程式碼不需要修改,所以回填的資料一致性可以做到99.9%。
  • 透過Iceberg來取代Kafka,讓原本需要$93M/y的成本縮減成$2M/y。

資料一致性之所以是99.9%而不是100%是因為回填的時間線和實時事件不同,回填可能因為一些去重的算法或是時間戳的處理而導致資料與實時處理不一致。但對於Netflix來說,這個0.1%的損失完全可以接受。

從建制這個架構中,Netflix也了解到每個實時應用對於回填都有不一樣的需求,例如容許延遲、吞吐量或上述提到的一致性等,這些都與實時應用在處理及時事件是不太一樣。

因此對於回填的算子來說,也會需要有個別的微調,例如設定、運算資源(CPU或記憶體)等,不能統一都用實時應用的參數。


本文翻譯自DATA+AI SUMMIT 2022的 Backfill Streaming Data Pipelines in Kappa Architecture