DAG를 작성하는 세 가지 방법
Airflow에서 실행시키기 위한 DAG를 작성하는 방법은 세 가지가 존재한다. 세 가지 방식의 차이점과 DAG 선언 방식에 따른 장점과 단점을 알아보려 한다. 공식 문서를 참고하여 작성하였다.
표준 생성자 (constructor) 사용
설명
- DAG 객체를 명시적으로 생성하고, 각 Operator (Task)를 DAG에 추가하는 방식
- DAG 생성자는 파라미터를 통해 dag_id, start_date, schedule, default_arg 등을 설정
장단점
- 장점
- 단점
- 각 Operator (Task)에 마다 dag 파라미터를 추가해야 함
- 여러 DAG를 설정하는 경우 매번 생성자 호출이 필요함
예시 코드
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
my_dag = DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)
with 구문 사용
설명
- with dag 형태로 DAG의 context 내에서 작업을 정의할 수 있음
- DAG를 추가할 때 dag 파라미터를 반복하지 않아도 됨 (<-> 표준 생성자)
장단점
- 장점
- dag 파라미터를 반복하지 않아서 코드가 더 간결하고 가독성이 좋아짐
- DAG 컨텍스트 내에서 작업을 정의하여 DAG와 연관된 작업을 한눈에 파악할 수 있음
- 단점
- 컨텍스트 내부에서만 작업을 정의해야 해서 DAG 구성의 확장이나 재사용이 다소 어려움
- with 블록 밖에서 DAG 작업을 추가할 경우 해당 Task가 DAG에 추가되지 않아 실행이 되지 않음
예시 코드
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
):
EmptyOperator(task_id="task")
@dag 데코레이터 사용
설명
- Airflow 2.0 이상에서는 @dag 데코레이터를 통해 함수 기반으로 DAG 정의 가능
- DAG의 Task 흐름을 작성하고, 해당 함수를 DAG로 등록
- DAG를 추가할 때 dag 파라미터를 반복하지 않아도 됨 (<-> 표준 생성자)
- dag decorator 공식 문서
장단점
- 장점
- 코드가 간결하며, DAG 구성을 함수 내에 모두 정의할 수 있어 가독성이 좋음
- 단점
- 복잡한 DAG를 다룰 때, 구성이 다소 모호해질 수 있음
주의할 점
- 공식 문서에 따르면 Airflow는 최상위에 존재하는 DAG만 로드, 즉 함수의 DAG는 로드 X
- @dag의 경우 함수 형태로 DAG를 작성되므로 최소 한 번은 호출해줘야 함
- @dag VS 아래의 my_function
- @dag : 함수 자체가 DAG이기에 호출을 해주면 DAG가 생성
- my_function : 최상위 (전역)에서 선언된 DAG 객체가 아니므로 DAG 생성 X
dag_1 = DAG('this_dag_will_be_discovered') # Load
def my_function(): # not Load
dag_2 = DAG('but_this_dag_will_not')
my_function() # not Load
예시 코드
import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
EmptyOperator(task_id="task")
generate_dag()
Reference
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] data_interval_start를 활용한 File System Partitioning 적용 (0) | 2024.10.31 |
---|---|
[Airflow] Airflow Task log 작성 (feat. logging) (3) | 2024.10.28 |
[Airflow] DAG와 Task의 동시성을 관리하기 위한 변수 (0) | 2024.10.25 |
[Airflow] Redshift COPY 시 잘못된 timestamp 값이 적재되는 문제 (1) | 2024.10.23 |
[Airflow] DAG params를 PythonOperator 매개변수로 사용하기 (0) | 2024.10.15 |