[Dataflow 教學] Dataflow 是什麼?跟 Apache Beam 有什麼關係?

Dataflow 是 GCP 資料三兄弟的老二 (依知名度排行),你的資料要放進 BigQuery 分析之前,要先整理好資料的格式,所以 GCP 強力推薦使用 Dataflow。

為什麼不用 VM 就好?你可以直接跳到最後一段!

一、Dataflow 是什麼?

想像你是一間工廠的管理者,每天都要處理大量的原物料(資料)。這些原物料需要經過各種加工程序,最後變成成品。

GCP 的 Dataflow 就像是一個全自動的生產線系統,你只要設定好要怎麼處理這些原物料,它就會自動幫你完成所有工序,而且還會自動調整工人(運算資源)的數量。

二、Dataflow 舉例說明

當然這樣子講還是有點模糊,舉個實際的例子來說:假設你需要處理每天的銷售資料,Dataflow 可以自動幫你:

  1. 收集各個分店的銷售資料
  2. 清理不正確的資料
  3. 計算各種統計數字
  4. 把結果存到資料庫

三、Dataflow 跟 Apache Beam 有什麼關係?

Apache Beam 是 Dataflow 的原型,GCP 只是把跟 Apache Beam 做一些加值的優化,放在 GCP 上,這樣你就不用自己在地端安裝和設定 Apache Beam,節省很多準備環境 (尤其是機器的安裝設定) 的時間。

Apache Beam 就像是一套標準的工廠作業規範。它定義了一些基本的處理方式,讓你可以用同一種方式來描述你想要的資料處理流程,不管最後是要在哪裡執行。

四、Apache Beam 的組成元件

讓我們來看看它的主要組成元件:

1. Pipeline(管線): 這就像是整個工廠的生產線規劃圖,定義了資料要如何從原料變成成品,經過哪些步驟。

2. PCollection(資料集合): 把它想像成工廠裡的輸送帶,上面放著要處理的原料或半成品。每次資料經過一道處理程序,就會形成新的一條輸送帶。

3. Transform(轉換): 這就是工廠裡的各種加工設備,例如:

  • ParDo:像是一個工作站,可以對每件原料進行客製化處理
  • GroupByKey:就像是分類站,把相同種類的產品集中在一起
  • Combine:類似於組裝線,把多個零件組合成一個成品

4. I/O Transforms(輸入輸出轉換): 就像工廠的原料進貨口和成品出貨口,決定資料要從哪裡讀入,最後要存到哪裡去。

5. Window(時間窗口): 假設你想要每小時統計一次生產數量,Window 就是幫你把資料按照時間切分的工具。

6. Trigger(觸發器): 就像是工廠的警報系統,當某些條件達成時(比如收集到足夠的數據),就會觸發特定的處理程序。

最重要的是,這整套系統的特色是:

  • 全自動化:設定好後就會自動運作
  • 可擴展性:需要處理更多資料時,會自動增加處理能力
  • 容錯能力:即使某部分出問題,系統也能自動恢復
  • 即時處理:能夠處理即時流入的資料,也能處理已存在的資料

關於 Apache Beam 詳細說明可以參考這份文件

五、從具體的角度再解釋一次 Apache Beam 的組成元件

1. Pipeline(管線)

從技術角度來看,Pipeline 是整個數據處理的骨幹,它包含了所有的處理步驟和邏輯。

# Pipeline 的基本架構
pipeline = beam.Pipeline()
result = (pipeline 
    | "讀取資料" >> ReadFromText('input.csv')
    | "處理資料" >> Process()
    | "寫入結果" >> WriteToText('output.txt'))

例子:處理網站日誌文件,從讀取原始日誌、解析內容、到產生報表的整個流程。

2. PCollection(資料集合)

這是 Beam 中最基本的資料結構,可以存放任何型別的資料,而且是不可變的(immutable)。每次轉換都會產生新的 PCollection。

