Spark 내부동작
Spark 파일 포맷
작업에 맞는 파일 최적화 필요
- Unstructured (비구조화) : Text
- Semi-structured (반구조화) : json, xml, csv
- Structured (구조화) : parquet, avro, orc, sequencefile
Spark의 주요 파일 포맷
- Splittable : HDFS 데이터 블록의 Partition으로 바로 올라갈 수 있는지 여부
- Human readable : 사람이 읽을 수 있는지 여부
- Nested structure support : subfield를 지원하는지 여부
- Schema evolution : 스키마가 다른 데이터끼리 사용 가능한지 여부
Parquet
- Spark의 기본 파일 포맷
- Hybrid Storage 방식
- 하나의 데이터 블록은 하나의 Row Group으로 구성
- 하나의 데이터 블록 내에서 Column-Wise 방식으로 저장
Execution Plan : Spark 내부 동작
Spark는 개발자가 만든 코드를 어떻게 변환하여 실행하는지 알아보자.
데이터 프레임 연산
- 연산 순서 : where -> select -> groupby -> count -> show
spark.read.option("header", True).csv(“test.csv”). \
where("gender <> 'F'"). \
select("name", "gender"). \
groupby("gender"). \
count(). \
show()
Transformation & Action
- Transformation
- Narrow Dependencies : 독립적인 파티션 레벨 작업 (select, filter, map 등)
- Wide Dependencies : Shuffling이 필요한 작업 (groupby, reduceby, partitionby, repartition 등)
- Action
- 아직 실행되지 않고 대기 중인 Transformation을 실제로 수행시키는 연산
- Read, Write, Show, Collect : Job을 실행시킴
- Lazy Execution : 더 많은 오퍼레이션을 볼 수 있기에 최적화가 편함
Jobs, Stages, Tasks
- Action -> Job -> 1 + Stages -> 1 + Tasks
- Action : Job을 하나 만들고 코드가 실제로 실행
- Job : Stage는 Shuffling이 발생하는 경우 새로 생김
- Stage : DAG의 형태로 구성된 Task 존재, Task는 병렬 실행 가능
- Task : 가장 작은 실행 유닛으로 Executor에 의해 실행
Execution Plan : Spark 내부 동작 실습
WordCount
- Action 연산이 show 하나만 존재하므로 Job은 1개, Stage는 2개
- 만약 show가 없다면, Job이 없으므로 의미 없는 코드가 됨
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df = spark.read.text("shakespeare.txt")
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()
df_count.show()
JOIN
- df_large read, df_small read, query 작업 Job은 3개, Stage는 5개
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")
join_df.show()
Broadcast JOIN
- broadcast를 통해 데이터가 충분히 작으면 shuffle join이 아닌 broadcasting 진행
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(broadcast(df_small), join_expr, "inner")
join_df.show()
Bucketing & Partitioning
HDFS 데이터나 입력 파티션을 처리 형태에 맞춰 최적화할 수 있다면, 처리 시간을 단축하고 리소스를 덜 사용할 수 있다.
Bucketing과 File System Partitioning
- Hive 메타스토어의 사용 필요 : saveAsTable
- 데이터 저장을 이후에 처리할 반복 작업에 최적화된 방법으로 하는 것
- Bucketing
- Aggregation이나 Window 함수나 JOIN에서 많이 사용되는 컬럼이 있는지 확인
- 있다면, 데이터를 특정 컬럼을 기준으로 나눠 테이블로 저장
- 예제 코드
- File System Partitioning
- 지금까지 이야기한 Partition과는 다름
- File System에 저장되는 데이터를 특정 컬럼의 집합으로 나눠 저장하는 것
- 원래 Hive에서 많이 사용
- 데이터의 특정 컬럼(Partition Key)을 기준으로 폴더 구조를 만들어 데이터 저장 최적화
- 예제 코드
Bucketing
- DataFrame을 특정 ID를 기준으로 나눠 테이블로 저장
- 다음부터 이를 로딩하여 사용함으로써 반복 처리 시 시간 단축
- DataFrameWriter의 bucketBy 함수 사용 -> Bucket의 수와 기준 ID 지정
- 데이터의 특성을 잘 알고 있는 경우 사용 가능
File System Partitioning
- 데이터를 Partition Key 기반 폴더 ("Partition") 구조로 물리적으로 나눠 저장
- DataFrame의 Partition이 아닌 Hive의 Partition을 말함
- 예와 이점
- 용량이 큰 로그 파일을 데이터 생성 시간 기반으로 데이터 읽기를 많이 한다면?
- 데이터 자체를 연도-월-일의 폴더 구조로 저장
- 보통 위의 구조로 이미 저장되는 경우가 많음
- 이를 통해 데이터 읽기 과정을 최적화 (스캐닝 과정이 줄어들거나 없어짐)
- 데이터 관리도 쉬워짐 (Retention Policy 적용 시)
- 용량이 큰 로그 파일을 데이터 생성 시간 기반으로 데이터 읽기를 많이 한다면?
- DataFrameWriter의 partitionBy 함수 사용
- Partition Key를 잘못 선택하면 매우 많은 파일이 생성됨
- 4개의 Partition Key 존재 : 연도, 월, 일, 시간
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 66일 차] Kafka와 Spark Streaming 기반 스트리밍 처리 (1) (0) | 2024.06.24 |
---|---|
[TIL - 65일 차] 하둡과 Spark (5) (0) | 2024.06.21 |
[TIL - 63일 차] 하둡과 Spark (3) (0) | 2024.06.19 |
[TIL - 62일 차] 하둡과 Spark (2) (0) | 2024.06.18 |
[TIL - 61일 차] 하둡과 Spark (1) (0) | 2024.06.17 |