본문 바로가기
DW&BI/Airflow

2. Airflow 개념

by 조훙 2020. 9. 14.

Airflow를 이해하기 위한 여러가지 용어와 개념들을 정리 한다.

용어 정리

Building blocks of Airflow

DAG(Directed Acyclic Graph) : 방향성 비순환 그래프. Airflow의 Workflow를 구성하는 방식을 이야기 한다. 각 Task간의 관계와 종속성을 DAG로 표현하게 된다.

 

TASK : DAG내에 존재하는 작업의 단위

Operator : DAG 안의 Task가 수행 할 작업을 정의. Task를 선언할 때, Operator와 Parameter를 통해 작업의 정의를 진행할 수 있다.

 

Hook : 외부 플랫폼, 데이터베이스 (ex:Hive, S3, MySQL, Google Cloud Platform 등) 에 접근할 수 있도록 만든 인터페이스.  대부분 Operator가 실행 되기 전에 Hook을 통해 통신하게 된다.

 

Connection : 외부 시스템과의 연결에 필요한 정보를 보관하는 단위. conn_id 형태로 Airflow MetaStore에 보관되며, 공용으로 사용된다.

 

XComs : XCOM은 내부 task간의 변수와 같은 상태 정보를 통신할 수 있게 해준다. Push와 Pull 연산을 통해서 통신하며, Task가 리턴하는 값은 항상 XCOM에 Push 된다.

 

Variables : 임의의 내용이나 설정 정보를 간단한 Key-Value 방식으로 저장하는 일반적인 방법

 

Jinja Template : Jinja Template는 Flask 와 Airflow 등의 Python 에서 많이 사용하는 Template Engine. Jinja 문법을 통해서 각종 파라미터 값을 처리 하거나, 간단한 반복문 처리 등을 진행 할 수 있다.

 

멱등성 : 프로그래밍에서 자주 사용되는 단어로, 해당 연산을 여러번 적용하더라도 결과가 달라지지 않는 성질.

Airflow의 Task는 멱등성을 유지하는 방식으로 개발을 해야 한다. 동일한 태스크를 여러번 실행해도 동일한 효과를 이끌어 내야 한다.

ex) 

  • 테이블을 삭제한 뒤 다시 생성 하여 진행.
  • 되도록 Insert 하는 작업을 하나의 Task로 이루어 지도록 한다.

DAG 생성 흐름

1) default_args 정의

  • default_args 정의를 통해 해당 DAG의 기본 정보 및 실행 시간, 재수행 횟수 등의 정보를 설정 한다.
  default_args = {
      'owner': 'airflow',
      'depends_on_past': False,
      'start_date': datetime(2020, 9, 1),
      'email': ['kjhmlo@tistory.com'],
      'email_on_failure': False,
      'email_on_retry': False,
      'retries': 1,
      'retry_delay': timedelta(minutes=5)}

2) DAG 인스턴스 생성

  • DAG 인스턴스를 생성 및 필요한 정보를 세팅
dag = DAG(
    'test',
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=timedelta(days=1),
)

3) DAG 내부의 Operator를 활용해 Task 정의

  • Task의 작업을 정의
  • 해당 DAG에서 수행 할 Task를 정의 함으로써, 다양한 작업을 정의할 수 있다.
start_slack = SlackAPIPostOperator(
  task_id='START_SLACK',
  token=SLACK_TOKEN,
  channel=SLACK_CHANNEL,
  username='Airflow',
  text=""":circleci-pass: *"""+SERVICE_ID+"""* Workflow Start.```Workflow: """+WORKFLOW_ID+"""
start date: """+'{{ now_dt }}'+"""
Dag URL: """+DAG_URL+"""```""",           
  dag=dag,
)

end_slack = SlackAPIPostOperator(
  task_id='END_SLACK',
  token=SLACK_TOKEN,
  channel=SLACK_CHANNEL,
  username='Airflow',
  text=""":circleci-pass: *"""+SERVICE_ID+"""* Workflow End.```Workflow: """+WORKFLOW_ID+"""
end date: """+'{{ now_dt }}'+"""
Dag URL: """+DAG_URL+"""```""",            
  dag=dag,
)

t1 = BashOperator(
          task_id='print_date',
          bash_command='date',
          dag=dag)

4) Task의 흐름을 연결 

  • 위에서 선언한 작업들의 순서를 정의하여 DAG의 작업 순서를 확인할 수 있다.
# set_upstream은 t1 작업이 끝나야 t2가 진행된다는 뜻
t2.set_upstream(t1)
# t1.set_downstream(t2)와 동일한 표현입니다
# t1 >> t2 와 동일 표현
t3.set_upstream(t1)

Operator의 종류

  • BashOperator : bash 명령 실행
  • PythonOperator : Python Function 실행 가능
  • EmailOperator : Email 전송 실행
  • SimpleHttpOperator : HTTP Request 전송 명령 실행
  • JdbcOperator : JDBC를 통한 Request 실행
  • MySqlOpereator, SqliteOperator... : 각종 DBMS 접속 및 SQL 실행
  • Sensor : 특정 시간, 파일 db row를 Poll (wait) 하는 오퍼레이터

출처

Building blocks of Airflow : https://www.slideshare.net/varyakarpenko5/airflow-for-beginners/4

airflow를 이용한 배치 데이터 처리 : getchan.github.io/data/airflow_2/#pools

'DW&BI > Airflow' 카테고리의 다른 글

3. Airflow 설치  (0) 2020.10.05
1. Airflow 소개  (0) 2020.09.10