데이터 파이프라인 문제점 해결하기
41일 차 강의를 들으면서 간단한 파이썬 데이터 파이프라인 실습을 진행하였다. 그런데 몇 가지 문제점이 존재하는데, Extract / Transform / Load 함수 중에 Load 함수를 수정하면 해결된다.
문제점
- 불필요한 헤더 : ['name', 'gender']까지 데이터 웨어하우스에 저장
- 멱등성 X : 여러 번 실행하면 그만큼 레코드가 증가
- 데이터 정합성 X : INSERT문을 실행하는 중에 에러가 발생할 경우
코드
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO ss721229.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
문제 해결
- 불필요한 헤더 : 가장 처음에 ['name', 'gender']가 등장하므로 두 번째부터 저장
- 멱등성 X : DELETE FROM "Table"을 통해 Full Refresh
- 데이터 정합성 X : Transaction으로 에러 발생 시 Rollback
코드
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
try:
# 트랜잭션 시작
cur.execute("BEGIN TRANSACTION")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
cur.execute("DELETE FROM ss721229.name_gender")
for r in records[1:]:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO ss721229.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
# 트랜잭션 성공적으로 종료
cur.execute("COMMIT")
except Exception as e:
# 오류 발생 시 롤백
cur.execute("ROLLBACK")
print(f"An error occurred: {e}")
finally:
cur.close()
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > 숙제' 카테고리의 다른 글
[숙제 - 51일 차] mau_summary, channel_summary를 config로 옮기기 (0) | 2024.06.03 |
---|---|
[숙제 - 43일 차] airflow.cfg 관련 문제 해결 (0) | 2024.05.22 |
[숙제 - 33일 차] S3 -> Redshift, COPY 명령어로 데이터 적재하기 (2) (0) | 2024.05.09 |
[숙제 - 32일 차] S3 -> Redshift, COPY 명령어로 데이터 적재하기 (0) | 2024.05.09 |
[숙제 - 31일 차] 데브코스 ETL/ELT (0) | 2024.05.06 |