Airflow Scheduling과 Execution
개요
Airflow 관리
Airflow를 사용하면서 반드시 알아야 할 것이 "작성한 DAG가 언제, 얼마나 실행되는가?"일 것이다. Airflow에서는 관련 개념을 숙지하지 않으면, 해당 DAG의 실행 유무에 대한 답을 내릴 수 없다. 그래서 Airflow를 활용해 DAG를 제대로 관리하기 위해서는 start_date, execution_date와 더불어 관련 개념을 이해해야 한다.
DAG 선언 예시
일반적인 DAG를 선언한 예제이다. 해당 Job이 "2024-06-09 04:00:00"에 활성화될 때, 총 몇 번의 DAG가 실행되는지 바로 떠올리기는 쉽지 않다. 그러나 관련 용어의 개념과 타임테이블을 반복적으로 이해하다 보면, 어렵지 않게 떠올릴 수 있을 것이라 생각한다.
dag = DAG(
...
start_date=datetime(2024, 6, 5),
schedule_interval="@daily" # 0 0 * * *
catchup=True,
...
)
관련 용어 이해
DAG에서 사용된 파라미터인 start_date, schedule_interval, catchup와 함께 execution_date, data_interval_start / end에 대해 알아보자. 위의 코드가 처음 실행될 때의 첫 부분을 이미지로 나타낸 것이다. 이를 바탕으로 용어 설명을 진행해 볼 것이다. 단, catchup은 여기서 자세히 다루지는 않겠다.
- start_date = datetime(2024, 6, 5),
- schedule_interval = "@daily"
- execution_date = 2024-06-05 00:00:00
- catchup = True
schedule_interval (data interval)
- DAG가 해당 Schedule에서 처리해야 할 시간 범위
- 이미지 이해
- schedule_interval이 "@daily"로 설정되어 매일 자정에 실행
- 하루마다 실행되므로 schedule_interval은 하루
- 실행 시각 (schedule_date = "@daily") : "2024-06-06 00:00:00" -> 2020-06-05의 데이터 처리
start_date
- 실제로 시작하는 날짜가 아닌 DAG의 첫 번째 schedule_interval 시작 시간
- 이미지 이해
- 이미지는 첫 DAG 실행을 가져온 것
- 첫 번째 schedule_interval의 시작 시간인 "2024-06-05 00:00:00"이 start_date
execution_date (Logical Date)
- DAG의 Time Window의 시작 지점
- execution_date는 Airflow 2.2부터 사용되지 않고, 대신에 logical_date라는 용어가 사용
- Time Window : DAG 처리를 진행하는 시간 범위
- 이미지 이해
- Time Window : "2024-06-05 00:00:00" ~ "2024-06-06 00:00:00"
- 해당 시간의 시작 지점은 "2024-06-05 00:00:00"
data_interval_start, data_interval_end
- execution_date에서 언급했던 Time Window와 관계
- data_interval_start : Time Window의 시작 -> 2024-06-05 00:00:00
- data_interval_end : Time Window의 끝 -> 2024-06-06 00:00:00
catchup
- DAG가 처음 활성화된 시점이 start_date보다 미래일 경우 사이 기간의 DAG 실행 여부
- True : start_date ~ 현재까지 DAG를 실행
- False : 활성화된 시점부터 DAG를 실행
- 즉, 이전의 데이터를 가져오기 위해 catchup은 True로 설정돼야 함
실습
위에 설명했던 것을 바탕으로 DAG 하나를 생성해 Airflow에서 직접 실행시켜 보자.
코드
- start_date, catchup, schedule_interval을 동일하게 작성
- 실행을 위한 간단한 PythonOperator (Task) 하나를 생성
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = 'practice_1',
start_date = datetime(2024,6,5),
catchup=True,
schedule = '@daily')
def print_hello():
print("hello!")
return "hello!"
print_hello = PythonOperator(
task_id = 'print_hello',
python_callable = print_hello,
dag = dag)
print_hello
실행 결과
- 실행 횟수는 총 4번
- 실행 시각: "2024-06-06 00:00:00", "2024-06-07 00:00:00", "2024-06-08 00:00:00", "2024-06-09 00:00:00"
시간 테이블
- 각 실행의 data_interval_start는 execution_date와 같음
- 각 실행의 data_interval_end는 DAG의 실행 시각과 같음
- 여기서 2024-06-09 이후에는 실행이 되지 않은 것을 확인할 수 있음
Reference
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] Executor 실행 준비 과정 (SchedulerJobRunner._execute()) (0) | 2024.10.10 |
---|---|
[Airflow] AWS Ubuntu EC2 Airflow 환경 구축 (feat. Dockerfile) (0) | 2024.08.16 |
[Airflow] Airflow 기타 기능 정리 (0) | 2024.06.05 |
[Airflow] Airflow 기초 지식 (0) | 2024.06.02 |
[Airflow] Airflow 개념과 ETL 작성시 주의할 점 (0) | 2024.05.24 |