ポスト・ラムダアーキテクチャの切り札? ー Apache Hudi NTTソフトウェアイノベーションセンタ 2020年10月16日 Zhai Hongjie, 研究員 大村 圭, 主任研究員
データを永続的に扱うデータレイクを起点としたとき、 軸①「データの取り回し」、軸②「活用のしやすさ」の2軸から課題を定義できる 2 データレイクの課題 既存のデータレイクはシンプルな機能しかなく、 多様な要件に対応するために高度化が必要 データレイク バッチデータ ストリーミング データ (Near)Real-time 分析 バッチ処理 新規データのみ (Incremental Read) 全データ ストリーミング データ ストリーミング データ バッチデータ バッチデータ バッチ処理 バッチ処理 (Near)Real-time 分析 (Near)Real-time 分析 データの更新・削除 ②分析や機械学習向けに 複雑な機能を利用する ①バッチとストリーム データを合わせて扱う
ラムダアーキテクチャ(*)のように バッチとストリーミングそれぞれで対応はできるが・・・ 3 データレイクの高度化 バッチデータ ストリーミング データ (Near)Real-time 分析 バッチ処理 新規データのみ (Incremental Read) 全データ ストリーミング データ ストリーミング データ バッチデータ バッチデータ バッチ処理 バッチ処理 (Near)Real-time 分析 (Near)Real-time 分析 データの更新・削除 ストリーミング 統合機能 バッチ統合機能 ストレージ (HDFS、S3、 etc.) ストリーミング 統合機能 ストリーミング 対応 バッチ統合機能 バッチ 差分 取得 Update/Delete対応 パイプラインがどんどん複雑になり、運用が困難 (*): http://lambda-architecture.net
HDFS・S3などはデファクトスタンダードになっているため、変更しづらい 必要な機能を全部まとめたストレージレイヤを用意 4 高度化・シンプル化の両立 今回はApache Hudiを紹介します バッチデータ ストリーミング データ (Near)Real-time 分析 バッチ処理 新規データのみ (Incremental Read) 全データ ストリーミング データ ストリーミング データ バッチデータ バッチデータ バッチ処理 バッチ処理 (Near)Real-time 分析 (Near)Real-time 分析 データの更新・削除 ストレージ (HDFS、S3、 etc.) ストレージレイヤ ソフトウェア 最近Apache Hudiや Delta Lakeなどの OSSが出ています
Apache Hudiの「Hudi」の由来はどれでしょう? A: 初期の名前Hoodieと同じ発音の単語 B: Hadoop Upsert Delete and Incrementalの略 C: Hive, Uber DEveloped略「HuDE」と同じ発音の単語 D: 開発者の名前 5 【問題】 Hudiは2016年からUberによって開発され、多様なワークロード (Read重視・Write重視)に対応できることが特徴です。 https://github.com/apache/hudi
Apache Hudiの「Hudi」の由来はどれでしょう? A: 初期の名前Hoodieと同じ発音の単語 B: Hadoop Upsert Delete and Incrementalの略 C: Hive, Uber DEveloped略「HuDE」と同じ発音の単語 D: 開発者の名前 6 【解答】 名前の通り、Apache HudiはHDFS・S3などにデータのUpsert(Update & Insert)、Delete、Incremental Read機能を実現するソフトウェア
7 Apache Hudiの概要 Copy-On-Write(CoW) Table: Read-intensiveなワークロード向き(書き込みが重い) ・新規データは既存の小さいParquetに統合する Merge-On-Read(MoR) Table: Write-intensiveなワークロード向き(読み出しが重い) ・新規データはAvroで一時保存してからParquetに統合(Compaction)する Apache Hudi ストレージ(HDFS,S3, etc.) Data (Parquet ,Avro) Spark API HudiWrite Client API HudiReadC lient API Spark API Metadat a IndexDB Copy-On-Write Table Merge-On-Read Table ・バッチRead バッチ分析向け ・Incremental Read リアルタイム分析向け ・Compaction ・Rollback データを指定の時間 まで巻き戻す ・バッチUpsert ・バッチDelete ・ストリーミング Write
8 Apache Hudiの概要 Copy-On-Write(CoW) Table: Read-intensiveなワークロード向き(書き込みが重い) ・新規データは既存の小さいParquetに統合する Merge-On-Read(MoR) Table: Write-intensiveなワークロード向き(読み出しが重い) ・新規データはAvroで一時保存してからParquetに統合(Compaction)する Apache Hudi ストレージ(HDFS,S3, etc.) Data (Parquet ,Avro) Spark API HudiWrite Client API HudiReadC lient API Spark API Metadat a IndexDB Copy-On-Write Table Merge-On-Read Table ・バッチRead バッチ分析向け ・Incremental Read リアルタイム分析向け ・Compaction ・Rollback データを指定の時間 まで巻き戻す ・バッチUpsert ・バッチDelete ・ストリーミング Write
9 Apache Hudiの概要 Copy-On-Write(CoW) Table: Read-intensiveなワークロード向き(書き込みが重い) ・新規データは既存の小さいParquetに統合する Merge-On-Read(MoR) Table: Write-intensiveなワークロード向き(読み出しが重い) ・新規データはAvroで一時保存してからParquetに統合(Compaction)する Apache Hudi ストレージ(HDFS,S3, etc.) Data (Parquet ,Avro) Spark API HudiWrite Client API HudiReadC lient API Spark API Metadat a IndexDB Copy-On-Write Table Merge-On-Read Table ・バッチRead バッチ分析向け ・Incremental Read リアルタイム分析向け ・Compaction ・Rollback データを指定の時間 まで巻き戻す ・バッチUpsert ・バッチDelete ・ストリーミング Write
10 Apache Hudiの概要 Apache Hudi ストレージ(HDFS,S3, etc.) Data (Parquet ,Avro) Spark API HudiWrite Client API HudiReadC lient API Spark API Matadat a IndexDB Copy-On-Write Table Merge-On-Read Table ・バッチRead バッチ分析向け ・Incremental Read リアルタイム分析向け ・Compaction ・Rollback データを指定の時間 まで巻き戻す ・バッチUpsert ・バッチDelete ・ストリーミング Write 今回はApache Hudiのベンチマークからわかる 得意分野・落し穴をピックアップして紹介 ※ベンチマークは0.5.2のものです。最新版では仕様が変更されている可能性があります
11 1:テーブルタイプーMoR vs CoW 0 10000 20000 30000 40000 50000 60000 70000 80000 Create Append Delete 時間(ms) Upsert性能比較 (Partitionなし) COW MOR AppendではMoRはCoW より19.4%速くなった 遅い 速い 設計通りUpsert(Update & Insert)に関してはMoRのほうが速い ただし例外も・・・ Benchmark Environment: ・Master: AWS m5.2xlarge * 1 ・Worker: AWS m5.2xlarge * 3 ・Hadoop 3.2.1 & Spark 2.4.5 ・Apache Hudi 0.5.2 ・Data: TPC-DS / store_sales
Upsert系の性能は1ファイルのサイズに左右される • ベンチマークのように初期データを細かく分割して書き込んだ直後、 MoRとCoWは変わらない • 長期運用だと設計上MoRが速くなる 12 MoRは常に速いわけではない 0 200000 400000 600000 800000 1000000 1200000 1400000 1 10 20 30 40 50 60 70 80 90 100 時間(ms) Scale Factor Insert性能(Partitionあり) Insert(COW) with Partition Insert(MOR) wth Partition 700000 750000 800000 850000 900000 950000 1000000 1050000 70 80 データをPartitionで細かく分割すると、 MoRは逆に1.5%遅くなった 遅い 速い
Upsert系の性能は1ファイルのサイズに左右される • ベンチマークのように初期データを細かく分割して書き込んだ直後、 MoRとCoWは変わらない • 長期運用だと設計上MoRが速くなる 13 MoRは常に速いわけではない Copy-On-Write Table Merge-On-Read Table Partitionありのテーブル A, B C A, B C E,F D Partitionありの場合、1 ファイルサイズは小さいた め、CoWのファイルコピー は速い。 場合によってMoRのファイ ル内容書き換えより速くな る。 D E,F Update: E→E' E',F Copy&Update E',F Update 1KB 1KB 1KB 1KB 1KB 1KB 1KB 1KB
Upsert系の性能は1ファイルのサイズに左右される • ベンチマークのように初期データを細かく分割して書き込んだ直後、 MoRとCoWは変わらない • 長期運用だと設計上MoRが速くなる 14 MoRは常に速いわけではない Copy-On-Write Table Merge-On-Read Table 書き込みし続けると A, B, G,H,I . . C,M, N,U,V ,... A, B C E,F,K, L,X... D 長期運用(書き込みし続け る)と、CoWのファイルは 大きくなり、コピーも遅く なる。 MoRは差分毎ファイル作っ ているため(*)、操作が速い D,O,Q ,R,S,T . .. E,F Update: E→E' E',F Copy&Update E',F Update 120MB 120MB 120MB 120MB X Y Z 1KB * Compactionされる前のデータに限る
ベンチマーク中予想外のことが発生していた ・元データに対して、Hudiの書き込み量は2倍ぐらい ・CoWに対するWriteで大量のSmall File(i.e. < 120MB)が作られた ー 小さいファイルが埋める前に新しいファイル作らないはず 色々と調査した結果・・・ 15 2:Hudiの隠し前提条件 HudiはRecordサイズを1KBと仮定している approxRecordSize(Default = 1024) Recordサイズの平均値。データを既に書き込んだ場合データから計算 することもできるが、それ以外はこの仮定値を使う。 この設定はあらゆるところに影響が出る。
Hudiは1レコード1KB前提で設計されている ー Incremental Readなどの機能を実現するために必要 ー e.g. メタデータの設計、レコード書き込みアルゴリズム 16 書き込み量問題とHudiメタデータ 生のデータ Hudi メタデータ Index データ HudiのParquet ・生のParquetに相当 ・Hudiはこの部分を1KBと仮定 ・今回のデータは約130bytes ・Record毎に付与する ・約130 bytes、サイズ調整不可 ・ファイル毎付与する ・BloomFilterを使う場合のみ存在 ・データ量と関係なく422KB ・サイズは調整可能 - bloomFilterNumEntries - bloomFilterFPP 1ファイル約100Kレコードの場合、最大メタデータ量: 130𝑏𝑦𝑡𝑒𝑠 × 100,000 + 422𝐾𝐵 = 13.42𝑀𝐵 Hudiの仮定通りRecordサイズ1KBなら、1ファイル約+13.4% 今回の場合は、+100%の書き込み量となった
17 ファイルサイズへの影響 20 40 60 80 100 120 140 0 500 1000 1500 2000 2500 3000 3500 ファイルサイズ(MB) レコードサイズ(Byte) ファイルサイズとレコードサイズの関係 Averge File Size(Copy On… ファイル最大サイズ (PARQUET_FILE_MAX_SIZE) 40 90 140 900 950 1000 1050 1100 1150 1024bytes レコードサイズが小さすぎると、出来上がったファイル もSmall File(i.e. <120MB)になる。 小さいレコードは 特に苦手 レコードサイズ仮定値の手前 でのファイルサイズが一番大きい
• 実はIncrement Queryの実装は全然速くない – MoRだと速いはずだが、まだ対応してない • スケーラビリティー重視のBulk Insertはパラメータによって遅くな ることもある • ファイルDelete/Updateしても、ファイルにあるレコードの順番は 変わらない • レコードサイズが時間によって大きく変動する場合、Small Fileが できたり、巨大ファイルできたりするかもしれない 18 その他気づいたところ 興味ある方はぜひAsk the Speakerで質問していただければ
19 本講演のまとめ • データレイクへの期待が多様化しており、それを受けて Apache Hudiが開発された • バッチ・ストリーミングを同じレイヤで処理するポスト・ラムダ アーキテクチャ • 多様なワークロードに対応するテーブルタイプ(CoW・MoR) • まだ1.0に向けて開発中のため、対応しきれてないところ もある • 現在はUber自身のユースケース(レコードサイズの仮定など)を優先して いる • 予想通りにならない時、隠し仕様あるかを一回調べましょう
20 以下、参考資料
Hudiは1レコード1KB前提で設計されている ー Incremental Readなどの機能を実現するために必要 ー e.g. メタデータの設計、レコード書き込みアルゴリズム 21 Small File問題 private void assignInserts(WorkloadProfile profile) { ... long averageRecordSize = averageBytesPerRecord(metaClient.getActiveTimeline().getCo mmitTimeline().filterCompletedInstants(), config.getCopyOnWriteRecordSizeEstimate()); List<SmallFile> smallFiles = getSmallFiles(partitionPath); ... for (SmallFile smallFile : smallFiles) { long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts); src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java, ファイルに書き込むレコード数 はRecordサイズで算出される。 実際のレコードは1KBより小さ い場合、Small Fileになる 初回書き込みなどレコードサ イズ計算できない場合、 Hudiの仮定値を使う ファイルに書き込 むべきレコード数 を計算する関数
Insert: 一般的な書き込み Bulk_insert:スケーラアップしやすいInsert 22 2:書き込みーInsert vs Bulk_Insert 0 0.005 0.01 0.015 0.02 0.025 0.03 0.035 0.04 0.045 0.05 0 20 40 60 80 100 120 bulk_insert vs insert性能比較 Bulk Insert(COW) Bulk Insert(MOR) Insert(COW) Insert(MOR) ScaleFactor=50(約10GB)からInsertの効率 が落ち始め、Bulk_Insertの効率は上がり続け る Time(ms)/Record Scale Factor 遅い 速い 0.004 0.0045 0.005 0.0055 0.006 40 60 80 100 120 ただし、どんなサイズのデータでも速いわけではない 小さいデータでは、 Insertは圧倒的に速い
23 Bulk Insertのターゲットデータサイズ Bulk Insertが速いのは一定のサイズまで – このサイズは、以下2つのパラメータに影響される BulkInsertParallelism(Default = 1500) ParquetFileMaxSize(Default = 120MB) 参考ページ:https://hudi.apache.org/docs/configurations.html 予め用意するファイルの数。このファイルを すべて使い切る前に新しいファイル作る必要 はない。 1つのファイルの最大サイズ。 このサイズを越えたファイルには 新しいデータを書き込めない。 初期状態のターゲットサイズは1500 × 120𝑀𝐵 = 175𝐺𝐵 このサイズ越えたら、普通のInsertと変わらなくなる 違うサイズを想定しているならば、BulkInsertParallelismを優先的に調整する (ParquetFileMaxSizeは副作用出るかもしれないので、調整は慎重に)

ポスト・ラムダアーキテクチャの切り札? Apache Hudi(NTTデータ テクノロジーカンファレンス 2020 発表資料)