ベアメタルで実現するSpark&Trino on K8sなデータ基盤

-- Views

October 05, 23

スライド概要

2023/10/05 Trino/Presto Conference Tokyo 2023 (Online)
https://techplay.jp/event/907388

profile-image

嫁と子供と酒で出来ているインフラエンジニア。

シェア

またはPlayer版

埋め込む »CMSなどでJSが使えない場合

(ダウンロード不可)

関連スライド

各ページのテキスト
1.

2023/10/05 Trino/Presto Conference Tokyo 2023 (Online) ベアメタルで実現する Spark&Trino on K8sなデータ基盤 株式会社マイクロアド 永富 安和 ( @yassan168 ) #trinodb

2.

事業紹介(データプラットフォーム事業) DSP #trinodb SSP 広告を出したい「広告主」向け 広告主/代理店 広告 広告代理店 広告 A 広告主 広告 B : 広告出稿料 リアルタイムで取引 (RTB) AD : 広告を出して欲しい「Webメディア」向け ユーザー メディア ユーザーA ニュース ユーザーB グルメ : 広告表示 : データ紐づけ 位置情報 EC購買 提携企業DB 提携企業DB Data Management Platform Web行動 リアル購買 提携企業DB 5

3.

現行のデータ基盤の概要 #trinodb 分析クエリ実行 ト 値で 理論 タバイ 約2ペ ている し 保持 約13T B/day を処理 Impala は 流量 ーダー 均 オ 平 Gbit 間 秒 広告配信サーバ 各種サーバ 参照 参照 Streaming ログ転送 分析(CDH v6.3.2) Data Lake(CDH v5.16.2) ジョブ実行 ジョブ実行 Spark Streaming (リアルタイム加工) ワークフロー

4.

現行のデータ基盤の概要 #trinodb 分析クエリ実行 今日はここの話 参照 Impala 平均流量: 秒間Gbitオーダー 広告配信サーバ 各種サーバ 参照 Streaming ログ転送 分析(CDH v6.3.2) Data Lake(CDH v5.16.2) ジョブ実行 ジョブ実行 Spark Streaming (リアルタイム加工) ワークフロー

5.

現行のデータ基盤の課題 1. CDH無償版の提供が終了しているので継続して利用出来ない ○ 有償の後継版Cloudera CDPも検討したが費用面がクリア出来ず見送り (Google Cloudなども検討したが、費用や技術課題がクリア出来ず見送り。5年償却で見るとクラウドは高い。) 2. ComputeとStorageを分離してNode配置出来ないので サーバスペックが過剰になりがち ○ ○ YARNのNode ManegerとHDFSは分離して配置出来ない ComputeスケールさせたいだけなのにStorageもスケールするので非効率 3. Impalaの統計情報の運用が非常に煩雑かつ有効に利用出来ない ○ ○ 大規模テーブルの場合、ほぼ使えない 統計情報が利用できないので効率の悪いクエリになりがちでImpalaを活かしきれない 4. ETL/ELT処理で利用しているMapReduceベースのHiveが遅い ○ 本来はMapReduceではなく、Tez・LLAPを使うべきだがCDHが古くて利用できない 5. テーブル構造が複雑なので、SQLベースでETL/ELT処理するのが辛い ○ 複雑なクエリになりがちで、改修に難易度が高く手間がかかる #trinodb

6.

新しいデータ基盤に求める事 1. ComputeとStorageを分離したい 2. ETL/ELT処理は、SQLベースではなく、Programmableに処理したい 3. SQLエンジンは大規模なテーブルでも統計情報を更新・有効活用が出来ること 4. Hiveテーブルの様にオンラインで柔軟なスキーマ進化が可能であること #trinodb

7.

