SchedulerJobRunner의 _executor 메서드 작동 과정
실제로 Executor가 실행되는 부분은 airflow/jobs/scheduler_job_runner.py에 작성된 SchedulerJobRunner의 _execute 메서드이다. 해당 메서드는 DAG가 생성되거나 트리고 될 때 실행되며, 실행할 DAG가 있다면 airflow.cfg에 설정된 Executor로 실행을 준비하도록 하는 역할이다. _execute의 코드를 순차적으로 살펴보며, Executor가 어떻게 동작하는 것인지 확인해 볼 것이다.
- 참고 : _execute 함수의 코드는 생략되는 부분 없이 모두 적음
- _execute 메서드 전체 코드 (공식 github)
설정된 Executor 확인 및 직렬화, 비동기 모드 여부 결정
시작 log 작성
from airflow.dag_processing.manager import DagFileProcessorAgent
self.log.info("Starting the scheduler")
ExecutorLoader를 통한 현재 Executor 확인
- ExecutorLoader.import_default_executor_cls()
- airflow.cfg [core]의 executor를 가져옴 (ex: executor = LocalExecutor)
- 단, 여러 개의 executor가 사용되더라도 처음으로 인식된 executor가 default로 사용됨
- executor_class : 환경 변수에서 가져온 문자열과 일치하는 Executor 클래스를 가져옴
executor_class, _ = ExecutorLoader.import_default_executor_cls()
DAG pickling, async mode (비동기 모드) 결정
- 원격 실행을 위해 DAG를 pickling (직렬화) 해야 하는지 확인
- supports_pickling (default : True), do_pickle (defalut : False)
- using_sqlite
- airflow.cfg [database]의 sql_alchemy_conn이 sqlite로 시작하는지 확인 (True, False)
- SQLite일 경우 동시 쓰기 처리가 힘들기 때문에 비동기 모드 비활성화
# DAGs can be pickled for easier remote execution by some executors
pickle_dags = self.do_pickle and executor_class.supports_pickling
self.log.info("Processing each file at most %s times", self.num_times_parse_dags)
# When using sqlite, we do not use async_mode
# so the scheduler job and DAG parser don't access the DB at the same time.
async_mode = not self.using_sqlite
DagFileProcessorAgent 선언과 callback sink 방식 결정
timeout 시간 확인, DAG File Processor Agent 선언
- processor_timeout : airflow.cfg [core]의 dag_file_processor_timeout 값을 가져와 timedelta 형식으로 변환
- _standalone_dag_processor : airflow.cfg [scheduler]의 standalone_dag_processor를 가져옴
- processor_agent (default : None)
- DagFileProcessorAgent
- DAG 파일을 파싱 하는 역할
- standalone이 아니라면 지정된 DAG 디렉터리에서 처리하도록 초기화
processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)
if not self._standalone_dag_processor and not self.processor_agent:
self.processor_agent = DagFileProcessorAgent(
dag_directory=Path(self.subdir), # airflow.cfg [core]의 DAGS_FOLDER 값을 가져옴 (DAG 경로)
max_runs=self.num_times_parse_dags, # default : -1
processor_timeout=processor_timeout, # timeout 시간
dag_ids=[],
pickle_dags=pickle_dags, # 직렬화 여부
async_mode=async_mode, # 비동기 모드 여부
)
callback sink 설정
- callback sink : scheduler가 task의 상태 변경 또는 이벤트가 발생했을 때 기록, 처리하는 방법을 결정
- 프로세서 에이전트가 사용될 경우 callback_sink를 PipeCallbackSink로 사용
- PipeCallbackSink : 파이프(pipe)를 사용해 데이터를 프로세스 간에 전달하는 방식
- 프로세서 에이전트가 사용되지 않을 경우 callback_sink를 DatabaseCallbackSink로 사용
- DatabaseCallbackSink : callback 정보를 데이터베이스에 저장
try:
callback_sink: PipeCallbackSink | DatabaseCallbackSink
if self.processor_agent:
self.log.debug("Using PipeCallbackSink as callback sink.")
callback_sink = PipeCallbackSink(get_sink_pipe=self.processor_agent.get_callbacks_pipe)
else:
from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
self.log.debug("Using DatabaseCallbackSink as callback sink.")
callback_sink = DatabaseCallbackSink()
Executor 실행 준비 및 특정 상황에서의 정상 종료
Executor 실행 준비
- job.executors -> ExecutorLoader.init_executors()를 반환받아 사용
- airflow.cfg [core]의 executor를 가져옴 (ex : executor = LocalExecutor)
- cache 되지 않은 모든 executor에 대한 새 인스턴스 생성
- 각 executor에 대해 순차적으로 처리
- job_id : Column(Integer, primary_key=True), 현재 scheduler 작업의 고유 id 할당
- callback_sink : Processor Agent 사용 여부에 따른 callback_sink (Pipe or Database)
for executor in self.job.executors:
executor.job_id = self.job.id
executor.callback_sink = callback_sink
executor.start()
SchedulerJobRunner Class의 자식 프로세스 종료
- register_signals() : 특정 상황에서 각 executor가 신호를 받았을 때 적절하게 반응
- SIGINT : 사용자가 Ctrl + C를 누를 경우 안전하게 종료
- SIGTERM : 프로세스를 종료하기 위해 운영체제가 보내는 신호
- SIGUSR2 : 사용자 정의 신호
- 해당 신호가 발생했을 때, 안전하게 종료할 수 있도록 처리
self.register_signals()
Processor Agent 시작 및 DAG 실행 준비
Processor Agent 시작 및 실행 시간 기록
- processor_agent가 존재할 경우 start() 메서드 실행
- DagFileProcessorManager 프로세서를 실행하고 manager에서 DAG 구문 분석 루프를 시작
- Multiprocessing Context 생성, 새로운 프로세스 생성 및 해당 프로세스 시작
- scheduler의 loop 실행 (아래 Diagram 참고) : DAG 실행을 위한 구성을 만드는 역할
- Manager : 지속적으로 새로운 파일이 있는지 스캔하고, Processor에게 DAG 구분 분석 요청
- Processor : Manager가 보낸 파일 구문 분석 및 모듈 처리, DagBag 반환
- Process가 완료되면 구문 분석된 결과 수집
- 내부 코드가 길고 복잡하기 때문에 그림과 간단한 순서로만 설명
if self.processor_agent:
self.processor_agent.start()
execute_start_time = timezone.utcnow()
self._run_scheduler_loop()
Processor Agent 종료 및 처리되지 않은 DAG 비활성화
Processor Agent 종료 및 처리되지 않은 DAG 비활성화
- Processor Agent가 존재할 경우 Processor Agent 종료 (리소스 해제)
- all_files_processed (Default : True) : 처리하지 않은 DAG 체크 및 처리되지 않은 DAG 비활성화
- Session.remove() : 현재 세션을 제거하여 데이터베이스와 연결 종료, 리소스 해제
if self.processor_agent:
# Stop any processors
self.processor_agent.terminate()
# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s", execute_start_time.isoformat()
)
DAG.deactivate_stale_dags(execute_start_time)
settings.Session.remove() # type: ignore
Except 및 Finally 구문
예외 처리
- run_scheduler loop 중 에러가 발생할 경우의 예외 처리
except Exception:
self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop")
raise
Executor 및 Processor Agent 종료
- Finally 구문을 통해 비정상 종료가 발생할 때도 안전하게 종료 가능
- 모든 executor를 순회하며 종료
- Processor Agent가 존재할 경우 종료
finally:
for executor in self.job.executors:
try:
executor.end()
except Exception:
self.log.exception("Exception when executing Executor.end on %s", executor)
if self.processor_agent:
try:
self.processor_agent.end()
except Exception:
self.log.exception("Exception when executing DagFileProcessorAgent.end")
self.log.info("Exited execute loop")
return None
작동 과정 요약
- Executor 확인
- ExecutorLoader를 통해 airflow.cfg에 지정된 Executor 클래스 가져오기
- Async Mode, Callback Sink 설정
- Processor Agent가 있을 경우 callback_sink를 PipeCallbackSink로 사용
- Processor Agent가 없을 경우 call_back_sink를 DatabaseCallbackSink로 사용
- SQLite를 DB로 사용하는 경우 비동기 모드 비활성화
- Executor 실행 준비
- 각 Executor에 대해 현재 job ID와 Callback Sink 설정
- 각 Executor는 비동기적으로 작업 처리
- Signal Handling
- SIGINT, SIGTERM, SUGUSR2와 같이 프로세스가 비정상 종료되는 경우를 위한 처리
- Scheduler Loop 실행
- DAG의 상태를 확인하고 업데이트하는 run_scheduler_loop을 시작
- Processor Agent 종료
- 스케줄링이 완료되면 Processor Agent를 종료하고, 처리되지 않은 DAG 비활성화
- 예외 처리 및 자원 해제
- except : 예외 발생 시 안전하게 종료
- finally : 모든 Executor, Processor Agent를 종료하여 리소스 해제
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] DAG params를 PythonOperator 매개변수로 사용하기 (0) | 2024.10.15 |
---|---|
[Airflow] LocalExecutor Parallelism 개념 및 설정 방법 (0) | 2024.10.14 |
[Airflow] AWS Ubuntu EC2 Airflow 환경 구축 (feat. Dockerfile) (0) | 2024.08.16 |
[Airflow] DAG Scheduling과 Execution (2) | 2024.06.09 |
[Airflow] Airflow 기타 기능 정리 (0) | 2024.06.05 |