📎 Airflow Docs


5. Airflow

5.1. Airflow 구성

DAG 종류

Graph - nodes and edges
Directed Graph - each edged has a direction
Acyclic - no loops (cycle)

DAG 구성

1) Imports

# import the libraries

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago

2) DAG Arguments

#defining DAG arguments

# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'Seeeun Cho',
    'start_date': days_ago(0),
    'email': ['nueees@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

3) DAG Definition

# define the DAG
dag = DAG(
    dag_id='ETL_test_data',
    default_args=default_args,
    description='Apache Airflow etl test',
    schedule_interval=timedelta(days=1),
)

4) Task Definitions

unzip_data = BashOperator(
    task_id='unzip_data',
    bash_command='unzip testdata.tgz -d /home/pjt/stage/"',
    dag=dag,
)
extract_data_from_csv = BashOperator(
    task_id='extract_data_from_csv',
    bash_command='cut -f1-4 -d"," test-data.csv > csv_data.csv',
    dag=dag,
)
extract_data_from_fixed_width = BashOperator(
    task_id='extract_data_from_fixed_width',
    bash_command='cut -f1-4 -d"," test-data.txt > fixed_width_data.csv',
    dag=dag,
)
consolidate_data = BashOperator(
    task_id='consolidate_data',
    bash_command='paste csv_data.csv fixed_width_data.csv > extracted_data.csv',
    dag=dag,
)
transform_data = BashOperator(
    task_id='transform_data',
    bash_command='tr "[a-z]" "[A-Z]" < extracted_data.csv > transformed_data.csv',
    dag=dag,
)

5) Task Pipeline

# task pipeline
unzip_data >> extract_data_from_csv >> extract_data_from_fixed_width >> consolidate_data >> transform_data



5.2. Airflow command

start Airflow

Starting your airflow services....
This process can take up to 15 minutes...
      
Airflow started, waiting for all services to be ready....
      
Your airflow server is now ready to use and available with username: airflow password: password

You can access your Airflow Webserver at: https://blahblah

username(airflow)이랑 password 카피 Web UI 사용 시 해당 URL(blahblah) 사용

list DAGs, tasks

airflow dags list
airflow tasks list ETL_test_data

unpause DAG

airflow dags unpause ETL_test_data

pause DAG

airflow dags pause ETL_test_data



zookeeper