구글 시트 연동과 API & Airflow 모니터링
구글 시트의 데이터를 Redshift 테이블로 복사하거나 Redshift의 SELECT 결과를 구글 시트로 복사하는 작업은 현업 부서와 일을 할 때 자주 사용하며, 중요한 작업 중 하나이다.
구글 시트 연동하기 (1) : 구글 시트 -> Redshift
구글 시트의 데이터를 Redshift의 테이블로 복사하는 작업을 진행한다.
구현 절차
- 시트 API를 활성화 -> 구글 서비스 어카운트 생성 -> 내용을 JSON 파일로 다운로드
- 어카운트에서 생성해 준 이메일을 조작하고 싶은 시트에 공유
- Airflow DAG에서 해당 JSON 파일로 인증하고 시트를 조작
어카운트 생성 과정
- 구글 스프레드시트 API 활성화 및 프로젝트 생성 (링크)
- 구글 서비스 어카운트 생성 (링크) : 사용자 인증 정보 만들기 > 서비스 계정
- Private Key 생성 : 생성된 어카운트 이메일 > 키 > 키 추가 > 새 키 만들기 (JSON)
- JSON 파일의 내용을 google_sheet_access_token이란 이름의 Variable로 등록
- JSON 파일에는 이메일 주소 하나가 존재, 이를 구글 스프레드시트 파일에 공유
실습
- 코드 (3개의 Task) : 구글 시트의 데이터 가져오기 > S3에 적재 > Redshift에 적재
- Python 3.7과 gspread 라이브러리의 호환이 되지 않아 gspread==3.6.0으로 다운 그레이드
- S3ToRedshiftOperator의 method가 REPLACE이므로 미리 테이블을 생성해둬야 함
CREATE TABLE IF NOT EXISTS ss721229.spreadsheet_copy_testing (
col1 INT,
col2 INT,
col3 INT,
col4 INT
);
구글 시트 연동하기 (2) : Redshift SELECT -> 구글 시트
Redshift SELECT문의 결과를 특정 구글 시트의 탭에 복사하는 작업을 진행한다.
실습
- 코드 (하나의 Task) : 아래의 SELECT 문의 결과를 구글 시트에 복사
- sheetfilename을 "spreadsheet-copy-testing"으로 지정하였을 때, gsheet.py의 client.open()에서 오류 발생
- open -> open_by_url로 수정하고, sheetfilename에 해당 시트의 url을 입력하여 해결
SELECT * FROM analytics.nps_summary
API & Airflow 모니터링
Airflow의 건강 여부 체크 (health check)을 어떻게 하는지와 Airflow API를 활용해 외부에서 Airflow를 조작하는 방법에 대해 학습한다.
Airflow API 활성화
- airflow.cfg의 api 섹션에서 auth_backend의 값 변경
- docker-compose.yaml에는 이미 설정돼 있음 (environments)
- AIRFLOW__API__AUTH_BACKENDS : 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
- __ 뒤에 처음 나오는 것이 섹션, 두 번째 나오는 것이 키
- API : 섹션, AUTH_BACKENDS : 키
[api]
auth_backend = airflow.api.auth.backend.basic_auth
- Airflow Web UI에서 새로운 사용자 추가 (API 사용자) : Security > List Users > +
Health API 호출
- Health API 호출
curl -X GET --user "monitor:MonitorUser1" http://localhost:8080/health
- 정상 응답 : latest_scheduler_heartbeat는 현재 시간과 크게 차이가 나면 안 됨
{
"metadatabase": {
"status": "healthy"
},
"scheduler": {
"status": "healthy",
"latest_scheduler_heartbeat": "2022-03-12T06:02:38.067178+00:00"
}
}
API 사용 예 (1) : 특정 DAG를 API로 Trigger 하기
- 명령어
curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
- 결과 : DAG Trigger의 결과가 출력
"conf": {},
"dag_id": "HelloWorld",
"dag_run_id": "manual__2023-05-24T00:00:00+00:00",
"data_interval_end": "2023-05-23T02:00:00+00:00",
"data_interval_start": "2023-05-22T02:00:00+00:00",
"end_date": null,
"execution_date": "2023-05-24T00:00:00+00:00",
"external_trigger": true,
"last_scheduling_decision": null,
"logical_date": "2023-05-24T00:00:00+00:00",
"note": null,
"run_type": "manual",
"start_date": null,
"state": "queued"
}
API 사용 예 (2) : 모든 DAG 리스트 하기
- 명령어
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags
- 결과 : 각 DAG의 정보가 출력
{
"dag_id": "Backup_Airflow_Data_to_S3",
"default_view": "grid",
"description": null,
"file_token": "...",
"fileloc": "/opt/airflow/dags/Backup_Airflow_Data_to_S3.py",
"has_import_errors": false,
"has_task_concurrency_limits": false,
"is_active": true,
"is_paused": true,
"is_subdag": false,
"last_expired": null,
"last_parsed_time": "2024-06-04T05:45:23.955130+00:00",
"last_pickled": null,
"max_active_runs": 16,
"max_active_tasks": 16,
"next_dagrun": "2024-06-02T08:00:00+00:00",
"next_dagrun_create_after": "2024-06-03T08:00:00+00:00",
"next_dagrun_data_interval_end": "2024-06-03T08:00:00+00:00",
"next_dagrun_data_interval_start": "2024-06-02T08:00:00+00:00",
"owners": [
"airflow"
],
"pickle_id": null,
"root_dag_id": null,
"schedule_interval": {
"__type": "CronExpression",
"value": "0 8 * * *"
},
"scheduler_lock": null,
"tags": [
{
"name": "backup"
},
{
"name": "medium"
}
],
"timetable_description": "At 08:00"
},
API 사용 예 (3) - 모든 Variable 리스트 하기
- 명령어
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
- 결과 : 각 varible의 정보가 출력
"total_entries": 3,
"variables": [
{
"description": "",
"key": "slack_url",
"value": "~"
},
{
"description": "",
"key": "csv_url",
"value": "~"
},
{
"description": "",
"key": "google_sheet_access_token",
"value": "{~}"
}
]
}
API 사용 예 (4) : 모든 Config 리스트 하기
- 명령어
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
- 결과 : 기본적으로 Disable 돼있으므로 특정 섹션의 키를 변경해야 함 (숙제)
{
"detail": "Your Airflow administrator chose not to expose the configuration, most likely for security reasons.",
"status": 403,
"title": "Forbidden",
"type": "https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/PermissionDenied"
}
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 54일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (4) (2) | 2024.06.06 |
---|---|
[TIL - 53일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (3) (0) | 2024.06.05 |
[TIL - 51일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (1) (0) | 2024.06.03 |
[TIL - 50일 차] Docker & K8S 실습 (4) (0) | 2024.05.31 |
[TIL - 49일 차] Docker & K8S 실습 (5) (0) | 2024.05.30 |