什么是ETL
ETL 是常用的数据处理,在以前的公司里,ETL 差不多是数据处理的基础,要求非常稳定,容错率高,而且能够很好的监控。ETL的全称是 Extract,Transform,Load, 一般情况下是将乱七八糟的数据进行预处理,然后放到储存空间上。可以是sql的也可以是Nosql的,还可以直接存成file的模式。
一开始我的设计思路是,用几个cron job和celery来handle所有的处理,然后将我们的log文件存在hdfs,还有一些数据存在MysqL,大概每天跑一次。核心是能够scale,稳定,容错,roll back。我们的data warehouse就放在云上,就简单处理了。
有了自己的ETL系统我觉得就很安心了,以后能够做数据处理和机器学习方面就相对方便一些。
问题来了
一开始我设计的思路和Uber一开始的ETL很像,因为我觉得很方便。但是我发觉一个很严重的问题,我一个人忙不过来。首先,要至少写个前端UI来监控cron job,但是市面上的都很差。其次,容错的autorestart写起来很费劲,可能是我自己没有找到一个好的处理方法。最后部署的时候相当麻烦,如果要写好这些东西,我一个人的话要至少一个月的时间,可能还不是特别robust。在尝试写了2两天的一些碎片处理的脚本之后我发觉时间拖了实在太久了。
隆重推荐的工具
airbnb是我很喜欢的公司,他们有很多开源的工具,airflow我觉得是最实用的代表。airflow 是能进行数据pipeline的管理,甚至是可以当做更高级的cron job 来使用。现在一般的大厂都不说自己的数据处理是ETL,美其名曰 data pipeline,可能跟google倡导的有关。airbnb的airflow是用python写的,它能进行工作流的调度,提供更可靠的流程,而且它还有自带的UI(可能是跟airbnb设计主导有关)。话不多说,先放两张截图:
什么是DAG
airflow里最重要的一个概念是DAG。
DAG是directed asyclic graph,在很多机器学习里有应用,也就是所谓的有向非循环。但是在airflow里你可以看做是一个小的工程,小的流程,因为每个小的工程里可以有很多“有向”的task,最终达到某种目的。在官网中的介绍里说dag的特点:
- Scheduled: each job should run at a certain scheduled interval
- Mission critical: if some of the jobs aren’t running,we are in trouble
- Evolving: as the company and the data team matures,so does the data processing
- Heterogenous: the stack for modern analytics is changing quickly,and most companies run multiple systems that need to be glued together
YEAH! It's awesome,right? After reading all of these,I found it was perfectly fit Prettyyes.
如何安装
安装airflow超级简单,使用pip就可以,现在airflow的版本是1.6.1,但是有个小的bug,这个之后会告诉大家如何修改。
pip install airflow
这里有个坑,因为airflow涉及到很多数据处理的包,所以会安装pandas和numpy(这个Data Scientist应该都很熟悉)但是国内pip install 安装非常慢,用douban的源也有一些小的问题。我的解决方案是,直接先用豆瓣的源安装numpy 和 pandas,然后再安装airflow,自动化部署的时候可以在requirements.txt 里调整顺序就行了
如何运行
摘自官方网站
# airflow needs a home,~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
# install from pypi using pip
pip install airflow
# initialize the database
airflow initdb
# start the web server,default port is 8080
airflow webserver -p 8080
然后你就可以上web ui查看所有的dags,来监控你的进程。
如何导入dag
一般第一次运行之后,airflow会在默认文件夹下生成airflow文件夹,然后你只要在里面新建一个文件dag就可以了。我这边部署在阿里云上的文件tree大概是这个样子的。
以下是我自己写的我们公司prettyyes里需要每天处理log的其中一个小的dag:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime,timedelta
import ConfigParser
config = ConfigParser.ConfigParser()
config.read('/etc/conf.ini')
WORK_DIR = config.get('dir_conf','work_dir')
OUTPUT_DIR = config.get('dir_conf','log_output')
PYTHON_ENV = config.get('dir_conf','python_env')
default_args = {
'owner': 'airflow','depends_on_past': False,'start_date': datetime.today() - timedelta(days=1),'retries': 2,'retry_delay': timedelta(minutes=15),}
dag = DAG('daily_process',default_args=default_args,schedule_interval=timedelta(days=1))
templated_command = "echo 'single' | {python_env}/python {work_dir}/mr/LogMR.py"\
.format(python_env=PYTHON_ENV,work_dir=WORK_DIR) + " --start_date {{ ds }}"
task = BashOperator(
task_id='process_log',bash_command=templated_command,dag=dag
)
写好之后,只要将这个dag放入之前建立好的dag文件夹,然后运行:
python <dag_file>
来确保没有语法错误。在测试里你可以看到我的
schedule_interval=timedelta(days=1)
这样我们的数据处理的任务就相当于每天跑一次。更重要的是,airflow还提供处理bash处理的接口外还有hadoop的很多接口。可以为以后连接hadoop系统提供便利。很多具体的功能可以看官方文档。
其中的一个小的bug
airflow 1.6.1有一个网站的小的bug,安装成功后,点击dag里的log会出现以下页面:
这个只要将
airflow/www/utils.py
文件替换成最新的airflow github上的utils.py文件就行,具体的问题在这个:
fixes datetime issue when persisting logs
使用supervisord进行deamon
airflow本身没有deamon模式,所以直接用supervisord就ok了,我们只要写4行代码。
[program:airflow_web]
command=/home/kimi/env/athena/bin/airflow webserver -p 8080
[program:airflow_scheduler]
command=/home/kimi/env/athena/bin/airflow scheduler
我觉得airflow特别适合小的团队,他的功能强大,而且真的部署方便。和hadoop,mrjob又可以无缝连接,对我们的业务有很大的提升。