[Airflow] Airflow Task log 작성 (feat. logging)

2024. 10. 28. 14:33·Data Engineering/Airflow

Airflow Task log 작성 (feat. logging)

Airflow는 로그를 작성하기 위해 Python의 logging 모듈을 사용한다. Airflow에서 logging 모듈을 어떻게 사용하고 있으며, 이를 활용해 Airflow Task의 log를 작성하는 방법을 알아볼 것이다.

  • Python logging 공식 문서

logging 모듈과 Airflow

logging 내부에 존재하는 Class는 총 4개 (Logger, Handler, Filter, Formatter)가 있다. 이에 대한 설명과 함께 Airflow에서는 해당 Class를 활용할 수 있는지 알아본다.

import logging

logging의 내부 Class

  • logging.Logger (로거)
    • 애플리케이션 코드가 직접 상호 작용하는 인터페이스, 로그 메시지 생성
    • info, warning, error, critical, debug 총 5개의 level을 사용해서 작성 가능
    • Airflow Logger : root, flask_appbuilder, airflow.processor, airflow.task
  • logging.Handler (핸들러)
    • 로그 레코드를 목적지로 전송
    • Airflow Handler :  = RedirectStdHandler, FileProcessorHandler, FileTaskHandler
  • logging.Filter (필터)
    • 어떤 로그 메시지가 전송되는지 결정
    • Airflow Filter : SecretsMasker (민감한 정보 출력 방지)
  • logging.Formatter (포맷터)
    • 로그 메시지의 레이아웃 (형식) 결정
    • Airflow Formatter : airflow_colored, airflow

예시 코드

  • decorator를 사용해 task와 dag 선언
  • airflow.task Logger를 가져와 로그 실행
from pendulum import datetime, duration
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator

# import the logging module
import logging

# get the airflow.task logger
task_logger = logging.getLogger("airflow.task")

@task
def extract():
    # with default airflow logging settings, DEBUG logs are ignored
    task_logger.debug("This log is at the level of DEBUG")

    # each of these lines produces a log statement
    print("This log is created with a print statement")
    task_logger.info("This log is informational")
    task_logger.warning("This log is a warning")
    task_logger.error("This log shows an error!")
    task_logger.critical("This log shows a critical error!")

    data = {"a": 19, "b": 23, "c": 42}

    # Using the Task flow API to push to XCom by returning a value
    return data

# logs outside of tasks will not be processed
task_logger.warning("This log will not show up!")

# command to create a file and write the data from the extract task into it
# these commands use Jinja templating within {{}}
commands = """
    touch /usr/local/airflow/{{ds}}.txt
    echo {{ti.xcom_pull(task_ids='extract')}} > /usr/local/airflow/{{ds}}.txt
    """

@dag(
    start_date=datetime(2022, 6, 5),
    schedule="@daily",
    dagrun_timeout=duration(minutes=10),
    catchup=False,
)
def more_logs_dag():
    write_to_file = BashOperator(task_id="write_to_file", bash_command=commands)

    # logs outside of tasks will not be processed
    task_logger.warning("This log will not show up!")

    extract() >> write_to_file
more_logs_dag()

출력된 로그

  • task (extract)의 로그는 작성되지만, dag (more_logs_dag)의 로그는 작성되지 않음
[2022-06-06, 07:25:09 UTC] {logging_mixin.py:115} INFO - This log is created with a print statement
[2022-06-06, 07:25:09 UTC] {more_logs_dag.py:15} INFO - This log is informational
[2022-06-06, 07:25:09 UTC] {more_logs_dag.py:16} WARNING - This log is a warning
[2022-06-06, 07:25:09 UTC] {more_logs_dag.py:17} ERROR - This log shows an error!
[2022-06-06, 07:25:09 UTC] {more_logs_dag.py:18} CRITICAL - This log shows a critical error!

Airflow logging 관련 추가 지식

다양한 Format

  • AIRFLOW__LOGGING__DAG_PROCESSOR_LOG_FORMAT : DAG Processor의 로그
[%%(asctime)s] [SOURCE:DAG_PROCESSOR] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
  • AIRFLOW__LOGGING__LOG_FILENAME_TEMPLATE : Task Run의 로그 파일 경로
dag_id={ ti.dag_id }/run_id={ ti.run_id }/task_id={ ti.task_id }/{%% if ti.map_index >= 0 %%}map_index={ ti.map_index }/{%% endif %%}attempt={ try_number }.log
  • AIRFLOW__LOGGING__LOG_FORMAT : 로그 포맷
[%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s

[2022-06-06, 07:25:09 UTC] {more_logs_dag.py:15} INFO - This log is informational
  • 다른 포맷도 알고 싶다면, Logging Config 공식 문서 참고

사용자 지정 로그 구성

  • 추가 기능 없이 Airflow 로그를 사용하기에 충분하지만, Airflow 관리에 도움이 됨
    • 기존 로그 형식을 변경 : ex) logging 호출이 이루어진 전체 경로 출력
    • Handler (로그 저장소) 추가 : 심각한 오류를 별도의 파일에 기록
    • 원격 저장소에 로그 저장

debug 로그 출력

  • 일반적인 상황에서는 debug의 로그는 출력되지 않음
  • debug 로그를 확인하는 방법 (둘 중 한 가지만 설정)
    • airflow.cfg [logging]의 logging_level을 DEBUG로 설정
    • AIRFLOW__LOGGING__LOGGING_LEVEL을 DEBUG로 설정

