DAG와 Task의 동시성을 관리하기 위한 변수
DAG와 Task의 병렬 처리를 관리하기 위해 사용하는 Airflow 변수는 다음과 같다.
- parallelism : Scheduler당 동시에 실행할 수 있는 최대 task Instance 수를 정의하며, Worker 수와 무관
- max_active_tasks_per_dag : 각 DAG에서 동시에 실행하도록 허용된 최대 Task Instance 수
- max_active_runs_per_dag : DAG당 활성 DAG 실행의 최대 수
이전에 parallelism에 대해서는 글을 작성한 적이 있기에 parallelism에 대해 자세히 알고 싶다면 아래의 링크에서 확인하면 좋을 것 같다. 여기서는 max_active_tasks_per_dag, max_active_runs_per_dag에 대해 알아보도록 한다.
max_active_runs_per_dag
개념 및 특징
- DAG당 활성 DAG 실행의 최대 수
- 각 DAG는 max_activa_runs_per_dag 값만큼 동시 실행이 가능
- 각 DAG의 Run Instance가 과도하게 쌓이지 않도록 제한
- 리소스를 효율적으로 관리하거나 DAG 상태에 따라 데이터 무결성을 유지할 때 유용
- schedule interval보다 실행 시간이 길다면, 실행 간격이 지연될 수도 있음 (예시 참고)
변수 정보
- Type : String
- Default : 16
- Env : AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG
예시
- max_activate_runs_per_dag == 1
- 각 DAG Run Instance는 동시에 하나만 실행
- 즉, 현재 DAG 실행이 완료되기 전까지 같은 DAG의 실행은 대기 상태에 머무름
- schedule interval이 1h이고 실행 시간이 1h 이상 걸리는 상황
- max_active_runs_per_dag == 1 : 다음 Run Instance는 이전 Run Instance가 끝날 때까지 대기
- max_active_runs_per_dag > 1 : 다음 Run Instance는 이전 Run Instance와 동시 실행
max_active_tasks_per_dag
Airflow 2.2.0 이후부터 dag_concurrency가 max_active_tasks_per_dag로 변경되었으니 참고 바란다.
개념 및 특징
- 각 DAG에서 동시에 실행하도록 허용된 최대 Task Instance 수
- 큰 값으로 설정 시 DAG가 Pool에서 사용 가능한 Slot을 독점하여 다른 DAG의 Task가 대기 상태에 빠질 수 있음
- 다시 말하면, 새로운 DAG가 Slot을 차지하는 것을 막고자 하는 경우 사용 가능
변수 정보
- Type : String
- Default : 16
- Env : AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG
예시
- max_active_tasks_per_dag == 1
- 각 DAG의 Task는 동시에 하나만 실행
- 병렬 처리가 가능한 Task임에도 하나의 Task가 완료되기 전까지 다음 Task는 대기
변수 값 적용 방법
airflow.cfg 수정 (Global)
- airflow.cfg에서 환경 변수의 값을 직접 변경
- 모든 DAG에 Default로 적용됨
[core]
...
max_active_runs_per_dag = 16
max_active_tasks_per_dag = 16
DAG의 매개 변수 설정 (Local)
- max_active_tasks는 max_active_tasks_per_dag 값을 가져와 사용
- max_active_runs는 max_active_runs_per_dag 값을 가져와 사용
- 특정 DAG의 Task Instance 혹은 DAG Run Instance 수를 지역적으로 설정 가능
# 예시 DAG
from airflow import DAG
from datetime import datetime
default_args = {
'owner': 'airflow',
'retries': 1,
}
with DAG(
'example_dag',
default_args=default_args,
description='Example DAG with max_active_runs_per_dag',
schedule_interval='@hourly',
start_date=datetime(2023, 1, 1),
max_active_runs=1 # max_active_runs_per_dag
max_active_tasks=3 # max_active_tasks_per_dag
) as dag:
# DAG에 정의된 태스크들
pass
Reference
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] Airflow Task log 작성 (feat. logging) (3) | 2024.10.28 |
---|---|
[Airflow] DAG를 선언하는 세 가지 방법 (0) | 2024.10.27 |
[Airflow] Redshift COPY 시 잘못된 timestamp 값이 적재되는 문제 (1) | 2024.10.23 |
[Airflow] DAG params를 PythonOperator 매개변수로 사용하기 (0) | 2024.10.15 |
[Airflow] LocalExecutor Parallelism 개념 및 설정 방법 (0) | 2024.10.14 |