DAG를 작성하는 세 가지 방법Airflow에서 실행시키기 위한 DAG를 작성하는 방법은 세 가지가 존재한다. 세 가지 방식의 차이점과 DAG 선언 방식에 따른 장점과 단점을 알아보려 한다. 공식 문서를 참고하여 작성하였다.Airflow DAGs 공식 문서표준 생성자 (constructor) 사용설명DAG 객체를 명시적으로 생성하고, 각 Operator (Task)를 DAG에 추가하는 방식DAG 생성자는 파라미터를 통해 dag_id, start_date, schedule, default_arg 등을 설정장단점장점명시적이기 때문에 코드 이해가 쉽고, DAG 설정을 한눈에 파악할 수 있음DAG에 여러 Operator를 추가하거나 Task 흐름을 정의하기 수월 DAG Factory를 구현할 경우, 반복적인 D..
DAG와 Task의 동시성을 관리하기 위한 변수DAG와 Task의 병렬 처리를 관리하기 위해 사용하는 Airflow 변수는 다음과 같다.parallelism : Scheduler당 동시에 실행할 수 있는 최대 task Instance 수를 정의하며, Worker 수와 무관max_active_tasks_per_dag : 각 DAG에서 동시에 실행하도록 허용된 최대 Task Instance 수max_active_runs_per_dag : DAG당 활성 DAG 실행의 최대 수 이전에 parallelism에 대해서는 글을 작성한 적이 있기에 parallelism에 대해 자세히 알고 싶다면 아래의 링크에서 확인하면 좋을 것 같다. 여기서는 max_active_tasks_per_dag, max_active_runs..
Redshift COPY 시 잘못된 timestamp 값이 적재되는 문제Airflow의 S3ToOperator를 사용해 S3 버킷에 저장된 .parquet 파일을 Redshift에 COPY를 진행하였는데, 다른 모든 값은 모두 올바르게 적재되었음에도 timestamp 형식을 가진 컬럼만 잘못된 값이 적재되었다. 이 문제를 해결하기 위한 과정을 적어보려 한다.데이터 처리 방식 (ETL)우선 이슈가 발생했을 때의 S3에 적재되는 .parquet의 ETL 과정을 나타내 본 뒤 본격적으로 해결 과정을 알아보자.데이터 추출 (Extract)서울 도시데이터 API 호출을 통해 Json 형태의 데이터 추출데이터 변형 (Transform)추출한 데이터 (Json)를 가져와 필요한 데이터를 리스트 형태로 변경여기서 리스..
DAG params를 PythonOperator 매개변수로 사용하기Airflow에서 DAG와 개별 Task를 유연하게 구성하기 위해 params와 op_kwargs를 활용하는 것은 매우 유용하다. 이를 활용하면, DAG 전역에서 사용하는 변수를 정의하여 개별 Task에서 재사용할 수 있다. 이 글에서는 DAG에서 params를 정의하고, 이를 PythonOperator의 매개변수로 사용하는 방법에 대해 소개하려고 한다.DAG : paramsDAG params란?DAG 실행 시 특정한 값을 전역적으로 정의하고, DAG 내의 Task에서 Jinja 템플릿을 사용해 값을 참조할 수 있다. 이를 통해 DAG 내 여러 Task에서 동일한 값을 재사용할 수 있다.DAG 예시 코드current_data_file_na..