DBT (Data Build Tool)
DBT Seeds
DBT Seeds
- Dimension 테이블을 csv 파일 형태로 DW로 로드하는 방법
- Dimension 테이블은 크기가 작고 많이 변하지 않음
- dbt seed를 실행해서 빌드
DBT Seeds 실습
- learn_dbt/seeds 디렉터리에 reference_date.csv 파일 생성
- dbt seed 실행
dbt seed
DBT Sources
- Staging 테이블을 만들 때 입력 테이블이 자주 바뀌면, models의 .sql 파일을 일일이 바꿔야 함
- 이 번거로움을 Sources를 활용해 입력 테이블에 별칭을 주고 별칭을 staging 테이블에서 사용
Sources
- 처음 입력이 되는 ETL 테이블을 대상으로 함
- 별칭 제공
- 최신 레코드 체크 기능 제공
- 테이블 이름에 별명(alias)을 주는 것
- ETL 단의 소스 테이블이 바뀌어도 뒤에 영향을 주지 않음
- 추상화를 통한 변경처리를 용이하게 하는 것
- 이 별명은 source 이름과 새 테이블 이름의 두 가지로 구성
- 예시 : raw_data.user_metadata -> keeyong, metadata
- Source 테이블에 새 레코드가 있는지 체크해 주는 기능도 제공
Sources 실습
- models/sources.yml 파일 생성 : raw_data.user_metadata는 Jinja에서 source("keeyong", "metadata")로 지정
version: 2
sources:
- name: ss721229
schema: raw_data
tables:
- name: metadata
identifier: user_metadata
- name: event
identifier: user_event
- name: variant
identifier: user_variant
- models 밑의 다른 파일 (총 3개)도 적절하게 변경
WITH src_user_event AS (
SELECT * FROM {{ source("keeyong", "event") }}
)
SELECT
user_id,
datestamp,
item_id,
clicked,
purchased,
paidamount
FROM
src_user_event
- dbt run 실행 : '3 sources'를 확인할 수 있음
Sources 최신성 (Freshness)
- 특정 데이터가 소스와 비교해 얼마나 최신성이 떨어지는지 체크하는 기능
- dbt source freshness 명령으로 수행
- models/sources.yml의 해당 테이블 밑에 코드 추가
- loaded_at_field : freshness를 결정해 주는 필드
- freshness : 언제 warn이고, 언제 error인지 규정
version: 2
sources:
- name: ss721229
schema: raw_data
tables:
- name: event
identifier: user_event
loaded_at_field: datestamp
freshness:
warn_after: { count: 1, period: hour } # datestamp의 최댓값이 현재보다 1시간 이상 뒤쳐져 있으면 wraning
error_after: { count: 24, period: hour } # 24시간 이상이라면 error
...
- dbt source freshness 실행
DBT Snapshots
- Dimension 테이블은 성격에 따라 변경이 잦음
- DBT에는 테이블의 변화를 기록함으로써 어느 시점이건 돌아가서 테이블의 내용을 볼 수 있게 함
- 테이블에 문제가 있을 경우 롤백 가능
- 다양한 데이터 문제 디버깅이 쉬워짐
SCD Type 2와 DBT
- Dimension 테이블에서 특정 Entity에 대한 데이터가 변경되는 경우
- 새로운 Dimension 테이블 생성 : history/snapshot 테이블
Snapshot 적용
- learn_dbt/snapshots/scd_user_metadata.sql이라는 환경 설정 파일 생성
- snapshots을 하려면 PK가 존재해야 하며, 레코드 변경 시간(updated_at, modified_at 등)이 필요
- 변경 감지 기준 : Primary Key 기준으로 변경 시간이 현재 DW에 있는 시간보다 미래인 경우
- Snapshots 테이블의 타임스탬프 : dbt_scd_id, dbt_updated_at, valid_from, valid_to
{% snapshot scd_user_metadata %}
{{
config(
target_schema='ss721229',
unique_key='user_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True
)
}}
SELECT * FROM {{ source('ss721229', 'metadata') }}
{% endsnapshot %}
- dbt snapshot 실행
Snapshot 업데이트
- snapshot 테이블 확인
SELECT * FROM ss721229.scd_user_metadata ORDER BY user_id LIMIT 10;
- raw_data.user_metadata 업데이트 : user_id가 2인 사람의 age를 0-19에서 20-29로 변경
UPDATE raw_data.user_metadata
SET age = '20-29', updated_at = GETDATE()
WHERE user_id = 2;
- dbt snapshot 실행 후 snapshot 테이블 확인 : user_id가 2인 레코드가 두 개가 되면서 snapshot 데이터가 변경
DBT tests
Tests
- 데이터 품질을 테스트하는 방법
- 내장 일반 테스트 ("Generic")
- unique, not_null, accepted_values, relationships 등의 테스트 지원
- models 폴더에 생성
- 커스텀 테스트 ("Singular")
- 기본적으로 SELECT로 간단하며 결과가 리턴되면 실패로 간주
- tests 폴더에 생성
- 특정 테이블에 관련된 test만 실행하고 싶다면 아래의 코드 실행
dbt test --select dim_user_metadata
Generic Tests
- models/schema.yml 파일 생성
version: 2
models:
- name: dim_user_metadata
columns:
- name: user_id
tests:
- unique
- not_null
Singular Tests
- tests/dim_user_metadata.sql 파일 생성
SELECT
*
FROM (
SELECT
user_id, COUNT(1) cnt
FROM
{{ ref("dim_user_metadata") }}
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1
)
WHERE cnt > 1
DBT Documentation
DBT 문서화
- 기존 철학은 문서와 소스 코드를 최대한 가깝게 배치하는 것
- 문서화의 두 가지 방식
- 기존 .yml 파일에 문서화 추가 (선호되는 방식)
- 독립적인 markdown 파일 생성
- 문서화한 것을 웹 서버로 서빙
- overview.md가 기본 홈페이지
- 이미지 등의 asset 추가도 가능
models 문서화
- models/schema.yml or models/sources.yml에 description 키 추가
version: 2
models:
- name: dim_user_metadata
description: A dimension table with user metadata
columns:
- name: user_id
description: The primary key of the table
tests:
- unique
- not_null
- dbt docs generate 실행 : models 문서 만들기
- dbt docs serve 실행 : 경량 웹 서버 실행
DBT Expectation
DBT Expectation
- Spark와 pandas에서 데이터 품질 관리할 때 쓰이는 Great Expectations에서 영감을 받아 만든 dbt 확장판
- dbt core에서 제공하는 내장 테스트 함수보다 Expectation에서 제공하는 것이 더 좋음
- 설치 후 packages.yml에 등록
packages:
- package: calogica/dbt_expectations
version: [">=0.7.0", "<0.8.0"]
DBT Expectation 함수 중 일부
- expect_column_to_exist
- expect_row_values_to_have_recent_data
- expect_column_values_to_be_null
- expect_column_values_to_not_be_null
- expect_column_values_to_be_unique
- expect_column_values_to_be_of_type
- expect_column_values_to_be_in_set
- expect_column_values_to_not_be_in_set
- expect_column_values_to_be_between
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 61일 차] 하둡과 Spark (1) (0) | 2024.06.17 |
---|---|
[TIL - 55일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (5-2) (0) | 2024.06.07 |
[TIL - 54일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (4) (2) | 2024.06.06 |
[TIL - 53일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (3) (0) | 2024.06.05 |
[TIL - 52일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (2) (0) | 2024.06.04 |