ソフトウェアの概要
- ジョブ(シェルコマンド実行、Python実行など)をDAG(有向グラフ)として管理し、自動で直列/並列実行させることができるツールです
- 1つのDAGはPythonコードで表現されており、所定のディレクトリにコードが書かれたファイルを置きます
- DAGの実行、および各タスク/ジョブの可否や出力結果などについては、Webブラウザ上から確認することができます
- ジョブの成功時や失敗時にメール送信する機能が標準搭載されています。定期タスクを実行するのに便利かと思われます
- まず公式のGitHubリポジトリをcloneします
- 次に、
docker-compose-CeleryExecutor.yml
をdocker-compose.yml
と見立てて、Docker Composeでコンテナ一式を展開します - 後は、Webブラウザ上でシステム起動を確認すればOKです。参考資料にも書かれている通り、標準では8080ポートをWebブラウザ(による管理画面)に使用しています
- dagsディレクトリに、DAGの設定を記したファイルを置いておくと、システム上で認識され、DAGとして使用できるようになります
- DAGs画面では、DAGの一覧を確認できます。また、各種操作やステータス確認を行えます
- トグル列では、トグルによってDAG実行をON/OFFできます
- 「DAG」列のリンクをクリックすると、DAG内のタスクの構造、実行統計、ガントチャートなどが確認できます
- 「DAG Runs」列からDAGの実行状況を確認できます
- 「Recent Tasks」列からタスクの実行状況(ログ含む)が確認できます
- 「Links」列から手動実行や各種ページにジャンプすることができます
- DAG」列のリンクをクリックした先のUIはこんな感じです。
参考資料を参照してください。
REST APIについて例えばAirflow REST API PluginやAirflow API Pluginを導入すれば使用できます。
以前業務で使用したのは前者ですが、Airflow REST API Pluginはかなり癖が強いプラグインでした。例えば、DAGの一覧を返すAPIを叩くと、
{
"airflow_cmd": "airflow list_dags",
"arguments": {
"api": "list_dags"
},
"call_time": "Wed, 08 Aug 2018 01:04:35 GMT",
"http_response_code": 200,
"output": {
"stderr": "b'/usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The
psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary
please use \"pip install psycopg2-binary\" instead. For details see: <http://initd.org/psycopg/docs/
install.html#binary-install-from-pypi>.\\n'b' \"\"\")\\n'",
"stdin": "",
"stdout": "b'[2018-08-08 02:05:28,737] {configuration.py:206} WARNING - section/key [celery/
celery_ssl_active] not found in config\\n'b'[2018-08-08 02:05:28,737] {default_celery.py:41} WARNING
- Celery Executor will run without SSL\\n'b'[2018-08-08 02:05:28,738] {__init__.py:45} INFO - Using
executor CeleryExecutor\\n'b'[2018-08-08 02:05:28,784] {models.py:189} INFO - Filling up the
DagBag from /usr/local/airflow/dags\\n'b'[2018-08-08 02:05:28,800] {models.py:2191} WARNING -
schedule_interval is used for <Task(BashOperator): t1>, though it has been deprecated as a task
parameter, you need to specify it as a DAG parameter instead\\n'b'[2018-08-08 02:05:28,801]
{models.py:2191} WARNING - schedule_interval is used for <Task(BashOperator): t2>, though it has
been deprecated as a task parameter, you need to specify it as a DAG parameter instead\\n'b'[
2018-08-08 02:05:28,802] {models.py:2191} WARNING - schedule_interval is used for <Task(
BashOperator): t3>, though it has been deprecated as a task parameter, you need to specify it as a
DAG parameter instead\\n'b'[2018-08-08 02:05:28,802] {models.py:2191} WARNING -
schedule_interval is used for <Task(BashOperator): t4>, though it has been deprecated as a task
parameter, you need to specify it as a DAG parameter instead\\n'b'\\n'b'\\n'b'
-------------------------------------------------------------------\\n'b'DAGS\\n'b'
-------------------------------------------------------------------\\n'b'first_dag\\n'b'second_dag\\n'
b'\\n'"
},
"post_arguments": {},
"response_time": "Wed, 08 Aug 2018 02:05:28 GMT",
"status": "OK"
}
といった風に返ってきます。本来欲しいデータは
['first_dag', 'second_dag']
だけなのですが、このプラグインの場合、標準入出力の文字列がそのまま返ってきます。
ゆえに、ちまちま文字列処理しないと所望の結果を取り出せません。stdoutに対して最低限の前処理を行った結果はこれで、
[2018-08-08 02:05:28,737] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config
[2018-08-08 02:05:28,737] {default_celery.py:41} WARNING - Celery Executor will run without SSL
[2018-08-08 02:05:28,738] {__init__.py:45} INFO - Using executor CeleryExecutor
[2018-08-08 02:05:28,784] {models.py:189} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2018-08-08 02:05:28,800] {models.py:2191} WARNING - schedule_interval is used for <Task(BashOperator): t1>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2018-08-08 02:05:28,801] {models.py:2191} WARNING - schedule_interval is used for <Task(BashOperator): t2>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2018-08-08 02:05:28,802] {models.py:2191} WARNING - schedule_interval is used for <Task(BashOperator): t3>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2018-08-08 02:05:28,802] {models.py:2191} WARNING - schedule_interval is used for <Task(BashOperator): t4>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
first_dag
second_dag
ここから更にパースしてくれといった雑な実装です。 あるだけマシですが
参考資料- 閲覧数 2452
画像


コメントを追加