[Airflow] Airflow 기타 기능 정리

2024. 6. 5. 17:35·Data Engineering/Airflow

Airflow 기타 기능

Airflkow Icon

Airflow의 가장 기본적인 개념인 DAG와 더불어 DAG를 더 효과적으로 관리할 수 있는 Sensor, Trigger Rules, Task Grouping, Dynamic DAG에 대해 알아보자.

DAG (Directed Acyclic Graph)

DAG 개념

  • Airflow에서 ETL을 부르는 별칭
  • DAG는 하나 이상의 Task로 구성

실행 방법

  • Web UI : airflow 웹에 접속해 원하는 DAG를 직접 Trigger
  • Scheduler Shell
    1. docker ps로 airflow sheduler의 container ID 혹은 image 이름 확인
    2. docker exec -it "container ID 혹은 image 이름" sh로 Scheduler Shell 접속
    3. airflow dags test "dag_id" "execution_date"로 DAG 실행
docker ps
docker exec -it "container ID 혹은 image 이름" sh
airflow dags test "dag_id" "execution_date"

Sensor

Sensor 개념

  • 특정 조건이 충족될 때까지 대기하는 Operator
  • 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용

Airflow 내장 Sensor

  • FileSensor : 지정된 위치에 파일이 생길 때까지 대기
  • HttpSensor : HTTP 요청 후 지정된 응답이 올 때까지 대기
  • SqlSensor : SQL DB에서 특정 조건을 충족할 때까지 대기
  • TimeSensor : 특정 시간에 도달할 때까지 워크플로우를 일시 정지
  • ExternalTaskSensor : 다른 Airflow DAG의 특정 Task 완료를 대기
  • ...

poke

  • poke 파라미터 : mode
    • mode = reschedule : worker를 릴리스하고 다시 잡아서 poke, 상황에 따라 worker를 다시 못 잡을 수 있음
    • mode = poke (default) : worker를 하나 붙잡고 poke 간에 sleep, 낭비가 될 수 있음

Trigger Rules

Trigger Rule 개념

  • Upstream Task의 성공 / 실패 상황에 따라 뒤의 Task 실행 여부를 결정하고 싶을 경우 사용
  • Operator의 trigger_rule 파라미터로 결정 가능
    • airflow.utils.trigger_rule의 TriggerRule 모듈 사용
    • all_success (default), all_failed, all_done : 모두 성공했을 경우, 모두 실패했을 경우, 모두 끝났을 경우
    • one_failed, one_success : 하나 실패했을 경우, 하나 성공했을 경우
    • none_failed, none_failed_min_one_success : 실패하지 않았을 경우, 실패 없이 최소 하나가 성공했을 경우

Task Grouping

Task Grouping 개념

  • Task 수가 많은 DAG라면 성격에 따라 Task를 관리하고 싶은 니즈 존재
  • 비슷한 작업을 하는 Task를 묶어 관리할 수 있도록 도와줌

Task Grouping 예제

  • 하나의 DAG 내에서 Task를 묶어 관리
  • Download라는 이름의 Task Group은 task_1, task_2, task_3이 존재
  • Process의 경우 Task Group 내에 inner_section_2라는 Task Group이 존재

Task Grouping

Dynamic DAG

Dynamic DAG 개념

  • Jinja 기반의 Template과 YAML을 기반으로 DAG를 동적으로 생성
  • Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 템플릿에 파라미터 제공
  • 비슷한 DAG를 개발하는 것을 방지

DAG 생성 시 주의할 점

  • DAG를 만드는 것과 DAG 안의 Task를 늘리는 것 사이의 밸런스가 중요
    • Task가 너무 많아지면 관리하기 힘듦
    • 반대로 DAG를 너무 많이 만들어도 안 됨
    • 오너가 다르거나 Task 수가 너무 커지는 경우 DAG를 복제하는 것이 더 좋음
  • generator를 직접 실행해서 DAG를 만들어내지만, 필요한 경우 이 과정도 자동화를 생각해 볼 수 있음
    • DAG의 변경사항이 생기는 경우 generator가 실행되게 설정
    • 일정 시간마다 generator가 실행되게 설정
    • ...

Dynamic DAG 사용

  • .yaml 파일에 DAG의 파라미터 정의
# config_appl.yml
dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'
  • Jinja Template를 활용해 DAG 생성
# templated_dag.jinja2
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG(dag_id="get_price_{{ dag_id }}",
    start_date=datetime(2023, 6, 15),
    schedule='{{ schedule }}',
    catchup={{ catchup or True }}) as dag:

    @task
    def extract(symbol):
        return symbol

    @task
    def process(symbol):
        return symbol

    @task
    def store(symbol):
        return symbol

    store(process(extract("{{ symbol }}")))
  • generator를 통해 최종 DAG 생성
# generator.py
from jinja2 import Environment, FileSystemLoader
import yaml
import os

file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('templated_dag.jinja2')

for f in os.listdir(file_dir):
    if f.endswith(".yml"):
        with open(f"{file_dir}/{f}", "r") as cf:
            config = yaml.safe_load(cf)
            with open(f"dags/get_price_{config['dag_id']}.py", "w") as f:
                f.write(template.render(config))

'Data Engineering > Airflow' 카테고리의 다른 글

[Airflow] Executor 실행 준비 과정 (SchedulerJobRunner._execute())  (0) 2024.10.10
[Airflow] AWS Ubuntu EC2 Airflow 환경 구축 (feat. Dockerfile)  (0) 2024.08.16
[Airflow] DAG Scheduling과 Execution  (2) 2024.06.09
[Airflow] Airflow 기초 지식  (0) 2024.06.02
[Airflow] Airflow 개념과 ETL 작성시 주의할 점  (0) 2024.05.24
'Data Engineering/Airflow' 카테고리의 다른 글
  • [Airflow] AWS Ubuntu EC2 Airflow 환경 구축 (feat. Dockerfile)
  • [Airflow] DAG Scheduling과 Execution
  • [Airflow] Airflow 기초 지식
  • [Airflow] Airflow 개념과 ETL 작성시 주의할 점
기억에 남는 블로그 닉네임
기억에 남는 블로그 닉네임
  • 기억에 남는 블로그 닉네임
    얕게, 깊게
    기억에 남는 블로그 닉네임
  • 전체
    오늘
    어제
  • 블로그 메뉴

    • 홈
    • 방명록
    • 글쓰기
    • 분류 전체보기
      • Data Engineering
        • Airflow
        • 빅데이터
        • 자동화
        • 기타
      • Infra
        • AWS
        • Terraform
        • [인프라 구축기] Terraform 활용 AWS ..
      • CS
        • 자료구조
        • 알고리즘
        • 네트워크
        • 데이터베이스
        • 이것이 취업을 위한 코딩 테스트다 with 파이썬
      • Python
      • Web
      • Git
      • 기타
        • 취업 & 진로
        • 회고록
        • 기타
      • 프로젝트 단위 공부
        • [부스트코스] DataLit : 데이터 다루기
        • [개인 프로젝트] 공모전 크롤링
        • [개인 프로젝트] FC Online 공식 경기 분..
        • 프로젝트 개선 방안
      • [프로그래머스] 데이터 엔지니어링 데브코스 3기
        • TIL(Today I Learn)
        • 숙제
        • 기타
      • 알고리즘 연습
        • 프로그래머스
        • 백준
  • 링크

    • 깃허브
    • 링크드인
  • 인기 글

  • 최근 글

  • 최근 댓글

  • hELLO· Designed By정상우.v4.10.3
기억에 남는 블로그 닉네임
[Airflow] Airflow 기타 기능 정리
상단으로

티스토리툴바