Spark 기타 기능과 메모리 관리
Spark 기타 기능
Broadcast Variable
- 룩업 테이블 등을 브로드캐스팅하여 셔플링을 막는 방식으로 사용
- 브로드캐스트 조인에서 사용되는 것과 동일한 테크닉
- 대부분 룩업 테이블 (혹은 디멘션 테이블)을 Executor로 전송하는 데 사용
- spark.sparkContext.broadcast 사용
- Closure
- Serialization이 Task 단위로 일어남
- UDF 안에서 파이썬 데이터 구조를 사용하는 경우
- Broadcast
- Serialization이 Worker Node 단위로 일어남
- UDF 안에서 브로드캐스트 된 데이터 구조를 사용하는 경우
- Broadcast 데이터 셋의 특징
- Worker Node로 공유되는 변경 불가 데이터 (Immutable)
- Worker Node 별로 한 번 공유되고 캐싱됨
- 제약점은 Task Memory 안에 들어갈 수 있어야 함
Accumulators
- 특정 이벤트 수를 기록하는 데 사용 -> 일종의 전역 변수
- 예) 비정상적인 값을 갖는 레코드의 수를 세는 데 사용
- 특징
- 변경 가능한 전역변수로 드라이버에 위치
- 스칼라로 만들면, 이름을 줄 수 있지만 그 이외에는 불가
- 레코드 별로 세거나 합을 구하는 데 사용 가능
- 두 가지 방법으로 사용
- Transformation : 값이 부정확할 수 있음 (Task 재실행과 Speculative execution)
- DataFrame/RDD Foreach : 추천되는 방식으로 이 경우 정확함
- 예제 코드
Speculative Execution
- 느린 Task를 다른 Worker Node에 있는 Executor에서 중복 실행
- Worker Node의 하드웨어 이슈 등으로 느려지는 경우 빠른 실행 보장
- Data Skew로 인해 오래 걸린다면, 도움이 안 되고 리소스만 낭비
- 실제 프로덕션에서 쓰는 곳은 그렇게 많지 않음, Job의 특성에 맞게 적용하는 것이 필요
- 제어 방식
- spark.speculation으로 컨트롤 가능하며, default는 False (비활성화)
- 다양한 환경변수로 세밀하게 제어 가능
Dynamic Resource Allocation
- Spark Application 간의 리소스 할당
- Resource Manager가 결정 : YARN은 FIFO, FAIR, CAPACITY 방식 지원
- 한 번 리소스를 할당받으면 해당 리소스를 끝까지 들고 가는 것이 기본
- 하나의 Spark Applicaytion 안에서 Job 간의 리소스 할당
- FIFO 형태로 처음 Job이 필요한 대로 리소스를 받아서 쓰는 것이 기본
- Spark Application의 리소스 요구/릴리스 방식
- Static Allocation : 받은 리소스를 끝까지 유지하므로 리소스 사용률에 악영향을 줄 가능성이 높음
- Dynamic Allocation : 상황에 따라 executor를 요구/릴리스하며, 하나의 리소스 매니저를 공유한다면 활성화하는 것이 좋음
- 환경 변수
- spark.dynamicAllocation.enabled = true
- spark.dynamicAllocation.shuffleTracking.enabled = true
- spark.dynamicAllocation.executorIdleTimeout = 60s (릴리스 타이밍 결정)
- spark.dynamicAllocation.schedulerBacklogTimeout = 1s (요청 타이밍 결정)
- spark.dynamicAllocation.minExecutors
- spark.dynamicAllocation.maxExecutors
- spark.dynamicAllocation.initialExecutors
- spark.dynamicAllocation.executorAllocationRatio
Scheduler
- 하나의 Spark Application 내의 Job에게 리소스를 나눠주는 정책
- FIFO (default) : 리소스를 처음 요청한 Job에게 리소스 우선순위가 있음
- FAIR
- Round-robin 방식으로 모든 Job에게 고르게 리소스를 분배
- 풀(Pool)이라는 형태로 리소스를 나눠 우선순위를 고려한 형태로 사용 가능
- 풀 안에서 리소스 분배도 FAIR 혹은 FIFO로 지정 가능
- 병렬성 증대를 위해 Thread 활용이 필요
- 환경 변수
- spark.scheduler.mode : FIFO 혹은 FAIR
- spark.scheduler.allocation.file : 'FAIR'의 경우 필요, 풀을 정의해 놓는 형태로 사용
Spark 메모리 관리
Driver와 Executor
- Driver의 역할
- Spark Application = 1 Driver + (1+ Executor)
- main 함수를 실행하고 SparkSession/SparkContext 생성
- 코드를 Task로 변환하여 DAG 생성
- Execution/Logical/Physical plan으로 변환
- Resource Manager의 도움을 받아 Task 실행 및 관리
- 위의 정보를 Web UI로 노출시킴 (4040 포트)
- Driver 메모리 구성
- spark.driver.memory = 4GB
- spark.driver.cores = 4
- spark.driver.memoryOverhead = 0.1
- max(spark.driver.memory의 10%, 384MB)
- 384MB보다 크므로 memory의 10%를 Overhead로 사용
- Executor 메모리 구성
- spark.executor.memory = 8GB
- spark.executor.cores = 4
- spark.executor.memoryOverhead = 0.1
- max(spark.executor.memory의 10%, 384MB)
- 384MB보다 크므로 memory의 10%를 Overhead로 사용
- Off Heap Memory, PySpark Memory, Py4J Memory
- Executor JVM 메모리 구성 (Heap - 8GB 기준)
- Reserved Memory (300MB) : 300MB로 Spark Engine 전용, 건드릴 수 없는 부분
- Spark Memory (4620MB)
- 데이터 프레임 관련 작업과 캐싱 (spark.memory.fraction = 0.6)
- Storage Memory Pool (spark.memory.storageFraction = 0.5) : DataFrame Caching
- Executor Memory Pool : DataFrame, Operation
- User Memory (3080MB)
- User-defined data structure, Spark internal metadata, UDF, UDAF 등
- User Memory를 사용할 일이 별로 없다면, fraction의 값을 증가시키는 것이 좋음
Executor Memory Pool Management
- Static Memory Pool Management : Spark 1.6 전에는 슬롯끼리 공평하게 나눠 가짐
- Unified Memory Manager
- 동작중인 Task 대상으로 Fair Allocation이 기본 동작
- 메모리가 부족해지면 Storage/Executor Memory Pool의 메모리 사용
- 더 이상 사용할 메모리가 없다면, 데이터를 메모리에서 디스크로 옮김 -> 성능 저하
Spark Executor Memory 환경 변수
Off Heap Memory
- Spark는 On Heap 메모리에서 가장 잘 동작
- JVM Heap은 Garbage Collection의 대상
- JVM Heap의 크기가 클수록 Garbe Collection 비용 증가
- 이때 같이 사용할 수 있는 것이 JVM 밖의 메모리 : Overhead, Off Heap
- Spark 3.x의 Off Heap Memory
- Spark 3.x는 Off Heap memory 작업에 최적화
- Spark 3.x는 Off Heap 메모리를 DataFrame 용으로 사용
- Off Heap 메모리의 크기 : spark.executor.memoryOverhead + spark.offHeap.size
Driver와 Executor의 메모리 이슈 (Out Of Memory)
- Driver OOM
- 큰 데이터셋의 Collect 실행
- 큰 데이터셋을 Broadcast JOIN
- Python, R 등으로 작성된 코드
- 너무 많은 Task
- Executor OOM
- 너무 큰 executor.cores, 일반적으로 1 ~ 5로 지정
- Data Skew (Big Partition)
JVM과 Python 간의 통신
- 실제 SparkContext는 JVM 쪽에서 생성
- PySpark Memory
- Spark은 JVM Application이지만, Pyspark는 Python 프로세스
- Executor 내 프로세스 (JVM Process, Python Worker)의 (de)serialization을 하는 것이 Py4J의 역할
- spark.executor.pyspark.memory (Python Process)
- PySpark는 기본으로 overhead memory 사용
- 이 환경변수가 사용되면, PySpark가 사용할 수 있는 메모리는 이 값으로 고정
- spark.python.worker.memory (Py4J)
- default : 512MB
- JVM과 Python Process 간의 통신을 담당하는 Py4J가 사용할 수 있는 메모리의 양
- 이 크기를 넘어가면 디스크로 Splill 발생
- Spark와 Python 간의 통신
- Driver에서 Python Process에서 실행할 코드와 기타 데이터를 Serialize 해서 Executor로 전송
- JVM executor는 파이썬 프로세스 실행 (PySpark Script)
- Executor는 이것과 파티션을 serialize 하고 위에서 받은 코드와 함께 Python Process로 전송
- Python Process에서 계산이 끝나면 결과가 다시 Executor로 serialize 되어 전송됨
Caching과 Persist
- Caching
- 자주 사용되는 데이터프레임을 메모리에 유지하여 처리속도 증가
- 메모리 소비를 늘리므로 불필요한 캐싱을 할 필요는 없음
- cache(), persist() : 데이터프레임을 메모리/디스크/오프힙에 보존
- persist() : 인자를 통해 세부 제어 가능
- useDisk = True
- userMemory = True
- useOffHeap = False : Off Heap 설정이 필요
- deserialized = False : 메모리를 줄일지 아니면 CPU 계산을 줄일지?
- replication = 1 : 몇 개의 복사본을 서로 다른 executor에 저장할지 결정
- persist는 자주 사용되는 조합은 하나의 상수로 지정 가능
- DISK_ONLY, MEMORY_ONLY, MEMORY_AND_DISK 등
- cache()
- disk = False
- memory = True
- offHeap = False
- deserialized = True
- replication = 1
- Spark SQL을 사용한 Caching
spark.sql("cache table table_name")
spark.sql("cache lazy table table_name")
spark.sql("uncache table table_name")
- Caching 취소
DataFrame.unpersist # (LRU - Least Recently Used)
spark.sql("uncache table table_name")
spark.catalog.isCached("table_name")
spark.catalog.clearCache()
Caching Best Practices
- Caching된 DataFrame이 재사용되는 것을 분명히 하기
- cachedDF = df.cache()
- cachedDF.select(...)
- 컬럼이 많다면 필요한 컬럼만 캐싱하고, 불필요할 때 uncache
- cachedDF = df.select(c1, c2, c3).cache()
- 매번 새로 DataFrame을 계산하는 것이 Caching보다 더 빠른 경우가 있음
- 큰 데이터셋이 Parquet과 같은 포맷으로 존재하는 경우
- 캐싱 결과가 너무 커서 메모리에만 있을 수 없는 경우
- Tip
- 소수의 DataFrame만 Caching
- 큰 DataFrame은 Caching 하지 말 것
- Caching을 너무 믿지 말 것 -> 중간에 쓰이는지 확인하는 작업이 필요
Dynamic Partition Pruning
- Partition Pruning은 Logical Plan Optimization 단계에서 발생
- Static Partition Pruning 문제
- Partitioning은 보통 큰 테이블에 적용되어 있음 (Fact Table)
- Fact 테이블과 Dimension 테이블 조인 시 필터링이 Dimension 테이블에 적용되면 낭비
- Dynamic Partition Pruning
- 필터링이 잘못된 것이라는 걸 인지하고 필터링을 Fact 테이블로 변경
- 비 Partition 테이블 (small)에 적용된 필터링을 Partition 테이블 (big)에 적용해 보는 것
- Dimension 테이블이 많이 작다면, 브로드캐스트 조인까지 진행할 수 있음
- 기본적으로 활성화 : spark.sql.optimizer.dynamicPartitionPruning.enabled: ture
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 78일 차] Spark, SparkML 실습 (3) (0) | 2024.07.10 |
---|---|
[TIL - 77일 차] Spark, SparkML 실습 (2) (0) | 2024.07.09 |
[TIL - 73일 차] 음식 배달에 걸리는 시간 예측하기 (2) (0) | 2024.07.03 |
[TIL - 72일 차] 음식 배달에 걸리는 시간 예측하기 (1) (0) | 2024.07.02 |
[TIL - 71일 차] 머신러닝 기초 (0) | 2024.07.01 |