data_interval_start를 활용한 File System Partitioning적용
데이터 파일을 S3에 저장을 하는 로직을 구현하기 위해 파일 명을 "0000_{data_interval_start}.parquet"으로 설정하는 방법을 탐색하였다. data_interval_start와 같은 Airflow 내부 시간에 대해 알고 싶다면 아래의 링크에서 확인 가능하다.
Jinja Template으로 data_interval_start 값 가져오기
data_interval_start 값 가져오기
- data_interval_start 값을 작성해 주면, 해당 값을 가져올 수 있음
- 주의할 점 : 공식 문서에 따르면 pendulum.DateTime Format으로 가져옴, 그러나 str Type인 것을 인지
- data_interval_start 예시 : 2023-08-10 00:00:01+02:20
{{ data_interval_start }} # 2023-08-10 00:00:01+02:20
data_interval_start 특정 값만 가져오기
- 여기도 위와 같이 str Type으로 가져옴
# data_interval_start에서 연월일을 '%Y%m%d' 형태로 가져오기
{{ macros.datetime.strftime(date_interval_start, '%Y%m%d') }} # 20230810
# data_interval_start에서 연월일, 시간을 '%Y%m%d:%H%M%s' 형태로 가져오기
{{ macros.datetime.strftime(date_interval_start + macros.timedelta(hours=8), '%Y%m%d:%H%M%s') }} # 20230810:000001
data_interval_start를 활용한 File System Partitioning 적용
Airflow의 PythonOperator에서 data_interval_start를 활용해 File System Partitioning을 적용하여 S3에 적재하는 코드를 간략히 작성하였다. 전체 코드는 아니기 때문에 data_interval_start를 사용하는 방법을 숙지하는 방향으로 이해하면 좋을 것 같다.
Task : load_current_seoul_data_to_s3
- PythonOperator를 활용해 op_kwargs (매개변수)에 data_interval_start 넘기기
load_current_seoul_data_to_s3 = PythonOperator(
task_id="load_current_seoul_data_to_s3",
python_callable=load_seoul_api_data,
op_kwargs = {
"transformed_data": '{{ task_instance.xcom_pull(task_ids="transform_population_data")[0] }}',
"file_name": '{{ params["current_data_file_name"] }}',
"data_interval_start": '{{ data_interval_start }}'
},
dag=dag
)
module : set_file_name_for_file_system_partitioning
- 위의 Task의 python_callable인 load_seoul_api_data 내에서 사용하는 모듈
- data_interval_start는 " 2023-08-10 00:00:01+02:20" 형태의 문자열
- "2023-08-10 00:00:01"만 파싱하여 pendulum 형태로 변경
- year, month, day를 추출하고 s3_key 작성 및 반환
def set_file_name_for_file_system_partitioning(file_name, data_interval_start):
try:
data_interval_start = pendulum.from_format(data_interval_start[:-6], 'YYYY-MM-DD HH:mm:ss')
year = data_interval_start.year
month = data_interval_start.month
day = data_interval_start.day
formatted_file_name = f"{file_name}_{data_interval_start}.parquet"
s3_key = f"seoul-city-data/year={year}/month={month}/day={day}/{formatted_file_name}"
logging.info(f"Generated S3 key: {s3_key}")
return s3_key
except Exception as e:
logging.error(f"file : 'seoul_func_and_var.py', function : 'set_file_name_for_file_system_partitioning'")
logging.error(f"Error in setting partitioning: {e}")
raise
S3 File System Partitioning 확인
- 다음과 같이 s3_key에 맞게 적절히 Partitioning이 된 것을 확인
Reference
https://pendulum.eustace.io/docs/#formatter
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] S3ToRedshiftOperator 사용 예시 (0) | 2024.11.07 |
---|---|
[Airflow] DAG Trigger 시 DAG Run이 동시에 두 개가 생성되는 문제 (0) | 2024.11.05 |
[Airflow] Airflow Task log 작성 (feat. logging) (3) | 2024.10.28 |
[Airflow] DAG를 선언하는 세 가지 방법 (0) | 2024.10.27 |
[Airflow] DAG와 Task의 동시성을 관리하기 위한 변수 (0) | 2024.10.25 |