# 一個包含用戶訪問記錄的 PCollection
visits = pipeline | beam.Create([
    {'user': 'A', 'page': '/home', 'time': '2024-01-01 10:00'},
    {'user': 'B', 'page': '/products', 'time': '2024-01-01 10:05'}
])

例子:存放使用者的點擊記錄,每筆記錄包含用戶ID、訪問頁面、時間戳等資訊。

3. Transform(轉換)

轉換是對 PCollection 進行操作的處理單元。主要類型包括:

(1) ParDo(並行處理):

# 解析每一行日誌並提取重要資訊
class ParseLog(beam.DoFn):
    def process(self, element):
        user, action, timestamp = element.split(',')
        return [{
            'user': user,
            'action': action,
            'timestamp': timestamp
        }]

logs | beam.ParDo(ParseLog())

例子:將原始的日誌文字轉換成結構化的資料格式。

(2) GroupByKey(分組):

# 依照用戶ID分組,統計每個用戶的訪問次數
user_visits = (visits 
    | beam.Map(lambda x: (x['user'], 1))
    | beam.GroupByKey()
    | beam.Map(lambda x: {x[0]: sum(x[1])})
)

例子:統計每個用戶在不同頁面的停留時間。

(3) Combine(合併):

# 計算每個頁面的總訪問次數
page_visits = (visits
    | beam.Map(lambda x: (x['page'], 1))
    | beam.CombinePerKey(sum)
)

例子:計算網站每個頁面的總瀏覽量。

4. I/O Transforms(輸入輸出轉換)

# 從多個來源讀取資料
logs = pipeline | beam.io.ReadFromText('logs/*.txt')
# 寫入到資料庫
results | beam.io.WriteToMongoDB(uri='mongodb://localhost:27017')

例子:從 S3 讀取日誌檔案,處理後寫入到 BigQuery。

5. Window(時間窗口)

# 每5分鐘統計一次訪問量
windowed_counts = (visits
    | beam.WindowInto(window.FixedWindows(300))  # 5分鐘
    | beam.GroupByKey()
    | beam.Map(count_visits)
)

例子:統計每小時的活躍用戶數,或計算每分鐘的交易金額。

6. Trigger(觸發器)

# 當收集到100筆資料或等待時間超過1分鐘時觸發處理
early_results = (data
    | beam.WindowInto(
        window.GlobalWindows(),
        trigger=trigger.Repeatedly(
            trigger.AfterCount(100) | 
            trigger.AfterProcessingTime(60)
        ),
        accumulation_mode=trigger.AccumulationMode.DISCARDING
    )
)

例子:即時監控系統,當檢測到異常訪問模式時立即觸發警報。

六、Apache Beam 實際應用整合範例

# 完整的網站訪問分析pipeline
def analyze_website_logs():
    with beam.Pipeline() as pipeline:
        results = (pipeline
            | "讀取日誌" >> beam.io.ReadFromText('logs/*.txt')
            | "解析日誌" >> beam.ParDo(ParseLog())
            | "加上時間窗口" >> beam.WindowInto(window.FixedWindows(3600))
            | "依頁面分組" >> beam.Map(lambda x: (x['page'], 1))
            | "計算訪問量" >> beam.CombinePerKey(sum)
            | "格式化輸出" >> beam.Map(format_output)
            | "寫入結果" >> beam.io.WriteToText('results.txt')
        )

這個例子展示了如何使用這些組件來建立一個完整的數據處理流程:

  1. 讀取多個日誌文件
  2. 解析每條日誌記錄
  3. 設定每小時的分析窗口
  4. 按頁面分組並計算訪問量
  5. 格式化輸出結果
  6. 將結果寫入文件

這樣的資料處理管道可以處理任意大小的資料集,而且能夠自動擴展和容錯。

需要注意的是,這些程式碼都是在本地端執行,如果要在 GCP Dataflow 上運行,只需要修改 Pipeline 的執行器設定即可,程式碼邏輯不需要改變。

