Airflow 安装及跳坑指南

有啥用

Airflow 简单来说就是管理和调度各种离线定时的 Job,用以替代 crontab, 可以把它看作是个高级版的 crontab

如果 crontab 的规模达到百千万,管理起来会非常复杂。这个时候可以考虑将任务迁移到 Airflow,你将可以清楚地分辨出哪些 DAG 是稳定的,哪些不那么见状,需要优化。如果 DAG 不足以打动你,强交互性、友好的界面管理、重跑任务以及合适的报警级别足以让你感觉相见恨晚。

简单记录安装及配置过程

Airflow 在 1.8 之后更名为 apache-airflow

NOTE: The transition from 1.8.0 (or before) to 1.8.1 (or after) requires uninstalling Airflow before installing the new version. The package name was changed from airflow to apache-airflow as of version 1.8.1.

1
$ pip3 install apache-airflow

如果你想安装 1.8.0 的 Airflow

1
$ pip3 install airflow

初始化

1
$ airflow initdb

初始化后默认会在 ~/ 下生成 airflow 文件夹,如果想更换 MySQL 或其他数据库做为元数据存储,那么在配置文件中修改配置后重新初始化即可。

关于如何在 python3 中安装 MySQL,此处不做赘述。

1
2
$ vi ~/airflow/airflow.cfg
sql_alchemy_conn = mysql://username:password@host:port/airflow

安装扩展

Airflow 内置了芹菜的调度器,只需要手动安装芹菜并进行简单配置就可以使用。

Celery 可使用 RabbitMQRedis 做为 broker,按需选择即可,此处也不做赘述。

1
2
3
4
5
6
7
$ pip3 install apache-airflow[celery]

# 启用还需修改几处配置
$ vi ~/airflow/airflow.cfg
executor = CeleryExecutor
broker_url = redis://127.0.0.1:6379/0
result_backend = db+mysql://root:xxx@127.0.0.1:3306/airflow

这里有一个可能会踩入的小坑

既然项目更名了,在安装芹菜等扩展时,记得选对项目。

否则便会安装两个项目,导致后期使用出现冲突。

1
2
3
4
5
6
7
8
$ pip3 install apache-airflow
$ pip3 install airflow[celery]

$ pip3 list
...
apache-airflow (1.10.0)
airflow (1.8.0)
...

启动

1
2
3
4
5
6
# 启动 webserver
$ airflow webserver -p 8080
# 启动调度程序
$ airflow scheduler
# 启动 Celery
$ airflow worker

启动完成后就可以运行官方内置的 example 测试啦!

大坑

记录几个遇到的报错

airflow.exceptions.AirflowException: Could not create Fernet object: Incorrect padding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
$ airflow initdb
[2018-10-30 15:30:26,857] {settings.py:174} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2018-10-30 15:30:27,164] {__init__.py:51} INFO - Using executor CeleryExecutor
DB: mysql://root:***@localhost:3306/airflow
[2018-10-30 15:30:27,320] {db.py:338} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl MySQLImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/models.py", line 159, in get_fernet
_fernet = Fernet(configuration.conf.get('core', 'FERNET_KEY').encode('utf-8'))
File "/usr/lib/python3/dist-packages/cryptography/fernet.py", line 34, in __init__
key = base64.urlsafe_b64decode(key)
File "/usr/lib/python3.5/base64.py", line 134, in urlsafe_b64decode
return b64decode(s)
File "/usr/lib/python3.5/base64.py", line 88, in b64decode
return binascii.a2b_base64(s)
binascii.Error: Incorrect padding

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/ubuntu/.local/bin/airflow", line 32, in <module>
args.func(args)
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/bin/cli.py", line 1002, in initdb
db_utils.initdb(settings.RBAC)
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/utils/db.py", line 103, in initdb
schema='airflow_ci'))
File "<string>", line 4, in __init__
File "/home/ubuntu/.local/lib/python3.5/site-packages/sqlalchemy/orm/state.py", line 414, in _initialize_instance
manager.dispatch.init_failure(self, args, kwargs)
File "/home/ubuntu/.local/lib/python3.5/site-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/home/ubuntu/.local/lib/python3.5/site-packages/sqlalchemy/util/compat.py", line 187, in reraise
raise value
File "/home/ubuntu/.local/lib/python3.5/site-packages/sqlalchemy/orm/state.py", line 411, in _initialize_instance
return manager.original_init(*mixed[1:], **kwargs)
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/models.py", line 677, in __init__
self.extra = extra
File "<string>", line 1, in __set__
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/models.py", line 731, in set_extra
fernet = get_fernet()
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/models.py", line 163, in get_fernet
raise AirflowException("Could not create Fernet object: {}".format(ve))
airflow.exceptions.AirflowException: Could not create Fernet object: Incorrect padding

fernet_key

关于 fernet_key 是什么,配置文件里给出了相应的解释:

Secret key to save connection passwords in the db

干掉它

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 生成一个 key 替换配置文件中的 fernet_key
$ python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"

$ vi ~/airflow/airflow.cfg
fernet_key = g3589dfasdfhnht289tghdsfij---dfadfgeu812=

