@task 개념과 사용 예시, 주의할 점
Airflow는 워크플로우의 각 단계를 Task로 정의하며, Task를 추가하기 위해 Operator를 사용한다. Airflow 2.0부터 TaskFlow API가 도입되어 Task Decorator인 @task를 활용해 Task를 정의할 수 있게 되었다. 이번 글에서는 @task의 개념과 사용 방법, 예시, 유스케이스를 소개하려고 한다.
Python Decorator와 @task의 개념
Python Decorator 개념
Python의 Dacorator는 함수의 기능을 확장하거나 수정하기 위해 사용하는 디자인 패턴이다. 보통 함수 위에 "@이름" 형태로 선언되며, 함수 호출 전후에 특정 작업을 수행하거나 함수의 동작을 변경할 수 있다. Airflow의 @task도 이와 유사하게 함수를 Airflow의 Task로 등록하고 실행할 수 있도록 변환하는 역할을 한다.
# 간단한 데코레이터 예제
def sample_decorator(func):
def wrapper(*args, **kwargs):
print("데코레이터가 함수 호출 전에 실행됩니다.")
result = func(*args, **kwargs)
print("데코레이터가 함수 호출 후에 실행됩니다.")
return result
return wrapper
@sample_decorator
def my_function():
print("실제 함수 실행")
my_function()
""" result
데코레이터가 함수 호출 전에 실행됩니다.
실제 함수 실행
데코레이터가 함수 호출 후에 실행됩니다.
"""
@task 개념
- @task는 함수 위에 선언하며, 해당 함수는 DAG의 Task로 등록
- @task를 사용하면 Operator를 직접 호출하지 않고 Python 함수를 Task로 사용 가능
@task 특징
- 함수를 정의하듯 Task를 작성할 수 있기에 Pythonic 한 Workflow 구성 가능
- PythonOperator 같은 클래스를 사용하지 않고 Task를 등록할 수 있음
- 반환값은 자동으로 Airflow의 Xcom에 전달되기 때문에 Task 간 데이터 전달이 간단해짐
@task 사용 예시
사용 예시 (1) : 두 숫자 더하기
from airflow.decorators import task, dag
from datetime import datetime
@dag(schedule_interval=None, start_date=datetime(2024, 1, 1), catchup=False)
def simple_addition_dag():
@task
def add_numbers(a: int, b: int) -> int:
return a + b
@task
def print_result(result: int):
print(f"결과: {result}")
result = add_numbers(5, 3)
print_result(result)
simple_addition_dag_instance = simple_addition_dag()
사용 예시 (2) : 동적 Task 매핑
@task
def process_data(data: int) -> int:
return data ** 2
@task
def aggregate_results(results: list[int]) -> int:
return sum(results)
@dag(schedule_interval=None, start_date=datetime(2024, 1, 1), catchup=False)
def dynamic_task_mapping_dag():
data = [1, 2, 3, 4, 5]
results = process_data.expand(data=data)
total = aggregate_results(results)
print(f"총합: {total}")
dynamic_task_mapping_instance = dynamic_task_mapping_dag()
사용 예시 (3) : 다른 파일에 존재하는 @task를 현재 폴더 DAG에 등록
- 디렉터리 구조 : dag 파일과 task 파일이 같은 디렉터리에
airflow_home/
│
├── dags/
│ ├── example_dag.py # DAG 파일
│ ├── tasks.py # 태스크 정의 파일
│
└── plugins/
- tasks.py : @task를 사용한 함수가 정의된 파일
# tasks.py
from airflow.decorators import task
@task
def hello_world():
print("Hello, World!")
@task
def add_numbers(a: int, b: int) -> int:
return a + b
- example_dag.py : task 파일의 hello_world, add_numbers를 import 한 뒤 DAG에서 사용
# example_dag.py
from airflow.decorators import dag
from datetime import datetime
from tasks import hello_world, add_numbers # 다른 파일의 태스크 임포트
@dag(schedule_interval=None, start_date=datetime(2024, 1, 1), catchup=False)
def example_dag():
greet = hello_world()
result = add_numbers(5, 3)
greet >> result # 태스크 간 종속성 설정
dag_instance = example_dag()
유스케이스
데이터 파이프라인
- @task를 사용해 ETL 단계를 Python으로 간단히 정의 가능
@task
def fetch_data():
return {"key": "value"}
@task
def transform_data(data: dict):
return {k: v.upper() for k, v in data.items()}
@task
def save_data(data: dict):
print(f"데이터 저장: {data}")
머신러닝 워크플로우
- 데이터 준비, 모델 학습 결과 평가를 각 Task로 나눠 작성 가능
@task
def prepare_data():
return [1, 2, 3, 4, 5]
@task
def train_model(data):
model = sum(data) # 간단한 합산 모델
return model
@task
def evaluate_model(model):
print(f"모델 평가: {model}")
동적 작업 작성
- 대규모 데이터 셋이나 API 호출을 병렬로 처리할 때 유용
@task
def call_api(endpoint: str) -> dict:
print(f"API 호출: {endpoint}")
return {"response": f"응답 from {endpoint}"}
@task 사용 시 주의할 점
Xcom을 통한 데이터 관리
- @task로 정의된 함수의 반환값은 Xcom에 저장
- Task 간 데이터를 주고받기 위해 반환값과 매개변수를 잘 설계해야 함
Decorator와 Task 간소화의 한계
- @task는 Python 함수를 Task로 변환하는데 적합하지만, 복잡한 Task는 Operator를 사용하는 것이 적합
- @task와 Operator는 혼용해서 사용할 수 있기 때문에 적절히 사용
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] schedule_interval 개념과 사용법 (0) | 2024.11.10 |
---|---|
[Airflow] EC2 (Start/Stop) InstanceOperator 사용 예시 (0) | 2024.11.09 |
[Airflow] S3ToRedshiftOperator 사용 예시 (0) | 2024.11.07 |
[Airflow] DAG Trigger 시 DAG Run이 동시에 두 개가 생성되는 문제 (0) | 2024.11.05 |
[Airflow] data_interval_start를 활용한 File System Partitioning 적용 (0) | 2024.10.31 |