タスクスケジューラー「Apache Airflow」の使い方について備忘録

  •  
 
トビウオ2019年11月5日 - 13:41 に投稿

タグ

ソフトウェアの概要

  • ジョブ(シェルコマンド実行、Python実行など)をDAG(有向グラフ)として管理し、自動で直列/並列実行させることができるツールです
  • 1つのDAGはPythonコードで表現されており、所定のディレクトリにコードが書かれたファイルを置きます
  • DAGの実行、および各タスク/ジョブの可否や出力結果などについては、Webブラウザ上から確認することができます
  • ジョブの成功時や失敗時にメール送信する機能が標準搭載されています。定期タスクを実行するのに便利かと思われます

導入方法

  • まず公式のGitHubリポジトリをcloneします
  • 次に、docker-compose-CeleryExecutor.ymldocker-compose.ymlと見立てて、Docker Composeでコンテナ一式を展開します
  • 後は、Webブラウザ上でシステム起動を確認すればOKです。参考資料にも書かれている通り、標準では8080ポートをWebブラウザ(による管理画面)に使用しています
  • dagsディレクトリに、DAGの設定を記したファイルを置いておくと、システム上で認識され、DAGとして使用できるようになります

UIの説明

  • DAGs画面では、DAGの一覧を確認できます。また、各種操作やステータス確認を行えます
    • トグル列では、トグルによってDAG実行をON/OFFできます
    • 「DAG」列のリンクをクリックすると、DAG内のタスクの構造、実行統計、ガントチャートなどが確認できます
    • 「DAG Runs」列からDAGの実行状況を確認できます
    • 「Recent Tasks」列からタスクの実行状況(ログ含む)が確認できます
    • 「Links」列から手動実行や各種ページにジャンプすることができます

メイン画面

  • DAG」列のリンクをクリックした先のUIはこんな感じです。

DAG別の画面

使い方

参考資料を参照してください。

REST APIについて

例えばAirflow REST API PluginAirflow API Pluginを導入すれば使用できます。
以前業務で使用したのは前者ですが、Airflow REST API Pluginはかなり癖が強いプラグインでした。例えば、DAGの一覧を返すAPIを叩くと、

{
  "airflow_cmd": "airflow list_dags",
  "arguments": {
    "api": "list_dags"
  },
  "call_time": "Wed, 08 Aug 2018 01:04:35 GMT",
  "http_response_code": 200,
  "output": {
    "stderr": "b'/usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The
psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary
please use \"pip install psycopg2-binary\" instead. For details see: <http://initd.org/psycopg/docs/
install.html#binary-install-from-pypi>.\\n'b'  \"\"\")\\n'",
    "stdin": "",
    "stdout": "b'[2018-08-08 02:05:28,737] {configuration.py:206} WARNING - section/key [celery/
celery_ssl_active] not found in config\\n'b'[2018-08-08 02:05:28,737] {default_celery.py:41} WARNING
- Celery Executor will run without SSL\\n'b'[2018-08-08 02:05:28,738] {__init__.py:45} INFO - Using
executor CeleryExecutor\\n'b'[2018-08-08 02:05:28,784] {models.py:189} INFO - Filling up the
DagBag from /usr/local/airflow/dags\\n'b'[2018-08-08 02:05:28,800] {models.py:2191} WARNING -
schedule_interval is used for <Task(BashOperator): t1>, though it has been deprecated as a task
parameter, you need to specify it as a DAG parameter instead\\n'b'[2018-08-08 02:05:28,801]
{models.py:2191} WARNING - schedule_interval is used for <Task(BashOperator): t2>, though it has
been deprecated as a task parameter, you need to specify it as a DAG parameter instead\\n'b'[
2018-08-08 02:05:28,802] {models.py:2191} WARNING - schedule_interval is used for <Task(
BashOperator): t3>, though it has been deprecated as a task parameter, you need to specify it as a
DAG parameter instead\\n'b'[2018-08-08 02:05:28,802] {models.py:2191} WARNING -
schedule_interval is used for <Task(BashOperator): t4>, though it has been deprecated as a task
parameter, you need to specify it as a DAG parameter instead\\n'b'\\n'b'\\n'b'
-------------------------------------------------------------------\\n'b'DAGS\\n'b'
-------------------------------------------------------------------\\n'b'first_dag\\n'b'second_dag\\n'
b'\\n'"
  },
  "post_arguments": {},
  "response_time": "Wed, 08 Aug 2018 02:05:28 GMT",
  "status": "OK"
}

といった風に返ってきます。本来欲しいデータは

['first_dag', 'second_dag']

だけなのですが、このプラグインの場合、標準入出力の文字列がそのまま返ってきます
ゆえに、ちまちま文字列処理しないと所望の結果を取り出せません。stdoutに対して最低限の前処理を行った結果はこれで、

[2018-08-08 02:05:28,737] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config
[2018-08-08 02:05:28,737] {default_celery.py:41} WARNING - Celery Executor will run without SSL
[2018-08-08 02:05:28,738] {__init__.py:45} INFO - Using executor CeleryExecutor
[2018-08-08 02:05:28,784] {models.py:189} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2018-08-08 02:05:28,800] {models.py:2191} WARNING - schedule_interval is used for <Task(BashOperator): t1>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2018-08-08 02:05:28,801] {models.py:2191} WARNING - schedule_interval is used for <Task(BashOperator): t2>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2018-08-08 02:05:28,802] {models.py:2191} WARNING - schedule_interval is used for <Task(BashOperator): t3>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2018-08-08 02:05:28,802] {models.py:2191} WARNING - schedule_interval is used for <Task(BashOperator): t4>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
first_dag
second_dag

ここから更にパースしてくれといった雑な実装です。 あるだけマシですが

参考資料

コメントを追加

プレーンテキスト

  • HTMLタグは利用できません。
  • 行と段落は自動的に折り返されます。
  • ウェブページのアドレスとメールアドレスは自動的にリンクに変換されます。
CAPTCHA
この質問はあなたが人間の訪問者であるかどうかをテストし、自動化されたスパム送信を防ぐためのものです。
画像
参考画面-1
参考画面-2