DAG Trigger 시 DAG Run이 동시에 두 개가 생성되는 문제새로운 DAG를 생성한 뒤 처음으로 Trigger 하거나, DAG의 실행을 중단한 뒤 나중에 다시 Trigger 하는 경우 DAG Run이 두 개가 동시에 생성되는 상황이 발생한다. catchup을 False로 설정해 Backfill이 없는데도 말이다. 여기서 Backfill은 DAG의 start_date부터 현재까지 중간에 없는 DAG Run을 생성하는 기능이다.현재 작성 중인 DAG는 API로부터 데이터를 추출하고 S3에 parquet으로 저장한 뒤 Redshfit에 COPY 하는 ETL 과정을 담고 있는데, 동시에 실행될 경우 Redshift 동시 접근으로 인한 에러가 발생하는 경우가 존재한다. 또한 S3의 File Partiti..
data_interval_start를 활용한 File System Partitioning적용데이터 파일을 S3에 저장을 하는 로직을 구현하기 위해 파일 명을 "0000_{data_interval_start}.parquet"으로 설정하는 방법을 탐색하였다. data_interval_start와 같은 Airflow 내부 시간에 대해 알고 싶다면 아래의 링크에서 확인 가능하다. [Airflow] DAG Scheduling과 ExecutionAirflow Scheduling과 Execution개요Airflow 관리Airflow를 사용하면서 반드시 알아야 할 것이 "작성한 DAG가 언제, 얼마나 실행되는가?"일 것이다. Airflow에서는 관련 개념을 숙지하지 않으면, 해당 DAG의 실행sanseo.tistory..
Airflow Task log 작성 (feat. logging)Airflow는 로그를 작성하기 위해 Python의 logging 모듈을 사용한다. Airflow에서 logging 모듈을 어떻게 사용하고 있으며, 이를 활용해 Airflow Task의 log를 작성하는 방법을 알아볼 것이다.Python logging 공식 문서logging 모듈과 Airflowlogging 내부에 존재하는 Class는 총 4개 (Logger, Handler, Filter, Formatter)가 있다. 이에 대한 설명과 함께 Airflow에서는 해당 Class를 활용할 수 있는지 알아본다.import logginglogging의 내부 Classlogging.Logger (로거)애플리케이션 코드가 직접 상호 작용하는 인터페이스,..
DAG를 작성하는 세 가지 방법Airflow에서 실행시키기 위한 DAG를 작성하는 방법은 세 가지가 존재한다. 세 가지 방식의 차이점과 DAG 선언 방식에 따른 장점과 단점을 알아보려 한다. 공식 문서를 참고하여 작성하였다.Airflow DAGs 공식 문서표준 생성자 (constructor) 사용설명DAG 객체를 명시적으로 생성하고, 각 Operator (Task)를 DAG에 추가하는 방식DAG 생성자는 파라미터를 통해 dag_id, start_date, schedule, default_arg 등을 설정장단점장점명시적이기 때문에 코드 이해가 쉽고, DAG 설정을 한눈에 파악할 수 있음DAG에 여러 Operator를 추가하거나 Task 흐름을 정의하기 수월 DAG Factory를 구현할 경우, 반복적인 D..