張宗耀:bilibili每天100T+的數(shù)據(jù)導入是如何實現(xiàn)的|DataFunTalk

導讀:B站千億級數(shù)據(jù)同步,每天100T+數(shù)據(jù)導入是如何實現(xiàn)的?本文將介紹Apache SeaTunnel在嗶哩嗶哩的實踐。包括以下幾方面內(nèi)容:

  • 工具選擇
  • 日志
  • 提速/限流
  • 監(jiān)控自理

01
工具選擇

數(shù)據(jù)集成和數(shù)據(jù)出倉的大體流程如下圖所示,主要以數(shù)倉為中心,從HTTP、MySQL等外部數(shù)據(jù)源抽數(shù)入倉,在數(shù)倉內(nèi)做相應業(yè)務處理后,出倉到對應的Clickhouse、MySQL等存儲,供業(yè)務使用。

張宗耀:bilibili每天100T+的數(shù)據(jù)導入是如何實現(xiàn)的|DataFunTalk

B站數(shù)據(jù)平臺在離線出入倉工具上目前主要有兩類。一類是基于DataX二次開發(fā)的Rider項目,另一類是基于Seatunnel 1.1.3二次開發(fā)的AlterEgo項目。

張宗耀:bilibili每天100T+的數(shù)據(jù)導入是如何實現(xiàn)的|DataFunTalk

上圖展示了Rider的架構(gòu)。

Rider在使用上支持T+1、H+1定時調(diào)度,在數(shù)據(jù)源上支持HTTP、MySQL、BOSS等作為數(shù)據(jù)源,此外Rider在使用上主要原生讀寫Hdfs文件。

DataX雖然在單個進程下已經(jīng)足夠優(yōu)秀,但是不支持分布式,另外在大數(shù)據(jù)量下表現(xiàn)不是很好,Seatunnel在分布式場景下表現(xiàn)優(yōu)秀,一方面構(gòu)建在spark之上,天然分布式,另外且自帶了很多插件,非常適合二次開發(fā)。我們調(diào)研后,在Seatunnel基礎上二次開發(fā)了AlterEgo項目。

張宗耀:bilibili每天100T+的數(shù)據(jù)導入是如何實現(xiàn)的|DataFunTalk

AlterEgo的工作流程:Input[數(shù)據(jù)源輸入] -> Filter[數(shù)據(jù)處理] -> Output[結(jié)果輸出]。

集群規(guī)模方面,由于歷史原因,目前離線集群與出入倉工具集群是分開部署的。出入倉工具集群節(jié)點數(shù)20+,CPU核數(shù)750+,內(nèi)存1.8T+;每日出入倉調(diào)度任務同步數(shù)據(jù)方面,日均記錄數(shù)上千億,日均數(shù)據(jù)量在100T以上。

在落地方面,主要提供了界面化操作,對任務做了抽象和封裝,平臺化后提供給用戶使用。平臺會根據(jù)用戶不同選擇,把任務封裝為Seatunnel的配置格式或DataX的Json格式配置。目前平臺的功能如圖,用戶可以選擇數(shù)倉數(shù)據(jù)源的庫名、表名和出倉后存儲的目標數(shù)據(jù)源的信息。這里為了管理接入血緣考慮,和安全規(guī)范使用流程,用戶在界面內(nèi)不被允許填寫用戶名密碼,可以選擇對應創(chuàng)建的數(shù)據(jù)源。在存儲個性上,比如為兼容MySQL協(xié)議的數(shù)據(jù)源的導入上,支持了Insert Ignore、Insert Update方式導入數(shù)據(jù)。字段映射可以通過界面上拖拽完成配置。

張宗耀:bilibili每天100T+的數(shù)據(jù)導入是如何實現(xiàn)的|DataFunTalk

在任務運維界面,通過DAG查看調(diào)度任務上下文,出倉和入倉的整個過程中,任務是互相依賴的,前面的任務出問題會導致后面的任務產(chǎn)出慢、數(shù)據(jù)延遲等。因此排查問題的過程中,往往需要在任務DAG中找到上游依賴最長的鏈路或是未完成鏈路并排查問題。

