ELT 작성과 슬랙 연동
Docker 기반 Airflow 실행
이전에 사용했던 airflow의 docker-compose.yaml의 x-airflow-common과 airflow-init을 수정하여 사용한다.
environment
- AIRFLOW_VAR_DATA_DIR
- AIRFLOW_VAR_ : 해당 문자 뒤에 나오는 문자를 이름으로 하는 환경 변수 생성
- 여기서의 환경 변수 이름은 DATA_DIR
- DAG에서 필요할 수 있는 임시 파일을 저장하기 위한 별도 디렉터리 설정
- _PIP_ADDITIONAL_REQUIREMENTS : 필요한 파이썬 모듈 설치
- ':-' : 만약 해당 파일이 존재하면 사용, 존재하지 않으면 오른쪽 값을 사용
environment:
...
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data # 추가
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- pymysql yfinance pandas numpy oauth2client gspread} # 수정
volumes
지정한 별도 디렉터리를 Host Volume 형태로 추가해 주었다.
volumes:
..
- ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data # 추가
airflow-init (Container)
Docker Container 상에 새롭게 추가한 디렉터리이므로 airflow-init Container의 command에 data 디렉터리를 생성하고, 디렉터리의 소유자를 변경해 주는 작업을 추가하였다.
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
...
mkdir -p /sources/logs /sources/dags /sources/plugins /sources/data # 수정
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins,data} # 수정
exec /entrypoint airflow version
적용한 환경 변수 확인
- yaml 파일 이름이 docker-compose.yaml일 경우 생략 가능
- Container를 실행한 뒤 exec 명령어로 scheduler에서 DATA_DIR 환경 변수를 읽어 옴
- airflow-init에 적용했으므로 어떤 Container에서 실행해도 같은 결과
docker-compose -f docker-compose.yaml up -d
docker exec -it fc9315590c78 airflow variables get DATA_DIR
>> /opt/airflow/data
주의할 점
yaml로 환경 변수를 지정하면, 웹 UI에서는 확인할 방법이 없다.
ELT 구현
SQL 구문을 직접 작성하게 되면, 사용할 때마다 작성해야 하며 비개발자의 경우 접근성의 어려움이 있다. 따라서 환경 설정을 통해 이러한 문제를 방지할 수 있다. NPS를 요약 정리하는 nps_summary 테이블을 생성하는 과정을 통해 알아보자.
NPS 계산 SQL 코드
SELECT LEFT(created_at, 10) AS date,
ROUND(
SUM(
CASE
WHEN score >= 9 THEN 1
WHEN score <= 6 THEN -1
END
)::float*100/COUNT(1), 2
) nps
FROM ss721229.nps
GROUP BY 1
ORDER BY 1;
config/nps_summary.py
딕셔너리 형태로 CTAS에 필요한 부분을 별도의 파일로 구성한 것이다.
{
'table': 'nps_summary',
'schema': 'ss721229',
'main_sql': """SELECT …;""", # 위의 SQL 코드
'input_check': [ {
'sql': 'SELECT COUNT(1) FROM ss721229.nps',
'count': 150000
} ],
'output_check': [ {
'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}',
'count': 12
} ],
}
새로운 Operator와 helper 함수 구현
- redshift_summary.py
- RedshiftSummaryOperator : NPS Summary를 위한 새로운 Operator
- build_summary_table : 생성한 Operator를 활용하여 테이블 생성을 위한 helper
- 아래 코드는 Operator와 helper를 적용하여 만든 DAG
...
from plugins import redshift_summary
tables_load = [
'nps_summary'
]
dag_root_path = os.path.dirname(os.path.abspath(__file__))
redshift_summary.build_summary_table(dag_root_path, dag, tables_load, "redshift_dev_db")
Slack 연동하기
목적
- DAG 실행 중 에러가 발생하면 지정된 슬랙 workspace의 채널로 보내기
- 해당 슬랙 workspace의 App 설정 필요
- 연동을 위한 함수를 만들어야 함 (plugins/slack.py)
- Task에 적용되는 default_args의 on_failure_callback에 지정
from plugins import slack
…
default_args= {
'on_failure_callback': slack.on_failure_callback,
}
DAG 문제를 슬랙에 표시
- https://api.slack.com/messaging/webhooks에서 Incoming Webhooks App 생성
- Create an App -> From scratch -> Name app & choose workspace -> Incoming Webhooks
- Incoming Webhook 활성화 -> Add New Webhook to Workspace
- sample 코드
curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' "https://hooks.slack.com/~"
데이터 파이프라인 실패 / 경고를 슬랙으로 보내기 (1) - 사용 방법
- webhooks 링크의 뒷부분을 airflow의 variable로 저장
- slack에 에러 메시지를 보내는 별도 모듈 작성 (slack.py) 후 DAG의 에러 콜백으로 지정
데이터 파이프라인 실패 / 경고를 슬랙으로 보내기 (2) - 예제
- name_gender_v4를 활용한 예제
- Task : extract >> transform >> load
- DAG 인스턴스는 아래 코드 참고
- 올바르게 작동하는 DAG에 Error를 발생시켜 Slack을 확인
from plugins import slack
dag = DAG(
dag_id = 'name_gender_v4',
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
'on_failure_callback': slack.on_failure_callback,
}
)
- load Task의 함수에 에러 코드 추가
- I를 하나 더 입력
- Host 파일 시스템 (Local)에서 DAG의 코드 변경 및 Docker에 적용 -> Host Volume의 장점
sql = f"IINSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 53일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (3) (0) | 2024.06.05 |
---|---|
[TIL - 52일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (2) (0) | 2024.06.04 |
[TIL - 50일 차] Docker & K8S 실습 (4) (0) | 2024.05.31 |
[TIL - 49일 차] Docker & K8S 실습 (5) (0) | 2024.05.30 |
[TIL - 48일 차] Docker & K8S 실습 (3) (0) | 2024.05.29 |