원격 저장소에 로그 저장

  • Default 로그 저장소 : AIRFLOW__LOGGING__BASE_LOG_FOLDER, {AIRFLOW_HOME}/logs
    • 기본적으로 로컬 저장소의 logs 디렉터리에 저장
    • 로그 네이밍 : {dag_id}/{task_id}/{execution_date}/{try_number}.log
  • 원격 저장소에 로그 저장
    • remote_base_log_folder를 정의하면, FileTaskHandler에 오버라이딩되어 로그 작성 가능
    • 하나의 Logger에 여러 개의 Handler를 추가하려면, default_logging_config 변경
  • Amazon S3에 로그 작성 방법 : S3TaskHandler가 생성되며, FileTaskHandler에 오버라이딩
    1. apache-airflow-providers-amazon 패키지 설치 (requirements.txt에 추가)
    2. Airflow Web UI에서 원격 저장소로 사용할 S3의 Connections 추가
    3. 환경 변수 설정 (둘 중 한 가지만 설정)
      • airflow.cfg 파일을 아래 1번과 같이 구성
      • Dockerfile에서 각 환경 변수를 아래 2번과 같이 구성
# 1. airflow.cfg
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_logging = True
remote_base_log_folder = s3://my-bucket/path/to/logs
remote_log_conn_id = MyS3Conn
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False


# 2. Dockerfile
# allow remote logging and provide a connection ID (see step 2)
ENV AIRFLOW__LOGGING__REMOTE_LOGGING = True
ENV AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID = MyS3Conn

# specify the location of your remote logs using your bucket name
ENV AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER = s3://my-bucket/path/to/logs

# optional: serverside encryption for S3 logs
ENV AIRFLOW__LOGGING__ENCRYPT_S3_LOGS = False
  • 다른 저장소에 저장하는 방법도 알고 싶다면, Writing Logs 공식 문서 참고

Airflow Task 로그 그룹 (Airflow 2.9)

  • Airflow 웹 UI에 표시되는 Task 로그를 그룹화
  • 로그 그룹 사용 형태 : ::group:: 구문을 사용하여 그룹을 명시
t_log.info("::group::<log group name>")
t_log.info("<log in log group>")
t_log.info("::endgroup::")
  • 로그 그룹 예시 및 결과
from airflow.decorators import dag, task
import logging

t_log = logging.getLogger("airflow.task")

@task 
def log_groups():
    t_log.info("I'm a log that is always shown.")
    t_log.info("::group::My log group!")
    t_log.info("hi! I'm a hidden log! :)")
    t_log.info("::endgroup::")
    t_log.info("I'm not hidden either.")
log_groups()

Airflow Log Group

Reference

https://www.astronomer.io/docs/learn/logging

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#config-logging

https://airflow.apache.org/docs/apache-airflow/1.10.11/howto/write-logs.html

https://airflow.apache.org/docs/apache-airflow/1.10.11/howto/write-logs.html

'Data Engineering > Airflow' 카테고리의 다른 글

[Airflow] DAG Trigger 시 DAG Run이 동시에 두 개가 생성되는 문제  (0) 2024.11.05
[Airflow] data_interval_start를 활용한 File System Partitioning 적용  (0) 2024.10.31
[Airflow] DAG를 선언하는 세 가지 방법  (0) 2024.10.27
[Airflow] DAG와 Task의 동시성을 관리하기 위한 변수  (0) 2024.10.25
[Airflow] Redshift COPY 시 잘못된 timestamp 값이 적재되는 문제  (1) 2024.10.23
'Data Engineering/Airflow' 카테고리의 다른 글
  • [Airflow] DAG Trigger 시 DAG Run이 동시에 두 개가 생성되는 문제
  • [Airflow] data_interval_start를 활용한 File System Partitioning 적용
  • [Airflow] DAG를 선언하는 세 가지 방법
  • [Airflow] DAG와 Task의 동시성을 관리하기 위한 변수
기억에 남는 블로그 닉네임
기억에 남는 블로그 닉네임
  • 기억에 남는 블로그 닉네임
    얕게, 깊게
    기억에 남는 블로그 닉네임
  • 전체
    오늘
    어제
  • 블로그 메뉴

    • 홈
    • 방명록
    • 글쓰기
    • 분류 전체보기
      • Data Engineering
        • Airflow
        • 빅데이터
        • 자동화
        • 기타
      • Infra
        • AWS
        • Terraform
        • [인프라 구축기] Terraform 활용 AWS ..
      • CS
        • 자료구조
        • 알고리즘
        • 네트워크
        • 데이터베이스
        • 이것이 취업을 위한 코딩 테스트다 with 파이썬
      • Python
      • Web
      • Git
      • 기타
        • 취업 & 진로
        • 회고록
        • 기타
      • 프로젝트 단위 공부
        • [부스트코스] DataLit : 데이터 다루기
        • [개인 프로젝트] 공모전 크롤링
        • [개인 프로젝트] FC Online 공식 경기 분..
        • 프로젝트 개선 방안
      • [프로그래머스] 데이터 엔지니어링 데브코스 3기
        • TIL(Today I Learn)
        • 숙제
        • 기타
      • 알고리즘 연습
        • 프로그래머스
        • 백준
  • 링크

    • 깃허브
    • 링크드인
  • 인기 글

  • 최근 글

  • 최근 댓글

  • hELLO· Designed By정상우.v4.10.3
기억에 남는 블로그 닉네임
[Airflow] Airflow Task log 작성 (feat. logging)
상단으로

티스토리툴바