Airflow Backfill 이해하기
Open Weathermap DAG 구현하기
Open Weathermap API
- 위도 / 경도 기반으로 그 지역의 기후 정보를 알려주는 서비스
- 무료 계정으로 api key를 받아 호출 시 사용
생성할 DAG - 서울 7일간의 낮 / 최소 / 최대 온도 읽기
- API Key를 open_weather_key라는 variable로 저장
- 서울의 위도와 경도를 찾아 One-Call API를 사용
- 날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max) 추출
- API 호출 : https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={par t}&appid={API key}&units=metric
DAG 구현 (1)
- Open Weathermap의 one call API를 사용해 서울의 다음 7일간의 낮 / 최소 / 최대 온도를 읽어 저장
CREATE TABLE ss721229.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
DAG 구현 (2)
- One-Call API는 결과를 JSON 형태로 반환
- JSON에서 daily라는 필드에 앞으로 7일 간 날씨 정보 포함
- daily 필드는 리스트이며 각 레코드가 하나의 날짜에 해당
- 날짜 정보는 dt 필드에 있으며, epoch는 1970년 1월 1일 이후 밀리세컨드로 시간을 표시
- datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09
DAG 구현 (3)
- Airflow Connections를 이용해 만들어진 Redshift connection 사용
- autocommit = False로 사용(default)
- Full Refresh 방법 : (Full Refresh + INSERT INTO) or (Full Refresh + COPY (S3))
Primary Key Uniqueness 보장하기
일반적으로 데이터 웨어하우스에서는 Primary Key 값이 중복되는 것을 막아주는 역할을 하지 않는다. 그러나 Incremental Update를 진행하다 보면 중복되는 경우가 생길 수 있다. 데이터 엔지니어로서 데이터 웨어하우스를 관리하면서 PK Uniqueness를 보장할 수 있는 방법이 무엇일지 알아보자.
Primary Key Uniqueness
- 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드
- 하나의 필드가 일반적이지만, 다수의 필드를 사용할 수도 있음
- CREATE TABLE 사용 시 지정
- 관계형 데이터베이스 시스템이 PK가 중복되는 것을 막아줌
CREATE TABLE products (
product_id INT PRIMARY_KEY,
name VARCHAR(%),
price decimal(7, 2)
);
CREATE TABLE orders (
order_id INT,
product_id INT,
PRIMARY KEY (order_id, product_id),
FOREIGN KEY (product_id) REFERENCES products (product_id)
);
빅데이터 기반 데이터 웨어하우스가 PK Uniqueness를 지키지 않는 이유
- PK Uniqueness를 보장하는데 메모리와 시간이 들기 때문에 대용량 데이터 적재 시 걸림돌
- PK Uniqueness를 보장하는 것은 데이터 인력의 책임
CREATE TABLE ss721229.test (
date date primary key,
value bigint
);
INSERT INTO keeyong.test VALUES ('2023-05-10', 100);
INSERT INTO keeyong.test VALUES ('2023-05-10', 150); -- 성공
Primary Key 유지 방법
- 앞서 살펴본 weather_forecast 테이블
- 날씨 정보이기에 최근 정보가 더 신뢰할 수 있음
- 어느 정보가 더 최근 정보인지 created_date 필드에 기록하고 활용
- date가 같은 레코드가 있다면 created_date가 더 최근 정보를 선택
- 이에 적합한 SQL 문법이 ROW_NUMBER
CREATE TABLE ss721229.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
ROW_NUMBER
date 별로 created_date의 역순으로 일련번호를 매길 때, ROW_NUMBER를 사용할 수 있다. 아래 사진처럼 일련번호를 매긴 후 seq가 1인 레코드만 선택하면 된다.
ROW_NUMBER() OVER (partition by date order by created_date DESC) seq
진행 과정
- 임시 테이블(스테이징 테이블)을 만들고 현재 모든 레코드를 복사
CREATE TEMP TABLE t AS SELECT * FROM ss721229.weather_forecast;
- 임시 테이블에 데이터 소스에서 읽어 들인 레코드 복사 (중복 존재 가능)
- 원본 테이블 레코드 삭제
DELETE FROM ss721229.weather_forecast;
- 중복 제거가 완료된 테이블을 최종 원본 테이블로 복사
INSERT INTO ss721229.weather_forecast
SELECT date, temp, min_temp, max_temp, created_date
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;
UPSERT
- Primary Key를 기준으로 존재하는 레코드라면 새 정보로 수정
- 존재하지 않는 레코드라면 새 레코드로 적재
- 보통 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원
Backfill과 Airflow
Full Refresh를 사용하면, 문제가 생길 경우 다시 실행하면 되므로 가능하다면 이 방법을 사용해야 한다. 그러나 Incremental Update를 사용할 경우 효율성은 더 좋지만, 운영 / 유지보수 난이도가 올라간다. 실수로 데이터가 빠질 경우도 있고, 중간에 실패했을 경우 과거 데이터를 다 읽어와야 하는 경우 다시 모두 재실행해야 한다.
Backfill
- 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터의 문제로 다시 읽어와야 하는 경우 사용하는 프로세스
- Incremental Update에서의 Backfill은 어려움
- 실패한 데이터 파이프라인의 재실행이 용이한 구조가 잘 디자인된 것
Backfill과 관련된 Airflow 변수
- start_date : 데이터를 읽어와야 하는 데이터의 날짜
- execution_date : 시스템 변수로 읽어와야 하는 데이터의 날짜를 지정
- catchup : DAG가 처음 활성화된 시점이 start_date보다 미래일 경우 사이의 기간의 DAG 실행 여부
- end_date : 이 값은 보통 필요하지 않으며 Backfill을 날짜 범위에 대해 하는 경우에만 필요
- Daily DAG 예시
- start_date가 2020-11-07이라면, 11-07에 DAG가 실행되는 것이 아님
- start_date는 데이터의 날짜이므로 DAG는 11-08에 실행
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 46일 차] Docker & K8S 실습 (1) (0) | 2024.05.27 |
---|---|
[TIL - 45일 차] 데이터 파이프라인과 Airflow (5) (0) | 2024.05.24 |
[TIL - 43일 차] 데이터 파이프라인과 Airflow (3) (0) | 2024.05.22 |
[TIL - 42일 차] 데이터 파이프라인과 Airflow (2) (0) | 2024.05.21 |
[TIL - 41일 차] 데이터 파이프라인과 Airflow (1) (0) | 2024.05.20 |