Airflow DAG 개발 - OLTP 복사와 ELT
OLTP(MySQL) 테이블 복사 방법
프로덕션 MySQL 테이블 (OLTP)의 prod.nps 테이블을 AWS Redshift (OLAP)의 raw_data.nps로 복사하는 실습을 진행한다.
복사 방법
둘 중 어느 방법을 사용하더라도 Airflow는 MySQL과의 연결이 필요하며, COPY를 사용할 경우 S3와도 연결돼야 한다.
- INSERT INTO
- MySQL의 소스 데이터를 읽어 하나씩 AWS Redshift로 "INSERT INTO"를 사용해 복사
- 소스 데이터의 레코드 수가 많다면 시간이 오래 걸림
- MySQL -> Airflow Server -> AWS Redshift
- Redshift COPY
- MySQL의 소스 데이터를 읽어 파일로 클라우드 스토리지 S3에 저장
- S3에 저장된 데이터를 COPY 명령어로 Redshift에 벌크 업데이트
- MySQL -> Airflow Server -> Amazon S3 -> AWS Redshift
AWS 관련 권한 설정
- Airflow DAG에서 S3 접근 (쓰기 권한)
- 루트 사용자의 키를 사용하면 해킹 시 AWS 자원들을 마음대로 사용 가능
- IAM User를 만들고 S3 버킷에 대한 읽기/쓰기 권한 설정 후 Access Key, Secret Key 사용
- Redshift S3 접근 (읽기 권한)
- Rdeshift에 S3 접근 역할 (Role)을 만들고 Redshift에 지정
MySQL 테이블 리뷰 (OLTP, Production Database)
- 이 테이블은 MySQL에 만들어져 있고, 이를 Redshift로 복사해야 함
CREATE TABLE prod.nps (
id INT NOT NULL AUTO_INCREMENT primary key,
created_at timestamp,
score smallint
);
Redshift (OLAP, Data Warehouse) 테이블 생성
- DAG를 사용하여 테이블을 생성하고 MySQL에서 Redshift로 복사해야 함
CREATE TABLE ss721229.nps (
id INT NOT NULL primary key,
created_at timestamp,
score smallint
);
OLTP(MySQL) 테이블 복사 실습
MySQL_to_Redshift DAG Task 구성 (Full Refresh)
Airflow에서 제공해 주는 두 개의 Operator를 사용한다.
schema = "ss721229"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table
- SqlToS3Operator : SQL문(SELECT)을 결과로 받아서 S3 특정 위치에 저장
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True, # 해당 위치에 파일이 있으면 덮어 씀
pd_kwargs={"index": False, "header": False},
dag = dag
)
- S3ToRedshiftOperator : S3의 파일을 Redshift에 복사(COPY)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE', # REPLACE : Full Refresh, UPSERT : Incremental Update
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
dag = dag
)
MySQL 테이블의 Incremental Update 방식
- MySQL 테이블은 다음을 만족해야 함
- created (timestamp) : Optional
- modified (timestamp)
- deleted (boolean) : 레코드를 삭제하지 않고 deleted를 True로 설정
- Daily Update 방법
- ROW_NUMBER
- UPSERT : S3ToRedshiftOperator의 method = "UPSERT", upsert_keys = ["id"]
- 과거 데이터를 수집할 경우 시스템 변수인 execution_date를 사용해서 query 작성 가능
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
Backfill 실행
Incremental DAG에서 Backfill 사용
- Airflow에서 추천하는 방식으로 Incremental Update를 구현했다면 Backfill이 쉬워짐
- 구현 방법에 따라 한 번에 하나씩 실행하는 것이 안전 -> max_active_runs
- 예를 들어 이번 예제에서는 같은 S3 경로를 사용하므로 동시에 실행되면 충돌
커맨드에서 Backfill 실행
- catchup이 True로 설정돼 있어야 함
- execution_date를 사용해서 Incremental Update가 구현돼 있어야 함
- start_date부터 시작하지만 end_date는 포함하지 않음, 즉 아래의 코드는 2018-07-31까지 데이터 수집
- 실행 순서는 날짜 / 시간 순이 아닌 무작위
- 날짜 순으로 변경할 경우 DAG default_args의 depends_on_past = True로 설정
airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 47일 차] Docker & K8S 실습 (2) (0) | 2024.05.28 |
---|---|
[TIL - 46일 차] Docker & K8S 실습 (1) (0) | 2024.05.27 |
[TIL - 44일 차] 데이터 파이프라인과 Airflow (4) (0) | 2024.05.23 |
[TIL - 43일 차] 데이터 파이프라인과 Airflow (3) (0) | 2024.05.22 |
[TIL - 42일 차] 데이터 파이프라인과 Airflow (2) (0) | 2024.05.21 |