Airflow Task log 작성 (feat. logging)
Airflow는 로그를 작성하기 위해 Python의 logging 모듈을 사용한다. Airflow에서 logging 모듈을 어떻게 사용하고 있으며, 이를 활용해 Airflow Task의 log를 작성하는 방법을 알아볼 것이다.
logging 모듈과 Airflow
logging 내부에 존재하는 Class는 총 4개 (Logger, Handler, Filter, Formatter)가 있다. 이에 대한 설명과 함께 Airflow에서는 해당 Class를 활용할 수 있는지 알아본다.
import logging
logging의 내부 Class
- logging.Logger (로거)
- 애플리케이션 코드가 직접 상호 작용하는 인터페이스, 로그 메시지 생성
- info, warning, error, critical, debug 총 5개의 level을 사용해서 작성 가능
- Airflow Logger : root, flask_appbuilder, airflow.processor, airflow.task
- logging.Handler (핸들러)
- 로그 레코드를 목적지로 전송
- Airflow Handler : = RedirectStdHandler, FileProcessorHandler, FileTaskHandler
- logging.Filter (필터)
- 어떤 로그 메시지가 전송되는지 결정
- Airflow Filter : SecretsMasker (민감한 정보 출력 방지)
- logging.Formatter (포맷터)
- 로그 메시지의 레이아웃 (형식) 결정
- Airflow Formatter : airflow_colored, airflow
예시 코드
- decorator를 사용해 task와 dag 선언
- airflow.task Logger를 가져와 로그 실행
from pendulum import datetime, duration
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
# import the logging module
import logging
# get the airflow.task logger
task_logger = logging.getLogger("airflow.task")
@task
def extract():
# with default airflow logging settings, DEBUG logs are ignored
task_logger.debug("This log is at the level of DEBUG")
# each of these lines produces a log statement
print("This log is created with a print statement")
task_logger.info("This log is informational")
task_logger.warning("This log is a warning")
task_logger.error("This log shows an error!")
task_logger.critical("This log shows a critical error!")
data = {"a": 19, "b": 23, "c": 42}
# Using the Task flow API to push to XCom by returning a value
return data
# logs outside of tasks will not be processed
task_logger.warning("This log will not show up!")
# command to create a file and write the data from the extract task into it
# these commands use Jinja templating within {{}}
commands = """
touch /usr/local/airflow/{{ds}}.txt
echo {{ti.xcom_pull(task_ids='extract')}} > /usr/local/airflow/{{ds}}.txt
"""
@dag(
start_date=datetime(2022, 6, 5),
schedule="@daily",
dagrun_timeout=duration(minutes=10),
catchup=False,
)
def more_logs_dag():
write_to_file = BashOperator(task_id="write_to_file", bash_command=commands)
# logs outside of tasks will not be processed
task_logger.warning("This log will not show up!")
extract() >> write_to_file
more_logs_dag()
출력된 로그
- task (extract)의 로그는 작성되지만, dag (more_logs_dag)의 로그는 작성되지 않음
[2022-06-06, 07:25:09 UTC] {logging_mixin.py:115} INFO - This log is created with a print statement
[2022-06-06, 07:25:09 UTC] {more_logs_dag.py:15} INFO - This log is informational
[2022-06-06, 07:25:09 UTC] {more_logs_dag.py:16} WARNING - This log is a warning
[2022-06-06, 07:25:09 UTC] {more_logs_dag.py:17} ERROR - This log shows an error!
[2022-06-06, 07:25:09 UTC] {more_logs_dag.py:18} CRITICAL - This log shows a critical error!
Airflow logging 관련 추가 지식
다양한 Format
- AIRFLOW__LOGGING__DAG_PROCESSOR_LOG_FORMAT : DAG Processor의 로그
[%%(asctime)s] [SOURCE:DAG_PROCESSOR] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
- AIRFLOW__LOGGING__LOG_FILENAME_TEMPLATE : Task Run의 로그 파일 경로
dag_id={ ti.dag_id }/run_id={ ti.run_id }/task_id={ ti.task_id }/{%% if ti.map_index >= 0 %%}map_index={ ti.map_index }/{%% endif %%}attempt={ try_number }.log
- AIRFLOW__LOGGING__LOG_FORMAT : 로그 포맷
[%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
[2022-06-06, 07:25:09 UTC] {more_logs_dag.py:15} INFO - This log is informational
- 다른 포맷도 알고 싶다면, Logging Config 공식 문서 참고
사용자 지정 로그 구성
- 추가 기능 없이 Airflow 로그를 사용하기에 충분하지만, Airflow 관리에 도움이 됨
- 기존 로그 형식을 변경 : ex) logging 호출이 이루어진 전체 경로 출력
- Handler (로그 저장소) 추가 : 심각한 오류를 별도의 파일에 기록
- 원격 저장소에 로그 저장
debug 로그 출력
- 일반적인 상황에서는 debug의 로그는 출력되지 않음
- debug 로그를 확인하는 방법 (둘 중 한 가지만 설정)
- airflow.cfg [logging]의 logging_level을 DEBUG로 설정
- AIRFLOW__LOGGING__LOGGING_LEVEL을 DEBUG로 설정
원격 저장소에 로그 저장
- Default 로그 저장소 : AIRFLOW__LOGGING__BASE_LOG_FOLDER, {AIRFLOW_HOME}/logs
- 기본적으로 로컬 저장소의 logs 디렉터리에 저장
- 로그 네이밍 : {dag_id}/{task_id}/{execution_date}/{try_number}.log
- 원격 저장소에 로그 저장
- remote_base_log_folder를 정의하면, FileTaskHandler에 오버라이딩되어 로그 작성 가능
- 하나의 Logger에 여러 개의 Handler를 추가하려면, default_logging_config 변경
- Amazon S3에 로그 작성 방법 : S3TaskHandler가 생성되며, FileTaskHandler에 오버라이딩
- apache-airflow-providers-amazon 패키지 설치 (requirements.txt에 추가)
- Airflow Web UI에서 원격 저장소로 사용할 S3의 Connections 추가
- 환경 변수 설정 (둘 중 한 가지만 설정)
- airflow.cfg 파일을 아래 1번과 같이 구성
- Dockerfile에서 각 환경 변수를 아래 2번과 같이 구성
# 1. airflow.cfg
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_logging = True
remote_base_log_folder = s3://my-bucket/path/to/logs
remote_log_conn_id = MyS3Conn
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# 2. Dockerfile
# allow remote logging and provide a connection ID (see step 2)
ENV AIRFLOW__LOGGING__REMOTE_LOGGING = True
ENV AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID = MyS3Conn
# specify the location of your remote logs using your bucket name
ENV AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER = s3://my-bucket/path/to/logs
# optional: serverside encryption for S3 logs
ENV AIRFLOW__LOGGING__ENCRYPT_S3_LOGS = False
- 다른 저장소에 저장하는 방법도 알고 싶다면, Writing Logs 공식 문서 참고
Airflow Task 로그 그룹 (Airflow 2.9)
- Airflow 웹 UI에 표시되는 Task 로그를 그룹화
- 로그 그룹 사용 형태 : ::group:: 구문을 사용하여 그룹을 명시
t_log.info("::group::<log group name>")
t_log.info("<log in log group>")
t_log.info("::endgroup::")
- 로그 그룹 예시 및 결과
from airflow.decorators import dag, task
import logging
t_log = logging.getLogger("airflow.task")
@task
def log_groups():
t_log.info("I'm a log that is always shown.")
t_log.info("::group::My log group!")
t_log.info("hi! I'm a hidden log! :)")
t_log.info("::endgroup::")
t_log.info("I'm not hidden either.")
log_groups()
Reference
https://www.astronomer.io/docs/learn/logging
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#config-logging
https://airflow.apache.org/docs/apache-airflow/1.10.11/howto/write-logs.html
https://airflow.apache.org/docs/apache-airflow/1.10.11/howto/write-logs.html
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] DAG Trigger 시 DAG Run이 동시에 두 개가 생성되는 문제 (0) | 2024.11.05 |
---|---|
[Airflow] data_interval_start를 활용한 File System Partitioning 적용 (0) | 2024.10.31 |
[Airflow] DAG를 선언하는 세 가지 방법 (0) | 2024.10.27 |
[Airflow] DAG와 Task의 동시성을 관리하기 위한 변수 (0) | 2024.10.25 |
[Airflow] Redshift COPY 시 잘못된 timestamp 값이 적재되는 문제 (1) | 2024.10.23 |