Data Engineering

Data Engineering/Airflow

[Airflow] S3ToRedshiftOperator 사용 예시

S3ToRedshiftOperator 사용 예시Airflow에서 S3에 존재하는 파일을 Redshift로 COPY 하는 방법은 크게 두 가지 방식이 있다. S3ToRedshiftOperator는 파라미터 정보만 적어주면 되므로 실제 쿼리 작업이 필요한 Hook 사용 방식보다 간편하게 사용할 수 있다. S3Hook과 PostgresHook을 활용해 S3와 Redshift를 각각 연결한 뒤 COPY Query 실행S3ToRedshiftOperator를 사용한 간단하게 COPYS3ToRedshiftOperator 사용 예시라이브러리 선언from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperatorS3ToRedshift..

Data Engineering/Airflow

[Airflow] DAG Trigger 시 DAG Run이 동시에 두 개가 생성되는 문제

DAG Trigger 시 DAG Run이 동시에 두 개가 생성되는 문제새로운 DAG를 생성한 뒤 처음으로 Trigger 하거나, DAG의 실행을 중단한 뒤 나중에 다시 Trigger 하는 경우 DAG Run이 두 개가 동시에 생성되는 상황이 발생한다. catchup을 False로 설정해 Backfill이 없는데도 말이다. 여기서 Backfill은 DAG의 start_date부터 현재까지 중간에 없는 DAG Run을 생성하는 기능이다.현재 작성 중인 DAG는 API로부터 데이터를 추출하고 S3에 parquet으로 저장한 뒤 Redshfit에 COPY 하는 ETL 과정을 담고 있는데, 동시에 실행될 경우 Redshift 동시 접근으로 인한 에러가 발생하는 경우가 존재한다. 또한 S3의 File Partiti..

Data Engineering/Airflow

[Airflow] data_interval_start를 활용한 File System Partitioning 적용

data_interval_start를 활용한 File System Partitioning적용데이터 파일을 S3에 저장을 하는 로직을 구현하기 위해 파일 명을 "0000_{data_interval_start}.parquet"으로 설정하는 방법을 탐색하였다. data_interval_start와 같은 Airflow 내부 시간에 대해 알고 싶다면 아래의 링크에서 확인 가능하다. [Airflow] DAG Scheduling과 ExecutionAirflow Scheduling과 Execution개요Airflow 관리Airflow를 사용하면서 반드시 알아야 할 것이 "작성한 DAG가 언제, 얼마나 실행되는가?"일 것이다. Airflow에서는 관련 개념을 숙지하지 않으면, 해당 DAG의 실행sanseo.tistory..

Data Engineering/Airflow

[Airflow] Airflow Task log 작성 (feat. logging)

Airflow Task log 작성 (feat. logging)Airflow는 로그를 작성하기 위해 Python의 logging 모듈을 사용한다. Airflow에서 logging 모듈을 어떻게 사용하고 있으며, 이를 활용해 Airflow Task의 log를 작성하는 방법을 알아볼 것이다.Python logging 공식 문서logging 모듈과 Airflowlogging 내부에 존재하는 Class는 총 4개 (Logger, Handler, Filter, Formatter)가 있다. 이에 대한 설명과 함께 Airflow에서는 해당 Class를 활용할 수 있는지 알아본다.import logginglogging의 내부 Classlogging.Logger (로거)애플리케이션 코드가 직접 상호 작용하는 인터페이스,..

기억에 남는 블로그 닉네임
'Data Engineering' 카테고리의 글 목록 (2 Page)