新しいデータ基盤に求める事 1. ComputeとStorageを分離したい 😆 2. Sparkを使ってスクリプトベースに処理することで、複雑なSQLでの処理が不要になった SQLエンジンは大規模なテーブルでも統計情報を更新・有効活用が出来ること 😆 4. HiveテーブルからIcebergテーブルに変更し、HDFSからS3互換のアプライアンスに置き換えること でYARN・Zookeeperに依存しなくなり分離が可能になった(構成要素も減ったので構築も楽になった) ETL/ELT処理は、SQLベースではなく、Programmableに処理したい 😆 3. #trinodb Trino&Icebergを使うことで、Hive・Impalaに依存せずに、柔軟に統計情報の更新・利用する Hiveテーブルの様にオンラインで柔軟なスキーマ進化が可能であること 😆 Iceberg特有のスキーマ進化(orパーティション進化)により、以前より柔軟な運用が可能になる

8.

新しいデータ基盤の概要 #trinodb

9.

新しいデータ基盤の概要 ォーカ 今日はここにフ ス #trinodb

10.

どうやって切り替えていくか? #trinodb 予算やデータセンタの設備の問題、技術的な課題などなど、いろんな理由から、 分析用クラスタ→Data Lake用クラスタの順に切り替えていきます。 詳しい話を始めると枠にまったく収まらないので、 マイクロアドの技術ブログなどで発信して行きます! 1 に tup # /296440/ e e M nt kyo m/eve SG To o F c T . O s の as 。 10/12 connp . は o ださい y く k く o し t て も g捕まえ /otfs / で : s こ p そ htt ので、 る す 参加

11.

アドホック分析用としてのTrinoで工夫したこと ● #trinodb Kubernetes(RKE2)を使うことでクラスタの構築やアップデートを楽にした ○ ○ Trino自体の構成がCoordinator・Workerと構成がシンプルでPersistentVolume (PV)が不要。そ の為、マニフェストをシンプルに保てるのでK8sでの運用はさほど辛くない。 RKE2のsystem-upgrade-controllerがあるので設定書いてapplyするとローリングアップグレー ドしてくれるので便利 https://docs.rke2.io/upgrade/automated_upgrade/ (もしくはRancher Web UIからポチーがもっと簡単) ● Rancherを使ってK8sクラスタ管理することで管理コストを下げ利便性を向上 ● Helm ChartにはTrino公式のものよりもこなれている github.com/valeriano-manassero/helm-charts を使用した ○ ○ JVMのヒープサイズの指定を-Xmx/-Xmsではな く-XX:MaxRAMPercentage/-XX:InitialRAMPercentageを使って使用可能なメモリーに対する 割合で指定するように変更 Affinityを利用して、CoordinatorとWorkerポッドの同居を禁止し、 Workerポッドはなるべく同一Nodeに2個以上配置しないようにする

12.

アドホック分析用としてのTrinoで工夫したこと ● Fault-tolerant executionを使い、 搭載メモリ以上のクエリを利用できるようにした ○ ○ ○ 搭載メモリの10倍以上のクエリでも実行可能になった ※ただし、実行時間は延びる Exhange managerを有効にしてTASKリトライポリシーを 使用しました。 中間データ用ストレージはS3以外にHDFSにも対応。超便利。 ージ ストレ 散 分 ュして タを シ ー ッ デ の中間 kがクラ 間データ Stage間 、途中でTas ずに中 せ し ュ 存 シ ッ に保 はクラ リ エ る もク 復旧す て っ を使 #trinodb

13.

アドホック分析用としてのTrinoで工夫したこと ● #trinodb Spill-to-diskを使って、OOMを起きにくくするようにした ○ Helm Chartを改修して、Coordinator・WorkerポッドにemptyDirボリュームをマウントすること で、OOMで落ちても復帰時に再利用出来るようにした ● Icebergテーブルで利用するカタログにはRESTカタログを用うことで、 TrinoやSparkなどからIcebergテーブルを利用しやすくした ● Hive→Icebergテーブル移管の際は、IcebergのSparkのadd_filesプロシージャを 使うことで、Icebergテーブル側に過去分のデータをコピーを不要にした ○ HiveテーブルはHDFS上にあるので、add_filesプロシージャで出来たIcebergテーブルの データは、S3とHDFSの両方を参照することになるので、カタログ(REST)の io-implプロパティにorg.apache.iceberg.io.ResolvingFileIOを利用することで両方に対応した