七、為什麼不自己在 VM 上寫 Python 來處理資料就好?為何要用 Dataflow?到底差在哪裡?

讓我用實際的例子來說明在 VM 上自己寫 Python 處理資料,與使用 Dataflow 的差異。

情境:處理每日 100GB 的使用者點擊日誌 Log

(一) 在 VM 上自己寫 Python 的程式碼大概長這樣子:

# 在單一 VM 上運行的 Python 程式碼
def process_logs():
    with open('huge_logs.txt', 'r') as file:
        for line in file:
            user_data = parse_log(line)
            # 處理記憶體不足的問題
            if memory_usage > threshold:
                save_temporary_results()
                clear_memory()
            
            # 處理單機運算慢的問題
            processed_data = complex_calculation(user_data)  # 可能要跑很久
            
            # 處理機器當機的問題
            try:
                save_to_database(processed_data)
            except ConnectionError:
                retry_save(processed_data)  # 需要自己寫重試機制 (retry)

可能會遇到的問題:

  1. 記憶體限制
    • VM 記憶體用完就崩潰
    • 需要自己寫程式分批處理
    • 要自己管理暫存資料
  2. 運算效能
    • 單機處理 100GB 可能要跑一整天,花錢
    • CPU 滿載可能影響其他程式
    • 擴充需要自己開更多 VM、寫分散式程式,管機器太累了
  3. 錯誤處理
    • 程式崩潰要從頭重跑
    • 網路斷線要自己處理
    • VM 當機就前功盡棄
  4. 監控和維護
    • 要自己寫 Log
    • 要自己監控 CPU/記憶體
    • 要自己處理備份

(二) 使用 Dataflow 的情況

# Dataflow 程式碼
with beam.Pipeline(options=PipelineOptions()) as pipeline:
    results = (pipeline 
        | 'ReadLogs' >> ReadFromText('gs://bucket/huge_logs.txt')
        | 'ParseLogs' >> beam.ParDo(ParseLogFn())
        | 'CalculateResults' >> beam.ParDo(ComplexCalculationFn())
        | 'WriteToDB' >> WriteToDatabase()
    )

不覺得看起來簡單很多嗎?接下來看一下 Dataflow 自動處理的部分:

  1. 自動擴展(Auto-scaling)
    • 自動偵測資料量
    • 自動增加/減少運算資源
    • 不用擔心記憶體爆掉
  2. 平行處理(Parallel Processing)
    • 自動分散工作到多台機器
    • 100GB 資料可能 1 小時就處理完
    • 有效利用 Google 的運算資源
  3. 錯誤恢復(Fault Tolerance)
    • 某台機器掛掉會自動重試
    • 網路問題自動處理
    • 不會從頭重跑,從上次中斷點繼續
  4. 監控和維護
    • 完整的監控儀表板
    • 即時查看處理進度
    • 系統層級的 Log

(三) 何時用 Dataflow 或 VM?

什麼時候選擇 Dataflow:

  1. 資料量大(>10GB)
  2. 需要即時處理(Streaming)
  3. 處理邏輯複雜
  4. 需要可靠的錯誤處理
  5. 需要自動擴展
  6. 預算允許(雖然比 VM 貴,但省下很多人力跟時間)

什麼時候用 VM 就夠:

  1. 資料量小(<1GB)
  2. 單次處理就好
  3. 處理邏輯簡單
  4. 不急著要結果 (你可以使用 Spot VM 享受 60%~90% Off 的折扣)
  5. 預算有限

重點是:Dataflow 不只是一個執行環境,而是一個完整的資料處理平台。雖然前期學習成本較高,但在處理大量資料時,會省下更多開發和維護的時間。

這就像是比較「自己蓋房子」和「請建商蓋房子」的差異 – 雖然請建商比較貴,但他們有完整的團隊、標準流程和品質保證,最終反而更有效率。

以上先簡單介紹一下,之後再深入介紹技術細節和實際操作。

返回頂端