데이터 파이프라인(ETL) 소개
데이터 파이프라인(ETL) 이란?
- Data Pipeline = ETL = Data Workflow = DAG(Directed Acyclic Graph)
- ELT : Extract(데이터 다운로드), Transform(데이터 포맷 변경), Load(데이터 적재)
- DAG : Airflow에서 사용되는 용어 / 다수의 tack로 구성이 되고, 루프가 존재하지 않음
ETL vs ELT
- ETL
- 데이터를 데이터 웨어하우스 외부에서 내부로 가져오는 프로세스로 보통 데이터 엔지니어가 수행
- ELT
- 데이터 웨어하우스 내부 데이터로 새로운 데이터를 만드는 프로세스로 보통 데이터 분석가가 수행
- 데이터 레이크 위에서 작업을 수행하기도 함
- ELT 프로세스 전용 기술이 있으며 dbt(Data Build Tool)가 가장 유명 -> Analytics Engineering
Data Lake vs Data Warehouse
- 데이터 레이크 (Data Lake)
- 구조화 데이터 + 비구조화 데이터
- 보존 기한이 없는 모든 데이터를 원래 형태로 보존하는 스토리지에 가까움
- 보통은 데이터 웨어하우스보다 몇 배는 더 큰 스토리지
- 데이터 웨어하우스 (Data Warehouse)
- 보존 기한이 있는 구조화된 데이터를 저장하고 처리하는 스토리지
- 일반적으로 관계형 데이터베이스이므로 비구조화 데이터를 저장하기는 어려움
- 보통 BI 툴(룩커, 태블로, 수퍼셋 등)은 데이터 웨어하우스를 백엔드로 사용
Data Lake & ELT
데이터 시스템이 성숙해진 큰 회사들이 갖는 다이어그램이다. 여기서 Data Lake에 저장된 데이터를 사용해 Transform 후 저장하는 과정을 ELT라고 볼 수 있다.
- Data Sources를 Data Lake(S3)에 저장
- Data Lake에 저장된 데이터를 Transform(Spark, Athena 등)하여 Data Warehouse에 저장
- Data Mart : 특정 부서/용도를 갖는 Data Warehouse
Data Pipeline
데이터 파이프라인은 데이터를 소스로부터 목적지로 복사하는 작업으로 보통 코딩 (Python or Scala) 혹은 SQL을 통해 이뤄진다. 대부분의 경우 목적지는 데이터 웨어하우스이며, MySQL과 같은 프로덕션 데이터베이스 혹은 S3와 같은 데이터 레이크가 목적지가 될 수도 있다.
- 데이터 소스 예 : Click Stream, call data, ads performance data, sensor data, metadata, log files 등
- 데이터 목적지 예 : 데이터 웨어하우스, 캐시 시스템(Redis, Memcache), 프로덕션 DB, NoSQL, S3 등
데이터 파이프라인 종류 (1) - Raw Data ETL Jobs
해당 작업은 주로 데이터 엔지니어가 수행한다.
- 내부(회사 안)/외부(회사 밖) 데이터 소스에서 데이터를 읽음 (많은 경우 API를 통하게 됨)
- 적당한 데이터 포맷 변환 (데이터 크기가 커지면 Spark 등이 필요해짐)
- 데이터 웨어하우스 로드
데이터 파이프라인 종류 (2) - Summary / Report Jobs
요약 테이블의 경우 SQL(CTAS)만으로 만들고, 이는 데이터 분석가가 하는 작업이다. 데이터 엔지니어 관점에서는 어떻게 데이터 분석가들이 편하게 할 수 있는 환경을 고려해야 한다.
- 데이터 웨어하우스 혹은 데이터 레이스로부터 데이터를 읽어 다시 데이터 웨어하우스에 로드
- Raw Data를 읽어 일종의 Report 형태나 Summary 형태의 테이블을 만드는 용도
- 특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재
데이터 파이프라인 종류 (3) - Production Data Jobs
- 데이터 웨어하우스로부터 데이터를 읽어 다른 스토리지로 쓰는 ETL
- Summary 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
- 머신러닝 모델에서 필요한 피쳐들을 미리 계산해 두는 경우
- 이 경우 타겟 스토리지
- Cassandra/HBase/DynamoDB와 같은 NoSQL
- MySQL과 같은 관계형 데이터베이스 (OLTP)
- Redis/Memcache와 같은 캐시
- ElasticSearch와 같은 검색엔진
데이터 파이프라인을 만들 때 고려할 점
이상과 현실 간의 괴리
- 이상
- 내가 만든 데이터 파이프라인은 문제없이 동작할 것
- 내가 만든 데이터 파이프라인을 관리하는 것은 어렵지 않을 것
- 현실
- 실패 : 버그, 코드 상 이슈, 데이터 파이프라인 간의 의존도 이해 부족
- 데이터 파이프라인 수에 따른 유지보수 비용 증가
Best Practices (1) - Full Refresh
- 가능하면 데이터가 작을 경우 매번 통째로 복사해서 테이블 만들기 (Full Refresh)
- Incremental Update(부분적으로) 만이 가능하다면, 대상 데이터 소스가 갖춰야 할 조건이 있음
- 데이터 소스가 프로덕션 데이터베이스 테이블이라면, 다음 필드가 필요 : created, modified, deleted
- 데이터 소스가 API라면, 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드를 읽어올 수 있어야 함
Best Practices (2) - 멱등성
- 멱등성(Idempotency)을 보장하는 것이 중요
- 동일한 입력 데이터로 데이터 파이프라인을 여러 번 실행해도 최종 테이블의 결과가 동일해야 함
- 중요한 포인트는 Critical Point가 모두 one atomic action으로 실행돼야 한다는 점
Best Practices (3) - Backfill
- 실패한 데이터 파이프라인의 재실행이 쉬워야 함
- 과거 데이터를 다시 채우는 과정(Backfill)이 쉬워야 함
- Airflow는 이 부분(특히 Backfill)에 강점을 갖고 있음
Best Practices (4) - 입/출력 문서화
데이터 시스템이 고도화되고 데이터 파이프라인의 수가 생기면, 다양한 문제가 생길 수 있다. 이를 방지하기 위해 입/출력을 문서화해야 한다.
- 데이터 파이프라인의 입력과 출력을 명확히 하고 문서화
- 비즈니스 오너 명시 : 누가 이 데이터를 요청했는지 기록으로 남길 것
- 이것이 나중에 데이터 카탈로그로 들어가서 데이터 디스커버리에 사용 가능
- 데이터 리니지가 중요해지며, 이걸 이해하지 못하면 사고 발생
Best Practices (5) - 데이터 삭제
- 일정 기간마다 사용하지 않는 데이터들을 삭제
Best Practices (6) - 사고 리포트 작성
- 데이터 파이프라인 사고 시 사고 리포트(post-mortem) 작성
- 유사한 사고 발생 방지
- 사고 원인(root-cause)을 이해하고 방지하기 위한 액션 아이템들의 실행이 중요해짐
- 기술 부채의 정도를 이야기해 주는 척도
Best Practices (7) - 데이터 대상 유닛 테스트
- 중요 데이터 파이프라인의 입/출력을 체크
- 입력 레코드 수와 출력 레코드 수가 몇 개인지 체크하는 것부터 시작
- Primary Key가 존재한다면, PK uniqueness를 보장하는지 체크
- 중복 레코드 체크
- ...
간단한 ETL 작성해 보기 with Colab
- AWS Redshift 연결
%sql postgresql://ID:PW@learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev
import psycopg2
# Redshift connection 함수
# 본인 ID/PW 사용!
def get_Redshift_connection():
host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
redshift_user = "ID"
redshift_pass = "PW"
port = 5439
dbname = "dev"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True)
return conn.cursor()
- Schema 생성
%%sql
DROP TABLE IF EXISTS ss721229.name_gender;
CREATE TABLE ss721229.name_gender (
name varchar(32) primary key,
gender varchar(8)
);
- Extract
import requests
def extract(url):
f = requests.get(url)
return (f.text)
- Transform
def transform(text):
lines = text.strip().split("\n")
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
return records
- Load
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO ss721229.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
- 실행
link = "https://s3-~"
data = extract(link)
lines = transform(data)
load(lines)
%%sql
SELECT gender, COUNT(1) count
FROM ss721229.name_gender
GROUP BY gender;
-- result
>> gender count
>> gender 1
>> F 65
>> Unisex 7
>> M 28
문제점
- 불필요한 헤더 : ['name', 'gender']이 포함
- 멱등성 X : 여러 번 실행하면 그만큼 레코드가 증가 -> DELETE FROM(Full Refresh)
- INSERT문을 실행하는 중에 에러가 발생할 경우 데이터 정합성 X -> Transaction
Airflow 소개
Airflow 소개
Airflow 소개 (1)
- Airflow : 파이썬으로 작성된 데이터 파이프라인(ETL) 프레임워크
- Airbnb에서 시작한 아파치 오픈소스 프로젝트
- 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임워크
- 데이터 파이프라인 스케줄링 지원
- 정해진 시간에 ETL 실행 혹은 한 ETL 실행이 끝나면 다음 ETL 실행
- 웹 UI 제공
Airflow 소개 (2)
- 데이터 파이프라인(ETL)을 쉽게 만들 수 있도록 해줌 (모듈 제공, Backfill 등)
- Airflow에서는 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름
- Airflow 버전 선택 방법 : 큰 회사에서 사용하는 버전을 확인, 버그가 발생하는 버전이 있을 수 있음
Airflow 구성
Airflow는 총 5개의 컴포넌트로 구성되어 있다.
- 웹 서버 (Web server) : 스케줄러와 DAG의 실행 상황을 시각화
- 스케줄러 (Scheduler) : DAG를 워커에게 배정하는 역할
- 워커 (Worker) : 실제로 DAG를 실행하는 역할
- 메타 데이터 데이터베이스 : DAG의 실행 결과는 별도 DB에 저장, 기본으로 SQLite 설치
- 큐 : 다수 서버 구성인 경우에 사용
Airflow 구조 - 서버 1 대
Worker의 수는 서버가 갖고 있는 CPU 수이다. 웹 서버는 Flask로 구성돼 있으며, Worker/Scheduler/Web server의 상황이나 결과가 저장되는 DB가 존재한다. CPU 수에 따라 동시에 진행할 수 있는 Task에 제약을 받는다. 이에 따라 스케일링을 위해 Worker를 별도의 서버에 세팅하고 Worker가 있는 서버를 증대시킨다.
Airflow 스케일링 방법
- Scale UP : 더 좋은 사양의 서버 사용
- Scale Out : 서버 추가, 서버 하나 (Worker, Scheduler, Webserver) / 다른 서버 (Worker)
Airflow 구조 - 서버 N 대
Airflow의 장단점
- 장점
- 데이터 파이프라인을 세밀하게 제어 가능
- 다양한 데이터 소스와 데이터 웨어하우스를 지원
- Backfill의 쉬움
- 단점
- 배우기 쉽지 않음
- 상대적으로 개발 환경을 구성하기 쉽지 않음
- 직접 운영이 쉽지 않으며, 클라우드 버전 사용이 선호
- (GCP : Cloud Composer, AWS : MWAA , Azure : Data Factory Managed Airflow)
DAG (Directed Acyclic Graph)
- Airflow에서 ETL을 부르는 명칭
- DAG는 하나 혹은 이상의 Task로 구성
- Task : 오퍼레이터(Operator)로 만들어짐, 이미 다양한 종류의 오퍼레이터 제공
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 43일 차] 데이터 파이프라인과 Airflow (3) (0) | 2024.05.22 |
---|---|
[TIL - 42일 차] 데이터 파이프라인과 Airflow (2) (0) | 2024.05.21 |
[TIL - 35일 차] 데이터 웨어하우스 관리와 고급 SQL과 BI 대시보드 (5) (0) | 2024.05.10 |
[TIL - 34일 차] 데이터 웨어하우스 관리와 고급 SQL과 BI 대시보드 (4) (0) | 2024.05.09 |
[TIL - 33일 차] 데이터 웨어하우스 관리와 고급 SQL과 BI 대시보드 (3) (0) | 2024.05.08 |