14.

補足:add_filesプロシージャって? 移行元のデータをIcebergテーブルにコピー せずに参照出来るようにするIcebergの Sparkのプロシージャ。 #trinodb Iceberg Catalog db1.tbl1 現Metadata Path S3 数万パーティションある様なテーブルの場 合、一度にコピーするには時間がかかるが、 これなら移行元の更新を止めずに移行が可 能。 Metadata File Metadata File Manifest List Manifest List Manifest File Manifest File Manifest File 参照 して るだ コピ けな ーが ので いら ない /db1/tbl1 パーティション単位で実行が可能なので、 移行&検証が終わるまでの期間はadd_fileを 使い追加分を更新し、準備が終わったら Icebergテーブルにデータ書いていけば良い ので移行作業の効率が良い。 /db1/tbl1/k1=A /db1/tbl1/k1=A/k2=1 File File File /db1/tbl1/k1=B /db1/tbl1/k1=A/k2=2 File File File Hive Metastore db1.tbl1 /db1/tbl1/k1=B/k2=1 File File File

15.

補足:TrinoはどうやってHiveやIcebergテーブルを参照するの? Hiveテーブルの場合 Icebergテーブルの場合 thrift経由でHive Metastoreに対し て、メタデータとデータの格納先ディ レクトリを取得するだけで、 実際のデータ処理はTrino側で実施。 カタログから欲しいFileの全パスとメタデータを取 得し(取得方法はカタログ実装によりけり)、その 情報を元にTrino側で処理を実施。 その為、YARNなどは一切関与しない。 Icebergは欲しいFileパスが①の段階で確認出来る (HiveはHDFSディレクトリまでなので②のタイミングで探 す必要がある) Iceberg Catalog /db1/tbl1 トリ ィレク デ が o n eを ②Tri いFil し 欲 ら 情報か 理する 処 探して ト ィレク デ の タ ①デー ータを取得 デ とメタ リ情報 /db1/tbl1/k1=A db1.tbl1 現Metadata Path ①欲しいFileのパス とメタデータを取得 /db1/tbl1/k1=A/k2=1 S3 File File File Hive Metastore db1.tbl1 Metadata File ②Trinoだけで Fileを処理する Manifest List Manifest File File File File Storege(S3)

16.

現状、困っていること #trinodb 1. Icebergのadd_filesプロシージャが TrinoのIcebergコネクターから利用できない ○ Add `add_files` procedure in Iceberg connector · Issue #11744 · trinodb/trino 2. Icebergのadd_filesプロシージャを使ったテーブルにて、 参照元のHiveテーブルにtimestamp型があった場合、 そのままでは以下のエラーが出て参照出来ない ○ ○ ○ エラー文:Query 20231002_061128_00067_tqdd2 failed: Unsupported Trino column type (timestamp(6) with time zone) for Parquet column ([update_time] optional int96 update_time = 8) Trino Iceberg not honoring existing timestamp column type name of the table created outside Trino (e.g. Spark) stored in HMS · Issue #11442 Hive/Impalaで利用しているTIMESTAMP型はINT96でParquetファイルで書き込みしている がTrinoで利用しているParquetライブラリは新しくINT96に対応していない事が影響

17.

今後の予定 #trinodb 1. 9月末にリリースのあったTrino Gateway trinodb/trino-gateway を利用する ● ● ● Trinoクラスタを2系統用意してTrino Gateway経由で利用する Trinoの設定反映やアップグレードの際に片系ずつ実施する事が可能になるのでサービスの ダウンタイムをなくすことが出来る TrinoのCoordinatorはHA構成が取れない(補足を参照)ので、 Trinoサービスとしての可用性向上の目的の意味もある 2. 利用状況に合わせたResource groupsとSession property managerの設計 ● ● ● クエリ実行時間の制限(連続XX時間まで) ユーザに合わせたクエリ種別の制限 ○ XXXユーザは特定のテーブルにはSELECTのみに限定 分析クラスタ内でのバッチへのリソース割当てを最優先にする 3. Icebergの統計情報を使ったパフォーマンス改善と運用整備

