Local Executor - Parallelism
Parallelism 이란?
Airflow에서 Parallelism은 실행되는 태스크의 병렬 처리 개수를 제어하는 개념이다. 이는 LocalExecutor 클래스의 self.parallelism 값에 따라 달라지며, 그 값에 따라 프로세스 생성 방식을 결정한다.
변수 정보
- Type : String
- Default : 32
- Env : AIRFLOW__CORE__PARALLELISM
self.parallelism == 0 (Unlimited Parallelism)
- 시스템의 리소스 한계 내에서 제한 없이 프로세스 생성
- 리소스가 부족할 경우 성능이 저하될 수 있음
self.parallelism > 0 (Limited Parallelism)
- 설정된 parallelism 값까지만 Process 생성
self.parallelism == 1
- Sequential Executor와 동일하게 하나의 프로세스만을 생성해 순차적으로 작업 실행
- 성능을 최적화하려면 Sequential Executor를 사용하는 것이 적절
Parallelism - BaseExecutor & LocalExecutor
BaseExecutor의 parallelism
- parallelism 값이 가장 처음 등장하는 곳은 BaseExecutor의 PARALLELISM
- parallelism 값은 Airflow 설정 파일인 airflow.cfg의 [core] 섹션에서 정의
- 환경 변수 [core]의 "PARALLELISM"을 가져오며, BaseExecutor의 self.parallelism 변수에 할당
# airflow/executors/base_executor.py
PARALLELISM: int = conf.getint("core", "PARALLELISM")
class BaseExecutor(LoggingMixin):
def __init__(self, parallelism: int = PARALLELISM):
super().__init__()
self.parallelism: int = parallelism
LocalExecutor의 parallelism
- LocalExecutor도 BaseExecutor와 동일하게 환경 변수의 PARALLELISM 값을 가져옴
- super().__init__(..)을 통해 BaseExecutor 생성자를 호출하고, self.parallelism에 PARALLELISM 대입
# airflow/executors/local_executor.py
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
class LocalExecutor(BaseExecutor):
def __init__(self, parallelism: int = PARALLELISM):
super().__init__(parallelism=parallelism)
if self.parallelism < 0:
raise AirflowException("parallelism must be bigger than or equal to 0")
Parallelism 설정 방법
parallelism 설정 방법 (1) : airflow.cfg 수정
- BaseExecutor 코드에서 PARALLELISM 값은 [core]의 PARALLELISM 값을 사용
- 즉, airflow.cfg의 PARALLELISM 값을 원하는 값으로 수정하면 됨
# airflow.cfg
executor = LocalExecutor
parallelism = 32 # 동시에 실행할 수 있는 최대 태스크 수
parallelism 설정 방법 (2) : LocalExecutor 클래스 오버라이딩
- LocalExecutor에서 super().__init__(parallelism = parallelism)으로 부모 클래스의 생성자 호출
- 이때, parallelism 값을 직접 설정하여 오버라이딩 할 수 있음
# airflow/executors/local_executor.py
class LocalExecutor(BaseExecutor):
def __init__(self, parallelism: int = PARALLELISM):
super().__init__(parallelism=3)
if self.parallelism < 0:
raise AirflowException("parallelism must be bigger than or equal to 0")
주의할 점
- parallelism 값을 수정하면, 전역 설정으로 모든 DAG에 적용됨
- 만약 개별 DAG에 대해 병렬 실행 설정을 제어하려면, max_active_tasks 옵션 사용
Reference
https://github.com/apache/airflow/blob/main/airflow/executors/local_executor.py
https://github.com/apache/airflow/blob/main/airflow/executors/base_executor.py
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] Redshift COPY 시 잘못된 timestamp 값이 적재되는 문제 (1) | 2024.10.23 |
---|---|
[Airflow] DAG params를 PythonOperator 매개변수로 사용하기 (0) | 2024.10.15 |
[Airflow] Executor 실행 준비 과정 (SchedulerJobRunner._execute()) (0) | 2024.10.10 |
[Airflow] AWS Ubuntu EC2 Airflow 환경 구축 (feat. Dockerfile) (0) | 2024.08.16 |
[Airflow] DAG Scheduling과 Execution (2) | 2024.06.09 |