Spark 프로그래밍 : DataFrame
Spark 데이터 처리
Spark 데이터 시스템 아키텍처
데이터 병렬처리
- 데이터 분산
- 하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록 (128MB)
- Spark에서는 이를 파티션 (Partition)이라 부름, 파티션의 기본 크기도 128MB
- 나눠진 데이터를 동시 처리
- MapReduce에서 N개의 데이터 블록으로 구성된 파일 처리 시 N개의 Map 태스크 실행
- Spark에서는 파티션 단위로 메모리에 로드되어 Executor 배정
- 데이터 분산 -> 파티셔닝 -> 병렬 처리
- Executor는 할당된 CPU 수만큼의 태스크 처리 가능
- 적절한 파티션 개수 = Execution 개수 * Execution 당 CPU 수
Spark 데이터 처리 흐름
- 데이터 프레임은 작은 파티션들로 구성되며, 수정 불가 (Immutable)
- 입력 데이터 프레임을 원하는 결과 도출까지 다른 데이터 프레임으로 계속 반환
- sort, group by, filter, map, join 등의 오퍼레이션 작업을 거침
셔플링
- 파티션 간의 데이터 이동이 필요한 경우 발생
- 셔플링이 발생하는 경우
- 명시적 파티션을 새롭게 하는 경우, 예) 파티션 수 줄이기
- 시스템에 의해 이뤄지는 셔플링
- 셔플링이 발생할 때 네트워크를 통해 데이터 이동
- 오퍼레이션에 따라 파티션 수 결정 : random, hashing partition, range partition 등
- sorting : range partition, aggregation : hashing partition
- Data Skew 발생 가능 : 셔플링 최소화 및 최적화가 중요
Spark 데이터 구조
Spark 데이터 구조
- RDD, DataFrame, Dataset (Immutable Distributed Data)
- 2016년에 DataFrame과 Dataset은 하나의 API로 통합
- 모두 파티션으로 나뉘어 Spark에서 처리
RDD (Resilient Distributed Dataset)
- 로우 레벨 데이터로 클러스터 내의 변경이 불가능한 서버에 분산된 데이터를 지칭
- 다수의 파티션으로 구성되며 로우 레벨의 함수형 변환 (map, filter, flatMap 등) 지원
- 일반 파이썬 데이터는 parallelize 함수로 RDD로 변환, 반대는 collect로 변환
- 레코드 별로 존재하지만 스키마가 존재하지 않음 -> 구조화 / 비구조화 데이터 모두 지원
DataFrame & Dataset
- RDD 위에 만들어지는 RDD와는 달리 필드(컬럼) 정보를 갖고 있음 (Table)
- Dataset은 타입 정보가 존재하며 컴파일 언어(Scala, Java 등)에서 사용 가능
- PySpark에서는 DataFrame 사용
프로그램 구조
Spark Session 생성과 설정에 대해 알아보고 Spark 프로그램의 일반적인 구조에 대해 알아본다.
Spark Session 생성
- Spark 프로그램의 시작은 Spark Session을 만드는 것
- 프로그램마다 하나를 만들어 Spark Cluster와 통싱 : Singleton 객체
- Spark 2.0에서 처음 소개
- Spark Session을 통해 Spark이 제공해 주는 다양한 기능 사용
- DataFrame, SQL, Streaming, ML API 모두 이 객체로 통신
- config 메서드를 통해 다양한 환경 설정 가능
- RDD 관련 작업을 할 때는 SparkSession 밑의 sparkContext 객체를 사용
- SparkSession API 문서
Spark Session 환경 변수
- Spark Session을 만들 때, 다양한 환경 설정 가능
- Executor 별 메모리 : spark.executor.memory (default : 1G)
- Executor 별 CPU 수 : spark.executor.cores (YARN's default : 1)
- Driver 메모리 : spark.driver.memory (default : 1G)
- Shuffle 후 Partition 수 : spark.sql.shuffle.partitions (default : 최대 200)
- 모든 환경 변수 옵션
- Spark Session 환경 설정 방법 4가지 : 충돌 시 우선순위는 아래일수록 높음
- 환경 변수 (Spark Cluster Admin이 관리)
- $SPARK_HOME/conf/spark_defaults.conf (Spark Cluster Admin이 관리)
- spark-submit 명령의 커맨드라인 파라미터
- SparkSession 만들 때 지정
Spark Session 생성 코드 : PySpark
- spark-submit 명령의 커맨드라인 파라미터
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Tutorial')\
# spark-submit 명령의 커맨드라인 파라미터
.config("spark.some.config.option1", "some-value")\
.config("spark.some.config.option2", "some-value")\
.getOrCreate()
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark Tutorial")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
# SparkSession 만들 때 지정
.config(config=conf)\
.getOrCreate()
전체적인 플로우
- Spark Session 생성
- 입력 데이터 로딩
- 데이터 조작 작업 (pandas와 흡사)
- DataFrame API나 Spark SQL 사용
- 원하는 결과가 나올 때까지 새로운 DataFrame 생성
- 최종 결과 저장
Spark Session이 지원하는 데이터 소스
- HDFS 파일 : csv, json, parquet, orc, text, avro, hive table
- JDBC 관계형 데이터 베이스
- 클라우드 기반 데이터 시스템 : Redshift, Snowflake, BigQuery
- 스트리밍 시스템 : Kinesis, Kafka
개발 / 실습 환경 소개
Spark 개발 환경 옵션
- Local Standalone Spark + Spark Shell
- Python IDE : PyCharm, Visual Studio
- Databricks Cloud
- 주피터 노트북, Colab, 아나콘다 등
Local Standalone Spark
- Spark Cluster Manager로 local[n] 지정
- master를 local[n]으로 지정
- master는 클러스터 매니저를 지정하는 데 사용
- 주로 개발이나 간단한 테스트 용도
- 하나의 JVM에서 모든 프로세스 실행
- 하나의 Driver와 하나의 Executor가 실행됨
- 1+ 스레드가 Executor 안에서 실행
- Executor 내에 생성되는 스레드 수
- local : 하나의 스레드만 생성
- local[*] : 컴퓨터 CPU 수만큼 스레드 생성
- Local Standalone Spark 환경 설정 (윈도우)
Colab Spark 사용
- PySpark와 Py4J 설치
- Colab 가상 서버 위에 로컬 모드 Spark 실행
- 개발 목적으로는 충분하지만, 큰 데이터의 처리는 불가
- Spark Web UI는 기본적으로는 접근 불가
- Py4J : Python에서 JVM 내에 있는 자바 객체를 사용할 수 있게 해 줌
- Colab 사용 예제
Spark DataFrame 실습
pyspark.sql.types
- IntegerType, LongType, FloatType
- StringType
- BooleanType
- Timestamp Type, DateType
- 모든 Type
실습 1 : 헤더가 없는 CSV 파일 처리하기
- 데이터에 스키마 지정
- SparkConf 사용해 보기
- measure_type 값이 TMIN인 레코드 대상으로 stationId 별 최소 온도 찾기
- 예제 코드
실습 2 : 헤더 없는 CSV 파일 처리하기
- 데이터에 스키마 지정
- cust_id를 기준으로 amount_spent의 합 계산
- aggregate 컬럼 alias 설정
- 예제 코드
실습 3 : 텍스트를 파싱해서 구조화된 데이터로 변환하기
- Regex를 이용해서 아래와 같이 변환하는 것이 목표
- regex 패턴 : “On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)”
- \S : non-whitespace character
- \s : whitespace character
- \d : numeric character
- 예제 코드
- 입력
On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling
- 출력
week departure_zipcode arrival_zipcode cost vendor
2021-01-04 85001 85002 $28.32 ABC Hauling
실습 4 : Stackoverflow 서베이 기반 인기 언어 찾기
- stackoverflow csv 파일에서 두 필드는 ;를 구분자로 프로그래밍 언어를 구분
- LanguageHaveWorkedWith : 사용해 본 언어
- LanguageWantToWorkWith : 사용해보고 싶은 언어
- 예제 코드
실습 5 : Redshift 연결해 보기
- MAU (Monthly Active User) 계산해 보기
- 두 개의 테이블을 Redshift에서 Spark로 로드 : JDBC 연결 실습
- DataFrame과 SparkSQL을 사용해 JOIN
- DataFrame JOIN
- left_DF.join(right_DF, join condition, join type)
- join type : "inner", "left", "right", "outer", "semi", "anti"
- 예제 코드
'[프로그래머스] 데이터 엔지니어링 데브코스 3기 > TIL(Today I Learn)' 카테고리의 다른 글
[TIL - 64일 차] 하둡과 Spark (4) (0) | 2024.06.20 |
---|---|
[TIL - 63일 차] 하둡과 Spark (3) (0) | 2024.06.19 |
[TIL - 61일 차] 하둡과 Spark (1) (0) | 2024.06.17 |
[TIL - 55일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (5-2) (0) | 2024.06.07 |
[TIL - 55일 차] Airflow 고급기능과 DBT, 데이터 디스커버리 (5-1) (2) | 2024.06.07 |