Spark 프로그래밍 : SQL
Spark SQL 소개
Spark SQL과 Spark DataFrame의 차이점과 Spark SQL의 사용법을 알아보자.
SQL의 중요성
- 데이터 분야의 필수적인 기본 기술
- 구조화된 데이터를 다룬다면, SQL은 데이터 규모와 관계없이 쓰임
- 모든 대용량 DW는 SQL 기반 : Redshift, Snowflake, BigQuery, Hive/Presto
- Spark도 Spark SQL 지원
Spark SQL
- 구조화된 데이터 처리를 위한 Spark 모듈
- 데이터 프레임 작업을 SQL로 처리 가능
- 데이터 프레임이 테이블 이름 지정 후 sql 함수 사용 가능
- HQL(Hive Query Language)과 호환 가능
Spark SQL vs DataFrame
- SQL로 가능한 작업이라면 DataFrame을 사용할 이유가 없음
- Spark SQL의 장점
- Familiarity, Readability : SQL의 가독성이 더 좋고, 더 많은 사람이 사용 가능
- Optimization : Spark SQL 엔진이 최적화하기 더 좋음
- Interoperability, Data Management : SQL이 포팅이 쉽고 접근 권한 체크도 쉬움
Spark SQL 사용법
- 데이터 프레임 기반으로 테이블 뷰 생성
- createOrReplaceTempView : Session이 살아있는 동안 존재
- createOrReplaceGlobalTempView : Spark Driver가 살아있는 동안 존재
- 보통 Session을 하나 만들기 때문에 두 개가 큰 차이가 없음
- Spark Session의 sql 함수로 SQL 결과를 데이터 프레임으로 반환
namegender_df.createOrReplaceTempView("namegender")
namegender_group_df = spark.sql("""
SELECT gender, count(1) FROM namegender GROUP BY 1
""")
print(namegender_group_df.collect())
- SparkSession 외부 데이터베이스 연결
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://HOST:PORT/DB?user=ID&password=PASSWORD") \
.option("dbtable", "raw_data.user_session_channel") \
.load()
Aggregation, JOIN, UDF
Aggregation
- DataFrame이 아닌 SQL로 작성하는 것을 추천
- Group By : SUM, MIN, MAX, AVG, COUNT 등
- Window : ROW_NUMBER, FIRST_VALUE, LAST_VALUE
- Rank
Join
- SQL JOIN은 두 개 혹은 그 이상의 테이블을 공통 필드를 가지고 병합
- 스타 스키마로 구성된 테이블로 분산된 정보를 통합하는 데 사용
- Shuffle JOIN
- 일반 JOIN 방식
- Bucket JOIN : JOIN Key를 바탕으로 새로 파티션을 새로 만들고 조인하는 방식
- Broadcast JOIN
- 큰 데이터와 작은 데이터 간의 조인
- 작은 데이터 프레임을 큰 데이터 프레임이 있는 모든 파티션으로 Broadcasting
- spark.sql.autoBroadcastJoinThreshold 파라미터로 충분히 작은지 여부 결정
UDF (User Defined Function)
- DataFrame이나 SQL에서 적용할 수 있는 사용자 정의 함수
- 데이터 프레임의 경우 .withColumn 함수와 같이 사용하는 것이 일반적
- Spark SQL에서도 사용 가능
- Aggregation 용 UDAF(User Defined Aggregation Function)도 존재
- Group By에서 사용되는 SUM, AVG와 같은 함수를 만드는 것
- PySpark에서 지원하지 않고, Scalar / JAVA를 사용해야 함
UDF 사용해 보기 (1)
- 'Name' 컬럼을 대문자로 변환한 'Curated Name' 컬럼을 생성
- DataFrame 활용
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))
- SQL 활용
def upper(s):
return s.upper()
# 먼저 테스트
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()
# ABCD
# DataFrame 기반 SQL에 적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()
UDF 사용해 보기 (2)
- 'a' 컬럼과 'b' 컬럼을 더한 'c' 컬럼을 생성
- DataFrame 활용
data = [
{"a": 1, "b": 2},
{"a": 5, "b": 5}
]
df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b"))
- SQL 활용
def plus(x, y):
return x + y
plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2)").show()
# 3
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()
UDF : Pandas UDF Scalar 함수 사용해 보기
- Pandas UDF 함수는 컬럼 단위로 연산하기에 일반 UDF보다 성능이 더 좋음
- Spark SQL 사용 : spark.udf.register에 등록한 이름(upper_udf)으로 사용
- DataFrame 사용 : spark.udf.register의 반환값이 저장된 변수(upperUDF)로 사용
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
return s.str.upper()
upperUDF = spark.udf.register("upper_udf", upper_udf2)
df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) `Curated Name` FROM test""").show()
Spark SQL 실습
실습 테이블
- 사용자 ID
- 보통 웹서비스에서는 등록된 사용자마다 유일한 ID 부여
- 세션 ID
- 사용자가 외부 링크(광고 등)를 통해 방문하거나 직접 방문할 경우 세션 생성
- 하나의 사용자 ID는 여러 개의 세션 ID를 가질 수 있음
- 세션의 경우 세션을 만들어낸 소스를 채널이란 이름으로 기록
- 세션이 생긴 시간도 기록
- 사용자 ID, 세션 ID를 기반으로 마케팅 / 트래픽과 같은 데이터 분석과 지표 설정이 가능
Ranking : 총매출이 가장 많은 사용자 10명 찾기
- user_session_channel, session_transaction 테이블 사용
- 작업 과정
- 3개의 테이블을 각 데이터 프레임으로 로딩
- 데이터 프레임 별로 테이블 이름 지정
- Spark SQL 처리
- 조인 키, 조인 방식 결정 (Join Key, Inner / Left / Right / Full)
- 예제 코드
top_rev_user_df2 = spark.sql("""
SELECT
userid,
SUM(amount) total_amount,
RANK() OVER (ORDER BY SUM(amount) DESC) rank
FROM session_transaction st
JOIN user_session_channel usc ON st.sessionid = usc.sessionid
GROUP BY userid
ORDER BY rank
LIMIT 10""")
Grouping : 월별 채널 별 매출과 방문자 정보 계산하기
- user_session_channel, session_timestamp, session_transaction 테이블 사용
- 작업 과정 (이전 실습과 같음)
- 3개의 테이블을 각 데이터 프레임으로 로딩
- 데이터 프레임 별로 테이블 이름 지정
- Spark SQL 처리
- 조인 키, 조인 방식 결정 (Join Key, Inner / Left / Right / Full)
- 예제 코드
Windowing : 사용자 별로 처음 채널과 마지막 채널 알아내기
- user_session_channel, session_timestamp, session_transaction 테이블 사용
- ROW_NUMBER, FIRST_VALUE / LAST_VALUE, ROWS BETWEEN AND 활용
- Window 함수 사용 시 레코드의 수는 그대로이며, 컬럼의 수만 증가
ROW_NUMBER OVER (partition by userid order by ts) seq
SUM(value) OVER (
order by value
rows between 2 preceding and 2 following
)
Hive 메타스토어 사용하기
Spark 데이터베이스와 테이블
- 카탈로그 : 테이블과 뷰에 관한 메타 데이터 관리
- 기본으로 메모리 기반 카탈로그 -> 세션이 끝나면 사라짐
- Hive와 호환되는 카탈로그 제공 -> Persistent
- 테이블 관리 방식 : 데이터베이스라 부르는 폴더와 같은 구조로 관리
- 메모리 기반 테이블 (View) : 임시테이블로 앞서 사용한 형태
- 스토리지 기반 테이블
- 기본적으로 HDFS와 parquet 포맷 사용
- Hive와 호환되는 메타스토어 사용
- 두 종류의 테이블 존재 : Managed Table, Unmanaged(External) Table
- Hive 메타스토어 사용법 : enableHiveSupport()
spark = SparkSession \
.builder \
.appName("Python Spark Hive") \
.enableHiveSupport() \
.getOrCreate()
Spark SQL : Managed Table 사용법
- 두 가지 테이블 생성 방법
- dataframe.saveAsTable("테이블 이름")
- SQL 문법 사용 : CREATE TABLE, CTAS
- spark.sql.warehouse.dir가 가리키는 위치에 데이터가 저장
- JDBC / ODBC 등으로 Spark와 연결해서 접근 가능 (Tableau, Power BI)
Spark SQL : Unmanaged (External) Table 사용법
- 이미 HDFS에 존재하는 데이터에 스키마를 정의해서 사용
- 메타 데이터만 카탈로그에 기록
- 데이터는 이미 존재
- External Table은 삭제되어도 데이터는 그대로
CREATE TABLE table_name (
column1 type1,
column2 type2,
column3 type3,
…
)
USING PARQUET
LOCATION 'hdfs_path';
유닛 테스트
유닛 테스트
- 코드 상의 특정 기능을 테스트하기 위해 작성된 코드
- 보통 정해진 입력을 주고 예상된 출력이 나오는지 테스트
- CI/CD를 사용하려면 전체 코드의 테스트 커버리지가 굉장히 중요해짐
- 각 언어 별로 정해진 테스트 프레임워크를 사용하는 것이 일반적
- JUnit For Java
- NUnit for .NET
- unittest for Python
- 예제 코드
cls 환경에서의 유닛 테스트
- 일반적으로 테스트 파일과 모듈 파일은 분리되어 있음
- test_df.py에는 테스트할 모듈 및 unittest 모듈 import
- 아래의 명령으로 유닛 테스트 진행
python -m unittest test_df.py
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 65일 차] 하둡과 Spark (5) (0) | 2024.06.21 |
---|---|
[TIL - 64일 차] 하둡과 Spark (4) (0) | 2024.06.20 |
[TIL - 62일 차] 하둡과 Spark (2) (0) | 2024.06.18 |
[TIL - 61일 차] 하둡과 Spark (1) (0) | 2024.06.17 |
[TIL - 55일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (5-2) (0) | 2024.06.07 |