5. Airflow
5.1. Airflow 구성
DAG 종류
Graph - nodes and edges
Directed Graph - each edged has a direction
Acyclic - no loops (cycle)
DAG 구성
1) Imports
# import the libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago
2) DAG Arguments
#defining DAG arguments
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'Seeeun Cho',
'start_date': days_ago(0),
'email': ['nueees@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
3) DAG Definition
# define the DAG
dag = DAG(
dag_id='ETL_test_data',
default_args=default_args,
description='Apache Airflow etl test',
schedule_interval=timedelta(days=1),
)
4) Task Definitions
unzip_data = BashOperator(
task_id='unzip_data',
bash_command='unzip testdata.tgz -d /home/pjt/stage/"',
dag=dag,
)
extract_data_from_csv = BashOperator(
task_id='extract_data_from_csv',
bash_command='cut -f1-4 -d"," test-data.csv > csv_data.csv',
dag=dag,
)
extract_data_from_fixed_width = BashOperator(
task_id='extract_data_from_fixed_width',
bash_command='cut -f1-4 -d"," test-data.txt > fixed_width_data.csv',
dag=dag,
)
consolidate_data = BashOperator(
task_id='consolidate_data',
bash_command='paste csv_data.csv fixed_width_data.csv > extracted_data.csv',
dag=dag,
)
transform_data = BashOperator(
task_id='transform_data',
bash_command='tr "[a-z]" "[A-Z]" < extracted_data.csv > transformed_data.csv',
dag=dag,
)
5) Task Pipeline
# task pipeline
unzip_data >> extract_data_from_csv >> extract_data_from_fixed_width >> consolidate_data >> transform_data
5.2. Airflow command
start Airflow
Starting your airflow services....
This process can take up to 15 minutes...
Airflow started, waiting for all services to be ready....
Your airflow server is now ready to use and available with username: airflow password: password
You can access your Airflow Webserver at: https://blahblah
username(airflow)이랑 password 카피 Web UI 사용 시 해당 URL(blahblah) 사용
list DAGs, tasks
airflow dags list
airflow tasks list ETL_test_data
unpause DAG
airflow dags unpause ETL_test_data
pause DAG
airflow dags pause ETL_test_data