現在位置: ホーム / ビッグデータ ブログ / Airflowを使ってワークフロー管理を行う

Airflowを使ってワークフロー管理を行う

過去に「Azkaban」を取り上げましたが、今回は「Airflow」を取り上げます。 サンプルは、TreasureDataと連携するワークフローを作成してみます。

Airflowとは

Airbnb社がオープンソースで公開しているツールで、依存関係のある複数のタスクを実行するワークフローエンジンです。
JOBのスケジューリングとモニタリングができます。
pythonスクリプトで定義していくため、pythonのプログラミング能力が必要です。

ワークフロエンジンに必要な、以下の機能が備わっています。

  • スケジューリング
  • タスクの依存関係
  • 実行ログ出力
  • エラー含むメール通知
  • エラーハンドリング
  • リトライ
  • ジョブを構成するタスクの単体実行

Github: https://github.com/apache/incubator-airflow
Document: https://pythonhosted.org/airflow/index.html

 導入方法 

今回もインストールからジョブを設定するところまでをみていきます。

1.動作環境 

動作環境は下記になります。

  • Amazon Linux
  • Python 2.7.10
  • airflow 1.7.1.2
  • td-client 0.4.2

2.インストールと起動

$ sudo su -

# yum install gcc-c++

# pip install airflow

# pip install td-client

# airflow initdb

3.Web UIへのアクセス

Web UIがみえるように、AirflowのWebサーバを起動します。
ここでは、80ポートを指定して起動します。
# airflow webserver -p 80
下記URLでアクセスできます。
http://<webサーバホスト名>/

WebUIにアクセスをすることができましたが、JOBの実行状況を表示するのみのため、

次にワークフローを作成していきます。

 4.ワークフローの作成

ワークフローはpythonスクリプトで定義します。
次のような、ワークフローをpythonで作成します。

  1. TreasureDataにあるテーブルのカウントを取得するジョブクエリを発行し、そのジョブIDをファイルに保存する
  2. TreasureDataに対し、そのジョブIDからジョブの実行結果を取得してファイルに保存する
  3. ジョブ実行結果を保管したファイルが存在すれば、ワークフローの処理が完了したことを示すテキストファイルを生成する

Airflowのワークフロー実装例「spike_my_tasks.py」

from __future__ import unicode_literals
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import time
import os
import json
import tdclient
import logging

days_ago = datetime.combine(datetime.today() - timedelta(1),
datetime.min.time())

# "start_date" = ジョブをいつから実行するかの実行開始日を指定します
# "email" = airflow.cfgの[smtp]セクションにて、smtpサーバの設定が必要です
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": days_ago,
"provide_context": True,
"email": ["<宛先メールアドレス>"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5)
}

# "schedule_interval" = バッチ実行間隔を指定
dag = DAG("spike_my_tasks", default_args=default_args, schedule_interval=timedelta(hours=18, minutes=02))

DEFAULT_ENDPOINT = "https://api.treasuredata.com/"
API_KEY = "<API KEY>"

def doMyTask1(**kwargs):
client = tdclient.Client(apikey=API_KEY, endpoint=DEFAULT_ENDPOINT)
job = client.query("sample_datasets", "SELECT COUNT(referer) FROM www_access", type="presto")
job.wait()
job.update()
if not job.success():
raise RuntimeError("job {0} {1}\n\nOutput:\n{2}".format(job.job_id, job.status(), job.debug["cmdout"]))

path_task1="MyTask1.job"
state_dir = os.path.dirname(path_task1)
if state_dir != "" and not os.path.exists(state_dir):
os.makedirs(state_dir)
with open(path_task1, "w") as f:
state = {"job_id": job.job_id, "status": job.status()}
json.dump(state, f)

return path_task1

def doMyTask2(**kwargs):
ti = kwargs["ti"]
retval1 = ti.xcom_pull(task_ids="myTask1")
logging.info(retval1)

loadData = ""
with open(retval1) as f:
loadData = json.load(f)
logging.info(loadData)

client = tdclient.Client(apikey=API_KEY, endpoint=DEFAULT_ENDPOINT)
job = client.job(loadData["job_id"])

path_task2="MyTask2.csv"
with open(path_task2, "w") as f:
f.write(",".join([c[0] for c in job.result_schema]))
f.write("\n")
for row in job.result():
f.write(",".join([str(c) if c else "" for c in row]) + "\n")

return path_task2

def doMyTask3(**kwargs):
ti = kwargs["ti"]
retval2 = ti.xcom_pull(task_ids="myTask2")
logging.info(retval2)

path_task3="MyTask3.txt"
with open(retval2) as f:
logging.info(f.read())
with open(path_task3, "w") as f:
f.write("done")

return path_task3

t1 = PythonOperator(
task_id="myTask1", dag=dag, python_callable=doMyTask1)
t2 = PythonOperator(
task_id="myTask2", dag=dag, python_callable=doMyTask2)
t3 = PythonOperator(
task_id="myTask3", dag=dag, python_callable=doMyTask3)

t2.set_upstream(t1)
t3.set_upstream(t2)
こちらの作成したワークフローのモジュールファイルを、デフォルトの配置先フォルダ「~/airflow/dag」に配置します。
配置しましたら、ワークフローがAirflowのスケジューラーとWeb UIに登録されるよう、スケジューラーとWeb UIのデーモンを、以下のように、起動もしくは再起動します。
# pgrep -f 'gunicorn: master' | xargs kill -s SIGKILL
# pgrep -f 'gunicorn: worker' | xargs kill -s SIGKILL
# airflow webserver -p 80
# pgrep -f '/usr/local/bin/airflow scheduler' | xargs kill -s SIGKILL
# airflow scheduler
登録されると、以下の赤枠部分のように、Web UIに表示されます。
赤枠で、新規登録されたワークフローがOFFとなっていたら、実行されるよう、以下のコマンドでONにします。
# airflow unpause spike_my_tasks
ワークフローがスケジューリングされた時間に実行された後、Web UI の「Recent Statuses」には、一連のタスクの成功した数や失敗した数が表示されます。
また、画面を辿っていくと、それぞれのタスクの実行された時間や実行ログなどが表示されます。
 

まとめ

Airflowは、以下の特徴があります。

  • タスク毎の実行時間などの情報が可視化される
  • エラーなどのログが詳細に表示されるので追跡しやすい
  • ワークフローを登録するのにはAirflowの再起動が必要


弊社では、本記事のようなビッグデータ分析基盤導入など、ビッグデータ活用を総合的にご支援するサービス「SIOS BigData One Stop Solution」として提供させて頂いております
ビッグデータのお困りごとについてのご相談は、以下リンクにあるお問合せフォームからお問合せください。
SIOS BigData One Stop Solution


無料ビッグデータハンズオンセミナーのご案内

8/10(水)にTreasure DataとTableauでビッグデータ分析基盤を作ってみる”ハンズオンセミナーを開催致します。基本的な内容ではございますが、ビッグデータ分析基盤づくりを体感いただけるイベントとなっております。ご興味お持ちいただいた方は是非ビッグデータハンズオンセミナー申込ページよりお申込みください。



斎藤@SSTD