Airflow 기초 지식
Airflow 소개
Airflow 개념
- 파이썬으로 작성된 데이터 파이프라인(ETL) 프레임워크
- 가장 많이 사용되는 데이터 파이프라인 관리 프레임워크
Airflow 장단점
- 장점
- 데이터 파이프라인을 세밀하게 제어 가능
- 다양한 데이터 소스와 데이터 웨어하우스를 지원
- Backfill (여러 이유로 과거의 ETL 실행이 안될 경우 과거의 날짜의 ETL을 실행)이 쉬움
- 단점
- 배우기 쉽지 않음
- 상대적으로 개발 환경을 구성하기 힘듦
- 직접 운영이 쉽지 않으며, 클라우드 버전 사용 선호
Airflow 관련 용어
- DAG(Directed Acyclic Graph)
- Airflow에서 ETL을 부르는 별칭
- DAG는 하나 이상의 Task로 구성
- Task
- 오퍼레이터(Operator)로 만들어짐
- Airflow 내에서 다양한 종류의 오퍼레이터 제공
- Operator
- 실제 작업 단위를 정의하는 기본 구성 요소
- 각 Opeartor는 특정 작업을 수행하며, DAG 내에서 Task로 인스턴스화
- BashOperator : Bash 명령어를 실행하는 Operator
- PythonOperator : Python 함수를 실행하는 Operator
- HttpOperator, MysqlOperator, DockerOperator ...
- Executor
- DAG의 각 작업(Task)을 어떤 방식으로 실행할지 결정
- airflow의 설정 파일인 airflow.cfg에서 설정 가능
- SequentialExecutor : 한 번에 하나의 작업만 실행, 주로 테스트나 소규모 워크플로우에 적합
- LocalExecutor : 로컬 시스템에서 다중 프로세스 사용을 통해 병렬로 여러 작업 실행
- CeleryExecutor, DaskExecutor, KubernetesExecutor ...
DAG & Task
- DAG : a -> (b, c) -> d 전체는 하나의 DAG
- Task : a, b, c, d는 각각의 Task
- 실행 순서 : a -> b, c 동시에 실행 -> d
Airflow 구성
- 웹 서버(Web server) : 스케줄러와 DAG의 실행 상황을 시각화
- 스케줄러 (Scheduler) : DAG를 워커에게 배정하는 역할
- 워커 (Worker) : 실제로 DAG를 실행하는 역할
- 메타 데이터 DB : DAG의 실행 결과가 저장되는 데이터 베이스, 기본적으로 SQLite
- 큐 : 다수 서버 구성인 경우 일정 규칙에 따라 워커를 선정
Airflow 기본 구조
Airflow 기본 구조
- DAG를 대표하는 객체 생성 : DAG 이름, 실행 주기, 실행 날짜, 오너 등
- DAG를 구성하는 Task 생성 : Task 별 Operator 선택, Task ID 부여 및 작업 세부 사항 지정
- Task가 여러 개일 경우 실행 순서 결정
예시 코드
- Import
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
- 실행될 함수 선언
# 첫 번째 Python 작업에서 실행될 함수
def task_one():
return 'Task one completed!'
# 두 번째 Python 작업에서 실행될 함수
def task_two():
return 'Task two completed!'
- DAG 정의
# 기본 인자 설정
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 6, 1),
'retries': 1,
}
# DAG 정의
dag = DAG(
'two_python_operators_example',
default_args=default_args,
tags=['example'],
catchup=False,
schedule_interval='@daily', # DAG가 매일 실행되도록 설정
)
- Task 정의
# 첫 번째 작업 정의
task1 = PythonOperator(
task_id='task_one',
python_callable=task_one, # 실행할 Python 함수 지정
dag=dag,
)
# 두 번째 작업 정의
task2 = PythonOperator(
task_id='task_two',
python_callable=task_two, # 실행할 Python 함수 지정
dag=dag,
)
- Task 간 순서 지정
# 작업 간의 순서 지정
task1 >> task2 # task1이 완료된 후 task2가 실행되도록 설정
Reference
https://airflow.apache.org/docs/apache-airflow/2.3.4/concepts/dags
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] Executor 실행 준비 과정 (SchedulerJobRunner._execute()) (0) | 2024.10.10 |
---|---|
[Airflow] AWS Ubuntu EC2 Airflow 환경 구축 (feat. Dockerfile) (0) | 2024.08.16 |
[Airflow] DAG Scheduling과 Execution (2) | 2024.06.09 |
[Airflow] Airflow 기타 기능 정리 (0) | 2024.06.05 |
[Airflow] Airflow 개념과 ETL 작성시 주의할 점 (0) | 2024.05.24 |