02
日志

平臺化落地其實難度不大,套個皮就可以了,但是很多時候我們要考慮的是面向運維開發(fā),細節(jié)都是對用戶是封裝好的,需要為用戶提供足夠的運維工具支持。這里以日志為例,排錯是很常見的場景,當用戶排錯時,用戶并不希望看到密密麻麻且無用的Yarn日志,但如果使用spark日志,由于Spark環(huán)境配置繁瑣,直接暴露Spark UI給用戶也會讓用戶的使用體驗不佳。此外,在后期我們整合入離線大集群后,集群節(jié)點數(shù)目有四五千個節(jié)點,集群規(guī)模大就導致日志聚合會慢,日志響應時間長。

張宗耀:bilibili每天100T+的數(shù)據(jù)導入是如何實現(xiàn)的|DataFunTalk

為解決日志查詢困難的問題,我們對日志層做了優(yōu)化,在Spark上使用LogAgent把業(yè)務日志轉(zhuǎn)發(fā)到我們的日志服務上。為方便查詢,且讓日志歷史信息可追溯,LogAgent在日志中追加了jobId、jobHistoryId、priority等信息。這樣采集日志后,我們會根據(jù)日志內(nèi)信息做各類告警,例如當任務出倉條數(shù)為0時發(fā)送告警等。此外,當日志有報錯打出,用戶可以直接在日志界面展示的日志里定位程序的問題,所有操作直接在平臺就可以完成,而不需要其他復雜的配置。

03
提速/限流

  1. ClickHouse出倉

Clickhouse數(shù)據(jù)出倉方式有三種:

寫分布式表:寫入性能偏低,代碼比較簡單,不需要依賴RDD Repartition。

寫Local表:需要在本地做一次repartition,會有性能壓力。但寫入性能會更高,和寫分布式表一樣,主要用Jdbc協(xié)議。

BulkLoad:BulkLoad將寫壓力前置到Spark層,寫入速度快,降低了Clickhouse側(cè)壓力,寫入不影響讀性能,做到讀寫分離,更加安全。依賴的是文件復制。

Clickhouse出倉任務調(diào)度記錄達到60億以上,數(shù)據(jù)量達到13T以上;手動補數(shù)據(jù)數(shù)據(jù)量在70T,數(shù)據(jù)量和記錄數(shù)都在不斷增長。

  1. 創(chuàng)作中心-出倉加速

簡單介紹下我們對創(chuàng)作中心的數(shù)據(jù)出倉做了一系列優(yōu)化,加速了數(shù)據(jù)出倉過程。

創(chuàng)作中心使用大量使用TiDB,我們利用jdbc協(xié)議批量寫數(shù)據(jù)。當寫得快會導致TiDB-Server IO高、壓力大。另外在數(shù)據(jù)出倉過程中,可能有新建分區(qū)表的需求,當出現(xiàn)DDL操作時,寫入很容易出現(xiàn)Information schema is changed導致失敗。如果存在更新數(shù)據(jù)場景,也會由于Insert update時需要把數(shù)據(jù)全部讀出,當任務出現(xiàn)失敗后重試時任務耗時會增長,性能降低。此外,TIDB集群是有限的,多個業(yè)務同時寫入TiDB時候會出現(xiàn)多實例競爭寫入資源,導致寫入時間耗時增加。

以上問題,我們的應對方案主要是基于業(yè)務大都為KV查詢,用自研的分布式KV存儲TaiShan替代TiDB。創(chuàng)作中心業(yè)務主要集中在點查和Range查詢,比較適合KV類存儲。TaiShan是B站自研的分布式KV存儲,經(jīng)過多次出倉實際壓測,TaiShan的Batch寫入方式和TiDB性能接近,實際使用并沒有多少性能提升,寫入多時也會影響到讀的業(yè)務。我們最終采用正在做的Bulk Load方式寫入TaiShan。Bulk Load優(yōu)化和前面介紹的Clickhouse的優(yōu)化類似,將寫入壓力前置,放到SeaTunnel層來生成數(shù)據(jù)文件;對于業(yè)務庫能實現(xiàn)簡單的讀寫分離,但可能會存在一些熱點問題,需要前置一次repartion。

