인프라 구축기인프라 구축기 (5)까지 작성된 아키텍처를 구성하기 위한 작업이 이루어졌다면, (6)부터는 구성된 서비스를 운영하기 위한 작업을 다룬다. 그중에서 가장 중요한 것이 로컬에서 Private Subnet에 위치하는 EC2의 Airflow Web Server에 접근할 수 있도록 하는 것이다. 이전에 구상 단계에서 ALB, 포트포워딩, Nginx를 활용하는 방법이 있었는데, 여기서는 Nginx를 활용해 구성하였다. 인프라 구축기 (5) - Private Subnet EC2에서 다른 Subnet의 인스턴스 접근 확인인프라 구축기인프라 구축기 (4)에서 구성된 인프라에서 Bastion Host 터널링을 통한 RDS, Redshift Serverless, Airflow (EC2) 접근을 확인할 수 있었다..
DAG params를 PythonOperator 매개변수로 사용하기Airflow에서 DAG와 개별 Task를 유연하게 구성하기 위해 params와 op_kwargs를 활용하는 것은 매우 유용하다. 이를 활용하면, DAG 전역에서 사용하는 변수를 정의하여 개별 Task에서 재사용할 수 있다. 이 글에서는 DAG에서 params를 정의하고, 이를 PythonOperator의 매개변수로 사용하는 방법에 대해 소개하려고 한다.DAG : paramsDAG params란?DAG 실행 시 특정한 값을 전역적으로 정의하고, DAG 내의 Task에서 Jinja 템플릿을 사용해 값을 참조할 수 있다. 이를 통해 DAG 내 여러 Task에서 동일한 값을 재사용할 수 있다.DAG 예시 코드current_data_file_na..
Local Executor - ParallelismParallelism 이란? Airflow에서 Parallelism은 실행되는 태스크의 병렬 처리 개수를 제어하는 개념이다. 이는 LocalExecutor 클래스의 self.parallelism 값에 따라 달라지며, 그 값에 따라 프로세스 생성 방식을 결정한다.변수 정보Type : StringDefault : 32Env : AIRFLOW__CORE__PARALLELISM self.parallelism == 0 (Unlimited Parallelism)시스템의 리소스 한계 내에서 제한 없이 프로세스 생성리소스가 부족할 경우 성능이 저하될 수 있음self.parallelism > 0 (Limited Parallelism)설정된 parallelism 값까지만 ..
SchedulerJobRunner의 _executor 메서드 작동 과정실제로 Executor가 실행되는 부분은 airflow/jobs/scheduler_job_runner.py에 작성된 SchedulerJobRunner의 _execute 메서드이다. 해당 메서드는 DAG가 생성되거나 트리고 될 때 실행되며, 실행할 DAG가 있다면 airflow.cfg에 설정된 Executor로 실행을 준비하도록 하는 역할이다. _execute의 코드를 순차적으로 살펴보며, Executor가 어떻게 동작하는 것인지 확인해 볼 것이다.참고 : _execute 함수의 코드는 생략되는 부분 없이 모두 적음_execute 메서드 전체 코드 (공식 github)설정된 Executor 확인 및 직렬화, 비동기 모드 여부 결정시작 l..