Airflow Backfill 이해하기
Open Weathermap DAG 구현하기
Open Weathermap API
- 위도 / 경도 기반으로 그 지역의 기후 정보를 알려주는 서비스
- 무료 계정으로 api key를 받아 호출 시 사용
Pricing - OpenWeatherMap
Get weather data for any location on the globe immediately with our superb API! Just subscribe with your email and start using minute forecasts, hourly forecasts, history and other weather data in your applications. For more functionality, please consider
openweathermap.org
생성할 DAG - 서울 7일간의 낮 / 최소 / 최대 온도 읽기
- API Key를 open_weather_key라는 variable로 저장
- 서울의 위도와 경도를 찾아 One-Call API를 사용
- 날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max) 추출
- API 호출 : https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={par t}&appid={API key}&units=metric
DAG 구현 (1)
- Open Weathermap의 one call API를 사용해 서울의 다음 7일간의 낮 / 최소 / 최대 온도를 읽어 저장
CREATE TABLE ss721229.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
DAG 구현 (2)
- One-Call API는 결과를 JSON 형태로 반환
- JSON에서 daily라는 필드에 앞으로 7일 간 날씨 정보 포함
- daily 필드는 리스트이며 각 레코드가 하나의 날짜에 해당
- 날짜 정보는 dt 필드에 있으며, epoch는 1970년 1월 1일 이후 밀리세컨드로 시간을 표시
- datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09
DAG 구현 (3)
- Airflow Connections를 이용해 만들어진 Redshift connection 사용
- autocommit = False로 사용(default)
- Full Refresh 방법 : (Full Refresh + INSERT INTO) or (Full Refresh + COPY (S3))
Primary Key Uniqueness 보장하기
일반적으로 데이터 웨어하우스에서는 Primary Key 값이 중복되는 것을 막아주는 역할을 하지 않는다. 그러나 Incremental Update를 진행하다 보면 중복되는 경우가 생길 수 있다. 데이터 엔지니어로서 데이터 웨어하우스를 관리하면서 PK Uniqueness를 보장할 수 있는 방법이 무엇일지 알아보자.
Primary Key Uniqueness
- 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드
- 하나의 필드가 일반적이지만, 다수의 필드를 사용할 수도 있음
- CREATE TABLE 사용 시 지정
- 관계형 데이터베이스 시스템이 PK가 중복되는 것을 막아줌
CREATE TABLE products (
product_id INT PRIMARY_KEY,
name VARCHAR(%),
price decimal(7, 2)
);
CREATE TABLE orders (
order_id INT,
product_id INT,
PRIMARY KEY (order_id, product_id),
FOREIGN KEY (product_id) REFERENCES products (product_id)
);
빅데이터 기반 데이터 웨어하우스가 PK Uniqueness를 지키지 않는 이유
- PK Uniqueness를 보장하는데 메모리와 시간이 들기 때문에 대용량 데이터 적재 시 걸림돌
- PK Uniqueness를 보장하는 것은 데이터 인력의 책임
CREATE TABLE ss721229.test (
date date primary key,
value bigint
);
INSERT INTO keeyong.test VALUES ('2023-05-10', 100);
INSERT INTO keeyong.test VALUES ('2023-05-10', 150); -- 성공
Primary Key 유지 방법
- 앞서 살펴본 weather_forecast 테이블
- 날씨 정보이기에 최근 정보가 더 신뢰할 수 있음
- 어느 정보가 더 최근 정보인지 created_date 필드에 기록하고 활용
- date가 같은 레코드가 있다면 created_date가 더 최근 정보를 선택
- 이에 적합한 SQL 문법이 ROW_NUMBER
CREATE TABLE ss721229.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
ROW_NUMBER
date 별로 created_date의 역순으로 일련번호를 매길 때, ROW_NUMBER를 사용할 수 있다. 아래 사진처럼 일련번호를 매긴 후 seq가 1인 레코드만 선택하면 된다.
ROW_NUMBER() OVER (partition by date order by created_date DESC) seq
진행 과정
- 임시 테이블(스테이징 테이블)을 만들고 현재 모든 레코드를 복사
CREATE TEMP TABLE t AS SELECT * FROM ss721229.weather_forecast;
- 임시 테이블에 데이터 소스에서 읽어 들인 레코드 복사 (중복 존재 가능)
- 원본 테이블 레코드 삭제
DELETE FROM ss721229.weather_forecast;
- 중복 제거가 완료된 테이블을 최종 원본 테이블로 복사
INSERT INTO ss721229.weather_forecast
SELECT date, temp, min_temp, max_temp, created_date
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;
UPSERT
- Primary Key를 기준으로 존재하는 레코드라면 새 정보로 수정
- 존재하지 않는 레코드라면 새 레코드로 적재
- 보통 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원
Backfill과 Airflow
Full Refresh를 사용하면, 문제가 생길 경우 다시 실행하면 되므로 가능하다면 이 방법을 사용해야 한다. 그러나 Incremental Update를 사용할 경우 효율성은 더 좋지만, 운영 / 유지보수 난이도가 올라간다. 실수로 데이터가 빠질 경우도 있고, 중간에 실패했을 경우 과거 데이터를 다 읽어와야 하는 경우 다시 모두 재실행해야 한다.
Backfill
- 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터의 문제로 다시 읽어와야 하는 경우 사용하는 프로세스
- Incremental Update에서의 Backfill은 어려움
- 실패한 데이터 파이프라인의 재실행이 용이한 구조가 잘 디자인된 것
Backfill과 관련된 Airflow 변수
- start_date : 데이터를 읽어와야 하는 데이터의 날짜
- execution_date : 시스템 변수로 읽어와야 하는 데이터의 날짜를 지정
- catchup : DAG가 처음 활성화된 시점이 start_date보다 미래일 경우 사이의 기간의 DAG 실행 여부
- end_date : 이 값은 보통 필요하지 않으며 Backfill을 날짜 범위에 대해 하는 경우에만 필요
- Daily DAG 예시
- start_date가 2020-11-07이라면, 11-07에 DAG가 실행되는 것이 아님
- start_date는 데이터의 날짜이므로 DAG는 11-08에 실행
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 46일 차] Docker & K8S 실습 (1) (0) | 2024.05.27 |
---|---|
[TIL - 45일 차] 데이터 파이프라인과 Airflow (5) (0) | 2024.05.24 |
[TIL - 43일 차] 데이터 파이프라인과 Airflow (3) (0) | 2024.05.22 |
[TIL - 42일 차] 데이터 파이프라인과 Airflow (2) (0) | 2024.05.21 |
[TIL - 41일 차] 데이터 파이프라인과 Airflow (1) (0) | 2024.05.20 |