張宗耀:bilibili每天100T+的數(shù)據(jù)導入是如何實現(xiàn)的|DataFunTalk

我們對jdbc協(xié)議寫入TiDB和BulkLoad的方案分別做了壓測,TiDB寫入3、4億條數(shù)據(jù)多實例寫入的情況下,壓測任務要運行兩小時以上,TaiShan只需要十幾分鐘即可跑完壓測,從結(jié)果來看Bulkload簡直不要太好,但有個無法回避的問題是TiDB集群是多個業(yè)務同時寫入的,分散到單個任務看起來寫入時間長。

我們也在嘗試繞過TiDB直接將數(shù)據(jù)寫入TiKV,這個方案我們也在調(diào)研和實踐中,感興趣的小伙伴可以看下:

https://github.com/tidb-incubator/TiBigData

張宗耀:bilibili每天100T+的數(shù)據(jù)導入是如何實現(xiàn)的|DataFunTalk
  1. 限速

在出倉場景,實際上還要考慮限流以及熔斷,沒有限速可能導致業(yè)務庫有一些問題,畢竟服務器能力有限,寫的太快將導致讀有影響。最開始我們使用的方法是在代碼內(nèi)Sleep,簡單實現(xiàn)就是假設數(shù)據(jù)寫入很快,可以在一毫秒內(nèi)完成,那么寫入的耗時就是代碼中sleep的時間,假如為例限速1w或5k,我們會通過sleep的時長就可以得出Spark需要的Executor數(shù),達到間接的、不準確的限流。漏斗桶和令牌桶一個限制流入一個限制流出,我們用的不是很多。分布式的話我們小范圍使用了Sentinel,但分布式限流如果觸發(fā)熔斷可能由于寫入資源有限而寫入一直處于熔斷狀態(tài),導致寫入時間長、數(shù)據(jù)任務破線。BBR算法是個很好的工具,有很多種實現(xiàn),依賴的參數(shù)很多,甚至可以有對端的CPU及內(nèi)存水位,不斷嘗試得到最佳寫入量,在使用上可以很好的改善峰值問題。

04
監(jiān)控自理

  1. 監(jiān)控

在使用上,我們承接了幾乎所有的離線出倉和入倉任務,作為數(shù)倉的零層和尾層,在入倉和出倉時需要及時感知可能存在的問題,一方面任務打優(yōu)先級,方便分級處理,在運行時,基于歷史指標預測當前任務的指標,當出現(xiàn)問題時及時告警接入檢查。

AlterEgo中,我們在Spark的Application Job內(nèi)定時上報寫入速度和寫入的數(shù)據(jù)量。Rider是常駐的可以運行多個Job,在Job中以Job為單位上報監(jiān)控數(shù)據(jù)。AlterEgo和Rider的數(shù)據(jù)全部接入到消息隊列里,消息最后被Aulick消費。Aulick內(nèi)設計了多種指標監(jiān)聽器,用于任務的監(jiān)控,包括運行時間、起止時間、速度、數(shù)據(jù)量、失敗重試次數(shù)和TiDB和MySQL特有的插入/更新數(shù)據(jù)量?;谶@些采集的指標數(shù)據(jù),可以做到任務實例頁,方便用戶查看,另外匯總信息可以通過Grafana工具以及其他BI工具功能展示,異常告警交由Sensor做告警觸發(fā)。

張宗耀:bilibili每天100T+的數(shù)據(jù)導入是如何實現(xiàn)的|DataFunTalk

數(shù)據(jù)采集上,Rider方式實現(xiàn)有的內(nèi)存Channel可以拿到同步的數(shù)據(jù)量等指標信息。AlterEgo方式由于是分布式,會有多個Executor進程同時上報,目前我們主要是通過自定義了累加器完成指標的上報,Executor端在使用累加器時實例化定時采集線程,由于各個Executor進程啟動時間不同,所以在上報時的時間點是不準確的,在使用上我們把時間按照10秒一個窗口進行規(guī)整,如在0-10秒上報的數(shù)據(jù)會全部規(guī)整到0秒上進行匯總。

