Airflow 기타 기능
Airflow의 가장 기본적인 개념인 DAG와 더불어 DAG를 더 효과적으로 관리할 수 있는 Sensor, Trigger Rules, Task Grouping, Dynamic DAG에 대해 알아보자.
DAG (Directed Acyclic Graph)
DAG 개념
- Airflow에서 ETL을 부르는 별칭
- DAG는 하나 이상의 Task로 구성
실행 방법
- Web UI : airflow 웹에 접속해 원하는 DAG를 직접 Trigger
- Scheduler Shell
- docker ps로 airflow sheduler의 container ID 혹은 image 이름 확인
- docker exec -it "container ID 혹은 image 이름" sh로 Scheduler Shell 접속
- airflow dags test "dag_id" "execution_date"로 DAG 실행
docker ps
docker exec -it "container ID 혹은 image 이름" sh
airflow dags test "dag_id" "execution_date"
Sensor
Sensor 개념
- 특정 조건이 충족될 때까지 대기하는 Operator
- 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
Airflow 내장 Sensor
- FileSensor : 지정된 위치에 파일이 생길 때까지 대기
- HttpSensor : HTTP 요청 후 지정된 응답이 올 때까지 대기
- SqlSensor : SQL DB에서 특정 조건을 충족할 때까지 대기
- TimeSensor : 특정 시간에 도달할 때까지 워크플로우를 일시 정지
- ExternalTaskSensor : 다른 Airflow DAG의 특정 Task 완료를 대기
- ...
poke
- poke 파라미터 : mode
- mode = reschedule : worker를 릴리스하고 다시 잡아서 poke, 상황에 따라 worker를 다시 못 잡을 수 있음
- mode = poke (default) : worker를 하나 붙잡고 poke 간에 sleep, 낭비가 될 수 있음
Trigger Rules
Trigger Rule 개념
- Upstream Task의 성공 / 실패 상황에 따라 뒤의 Task 실행 여부를 결정하고 싶을 경우 사용
- Operator의 trigger_rule 파라미터로 결정 가능
- airflow.utils.trigger_rule의 TriggerRule 모듈 사용
- all_success (default), all_failed, all_done : 모두 성공했을 경우, 모두 실패했을 경우, 모두 끝났을 경우
- one_failed, one_success : 하나 실패했을 경우, 하나 성공했을 경우
- none_failed, none_failed_min_one_success : 실패하지 않았을 경우, 실패 없이 최소 하나가 성공했을 경우
Task Grouping
Task Grouping 개념
- Task 수가 많은 DAG라면 성격에 따라 Task를 관리하고 싶은 니즈 존재
- 비슷한 작업을 하는 Task를 묶어 관리할 수 있도록 도와줌
Task Grouping 예제
- 하나의 DAG 내에서 Task를 묶어 관리
- Download라는 이름의 Task Group은 task_1, task_2, task_3이 존재
- Process의 경우 Task Group 내에 inner_section_2라는 Task Group이 존재
Dynamic DAG
Dynamic DAG 개념
- Jinja 기반의 Template과 YAML을 기반으로 DAG를 동적으로 생성
- Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 템플릿에 파라미터 제공
- 비슷한 DAG를 개발하는 것을 방지
DAG 생성 시 주의할 점
- DAG를 만드는 것과 DAG 안의 Task를 늘리는 것 사이의 밸런스가 중요
- Task가 너무 많아지면 관리하기 힘듦
- 반대로 DAG를 너무 많이 만들어도 안 됨
- 오너가 다르거나 Task 수가 너무 커지는 경우 DAG를 복제하는 것이 더 좋음
- generator를 직접 실행해서 DAG를 만들어내지만, 필요한 경우 이 과정도 자동화를 생각해 볼 수 있음
- DAG의 변경사항이 생기는 경우 generator가 실행되게 설정
- 일정 시간마다 generator가 실행되게 설정
- ...
Dynamic DAG 사용
- .yaml 파일에 DAG의 파라미터 정의
# config_appl.yml
dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'
- Jinja Template를 활용해 DAG 생성
# templated_dag.jinja2
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG(dag_id="get_price_{{ dag_id }}",
start_date=datetime(2023, 6, 15),
schedule='{{ schedule }}',
catchup={{ catchup or True }}) as dag:
@task
def extract(symbol):
return symbol
@task
def process(symbol):
return symbol
@task
def store(symbol):
return symbol
store(process(extract("{{ symbol }}")))
- generator를 통해 최종 DAG 생성
# generator.py
from jinja2 import Environment, FileSystemLoader
import yaml
import os
file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('templated_dag.jinja2')
for f in os.listdir(file_dir):
if f.endswith(".yml"):
with open(f"{file_dir}/{f}", "r") as cf:
config = yaml.safe_load(cf)
with open(f"dags/get_price_{config['dag_id']}.py", "w") as f:
f.write(template.render(config))
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] Executor 실행 준비 과정 (SchedulerJobRunner._execute()) (0) | 2024.10.10 |
---|---|
[Airflow] AWS Ubuntu EC2 Airflow 환경 구축 (feat. Dockerfile) (0) | 2024.08.16 |
[Airflow] DAG Scheduling과 Execution (2) | 2024.06.09 |
[Airflow] Airflow 기초 지식 (0) | 2024.06.02 |
[Airflow] Airflow 개념과 ETL 작성시 주의할 점 (0) | 2024.05.24 |