Dataflow 是 GCP 資料三兄弟的老二 (依知名度排行),你的資料要放進 BigQuery 分析之前,要先整理好資料的格式,所以 GCP 強力推薦使用 Dataflow。
為什麼不用 VM 就好?你可以直接跳到最後一段!
一、Dataflow 是什麼?
想像你是一間工廠的管理者,每天都要處理大量的原物料(資料)。這些原物料需要經過各種加工程序,最後變成成品。
GCP 的 Dataflow 就像是一個全自動的生產線系統,你只要設定好要怎麼處理這些原物料,它就會自動幫你完成所有工序,而且還會自動調整工人(運算資源)的數量。
二、Dataflow 舉例說明
當然這樣子講還是有點模糊,舉個實際的例子來說:假設你需要處理每天的銷售資料,Dataflow 可以自動幫你:
- 收集各個分店的銷售資料
- 清理不正確的資料
- 計算各種統計數字
- 把結果存到資料庫
三、Dataflow 跟 Apache Beam 有什麼關係?
Apache Beam 是 Dataflow 的原型,GCP 只是把跟 Apache Beam 做一些加值的優化,放在 GCP 上,這樣你就不用自己在地端安裝和設定 Apache Beam,節省很多準備環境 (尤其是機器的安裝設定) 的時間。
Apache Beam 就像是一套標準的工廠作業規範。它定義了一些基本的處理方式,讓你可以用同一種方式來描述你想要的資料處理流程,不管最後是要在哪裡執行。
四、Apache Beam 的組成元件
讓我們來看看它的主要組成元件:
1. Pipeline(管線): 這就像是整個工廠的生產線規劃圖,定義了資料要如何從原料變成成品,經過哪些步驟。
2. PCollection(資料集合): 把它想像成工廠裡的輸送帶,上面放著要處理的原料或半成品。每次資料經過一道處理程序,就會形成新的一條輸送帶。
3. Transform(轉換): 這就是工廠裡的各種加工設備,例如:
- ParDo:像是一個工作站,可以對每件原料進行客製化處理
- GroupByKey:就像是分類站,把相同種類的產品集中在一起
- Combine:類似於組裝線,把多個零件組合成一個成品
4. I/O Transforms(輸入輸出轉換): 就像工廠的原料進貨口和成品出貨口,決定資料要從哪裡讀入,最後要存到哪裡去。
5. Window(時間窗口): 假設你想要每小時統計一次生產數量,Window 就是幫你把資料按照時間切分的工具。
6. Trigger(觸發器): 就像是工廠的警報系統,當某些條件達成時(比如收集到足夠的數據),就會觸發特定的處理程序。
最重要的是,這整套系統的特色是:
- 全自動化:設定好後就會自動運作
- 可擴展性:需要處理更多資料時,會自動增加處理能力
- 容錯能力:即使某部分出問題,系統也能自動恢復
- 即時處理:能夠處理即時流入的資料,也能處理已存在的資料
關於 Apache Beam 詳細說明可以參考這份文件。
五、從具體的角度再解釋一次 Apache Beam 的組成元件
1. Pipeline(管線)
從技術角度來看,Pipeline 是整個數據處理的骨幹,它包含了所有的處理步驟和邏輯。
# Pipeline 的基本架構
pipeline = beam.Pipeline()
result = (pipeline
| "讀取資料" >> ReadFromText('input.csv')
| "處理資料" >> Process()
| "寫入結果" >> WriteToText('output.txt'))
例子:處理網站日誌文件,從讀取原始日誌、解析內容、到產生報表的整個流程。
2. PCollection(資料集合)
這是 Beam 中最基本的資料結構,可以存放任何型別的資料,而且是不可變的(immutable)。每次轉換都會產生新的 PCollection。
# 一個包含用戶訪問記錄的 PCollection
visits = pipeline | beam.Create([
{'user': 'A', 'page': '/home', 'time': '2024-01-01 10:00'},
{'user': 'B', 'page': '/products', 'time': '2024-01-01 10:05'}
])
例子:存放使用者的點擊記錄,每筆記錄包含用戶ID、訪問頁面、時間戳等資訊。
3. Transform(轉換)
轉換是對 PCollection 進行操作的處理單元。主要類型包括:
(1) ParDo(並行處理):
# 解析每一行日誌並提取重要資訊
class ParseLog(beam.DoFn):
def process(self, element):
user, action, timestamp = element.split(',')
return [{
'user': user,
'action': action,
'timestamp': timestamp
}]
logs | beam.ParDo(ParseLog())
例子:將原始的日誌文字轉換成結構化的資料格式。
(2) GroupByKey(分組):
# 依照用戶ID分組,統計每個用戶的訪問次數
user_visits = (visits
| beam.Map(lambda x: (x['user'], 1))
| beam.GroupByKey()
| beam.Map(lambda x: {x[0]: sum(x[1])})
)
例子:統計每個用戶在不同頁面的停留時間。
(3) Combine(合併):
# 計算每個頁面的總訪問次數
page_visits = (visits
| beam.Map(lambda x: (x['page'], 1))
| beam.CombinePerKey(sum)
)
例子:計算網站每個頁面的總瀏覽量。
4. I/O Transforms(輸入輸出轉換)
# 從多個來源讀取資料
logs = pipeline | beam.io.ReadFromText('logs/*.txt')
# 寫入到資料庫
results | beam.io.WriteToMongoDB(uri='mongodb://localhost:27017')
例子:從 S3 讀取日誌檔案,處理後寫入到 BigQuery。
5. Window(時間窗口)
# 每5分鐘統計一次訪問量
windowed_counts = (visits
| beam.WindowInto(window.FixedWindows(300)) # 5分鐘
| beam.GroupByKey()
| beam.Map(count_visits)
)
例子:統計每小時的活躍用戶數,或計算每分鐘的交易金額。
6. Trigger(觸發器)
# 當收集到100筆資料或等待時間超過1分鐘時觸發處理
early_results = (data
| beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(
trigger.AfterCount(100) |
trigger.AfterProcessingTime(60)
),
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
)
例子:即時監控系統,當檢測到異常訪問模式時立即觸發警報。
六、Apache Beam 實際應用整合範例
# 完整的網站訪問分析pipeline
def analyze_website_logs():
with beam.Pipeline() as pipeline:
results = (pipeline
| "讀取日誌" >> beam.io.ReadFromText('logs/*.txt')
| "解析日誌" >> beam.ParDo(ParseLog())
| "加上時間窗口" >> beam.WindowInto(window.FixedWindows(3600))
| "依頁面分組" >> beam.Map(lambda x: (x['page'], 1))
| "計算訪問量" >> beam.CombinePerKey(sum)
| "格式化輸出" >> beam.Map(format_output)
| "寫入結果" >> beam.io.WriteToText('results.txt')
)
這個例子展示了如何使用這些組件來建立一個完整的數據處理流程:
- 讀取多個日誌文件
- 解析每條日誌記錄
- 設定每小時的分析窗口
- 按頁面分組並計算訪問量
- 格式化輸出結果
- 將結果寫入文件
這樣的資料處理管道可以處理任意大小的資料集,而且能夠自動擴展和容錯。
需要注意的是,這些程式碼都是在本地端執行,如果要在 GCP Dataflow 上運行,只需要修改 Pipeline 的執行器設定即可,程式碼邏輯不需要改變。
七、為什麼不自己在 VM 上寫 Python 來處理資料就好?為何要用 Dataflow?到底差在哪裡?
讓我用實際的例子來說明在 VM 上自己寫 Python 處理資料,與使用 Dataflow 的差異。
情境:處理每日 100GB 的使用者點擊日誌 Log
(一) 在 VM 上自己寫 Python 的程式碼大概長這樣子:
# 在單一 VM 上運行的 Python 程式碼
def process_logs():
with open('huge_logs.txt', 'r') as file:
for line in file:
user_data = parse_log(line)
# 處理記憶體不足的問題
if memory_usage > threshold:
save_temporary_results()
clear_memory()
# 處理單機運算慢的問題
processed_data = complex_calculation(user_data) # 可能要跑很久
# 處理機器當機的問題
try:
save_to_database(processed_data)
except ConnectionError:
retry_save(processed_data) # 需要自己寫重試機制 (retry)
可能會遇到的問題:
- 記憶體限制
- VM 記憶體用完就崩潰
- 需要自己寫程式分批處理
- 要自己管理暫存資料
- 運算效能
- 單機處理 100GB 可能要跑一整天,花錢
- CPU 滿載可能影響其他程式
- 擴充需要自己開更多 VM、寫分散式程式,管機器太累了
- 錯誤處理
- 程式崩潰要從頭重跑
- 網路斷線要自己處理
- VM 當機就前功盡棄
- 監控和維護
- 要自己寫 Log
- 要自己監控 CPU/記憶體
- 要自己處理備份
(二) 使用 Dataflow 的情況
# Dataflow 程式碼
with beam.Pipeline(options=PipelineOptions()) as pipeline:
results = (pipeline
| 'ReadLogs' >> ReadFromText('gs://bucket/huge_logs.txt')
| 'ParseLogs' >> beam.ParDo(ParseLogFn())
| 'CalculateResults' >> beam.ParDo(ComplexCalculationFn())
| 'WriteToDB' >> WriteToDatabase()
)
不覺得看起來簡單很多嗎?接下來看一下 Dataflow 自動處理的部分:
- 自動擴展(Auto-scaling)
- 自動偵測資料量
- 自動增加/減少運算資源
- 不用擔心記憶體爆掉
- 平行處理(Parallel Processing)
- 自動分散工作到多台機器
- 100GB 資料可能 1 小時就處理完
- 有效利用 Google 的運算資源
- 錯誤恢復(Fault Tolerance)
- 某台機器掛掉會自動重試
- 網路問題自動處理
- 不會從頭重跑,從上次中斷點繼續
- 監控和維護
- 完整的監控儀表板
- 即時查看處理進度
- 系統層級的 Log
(三) 何時用 Dataflow 或 VM?
什麼時候選擇 Dataflow:
- 資料量大(>10GB)
- 需要即時處理(Streaming)
- 處理邏輯複雜
- 需要可靠的錯誤處理
- 需要自動擴展
- 預算允許(雖然比 VM 貴,但省下很多人力跟時間)
什麼時候用 VM 就夠:
- 資料量小(<1GB)
- 單次處理就好
- 處理邏輯簡單
- 不急著要結果 (你可以使用 Spot VM 享受 60%~90% Off 的折扣)
- 預算有限
重點是:Dataflow 不只是一個執行環境,而是一個完整的資料處理平台。雖然前期學習成本較高,但在處理大量資料時,會省下更多開發和維護的時間。
這就像是比較「自己蓋房子」和「請建商蓋房子」的差異 – 雖然請建商比較貴,但他們有完整的團隊、標準流程和品質保證,最終反而更有效率。
以上先簡單介紹一下,之後再深入介紹技術細節和實際操作。