airflow

Infra/[인프라 구축기] Terraform 활용 AWS 인프라 구축

인프라 구축기 (6) - 로컬에서 Private EC2 Airflow Web Server 접속

인프라 구축기인프라 구축기 (5)까지 작성된 아키텍처를 구성하기 위한 작업이 이루어졌다면, (6)부터는 구성된 서비스를 운영하기 위한 작업을 다룬다. 그중에서 가장 중요한 것이 로컬에서 Private Subnet에 위치하는 EC2의 Airflow Web Server에 접근할 수 있도록 하는 것이다. 이전에 구상 단계에서 ALB, 포트포워딩, Nginx를 활용하는 방법이 있었는데, 여기서는 Nginx를 활용해 구성하였다. 인프라 구축기 (5) - Private Subnet EC2에서 다른 Subnet의 인스턴스 접근 확인인프라 구축기인프라 구축기 (4)에서 구성된 인프라에서 Bastion Host 터널링을 통한 RDS, Redshift Serverless, Airflow (EC2) 접근을 확인할 수 있었다..

Data Engineering/Airflow

[Airflow] DAG params를 PythonOperator 매개변수로 사용하기

DAG params를 PythonOperator 매개변수로 사용하기Airflow에서 DAG와 개별 Task를 유연하게 구성하기 위해 params와 op_kwargs를 활용하는 것은 매우 유용하다. 이를 활용하면, DAG 전역에서 사용하는 변수를 정의하여 개별 Task에서 재사용할 수 있다. 이 글에서는 DAG에서 params를 정의하고, 이를 PythonOperator의 매개변수로 사용하는 방법에 대해 소개하려고 한다.DAG : paramsDAG params란?DAG 실행 시 특정한 값을 전역적으로 정의하고, DAG 내의 Task에서 Jinja 템플릿을 사용해 값을 참조할 수 있다. 이를 통해 DAG 내 여러 Task에서 동일한 값을 재사용할 수 있다.DAG 예시 코드current_data_file_na..

Data Engineering/Airflow

[Airflow] LocalExecutor Parallelism 개념 및 설정 방법

Local Executor - ParallelismParallelism 이란? Airflow에서 Parallelism은 실행되는 태스크의 병렬 처리 개수를 제어하는 개념이다. 이는 LocalExecutor 클래스의 self.parallelism 값에 따라 달라지며, 그 값에 따라 프로세스 생성 방식을 결정한다.변수 정보Type : StringDefault : 32Env : AIRFLOW__CORE__PARALLELISM self.parallelism == 0 (Unlimited Parallelism)시스템의 리소스 한계 내에서 제한 없이 프로세스 생성리소스가 부족할 경우 성능이 저하될 수 있음self.parallelism > 0 (Limited Parallelism)설정된 parallelism 값까지만 ..

Data Engineering/Airflow

[Airflow] Executor 실행 준비 과정 (SchedulerJobRunner._execute())

SchedulerJobRunner의 _executor 메서드 작동 과정실제로 Executor가 실행되는 부분은 airflow/jobs/scheduler_job_runner.py에 작성된 SchedulerJobRunner의 _execute 메서드이다. 해당 메서드는 DAG가 생성되거나 트리고 될 때 실행되며, 실행할 DAG가 있다면 airflow.cfg에 설정된 Executor로 실행을 준비하도록 하는 역할이다. _execute의 코드를 순차적으로 살펴보며, Executor가 어떻게 동작하는 것인지 확인해 볼 것이다.참고 : _execute 함수의 코드는 생략되는 부분 없이 모두 적음_execute 메서드 전체 코드 (공식 github)설정된 Executor 확인 및 직렬화, 비동기 모드 여부 결정시작 l..

기억에 남는 블로그 닉네임
'airflow' 태그의 글 목록 (4 Page)