$ airflow initdb
[2018-10-30 15:31:21,159] {settings.py:174} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2018-10-30 15:31:21,464] {__init__.py:51} INFO - Using executor CeleryExecutor
DB: mysql://root:***@localhost:3306/airflow
[2018-10-30 15:31:21,620] {db.py:338} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl MySQLImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
Done.

TypeError: b’5e36be93294a6fea65a4c81571388241b1667fca’ is not JSON serializable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
Ooops.

____/ ( ( ) ) \___
/( ( ( ) _ )) ) )\
(( ( )( ) ) ( ) )
((/ ( _( ) ( _) ) ( () ) )
( ( ( (_) (( ( ) .((_ ) . )_
( ( ) ( ( ) ) ) . ) ( )
( ( ( ( ) ( _ ( _) ). ) . ) ) ( )
( ( ( ) ( ) ( )) ) _)( ) ) )
( ( ( \ ) ( (_ ( ) ( ) ) ) ) )) ( )
( ( ( ( (_ ( ) ( _ ) ) ( ) ) )
( ( ( ( ( ) (_ ) ) ) _) ) _( ( )
(( ( )( ( _ ) _) _(_ ( (_ )
(_((__(_(__(( ( ( | ) ) ) )_))__))_)___)
((__) \\||lll|l||/// \_))
( /(/ ( ) ) )\ )
( ( ( ( | | ) ) )\ )
( /(| / ( )) ) ) )) )
( ( ((((_(|)_))))) )
( ||\(|(|)|/|| )
( |(||(||)|||| )
( //|/l|||)|\\ \ )
(/ / // /|//||||\\ \ \ \ _)
-------------------------------------------------------------------------------
Node: ubuntu
-------------------------------------------------------------------------------
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask/app.py", line 1982, in wsgi_app
response = self.full_dispatch_request()
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask/app.py", line 1614, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask/app.py", line 1517, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask/_compat.py", line 33, in reraise
raise value
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask/app.py", line 1612, in full_dispatch_request
rv = self.dispatch_request()
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask/app.py", line 1598, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask_login.py", line 755, in decorated_view
return func(*args, **kwargs)
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/www/views.py", line 2061, in index
auto_complete_data=auto_complete_data)
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask_admin/base.py", line 308, in render
return render_template(template, **kwargs)
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask/templating.py", line 134, in render_template
context, ctx.app)
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask/templating.py", line 116, in _render
rv = template.render(context)
File "/home/ubuntu/.local/lib/python3.5/site-packages/jinja2/environment.py", line 989, in render
return self.environment.handle_exception(exc_info, True)
File "/home/ubuntu/.local/lib/python3.5/site-packages/jinja2/environment.py", line 754, in handle_exception
reraise(exc_type, exc_value, tb)
File "/home/ubuntu/.local/lib/python3.5/site-packages/jinja2/_compat.py", line 37, in reraise
raise value.with_traceback(tb)
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/www/templates/airflow/dags.html", line 18, in top-level template code
{% extends "airflow/master.html" %}
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/www/templates/airflow/master.html", line 18, in top-level template code
{% extends "admin/master.html" %}
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/www/templates/admin/master.html", line 18, in top-level template code
{% extends 'admin/base.html' %}
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask_admin/templates/bootstrap3/admin/base.html", line 74, in top-level template code
{% block tail_js %}
File "/home/ubuntu/.local/lib/python3.5/site-packages/airflow/www/templates/admin/master.html", line 44, in block "tail_js"
xhr.setRequestHeader("X-CSRFToken", "{{ csrf_token() }}");
File "/home/ubuntu/.local/lib/python3.5/site-packages/flask_wtf/csrf.py", line 47, in generate_csrf
setattr(g, field_name, s.dumps(session[field_name]))
File "/home/ubuntu/.local/lib/python3.5/site-packages/itsdangerous/serializer.py", line 166, in dumps
payload = want_bytes(self.dump_payload(obj))
File "/home/ubuntu/.local/lib/python3.5/site-packages/itsdangerous/url_safe.py", line 42, in dump_payload
json = super(URLSafeSerializerMixin, self).dump_payload(obj)
File "/home/ubuntu/.local/lib/python3.5/site-packages/itsdangerous/serializer.py", line 133, in dump_payload
return want_bytes(self.serializer.dumps(obj, **self.serializer_kwargs))
File "/home/ubuntu/.local/lib/python3.5/site-packages/itsdangerous/_json.py", line 18, in dumps
return json.dumps(obj, **kwargs)
File "/usr/lib/python3.5/json/__init__.py", line 237, in dumps
**kw).encode(obj)
File "/usr/lib/python3.5/json/encoder.py", line 198, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/lib/python3.5/json/encoder.py", line 256, in iterencode
return _iterencode(o, 0)
File "/usr/lib/python3.5/json/encoder.py", line 179, in default
raise TypeError(repr(o) + " is not JSON serializable")
TypeError: b'5e36be93294a6fea65a4c81571388241b1667fca' is not JSON serializable

这个错误十分诡异,至今不解,项目运行环境为 Python 3.5.2

连首页都进不去,使用的地址为 127.0.0.1:8080,后面尝试把地址修改为 localhost:8080 ,就没有这个报错了 Orz


了解更多

Airflow: a workflow management platform

What we learned migrating off Cron to Airflow