現在位置: ホーム / ビッグデータ ブログ / Digdagを使ってワークフロー管理を行う

Digdagを使ってワークフロー管理を行う

過去に「Airflow」を取り上げましたが、今回は「Digdag」を取り上げます。 サンプルは、TreasureDataと連携するワークフローを作成してみます。
過去の記事は、「Airflowを使ってワークフロー管理を行う」となります。

Digdagとは

TreasureData社がオープンソースで公開しているツールで、依存関係のある複数のタスクを実行するワークフローエンジンです。
プログラマではなくてもわかりやすいよう、YAMLに対し、DSLにて、DAGの構造で、ワークフローを定義することができます。

ワークフロエンジンに必要な、以下の機能が備わっています。

  • タスクの依存関係
  • スケジューリング
  • エラー処理
    • リトライ
    • メール通知
  • タスクの並列実行
  • タスク実行ログの収集

Github: https://github.com/treasure-data/digdag/
Document: http://docs.digdag.io/

 導入方法 

今回もインストールからジョブを設定するところまでをみていきます。

1.動作環境 

動作環境は下記になります。

  • Azureu上仮想マシンでCentOS 6.8
  • jdk-8u112-linux-x64.rpm
  • Digdag 0.8.22

2.インストールと起動

Digdagをインストールします。
$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
$ chmod +x ~/bin/digdag
$ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc

次に、Digdagをサーバーとして起動できるよう、「~/.config/digdag/」直下に、以下の設定ファイルを2つ作成します。

「digdag.properties」の作成例

database.type=h2
database.path=/home/cent/bin/test.db
digdag.secret-access-policy-file=/home/cent/.config/digdag/secret-access-policy.yaml
#「http://docs.digdag.io/command_reference.html#secret-encryption-key」と同じ設定を指定
digdag.secret-encryption-key=MDEyMzQ1Njc4OTAxMjM0NQ==

「secret-access-policy.yaml」の作成

以下のリンク先と同じ設定を指定する

それから、作成後に以下のコマンドでDigdagのサーバを起動します。
cd ~/bin/
digdag server -c ~/.config/digdag/digdag.properties -L ~/bin/logs/server.log -l debug -O ~/bin/logs -A ~/bin/logs &

 3.ワークフローの作成

TreasureDataにある2つのテーブルにて、一方のテーブルに、他方のテーブルからクエリ実行で取得したレコードを挿入するワークフローを作成します。
ワークフローには、以下のタスクを定義します。

  1. レコード挿入前に、TreasureDataにあるテーブルのカウントを取得するジョブクエリを発行する
  2. 前に取得したカウントを、メール本文を定義したテンプレートを読込み、メール送信する
  3. TreasureDataに保存してあるジョブクエリを実行する(ジョブクエリは、一方のテーブルに、他方のテーブルにクエリを実行して取得したレコードを挿入するクエリ)
  4. レコード挿入後に、TreasureDataにあるテーブルのカウントを取得するジョブクエリを発行する
  5. 前に取得したカウントを、メール本文を定義したテンプレートを読込み、メール送信する

ワークフロー実装例「spike.dig」

timezone: Asia/Tokyo

schedule:
  cron>: 30 14 7 12 *

_export:
  td:
    database: spikedb
  mail:
    port: 465
    from: ""
    to: [""]
    debug: true

_error:
  mail>: ./tmpl/err.txt
  subject: 【digdag mail test】err mail

+step1:
  td>: ./sql/step1.sql
  store_last_results: true
  engine: hive

+step1-1:
  mail>: ./tmpl/info_step1.txt
  subject: 【digdag mail test】step1 info mail

+step2:
  td_run>: spikedb_spiketbl002_select _all

+step3:
  td>: ./sql/step3.sql
  store_last_results: true
  engine: hive
  
+step3-1:
  mail>: ./tmpl/info_step3.txt
  subject: 【digdag mail test】step3 info mail

クエリ作成例「step1.sql」

select count(*) AS cnt1 from spiketbl001

メール本文テンプレート作成例「info_step1.txt」

${moment().format("YYYYMMDD HH:mm:ss")}
-----------
information
-----------
step1 sql count=${td.last_results.cnt1}

なお、今回サンプルとして作成するワークフローのDigdagプロジェクト「spike」は、「~/bin/」直下に作成し、フォルダツリーは以下のようにします。
$ tree spike
spike/
├─spike.dig
│
├─sql
│ ├── step1.sql
│ └── step3.sql
│
└─tmpl
   ├── err.txt
   ├── info_step1.txt
   └── info_step3.txt

次に、以下のコマンドにて、サーバに今回作成したワークフローを登録します。コマンドは、カレントディレクトリをプロジェクトフォルダに移動してから実行します。
cd ~/bin/spike
digdag push spike

さらに、以下のコマンドにて、サーバに今回作成したワークフロー中で使用される「secrets」パラメータの設定を登録します。パラメータの設定は、json形式でファイルに定義できるため、作成した定義ファイル「secrets.json」をコマンド引数に指定しています。
cd ~/bin/
digdag secrets --project spike --set @secrets.json

スケジュールの登録状況は、以下のコマンドで確認できます。
cd ~/bin/spike
digdag schedules

当該ワークフローの実行状況は、以下のコマンドで確認できます。
digdag sessions spike

まとめ

Digdagは、以下の特徴があります。

  • DAG形式のワークフローが作成できる
  • フロー制御や外部のスクリプト呼出しができプログラマブルなワークフローにできる
  • Treasure Dataの機能が使える
  • ワークフローで使用する環境変数などのパラメータを埋込みではなく外出しにできる
  • ワークフローの実行時間超過・失敗時のエラー通知ができる
  • タスクの並列実行機能がある

弊社では、本記事のようなビッグデータ分析基盤導入など、ビッグデータ活用を総合的にご支援するサービス「SIOS BigData One Stop Solution」として提供させて頂いております。
ビッグデータのお困りごとについてのご相談は、以下リンクにあるお問合せフォームからお問合せください。
SIOS BigData One Stop Solution

斎藤@SSTD