📊

Spark는 어떤 컨셉인가?

1. DataFrame

Spark는 데이터를 테이블 형태로 다룰 수 있도록 API를 제공한다. 물리적으로는 여러 머신의 메모리에 분산되어 있더라도, 사용자가 데이터를 마치 하나의 테이블처럼 다룰 수 있다.
df = spark.read.load("[파일경로]", format="csv", sep="\t", inferSchema="true", // * 타입 추론 header="true") dfSelected = df.select( col("ID").alias("id"), col("Year_Birth").alias("year_birth") ) dfSelected.printSchema() root |-- id: integer (nullable = true) |-- year_birth: integer (nullable = true)
Kotlin
복사
위의 예시에서 df, dfSelected 는 각각 다른 DataFrame으로 불변이다. 이 때, 물리적으로 실제 데이터가 복사되는 것은 아니다. 물리적인 데이터는 여전히 Disk에 CSV파일로 존재하고 count() 등의 RDD 액션을 수행할 때, 데이터를 메모리로 읽어 처리한다.
데이터를 탐색하거나, 빠르게 가공하고 싶다면 SQL API를 사용한다.
DataFrame은 untyped API로, SQL과 1:1로 매핑된다. DataFrame은 Dataset[Row] 즉, Row라는 타입의 Dataset이다.
DataSet은 type-safe하며, lambda 등이 가능하다. map, flatmap, filter 등 함수형 Collection API 일부를 사용할 수 있다. Encoder를 이용해 DataFrame을 Dataset[T]로 변경한다. DataSet은 각 언어 구조에 따른 타입을 알 수 있다.
case class MarketingUser(userId: Int, yearBirth: Int) # Encoder는 Serialization / Deserialization을 한다. val dsMarketingUser = dfSelected.as[MarketingUser] # dfSelected org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, yearBirth: int] # dsMaretingUser org.apache.spark.sql.Dataset[MarketingUser] = [userId: int, yearBirth: int]
Kotlin
복사
고도의 최적화 등 특별한 경우가 아니라면 낮은 수준의 API인 RDD를 사용하지 않는다. 일정 규모에서는 머신에 비용을 투자하는 것이 고도의 최적화를 유지보수하는 것보다 저렴하다. Public Cloud 환경에선 머신이 사람보다 구하기 쉽기 때문이다.

3. 실행계획과 최적화

DataFrame, SQL, DataSet API를 이용할 경우 Catalyst Optimizer를 이용하여 최적화한 후, RDD로 코드가 생성되어 실행된다.

explain()

# xx.select("education").limit(5).explain("extended") ... == Analyzed Logical Plan == education: string GlobalLimit 5 +- LocalLimit 5 +- Project [education#238] +- Project [id#236, year_birth#237, education#238, count_kid#239, count_teen#240, date_customer#241, days_last_login#242, add_months(to_date('date_customer, Some(d-M-yyyy)), 72) AS date_joined#257] +- Project [ID#16 AS id#236, Year_Birth#17 AS year_birth#237, Education#18 AS education#238, Kidhome#21 AS count_kid#239, Teenhome#22 AS count_teen#240, Dt_Customer#23 AS date_customer#241, Recency#24 AS days_last_login#242] +- Relation[ID#16,Year_Birth#17,Education#18,Marital_Status#19,Income#20,Kidhome#21,Teenhome#22,Dt_Customer#23,Recency#24,MntWines#25,MntFruits#26,MntMeatProducts#27,MntFishProducts#28,MntSweetProducts#29,MntGoldProds#30,NumDealsPurchases#31,NumWebPurchases#32,NumCatalogPurchases#33,NumStorePurchases#34,NumWebVisitsMonth#35,AcceptedCmp3#36,AcceptedCmp4#37,AcceptedCmp5#38,AcceptedCmp1#39,... 5 more fields] csv == Optimized Logical Plan == GlobalLimit 5 +- LocalLimit 5 +- Project [Education#18 AS education#238] +- Relation[ID#16,Year_Birth#17,Education#18,Marital_Status#19,Income#20,Kidhome#21,Teenhome#22,Dt_Customer#23,Recency#24,MntWines#25,MntFruits#26,MntMeatProducts#27,MntFishProducts#28,MntSweetProducts#29,MntGoldProds#30,NumDealsPurchases#31,NumWebPurchases#32,NumCatalogPurchases#33,NumStorePurchases#34,NumWebVisitsMonth#35,AcceptedCmp3#36,AcceptedCmp4#37,AcceptedCmp5#38,AcceptedCmp1#39,... 5 more fields] csv == Physical Plan ==
Bash
복사
explain() 함수를 통해 Spark DataFrame의 실행 계획을 볼 수 있다. 실행 계획으로 확인해보면, Analyzed Logical Plan 과 Optimized Logical Plan 이 조금 다른것을 볼 수 있다. 가령 위의 예시에선 select() 를 여러 컬럼에 대해 수행했으나, 최적화 시점에서 education 컬럼만 Projection 하였다.
Spark 3 에서는 explain() 함수가 "simple", "extended", "codegen", "cost", "formatted" 등 다양한 기능을 지원한다.
다만, 병목 등의 상황은 Spark UI를 활용하여 파악하는 것이 용이하다.
데이터 전체를 가져온 '후' 필터링하는 것이 아니라, 저장소에서 데이터를 필터링 '' 결과물만 가져와 네트워크 및 가공 비용을 줄일 수 있다. 이를 Predicate Push Down 라고 한다.
Logical Plan이 최적화 단계를 거쳐 Physical Plan이 생성되는데, 구체적으로 어떤 알고리즘을 수행해야 하는지 등의 정보를 담고 있다. 가령 위의 예시에서 Logical Plan에서는 Join이라고 표시되지만 Physical Plan 에서는 Sort-merge Join을 사용한다고 표시된다.

4. Transformation과 Action

RDD의 실제 동작은 Transformation과 Action으로 이루어져 있다. filter, map 등 다양한 Tranformation 을 적용하는 것을 반복하다가 최종적으로는 count()와 같은 Action 을 호출해 결과를 만들어낸다. 이 때, Action 시점까지 Transformation 요청을 미루는 것을 Lazy Evaluation이라 한다.

Lazy Evaluation 장점

많은 데이터를 다룰 때, map(withColumn), filter(where)과 같은 함수를 호출할 때마다 즉시 연산된다면 데이터 로딩이 잦아져 네트워크 비용이 증가한다.
map, filter 등을 통해 최적화가 가능한 로직이 존재할 경우 지연로딩함으로써 불필요한 연산을 피할 수 있다.

5. Partition

데이터 시스템에서 Partition이란, 전체에서 나누어진 부분을 말한다. 데이터가 수천억 건일 경우, 단일 머신에서 처리하기 어렵다. 따라서 사용자는 DataFrame이라는 추상화된 API를 통해 데이터를 '하나'처럼 다루지만, Spark는 데이터를 분할해 Partition 단위로 데이터를 처리한다.