Airflow DAG 작성 - Hello World
Hello world 예제 프로그램 살펴보기
PythonOperator
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = 'HelloWorld',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *')
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
print_hello = PythonOperator(
task_id = 'print_hello',
#python_callable param points to the function you want to run
python_callable = print_hello,
#dag param points to the DAG that this task is a part of
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
#Assign the order of the tasks in our DAG
print_hello >> print_goodbye
Airflow Decorators
데코레이터를 사용하면 코드가 훨씬 단순해진다. 위의 코드에서는 PythonOperator를 따로 정의하고, python_callable을 해당 task로 지정해주어야 했다. 그러나 @task 데코레이터를 사용하면 함수 정의와 함께 task가 된다. task_id를 따로 지정하지 않으면, 함수 이름이 task_id가 된다.
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye()
중요한 DAG 파라미터 (not task parameters)
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
- max_active_runs : 동시에 실행될 수 있는 DAG 수 (backfill 할 때 중요)
- max_active_tasks : 동시에 실행될 수 있는 Task 수
- catchup : 과거 데이터 backfill 여부 (incremental update를 하는 DAG에서 중요)
- DAG parameter vs Task parameters의 차이점 이해가 중요
Name - Gender 예제 프로그램 포팅
예제 코드 (1) - 코드 (깃허브)
- S3에 존재하는 name_gender.csv 파일을 읽어 Redshift에 저장하는 ETL 코드
- Redshift 연결 정보, S3 주소 등 민감한 정보가 코드에 포함됨
예제 코드 (2) - 코드 (깃허브)
- 예제 코드 (1)에서 params 파라미터를 추가하여 S3 주소를 task에 매개변수 형태로 넘김
예제 코드 (3) - 코드 (깃허브)
- Variable을 이용해 CSV 파일 주소를 넘김
- Xcom을 사용해서 3개의 Task로 나누고, 이전 Task의 반환값을 전달받음
예제 코드 (4) - 코드 (깃허브)
- Redshift Connection 사용
예제 코드 (5) - 코드 (깃허브)
- Task Decorater 사용하여 Xcom을 사용할 필요가 없음
- 기본적으로 PythonOperator 대신 airflow.decorators.task 사용
Connections and Variables
- Connections
- Redshift와 연결할 때, 코드에 중요한 정보(ID, PW 등)가 포함돼 있고 직접 코드를 수정해야 함
- 환결 설정 형태로 코드 밖으로 빼내는 역할
- Variables
- S3의 csv 파일을 코드 내에서 링크로 접근하며, 직접 코드를 수정해야 함
- 환경 설정 형태로 코드 밖으로 빼내는 역할
Xcom
- Task(Operator) 간에 데이터를 주고받기 위한 방식
- 보통 하나의 Operator의 반환값을 다른 Operator에게 읽어가는 형태
- 반환값과 전달받은 파라미터는 Airflow 메타 데이터 DB에 저장
- 큰 데이터를 주고받기는 힘듦
- 보통 큰 데이터는 S3 등에 로드하고 그 위치를 넘기는 것이 일반적
Yahoo Finance API DAG 작성 (1)
구현 DAG의 세부 사항 - Full Refresh로 구현
- Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)
- Redshift 상의 테이블로 레코드 적재
Extract / Transform - Yahoo Finance API 호출
- Yahoo Finance API를 호출하여 애플 주식 정보 수집 및 파싱
Load - Redshift 테이블 업데이트
- Full Refresh로 구현
- 트랜젝션 형태로 구성
Yahoo Finance API DAG 작성 (2)
구현 DAG의 세부사항 - Incremental Update로 구현
Extract/Transform은 (1)과 동일하고, Load만 달라진다. Extract와 Transform은 동일하지만, 레코드 적재 및 중복 제거 후 테이블에 Load하는 형태로 바뀐다.
Load - Redshift 테이블 업데이트
- Incremental Update로 구현
- 임시 테이블을 생성하여 현재 테이블의 레코드 복사(CREATE TEMP TABLE ... AS SELECT)
- 임시 테이블로 Yahoo Finance API로 읽어온 레코드 적재
- 원본 테이블을 삭제하고 새로 생성
- 원본 테이블에 임시 테이블의 내용을 복사 (SELECT DISTINCT *)
- 트랜잭션 형태로 구성
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 45일 차] 데이터 파이프라인과 Airflow (5) (0) | 2024.05.24 |
---|---|
[TIL - 44일 차] 데이터 파이프라인과 Airflow (4) (0) | 2024.05.23 |
[TIL - 42일 차] 데이터 파이프라인과 Airflow (2) (0) | 2024.05.21 |
[TIL - 41일 차] 데이터 파이프라인과 Airflow (1) (0) | 2024.05.20 |
[TIL - 35일 차] 데이터 웨어하우스 관리와 고급 SQL과 BI 대시보드 (5) (0) | 2024.05.10 |