張宗耀:bilibili每天100T+的數(shù)據(jù)導入是如何實現(xiàn)的|DataFunTalk
  1. 自理

在數(shù)據(jù)同步過程中,在數(shù)據(jù)維度上,需要發(fā)現(xiàn)異常讀寫速度、異常讀寫流量和異常走勢,出現(xiàn)問題及時監(jiān)控和報警,報警會有電話報警到對應負責人,對數(shù)據(jù)異常進行處理,防止由于上游數(shù)據(jù)導入任務異常導致下游數(shù)據(jù)產(chǎn)出問題。這類其實也可以通過DQC去做,但側(cè)重點不同,這里更關注事中觸發(fā)。

在時間維度上,基于任務歷史預測,數(shù)據(jù)同步任務到時間未啟動,或者任務已提交到Y(jié)arn但是由于資源不足沒有啟動,數(shù)據(jù)同步任務執(zhí)行時間過長,也是需要及時處理。

在診斷方面,在任務失敗后需要解析Error日志進行失敗歸因以及跟蹤,方便用戶自理,有一定量以后,還可以做任務的統(tǒng)計,以及資源優(yōu)化。

05
精彩問答

Q1:3-4億的BulkLoad壓測性能提升是如何實現(xiàn)的?

A:BulkLoad是先寫數(shù)據(jù)到本地磁盤,然后推到存儲系統(tǒng),由存儲系統(tǒng)加載到內(nèi)存,首先是Seatunnel分布式的,在執(zhí)行時是分布式多進程生成各自的數(shù)據(jù)文件,最后再把數(shù)據(jù)文件推到存儲系統(tǒng), 實際的性能就看開多少并發(fā)度了,并且整個過程不會太消耗存儲系統(tǒng)的CPU及IO壓力,對讀也是非常友好。

Q2:限速階段如何感知下游壓力?

A:感知下游壓力,目前簡單做法可以通過后端返回的RT時間或者失敗來感知到存儲端壓力,目前熔斷可以簡單這么做,但這種不精細,無法處理好峰值問題,高級玩法可以參考BBR算法,依賴的參數(shù)很多,可以有存儲段的CPU及內(nèi)存水位,可以很好的改善峰值問題。

Q3:能否監(jiān)控精細到Byte字節(jié)數(shù)?

A:大部分細節(jié)指標需要自己試下,B站這邊是通過自定義d累加器實的現(xiàn),在寫入數(shù)據(jù)時記錄數(shù)據(jù)字節(jié)數(shù)、條數(shù)等,簡單點可以自己getBytes拿到,我們這邊累加器會定時上報到消息隊列,然后由Aulick消費數(shù)據(jù)后再進行相應地做報警動作。比如如果傳輸?shù)淖止?jié)量,字節(jié)速率存在異常,就可以及時的發(fā)下,找相應的同學協(xié)同排查問題。

Q4:DataX和Seatunnel是否可以互相替代?

A:工具平臺在落地上,互相替換是很有必要的,出去性能差別外,在執(zhí)行上也只是配置文件的區(qū)別。在集群規(guī)模很大后會收到很多環(huán)境問題、異常及各類問題影響,在工具使用上有個降級方案還是很有必要的。比如我們最近遇到了JDK的Bug,在kerberos認證時認證隊列可能拉長,引起寫入速度慢,事故當時,DataX在寫數(shù)據(jù)時需要經(jīng)過認證,無法及時定位問題,大部分集成任務運行出現(xiàn)速度慢、以及超時。為了防止事故再次出現(xiàn),我們已經(jīng)實現(xiàn)了大部分任務的可互相替換運行數(shù)據(jù)同步任務,實現(xiàn)任務的高可用。

今天的分享就到這里,謝謝大家。

本文經(jīng)授權(quán)發(fā)布,不代表增長黑客立場,如若轉(zhuǎn)載,請注明出處:http://gptmaths.com/quan/60849.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
上一篇 2022-03-15 12:34
下一篇 2022-03-15 14:05

增長黑客Growthhk.cn薦讀更多>>

發(fā)表回復

登錄后才能評論