Airflow 설치와 프로그래밍
SQL 트랜잭션 이해하기
- 중간에 실패하면 불완전 상황에 놓이는 작업이 있다면?
- 은행 이체 과정 : 인출은 성공했지만, 송금에서 문제가 생긴다면?
트랜잭션이란?
- Atomic하게 실행돼야 하는 SQL을 묶어 하나의 작업처럼 처리하는 방법
- BEGIN과 END 혹은 BEGIN과 COMMIT 사이에 해당 SQL들을 사용
- ROLLBACK은 BEGIN의 이전 상태로 돌아가라는 SQL 명령
- Transaction 구간의 SQL 결과는 임시 상태가 되며, 커밋 전에 다른 세션에서 볼 수 없음
- 트랜잭션의 SQL을 최소화하는 것이 좋으며, 위의 경우 auto commit을 사용하는 경우
트랜잭션 구현 방법 (1) - autocommit
- autocommit = True
- 기본적으로 모든 SQL statement가 바로 물리 테이블에 커밋
- 이를 바꾸고 싶다면 BEGIN;END; 혹은 BEGIN;COMMIT을 사용 (혹은 ROLLBACK)
- autocommit = False
- 기본적으로 모든 SQL statement가 커밋되지 않음, 즉 모두 staging 상태로 존재
- connection 객체의 .commit()과 .rollback() 함수로 커밋을 결정
트랜잭션 구현 방법 (2)
- autocommit의 True / False 결정 여부는 개인이나 팀의 선택
- Python에서 트랜잭션을 사용할 경우 try/catch를 사용하는 것이 일반적
- 에러 발생 시 rollback을 명시적으로 실행, 에러가 없다면 커밋
- raise를 호출하여 에러를 명확히 확인
try:
cur.execute(create_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
Airflow 설치
설치 방법
- 직접 설치 & 운영 : Docker 위에 설치, AWS EC2 등의 리눅스 서버에 설치
- 클라우드 사용 (프로덕션 환경에서 선호) : 기본으로 서버 3 대를 운영하기에 비용이 만만치 않음
- AWS : MWAA
- Google Cloud : Cloud Composer
- Microsoft Azure : Azure Data Factory의 Airflow DAGs
설치 과정
- Airflow의 메타 데이터베이스로 로컬 서버에 Postgres를 설치
- Airflow는 /var/lib/airflow/ 밑에 설치
- Airflow 서버에 총 3개의 어카운트 사용
- ubuntu : 메인 어카운트
- postgres : postgres 설치 시 만들어지는 계정, postgres 액세스를 위한 airflow 계정을 별도로 생성
- airflow : Airlofw 용 어카운트, Airflow 서비스는 이 계정으로 실행됨
설치 과정 (1) - EC2
- EC2 Ubuntu(22.04), Airflow(2.5.1), Python(3.12)
설치 과정 (2) - Docker
- Docker Engine 설치
- airflow-setup repo 클론 : git clone https://github.com/keeyong/airflow-setup.git
- airflow-setup 폴더로 이동 : cd airflow-setup
- 2.5.1 이미지 관련 yml 파일 다운로드 : curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
- 이미지 다운로드 : docker-compose -f docker-compose.yaml pull
- 컨테이너 실행 : docker-compose -f docker-compose.yaml up
- 웹 UI(http://localhost:8080) 로그인 (airflow:airflow)
Airflow 기본 프로그램 실행
Airflow 코드의 기본 구조
- DAG를 대표하는 객체를 먼저 만듦 : DAG 이름, 실행 주기, 실행 날짜, 오너 등
- DAG를 구성하는 Task를 만듦 : Task 별 오퍼레이터 선택, Task ID 부여 및 작업 세부 사항 지정
- Task 간의 실행 순서 결정
DAG 설정 예제 (1)
- 여기에 지정되는 인자는 모든 Task에 공통으로 적용되는 설정
- 뒤에서 DAG 객체를 만들 때 지정
from datetime import datetime, timedelta
default_args = {
'owner': 'keeyong',
'email': ['keeyonghan@hotmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
DAG 설정 예제 (2)
- start_date가 과거일 경우 과거 ~ 현재까지 실행되지 않은 것을 실행할 수 있음 -> "catchup"
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date=datetime(2020,8,7,hour=0,minute=00),
schedule="0 * * * *", # Min Hour Day Month Week
tags=["example"],
catchup=False,
# common settings
default_args=default_args
)
Bash Operator를 사용한 예제 (1)
- 3개의 Task로 구성
- t1은 현재 시간 출력
- t2는 5초 간 대기 후 종료
- t3은 서버의 /tmp 디렉터리의 내용 출력
- t1이 끝나고 t2와 t3을 병렬로 실행
터미널에서 DAG 실행
- Docker
- 현재 실행 중인 컨테이너 및 scheduler의 ID 확인 : docker ps
- airflow shell 접속 : docker exec -it scheduler_ID sh
- Airflow 서버에 로그인 후 다음 명령 실행 (단, 날짜 형식은 YYYY-MM-DD)
airflow dags list
airflot tasks list DAG_name
airflow tasks test DAG_name Task_name Date
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 44일 차] 데이터 파이프라인과 Airflow (4) (0) | 2024.05.23 |
---|---|
[TIL - 43일 차] 데이터 파이프라인과 Airflow (3) (0) | 2024.05.22 |
[TIL - 41일 차] 데이터 파이프라인과 Airflow (1) (0) | 2024.05.20 |
[TIL - 35일 차] 데이터 웨어하우스 관리와 고급 SQL과 BI 대시보드 (5) (0) | 2024.05.10 |
[TIL - 34일 차] 데이터 웨어하우스 관리와 고급 SQL과 BI 대시보드 (4) (0) | 2024.05.09 |