Data Engineering

Data Engineering/Airflow

[Airflow] DAG를 선언하는 세 가지 방법

DAG를 작성하는 세 가지 방법Airflow에서 실행시키기 위한 DAG를 작성하는 방법은 세 가지가 존재한다. 세 가지 방식의 차이점과 DAG 선언 방식에 따른 장점과 단점을 알아보려 한다. 공식 문서를 참고하여 작성하였다.Airflow DAGs 공식 문서표준 생성자 (constructor) 사용설명DAG 객체를 명시적으로 생성하고, 각 Operator (Task)를 DAG에 추가하는 방식DAG 생성자는 파라미터를 통해 dag_id, start_date, schedule, default_arg 등을 설정장단점장점명시적이기 때문에 코드 이해가 쉽고, DAG 설정을 한눈에 파악할 수 있음DAG에 여러 Operator를 추가하거나 Task 흐름을 정의하기 수월 DAG Factory를 구현할 경우, 반복적인 D..

Data Engineering/Airflow

[Airflow] DAG와 Task의 동시성을 관리하기 위한 변수

DAG와 Task의 동시성을 관리하기 위한 변수DAG와 Task의 병렬 처리를 관리하기 위해 사용하는 Airflow 변수는 다음과 같다.parallelism : Scheduler당 동시에 실행할 수 있는 최대 task Instance 수를 정의하며, Worker 수와 무관max_active_tasks_per_dag : 각 DAG에서 동시에 실행하도록 허용된 최대 Task Instance 수max_active_runs_per_dag : DAG당 활성 DAG 실행의 최대 수 이전에 parallelism에 대해서는 글을 작성한 적이 있기에 parallelism에 대해 자세히 알고 싶다면 아래의 링크에서 확인하면 좋을 것 같다. 여기서는 max_active_tasks_per_dag, max_active_runs..

Data Engineering/Airflow

[Airflow] Redshift COPY 시 잘못된 timestamp 값이 적재되는 문제

Redshift COPY 시 잘못된 timestamp 값이 적재되는 문제Airflow의 S3ToOperator를 사용해 S3 버킷에 저장된 .parquet 파일을 Redshift에 COPY를 진행하였는데, 다른 모든 값은 모두 올바르게 적재되었음에도 timestamp 형식을 가진 컬럼만 잘못된 값이 적재되었다. 이 문제를 해결하기 위한 과정을 적어보려 한다.데이터 처리 방식 (ETL)우선 이슈가 발생했을 때의 S3에 적재되는 .parquet의 ETL 과정을 나타내 본 뒤 본격적으로 해결 과정을 알아보자.데이터 추출 (Extract)서울 도시데이터 API 호출을 통해 Json 형태의 데이터 추출데이터 변형 (Transform)추출한 데이터 (Json)를 가져와 필요한 데이터를 리스트 형태로 변경여기서 리스..

Data Engineering/Airflow

[Airflow] DAG params를 PythonOperator 매개변수로 사용하기

DAG params를 PythonOperator 매개변수로 사용하기Airflow에서 DAG와 개별 Task를 유연하게 구성하기 위해 params와 op_kwargs를 활용하는 것은 매우 유용하다. 이를 활용하면, DAG 전역에서 사용하는 변수를 정의하여 개별 Task에서 재사용할 수 있다. 이 글에서는 DAG에서 params를 정의하고, 이를 PythonOperator의 매개변수로 사용하는 방법에 대해 소개하려고 한다.DAG : paramsDAG params란?DAG 실행 시 특정한 값을 전역적으로 정의하고, DAG 내의 Task에서 Jinja 템플릿을 사용해 값을 참조할 수 있다. 이를 통해 DAG 내 여러 Task에서 동일한 값을 재사용할 수 있다.DAG 예시 코드current_data_file_na..