Spark Shuffling 최적화
Repartition and Coalesce
Repartition을 하는 이유
- 전체적으로 파티션의 수를 늘려 병렬성 증대
- 굉장히 큰 Partition이나 Skew Partition의 크기를 조절
- 파티션을 분석 패턴에 맞게 재분배 (Write once, read many)
- 어떤 DataFrame을 특정 컬럼을 기준으로 그루핑 하거나 필터링을 자주 하는 경우
- 미리 그 컬럼을 기준으로 저장해 두었다면, 그것이 Bucketing
Repartition 방식
- 두 가지 방식 존재
- repartition (Hash)
- repartitionByRange (value 기준)
- 주의할 점
- Shuffling 발생 : Repartition이 별 이유 없이 사용되면 오히려 시간과 비용 증가
- Column이 사용되면 균등한 파티션 크기를 보장할 수 없음
- 파티션의 수를 줄이는 용도로는 사용 불가
repartition(numPartitions, *cols)
- Hash 기반 Partitioning
- repartition(5) : rount-robin 형태로 5개의 partition 생성
- repartition(5, "city") : "city"를 기준으로 5개의 partition 생성
- repartition(5, "city", "zipcode") : "city", "zipcode"를 기준으로 5개의 partition 생성, skew 발생 가능
- repartition("city")
- repartition("city", "zipcode")
repartitionByRange(numPartitions, *cols)
- 지정된 컬럼 값의 범위를 기준으로 파티션을 나누는 방식
- 데이터 샘플링 기반으로 Partition을 나누기에 결과가 매번 다를 수 있음
- 사용법은 앞서 repartition과 동일
Coalesce
- Partition의 수를 줄이는 용도 (늘리지 않음)
- Shuffling을 발생시키지 않고 Local Partition을 병합 -> Skew 발생 가능
- Column이 사용되며 균등한 Partition 크기를 보장할 수 없음
DataFrame 관련 힌트
- Spark SQL Optimizer에게 Execution plan을 만듬에 있어 특정한 방식을 사용하도록 제안
- Partitioning 관련 힌트
- COALESCE
- REPARTITION
- REPARTITION_BY_RANGE
- REBALANCE : DataFrame을 Table로 저장할 때 유용, 파일 크기를 비슷하게 만들어 저장 (AQE 필요)
- 여러 개가 사용될 경우 먼저 사용된 것이 우선순위가 높음
df1.join(df2, "id", "inner").hint("COALESCE", 3)
- JOIN 관련 힌트
- BROADCAST, BROADCASTJOIN, MAPJOIN : Broadcast Join 사용 제안
- MERGE, SHUFFLE_MARGE, MERGEJOIN : Shuffle Merge Join 사용 제안
- SHUFFLE_HASH : Shuffle Hash Join 사용 제안
- SHUFFLE_REPLICATE_NL : Shuffle and replicate (Cross Join) join 사용 제안
- 여러 개가 사용될 경우 아래로 갈수록 우선순위가 낮아짐
SELECT /*+ MERGE(df2) */ *
FROM df1 JOIN df2 ON df1.order_month = df2.year_month
DataFrame 힌트 사용법
- Spark SQL : /*+ hint [...] */
SELECT /*+ REPARTITION(3) */ * FROM TABLE
SELECT /*+ BROADCAST(table1) */ * FROM table 1 JOIN table2 ON table1.key = table2.key
- DataFrame API : .hint 메서드 사용
join_df = df1.join(df2.hint("broadcast"), "id", "inner").hint("COALESCE", 3)
AQE (Adaptive Query Execution)
Spark Optimization 역사
- Spark 1.x : Catalyst Optimizer와 Tungsten Project
- Catalyst Optimizer : 규칙기반 최적화 수행 (Predicate pushdown, projection pushdown)
- Tungsten Project : 기본적으로 JVM 문제없이 코드 최적화를 하려는 것 (Off Heap 메모리 관리)
- Spark 2.x : CBO (Cost-Based Optimizer)
- DataFrame 통계 정보를 이용해 효율적인 Execution plan 생성
- 전체 크기, 레코드 수, 컬럼 별 특성
AQE 이전
- 아래의 GROUP BY 쿼리는 2개의 Stage 생성
- spark.sql.shuffle.partitions 값에 의해 Shuffling 후 Partition 수 결정
- spark.sql.shuffle.partitions
- 이 변수 하나로 다양한 상황의 shuffling을 해결하기 쉽지 않음
- 적은 수의 Partition은 병렬성을 낮추고 OOM과 Disk Spill 가능성을 높임
- 많은 수의 Partition은 Task Scheduler, Task 생성과 관련된 오버헤드가 생김
SELECT sku, SUM(prive) sales
FROM order
GROUP BY sku;
- 위의 이슈 해결을 위해 Parsing Time 최적화와 Runtime 최적화의 병행
AQE
- Runtime 정보를 통해 쿼리 실행의 중간에서 최적화 진행
- Query -> Job -> Stage -> Task
- Stage 단계에서 최적화 방식을 변경
- Stage가 가장 좋은 최적화 방식 변경 포인트
- Shuffling, Broadcasting이 Job을 Stage로 나눔
- 중간 결과가 materialize 되어 DataFrame의 통계 정보를 정확히 알 수 있는 시점
- AQE가 필요한 경우
- Dynamically coalescing (Post) shuffle partitions (Spark 3)
- Dynamically switch join strategies (Spark 3.2)
- Dynamically optimizing skew joins (Spark 3)
Dynamic coalescing (Post) shuffle partitions
- 필요한 이유
- 적당한 Partition의 크기와 수는 성능에 영향을 미침
- 많은 수의 작은 Partition : Scheduler 오버헤드, Task 준비 오버헤드, 비효율적인 I/O
- 적은 수의 큰 Partition : OOM, Disk Spill
- spark.sql.shuffle.partitions 하나의 변수로는 불충분
- 동작 방식
SELECT k, SUM(v) FROM t GROUP BY 1 ORDER BY 2 DESC;
- 환경 변수 (3.3.1)
Dynamic coalescing (Post) shuffle partitions 동작 방식
- 내부적으로 많은 수의 Partition을 일부러 생성
- spark.sql.adaptive.coalescePartitions.initialPartitionNum(200)
- 매 Stage가 종료될 때, 필요하다면 자동으로 Coalesce 수행
- spark.sql.adaptive.coalescePartitions.enabled
- 설정에 따라 Partition의 크기는 최소 크기 혹은 목표 크기를 맞추려 동작
- spark.sql.adaptive.advisoryPartitionSizelnBytes
- spark.sql.adaptive.coalescePartitions.minPartitionSize
- 둘 중 무엇을 쓸지는 spark.sql.adaptive.coalescePartitions.parallelismFirst에 의해 결정
- Coalescing X (spark.sql.shuffle.partitions = 5) : Skew 발생
- Coalescing O
Dynamically switching join strategies
- 필요한 이유
- Static Query Plan이 여러 이유로 BHJ (Broadcast Hash Join) 기회를 놓친 경우
- DataFrame의 통계 정보 부족 (필터링 등), UDF가 사용된 경우
- AQE의 해법
- Runtime 통계 정보를 바탕으로 Join 전략을 변경
- Stage가 끝나고 Join 되기 전에 다시 쿼리 플래닝을 수행
- Broadcast Join, Shuffle Hash join 옵션이 존재
- 동작 방식
- 환경 변수 (3.3.1)
Dynamically optimizing skew joins
- Dynamically switching join strategies
- Partition의 크기가 작다면, Broadcast Join을 통해 Skew로 인한 문제를 방지할 수 있음
- 그러나 Broadcast Join을 하기에 Partition의 크기가 클 수도 있음
- 필요한 이유
- Sker Partition으로 인한 성능 문제를 해결하기 위함
- 한 두 개의 오래 걸리는 Task로 인한 전체 Job/Stage 종료 지연
- Disk Spill이 발생한다면 더 느려짐
- Task의 수는 증가하지만, 실행 시간이 대폭 감소
- AQE의 해법
- Skew Partition 존재 여부 파악
- Skew Partition을 작게 나눔
- 상대 Join Partition을 중복하여 생성하고 Join 수행
- 동작 방식
- 환경 변수 (3.3.1)
Dynamically optimizing skew joins 동작 방식
- Table A와 B의 Join
- Join Key로 Shuffling을 진행하여 각각 4개의 Partition이 생성
- A0 Partition이 다른 Partition보다 크기가 큰 것을 Skew Reader가 확인
- skewedPartitionFactor, skewedPartitionThresholdInBytes 조건 확인
- A0 Partition을 작은 Partition으로 Split 하고 B0를 복제하여 Join 진행
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 80일 차] Spark, SparkML 실습 (5) (0) | 2024.07.12 |
---|---|
[TIL - 78일 차] Spark, SparkML 실습 (3) (0) | 2024.07.10 |
[TIL - 76일 차] Spark, SparkML 실습 (1) (0) | 2024.07.08 |
[TIL - 73일 차] 음식 배달에 걸리는 시간 예측하기 (2) (0) | 2024.07.03 |
[TIL - 72일 차] 음식 배달에 걸리는 시간 예측하기 (1) (0) | 2024.07.02 |