DAG Trigger 시 DAG Run이 동시에 두 개가 생성되는 문제
새로운 DAG를 생성한 뒤 처음으로 Trigger 하거나, DAG의 실행을 중단한 뒤 나중에 다시 Trigger 하는 경우 DAG Run이 두 개가 동시에 생성되는 상황이 발생한다. catchup을 False로 설정해 Backfill이 없는데도 말이다. 여기서 Backfill은 DAG의 start_date부터 현재까지 중간에 없는 DAG Run을 생성하는 기능이다.
현재 작성 중인 DAG는 API로부터 데이터를 추출하고 S3에 parquet으로 저장한 뒤 Redshfit에 COPY 하는 ETL 과정을 담고 있는데, 동시에 실행될 경우 Redshift 동시 접근으로 인한 에러가 발생하는 경우가 존재한다. 또한 S3의 File Partitioning을 Logical Date을 기준으로 진행하여 파일 명만 다른 파일이 하나가 추가로 생긴다. 이러한 상황이 발생하는 이유와 발생한 문제를 해결하기 위한 방법을 알아보려고 한다.
- 문제 상황
- Start Date가 2024-11-03, 07:58:36에 같은 DAG가 다른 Logical Date로 실행된 것을 확인 가능
- 운 좋게 모두 성공적으로 실행됐지만, 항상 성공하지는 않기 때문에 해결이 필요한 부분
DAG Run이 동시에 두 개가 생성되는 이유
External Trigger를 사용한 경우
다음은 start_date를 2024-10-10으로 설정한 뒤 생성된 DAG를 Web UI에서 External Trigger 해 생성된 두 개의 DAG Run이다. 여기서 DAG Run이 두 개가 생성된 이유는 다음과 같다.
- Airflow Web에서의 External Trigger
- Logical Date가 서로 다른 두 개의 DAG Run 생성
- external_trigger = False : DAG의 start_date에 기반해 기본 스케줄링을 통한 DAG Run 생성
- external_trigger = True : External Trigger에 의한 DAG Run 생성
결론적으로 Airflow의 기본 Scheduling 동작과 External Trigger가 중첩되었기 때문에 발생한 문제이며, External Trigger을 처음 사용할 때 이러한 중복이 발생할 수 있다.
DAG가 중지된 후 다시 Trigger 되는 경우
이전에 정상적으로 작동하던 DAG가 여러 이유 (서버 중단, DAG Paused 등)로 다시 실행하는 경우이다. 여기서 DAG Run이 두 개가 생성된 이유는 다음과 같다. 이해를 돕기 위해 아래의 이미지를 예시로 이야기해보려고 한다.
- DAG 정보와 상황 설명
- 중지되기 전 DAG 정보
- schedule_interval : */20 * * * * (매 20분)
- DAG가 중지된 시각 : 2024-11-03, 13:00:00 보다 조금 지난 시각
- 즉, Logical Date = 2024-11-03, 12:40:00까지 실행이 진행된 상황
- 다음에 실행될 Logical Date는 2024-11-03, 13:00:00
- 재실행된 DAG 정보 (동시에 두 개의 DAG 실행)
- DAG가 실행된 시각 (Start Date) : 2024-11-03, 07:58:36 / 2024-11-03, 07:58:35 (동시 실행)
- DAG의 기준 시각 (Logical Date) : 2024-11-03, 13:00:00 / 2024-11-03, 07:20:00
- 즉, 이전에 중지된 시점에 대한 DAG와 현재 시각에 대한 DAG를 모두 실행
- 중지되기 전 DAG 정보
- Schedule이 존재하는 중지된 DAG를 다시 시작
- 중지된 시점의 DAG의 Last Run과 Next Run이 존재
- Logical Date가 서로 다른 두 개의 DAG Run 생성
- Logical Date = 2024-11-03, 13:00:00 : 중지된 시점의 Next Run을 기준으로 하나의 DAG Run 생성
- Logical Date = 2024-11-03, 07:20:00 : 현재 시점을 기준으로 가장 최근의 DAG Run을 생성
- 두 개의 DAG가 실행된 후 Next Run이 2024-11-03, 13:20:00으로 설정됨
동시 실행으로 인한 문제 해결
현재 AWS EC2를 통한 Airflow를 사용하고 있기에 비용을 절약하기 위해 10:00 ~ 22:00 (UTC+9)에만 서버를 켜두고 있다. 따라서 매일 DAG의 중단이 발생하기 때문에 이 문제를 해결해야 안정적인 DAG를 구성할 수 있을 것이다.
max_active_runs 설정 (X) : 무의미한 DAG 하나가 추가로 실행
- DAG가 동시에 활성화되는 DAG Run의 수를 제한해 생성된 두 개의 DAG Run을 하나씩 실행시키는 것
- 근본적인 해결책은 아니며 무의미한 DAG Run이 하나 실행되는 것
DAG Run 동시 실행 방지 로직 추가 (X) : 무의미한 DAG 하나가 추가로 실행
- 실행 중인 DAG Run이 있는 경우 새로운 DAG Run이 생성되지 않도록 로직 추가
- check_running_dag Task를 추가해 실행 중인 DAG가 존재한다면, 에러를 발생시킴
- 그러나 중지된 시각의 DAG Run이 남아 실행되므로 수집 데이터와 Logical Date 시각이 일치하지 않음
- (데이터는 실행 시각을 기준으로 수집하며, S3에는 Logical Date를 기준으로 Partitioning을 진행)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.db import provide_session
from airflow.models import DagRun
from datetime import datetime
# DAG 정의
dag = DAG(
...
)
@provide_session
def check_running_dag(dag_id, session=None):
# 주어진 DAG ID에 대해 실행 중인 DAG Run 확인
running_dags = session.query(DagRun).filter(
DagRun.dag_id == dag_id,
DagRun.state == 'running'
).all()
# 실행 중인 DAG Run이 있으면 에러 발생
if running_dags:
raise ValueError(f"Error: Running DAGs found for DAG ID: {dag_id}: {running_dags}")
else:
print(f"No running DAGs found for DAG ID: {dag_id}. Proceeding to next task.")
# 태스크 추가
check_running_dag_task = PythonOperator(
task_id='check_running_dag_task',
python_callable=check_running_dag,
op_kwargs={'dag_id': 'your_target_dag_id'}, # 확인할 DAG ID 입력
dag=dag
)
schedule_interval 설정 (O) : 가장 간단하면서 근본적인 해결 방법
- schedule_interval을 '*/20 1-12 * * *'로 설정해 매일 10시부터 21시 40분까지 20분마다 실행되도록 설정
- 21시 40분 이후의 첫 실행은 다음 날 10시
- 변경 후 동시에 두 개의 DAG Run이 생성되는 문제는 해결
dag = DAG(
dag_id = "etl_seoul_population_data",
default_args=default_args,
start_date=datetime(2024, 10, 20),
catchup = False,
schedule_interval = "*/20 1-12 * * *",
params= {
"current_data_file_name": "current_population",
"prediction_data_file_name": "prediction_population"
}
)
- 또 다른 문제 발생
- 21:40 (UTC+9)에 마지막 DAG 실행을 진행한 뒤 다음 날 첫 DAG의 Logical Date은 21:40 (UTC+9)
- schedule_interval에 따라 실행되지 않는 (서버가 꺼진) 시간이 interval이 됨
- 해결 방법
- 쉽게 해결할 수 있는 방법은 S3에 저장하는 파일의 이름을 data_interval_end로 변경
- 혹은 10시에 실행이 되는 데이터만 제외하여 수집
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] EC2 (Start/Stop) InstanceOperator 사용 예시 (0) | 2024.11.09 |
---|---|
[Airflow] S3ToRedshiftOperator 사용 예시 (0) | 2024.11.07 |
[Airflow] data_interval_start를 활용한 File System Partitioning 적용 (0) | 2024.10.31 |
[Airflow] Airflow Task log 작성 (feat. logging) (3) | 2024.10.28 |
[Airflow] DAG를 선언하는 세 가지 방법 (0) | 2024.10.27 |