Airflow의 기타 기능
Dag Dependencies (1)
DAG를 실행하는 방법
- 주기적 실행 : Schedule로 지정
- DAG Dependencies : 다른 DAG에 의해 Trigger
- Explicit Trigger (TriggerDagOperator) : DAG A가 DAG B를 Trigger, A가 끝나면 B를 Trigger
- Reactive Trigger (ExternalTaskSensor) : DAG B가 DAG A의 Task 종료를 기다림, A는 이 사실을 모름
- ExternalTaskSensor는 사용 조건이 까다로워 실수하기 쉽기 때문에 안 쓰는 방향으로 가는 게 좋음
- 알아두면 좋은 상황에 따라 다른 Task 실행 방식
- BranchPythonOperater : 조건에 따라 다른 Task로 분기
- LatestOnlyOperator : 과거 데이터 Backfill 시에 불필요한 Task 처리
- 이전 Task의 실행 상황 : 앞의 Task가 실패해도 동작해야 하는 경우가 있을 수 있음
trigger_B = TriggerDagRunOperator(
task_id="trigger_B",
trigger_dag_id="트리거하려는DAG이름",
conf={ 'path': '/opt/ml/conf' },
execution_date="{{ ds }}", # Jinja 템플릿을 통해 DAG A의 execution_date을 패스
reset_dag_run=True, # True일 경우 해당 날짜가 이미 실행되었더라는 다시 재실행
wait_for_completion=True # DAG B가 끝날 때까지 기다릴지 여부를 결정. 디폴트값은 False
)
Jinja Template
- Python에서 널리 사용되는 템플릿 엔진
- 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
- Flask에서 사용
- 변수는 이중 중괄호 {{ }}로 사용
<h1>Hello, {{ name }}</h1>
- 제어문은 퍼센트 기호 {% %}로 표시
<ul>
{% for item in items %}
<li>{{ item }}</li>
{% endfor %}
</ul>
Jinja Template + Airflow
- 작업 이름, 파라미터, SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능
- 재사용 가능하고 사용자 정의 가능한 워크플로우 생성
- 예시 1 : execution_date를 코드 내에서 쉽게 사용 {{ ds }}
- 예시 2 : 파라미터 등으로 넘어온 변수를 쉽게 사용 가능
- 사용 가능한 시스템 변수 레퍼런스
# BashOperator를 사용하여 템플릿 작업 정의
task1 = BashOperator(
task_id='task1',
bash_command='echo "{{ ds }}"',
dag=dag
)
# 동적 매개변수가 있는 다른 템플릿 작업 정의
task2 = BashOperator(
task_id='task2',
bash_command='echo "안녕하세요, {{ params.name }}!"',
params={'name': 'John'}, # 사용자 정의 가능한 매개변수
dag=dag
)
- (Templated)가 적혀있으면, Jinja Template 형식으로 사용할 수 있음
TriggerDagOperator
- DAG A의 Task를 TriggerDagRunOperator로 구현
- conf : DAG B에 넘기고 싶은 정보
- airflow.cfg의 dag_run_conf_overrides_params가 True로 설정돼 있어야 함
- DAG B가 Jinja Template를 지원하면 {{ dag_run.conf["path"] }}로 접근
- DAG B가 PythonOperator(**context)라면, kwargs['dag_run'].conf.get('path')로 접근
trigger_B = TriggerDagRunOperator(
task_id="trigger_B",
trigger_dag_id="트리거하려는DAG이름",
conf={ 'path': '/opt/ml/conf' }, # DAG B에 넘기고 싶은 정보
execution_date="{{ ds }}", # Jinja 템플릿을 통해 DAG A의 execution_date을 패스
reset_dag_run=True, # True일 경우 해당 날짜가 이미 실행되었더라도 다시 재실행
wait_for_completion=True # DAG B가 끝날 때까지 기다릴지 여부를 결정. Default = False
)
Dag Dependencies (2)
Sensor
- 특정 조건이 충족될 때까지 대기하는 Operator
- 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
- Airflow는 내장 Sensor 제공
- FileSensor : 지정된 위치에 파일이 생길 때까지 대기
- HttpSensor : HTTP 요청 후 지정된 응답이 올 때까지 대기
- SqlSensor : SQL DB에서 특정 조건을 충족할 때까지 대기
- TimeSensor : 특정 시간에 도달할 때까지 워크플로우를 일시 정지 (like sleep)
- ExternalTaskSensor : 다른 Airflow DAG의 특정 Task 완료를 대기
- 주기적으로 poke 하는 것
- 관련 파라미터 : mode
- mode = reschedule : worker를 릴리스하고 다시 잡아서 poke, worker를 못 잡을 수도 있음
- mode = poke (default) : worker를 하나 붙잡고 poke 간에 sleep, 낭비가 될 수도 있음
ExternalTaskSensor
- DAG B의 ExternalTaskSensor Task가 DAG A의 Task가 끝났는지 체크
- 먼저 동일한 shedule_interval 사용
- 두 Task의 Execution Date가 동일해야 작동
- 만약 DAG A와 DAG B가 다른 schedule interval을 갖는다면?
- execution_delta를 사용
- execution_date_fn을 사용하면 더 복잡하게 컨트롤 가능
- 두 개의 DAG가 다른 frequency를 갖는다면 ExternalTaskSensor 사용 불가
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id='waiting_for_end_of_dag_a',
external_dag_id='DAG이름',
external_task_id='end',
timeout=5*60,
mode='reschedule',
execution_delta=timedelta(minutes=5)
)
BranchPythonOperator
- 상황에 따라 뒤에 실행돼야 할 Task를 동적으로 결정해 주는 Operator
- TriggerDagOperator 앞에 이 Operator를 사용하는 경우도 있음
- 개발, 프로덕션 등 다양한 단계에서 필요한 실행을 지정하는 경우 사용
- 예제 코드
from airflow.operators.python import BranchPythonOperator
# 상황에 따라 뒤에 실행되어야 하는 태스크를 리턴
def skip_or_cont_trigger():
if Variable.get("mode", "dev") == "dev":
return []
else:
return ["trigger_b"]
# "mode"라는 Variable의 값이 "dev"이면 trigger_b 태스크를 스킵
branching = BranchPythonOperator(
task_id='branching',
python_callable=skip_or_cont_trigger,
)
LatestOnlyOperator
- Time-sensitive 한 Task가 과거 데이터의 backfill 시 실행되는 것을 막기 위함
- execution_date < 현재 시간 < 다음 execution_date 인 경우에만 뒤의 Task를 실행
- 아래의 코드에서 backfill이 진행되는 경우 (현재 날짜 : 2023.06.18)
- 2023.06.16의 데이터는 저장 O -> t3, t4가 정상 작동
- 2023.06.14의 데이터는 저장 X -> t3, t4가 skipped
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id='latest_only_example',
schedule=timedelta(hours=48), # 매 48시간마다 실행되는 DAG로 설정
start_date=datetime(2023, 6, 14),
catchup=True) as dag:
t1 = EmptyOperator(task_id='task1')
t2 = LatestOnlyOperator(task_id = 'latest_only')
t3 = EmptyOperator(task_id='task3')
t4 = EmptyOperator(task_id='task4')
t1 >> t2 >> [t3, t4]
Trigger Rules
- Upstream Task의 성공 / 실패 상황에 따라 뒤의 Task 실행 여부를 결정하고 싶을 경우 사용
- Operator의 trigger_rule 파라미터로 결정 가능
- airflow.utils.trigger_rule의 TriggerRule 모듈 사용
- all_success (default), all_failed, all_done
- one_filed, one_success
- none_failed, none_failed_min_one_success
from airflow.utils.trigger_rule import TriggerRule
with DAG("trigger_rules", default_args=default_args, schedule=timedelta(1)) as dag:
t1 = BashOperator(task_id="print_date", bash_command="date")
t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
t3 = BashOperator(task_id="exit", bash_command="exit 1")
t4 = BashOperator(
task_id='final_task',
bash_command='echo DONE!',
trigger_rule=TriggerRule.ALL_DONE
)
[t1, t2, t3] >> t4
Task Grouping
Task Grouping의 필요성
- Task 수가 많은 DAG라면 성격에 따라 Task를 관리하고 싶은 니즈 존재
- SubDAG가 사용되다가 Airflow 2.0에서 나온 Task Grouping으로 넘어가는 추세
- 비슷한 일을 하는 Task를 SubDAG라는 Child Dag로 만들어 관리
Task Grouping 예제
- 다수의 파일 처리를 하는 DAG라면, 파일 다운로드 / 체크 / 데이터 처리 Task로 구성
- 예제 코드
Dynamic DAGs
Dynamic DAG
- 비슷한 DAG를 개발하는 것을 방지
- 템플릿과 YAML을 기반으로 DAG를 동적으로 생성
- Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 템플릿에 파라미터를 제공
- DAG를 만드는 것과 DAG 안의 Task를 늘리는 것 사이의 밸런스가 필요
- 오너가 다르거나 Task 수가 너무 커지는 경우 DAG를 복제하는 것이 더 좋음
기본적인 아이디어
- .yaml 파일에 DAG의 파라미터를 정의
- Jinja Template를 활용해 DAG 생성
- generator.py를 통해 최종 DAG 생성
- 예제 코드
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 55일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (5-1) (2) | 2024.06.07 |
---|---|
[TIL - 54일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (4) (2) | 2024.06.06 |
[TIL - 52일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (2) (0) | 2024.06.04 |
[TIL - 51일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (1) (0) | 2024.06.03 |
[TIL - 50일 차] Docker & K8S 실습 (4) (0) | 2024.05.31 |