DAG params를 PythonOperator 매개변수로 사용하기
Airflow에서 DAG와 개별 Task를 유연하게 구성하기 위해 params와 op_kwargs를 활용하는 것은 매우 유용하다. 이를 활용하면, DAG 전역에서 사용하는 변수를 정의하여 개별 Task에서 재사용할 수 있다. 이 글에서는 DAG에서 params를 정의하고, 이를 PythonOperator의 매개변수로 사용하는 방법에 대해 소개하려고 한다.
DAG : params
DAG params란?
DAG 실행 시 특정한 값을 전역적으로 정의하고, DAG 내의 Task에서 Jinja 템플릿을 사용해 값을 참조할 수 있다. 이를 통해 DAG 내 여러 Task에서 동일한 값을 재사용할 수 있다.
DAG 예시 코드
current_data_file_name이라는 값을 DAG의 params로 정의하였다. 이 값은 DAG 내의 Task에서 사용이 가능하다.
dag = DAG(
dag_id = 'etl_seoul_population_data',
default_args=default_args,
start_date=datetime(2024, 10, 10),
catchup = False,
schedule_interval = '@once',
params= {
'current_data_file_name': 'population_test_current.parquet'
}
)
PythonOperator : op_kwargs
op_kwargs란?
op_kwargs는 Airflow의 PythonOperator에서 사용되는 인자로, Python 함수에 매개변수를 전달할 수 있다. 특히 DAG의 params에 저장된 값을 Jinja 템플릿을 사용하면 쉽게 매개변수로 전달할 수 있다.
Task 예시 코드
DAG의 params에 정의된 current_data_file_name을 PythonOperator의 op_kwargs로 전달하여, Python 함수의 매개변수로 사용하고 있다.
load_current_seoul_data_to_s3 = PythonOperator(
task_id='load_current_seoul_data_to_s3',
python_callable=load_seoul_api_data,
op_kwargs = {
'file_name': '{{ params["current_data_file_name"] }}'
},
dag=dag
)
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] DAG와 Task의 동시성을 관리하기 위한 변수 (0) | 2024.10.25 |
---|---|
[Airflow] Redshift COPY 시 잘못된 timestamp 값이 적재되는 문제 (1) | 2024.10.23 |
[Airflow] LocalExecutor Parallelism 개념 및 설정 방법 (0) | 2024.10.14 |
[Airflow] Executor 실행 준비 과정 (SchedulerJobRunner._execute()) (0) | 2024.10.10 |
[Airflow] AWS Ubuntu EC2 Airflow 환경 구축 (feat. Dockerfile) (0) | 2024.08.16 |