S3ToRedshiftOperator 사용 예시
Airflow에서 S3에 존재하는 파일을 Redshift로 COPY 하는 방법은 크게 두 가지 방식이 있다. S3ToRedshiftOperator는 파라미터 정보만 적어주면 되므로 실제 쿼리 작업이 필요한 Hook 사용 방식보다 간편하게 사용할 수 있다.
- S3Hook과 PostgresHook을 활용해 S3와 Redshift를 각각 연결한 뒤 COPY Query 실행
- S3ToRedshiftOperator를 사용한 간단하게 COPY
S3ToRedshiftOperator 사용 예시
라이브러리 선언
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
S3ToRedshiftOperator 사용
- 코드는 공식 문서 참고 : 내부적으로 S3Hook과 RedshiftDataHook/RedshiftSQLHook을 사용해 작성
load_data_to_redshift_from_s3 = S3ToRedshiftOperator(
task_id = "load_data_to_redshift_from_s3",
s3_bucket = 's3_bucket',
s3_key = '2020-04-11/data.parquet',
schema = "raw_data",
table = 'table',
copy_options = ["parquet"],
redshift_conn_id = "redshift_conn",
aws_conn_id = "s3_conn",
method = "UPSERT",
upsert_keys = ["col1", "col2", "col3"],
dag = dag
)
각 파라미터 설명
- task_id : task id
- s3_bucket : 가져올 파일의 S3 버킷 이름
- s3_key : 가져올 파일의 S3 파일 경로
- schema : 저장할 Redshift Schema
- table : 저장할 Redshift Table
- copy_options : COPY option에 들어갈 내용 (파일 형태, 시간 형태 등)
- redshift_conn_id : Airflow에 등록한 Redshift Connection id
- aws_conn_id : Airflow에 등록한 S3 Connection id (S3 권한이 있는 IAM 계정)
- method : COPY 방식 (APPEND, UPSERT, REPLACE)
- APPEND : 중복 제거 없이 현재 데이터를 테이블에 추가 (단순 COPY 쿼리)
- UPSERT : INCREMENTAL UPDATE 방식으로 중복 데이터가 없을 경우에 추가
- REPLACE : FULL REFRESH 방식으로 현재 데이터로 대체하는 것
- upsert_keys : UPSERT 방식일 경우 중복을 판단할 컬럼들
- verify : S3 연결에 SSL 인증이 필요할 경우 사용
Reference
https://seoyeonhwng.medium.com/aws-s3%EB%9E%80-%EB%AC%B4%EC%97%87%EC%9D%B8%EA%B0%80-b0da502b0504
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] schedule_interval 개념과 사용법 (0) | 2024.11.10 |
---|---|
[Airflow] EC2 (Start/Stop) InstanceOperator 사용 예시 (0) | 2024.11.09 |
[Airflow] DAG Trigger 시 DAG Run이 동시에 두 개가 생성되는 문제 (0) | 2024.11.05 |
[Airflow] data_interval_start를 활용한 File System Partitioning 적용 (0) | 2024.10.31 |
[Airflow] Airflow Task log 작성 (feat. logging) (3) | 2024.10.28 |