18.

補足

19.

#trinodb ● Fault Tolerant Executionに関する情報 ○ wikiにある公式ドキュメントには無い詳細な説明 https://github.com/trinodb/trino/wiki/Fault-Tolerant-Execution ○ Trino | Using Trino as a batch processing engine https://trino.io/blog/2022/06/24/trino-meetup-extract-trino-load.html ○ Trino | Project Tardigrade delivers ETL at Trino speeds to early users https://trino.io/blog/2022/05/05/tardigrade-launch.html ● TrinoのHAに関連する情報 ○ Can you set up Trino in HA mode? - Trino - Starburst forum https://www.starburst.io/community/forum/t/can-you-set-up-trino-in-ha-mode/31 ○ High Availability · Issue #391 · trinodb/trino https://github.com/trinodb/trino/issues/391 ● Icebergについて深く知る事が出来る良い記事 ○ Apache Iceberg: An Architectural Look Under the Covers | Dremio https://www.dremio.com/resources/guides/apache-iceberg-an-architectural-look-under-the-covers/

20.

#trinodb ● Hive→Iceberg移管に関して参考になるブログ記事 ○ How to Migrate a Hive Table to an Iceberg Table | Dremio https://www.dremio.com/blog/how-to-migrate-a-hive-table-to-an-iceberg-table/ ○ Migrating a Hive Table to an Iceberg Table Hands-on Tutorial | Dremio https://www.dremio.com/blog/migrating-a-hive-table-to-an-iceberg-table-hands-on-tutorial/ ● 利用しているIcebergのREST Catalog実装 ○ ○ https://github.com/tabular-io/iceberg-rest-image Iceberg's REST Catalog: A Spark Demo • Tabular https://tabular.io/blog/rest-catalog-docker/ ● Icebergを知りたいならここから始めると参考になる記事 ○ Apache Iceberg 101 - Your Guide to Learning Apache Iceberg Concepts and Practices | Dremio https://www.dremio.com/blog/apache-iceberg-101-your-guide-to-learning-apache-iceberg-concept s-and-practices/ ○ Apache Iceberg FAQ | Dremio https://www.dremio.com/blog/apache-iceberg-faq/#h-what-is-a-data-lakehouse

21.

#trinodb ● ParquetファイルのINT96関連情報 ○ parquet-format/LogicalTypes.md https://github.com/xhochy/parquet-format/blob/cb4727767823ae201fd567f67825cc22834c20e9 /LogicalTypes.md#int96-timestamps-also-called-impala_timestamp ○ Parquet: Support filter operations on int96 timestamps by thesquelched · Pull Request #2563 · apache/iceberg https://github.com/apache/iceberg/pull/2563 ○ 'NOT_SUPPORTED: Unsupported Trino column type (date) for Parquet column ([today] optional int64 today (TIMESTAMP(MICROS,false))) · Issue #17733 · trinodb/trino https://github.com/trinodb/trino/issues/17733 ● S3互換ストレージと言えばMinIO以外にもApache Ozoneもあるよ(宣伝) ○ S3互換のオブジェクトストレージ Apache Ozoneに関する情報(随時更新) - Qiita https://qiita.com/yassan168/items/1e3c000284ae6fc8448c

22.

#trinodb ● RKE2 ○ ● Rancherを利用したモニタリング&アラート ○ ● https://docs.rke2.io/ https://ranchermanager.docs.rancher.com/pages-for-subheaders/monitoring-and-aler ting Rancher ○ ○ https://www.rancher.com/ 日本のユーザコミュニティもあるのでよろしくです。 ■ https://rancherjp.connpass.com/