Skip to main content

Catalyst Optimizer — Spark의 두뇌

Spark가 빠른 이유 중 하나는 Catalyst Optimizer 라는 쿼리 최적화 엔진입니다. SQL이든 DataFrame API든, 모든 연산은 Catalyst를 거쳐 최적화된 실행 계획으로 변환됩니다.

실행 계획 변환 과정

사용자 코드 (SQL/DataFrame API)

[1] Unresolved Logical Plan — 파싱된 논리 계획 (테이블/컬럼 미확인)
    ↓ (Catalog에서 테이블/컬럼 확인)
[2] Resolved Logical Plan — 확인된 논리 계획
    ↓ (최적화 규칙 적용)
[3] Optimized Logical Plan — 최적화된 논리 계획
    ↓ (물리 전략 선택)
[4] Physical Plan — 물리 실행 계획
    ↓ (코드 생성)
[5] RDD 연산 — 실제 분산 실행

주요 최적화 규칙

최적화 규칙설명효과
Predicate PushdownWHERE 조건을 가능한 한 데이터 소스에 가깝게 내려보냅니다Parquet/Delta에서 필요한 행만 읽음 → I/O 대폭 감소
Column PruningSELECT에 필요한 컬럼만 읽습니다100개 컬럼 테이블에서 3개만 필요하면 3개만 읽음
Constant Folding1 + 2와 같은 상수 표현식을 컴파일 시점에 3으로 치환합니다런타임 계산 제거
Join Reordering조인 순서를 최적화합니다 (작은 테이블을 먼저 조인)Shuffle 데이터량 감소
Partition Pruning파티션 컬럼 조건으로 불필요한 파티션을 건너뜁니다전체 테이블 대신 필요 파티션만 스캔
# 실행 계획 확인 방법
df = spark.sql("""
    SELECT customer_id, SUM(amount)
    FROM catalog.schema.orders
    WHERE order_date >= '2025-01-01'
    GROUP BY customer_id
""")

# 논리 + 물리 실행 계획 출력
df.explain(mode="extended")

# Databricks에서는 Spark UI의 SQL 탭에서 시각적으로 확인 가능

Cost-Based Optimization (CBO)

Catalyst는 테이블 통계(행 수, 컬럼 분포, 데이터 크기)를 활용하여 비용 기반 으로 최적의 실행 계획을 선택합니다.
-- 테이블 통계 수집 (CBO 활성화의 핵심)
ANALYZE TABLE catalog.schema.orders COMPUTE STATISTICS;

-- 컬럼 단위 통계 수집 (더 정밀한 최적화)
ANALYZE TABLE catalog.schema.orders
COMPUTE STATISTICS FOR COLUMNS customer_id, order_date, amount;
⚠️ Gotcha: CBO는 통계가 최신 상태일 때만 효과적입니다. 대량 데이터 적재 후 ANALYZE를 실행하지 않으면, Catalyst가 잘못된 통계로 비효율적인 조인 전략을 선택할 수 있습니다. Delta Lake의 Predictive Optimization은 이를 자동화합니다.

Shuffle 메커니즘 심화

💡 Shuffle 이란 데이터를 키(key) 기준으로 재분배하는 과정입니다. GROUP BY, JOIN, ORDER BY 등의 연산에서 발생하며, 네트워크를 통해 데이터가 이동 하므로 Spark에서 가장 비용이 큰 연산입니다.

조인 전략 비교

조인 전략조건데이터 이동성능
Broadcast Hash Join한쪽 테이블이 작음 (기본 ≤ 10MB)작은 테이블을 모든 Executor에 복제⭐⭐⭐ 가장 빠름
Sort-Merge Join양쪽 테이블 모두 큼양쪽 데이터를 키별로 셔플 + 정렬⭐⭐ 안정적
Shuffle Hash Join한쪽이 상대적으로 작음양쪽 데이터를 키별로 셔플⭐⭐ 정렬 불필요
Cartesian Join조인 조건 없음전체 데이터 교차 결합⭐ 매우 느림 (주의!)
from pyspark.sql.functions import broadcast

# Broadcast Join 강제 (작은 테이블을 명시적으로 브로드캐스트)
result = large_orders.join(
    broadcast(small_stores),  # small_stores를 모든 Executor로 복제
    "store_id"
)

# Broadcast 임계값 조정 (기본 10MB → 100MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "104857600")  # 100MB
⚠️ Gotcha — Broadcast Join 메모리: Broadcast되는 테이블은 각 Executor의 메모리에 적재 됩니다. 100MB 테이블을 20개 Executor에 브로드캐스트하면 총 2GB의 메모리를 사용합니다. 임계값을 너무 크게 설정하면 OOM(Out of Memory)이 발생할 수 있습니다.

Shuffle 파티션 수 튜닝

# 기본값: 200 (대부분의 워크로드에 부적합!)
spark.conf.get("spark.sql.shuffle.partitions")  # "200"

# 데이터 규모에 맞게 조정
# 일반 가이드: 셔플 후 각 파티션이 100~200MB가 되도록 설정
# 예: 셔플 데이터 100GB → 100GB / 128MB ≈ 800 파티션
spark.conf.set("spark.sql.shuffle.partitions", "800")
데이터 규모권장 Shuffle 파티션 수파티션당 크기
< 1GB20~5020~50MB
1~10GB50~20050~200MB
10~100GB200~1,000100~200MB
100GB~1TB1,000~5,000100~200MB
> 1TB5,000~20,000100~200MB
⚠️ Gotcha: 파티션 수가 너무 많으면 스케줄링 오버헤드소형 파일 문제 가 발생합니다. 반대로 너무 적으면 각 Task가 처리하는 데이터가 많아져 OOM 이나 GC 지연 이 발생합니다. AQE를 활성화하면 이를 자동으로 조정합니다.

AQE (Adaptive Query Execution) 심화

💡 AQE 는 Spark 3.x부터 도입된 기능으로, 쿼리 실행 중에 런타임 통계를 수집하여 실행 계획을 동적으로 수정합니다. Databricks Runtime에서는 기본 활성화되어 있습니다.

AQE의 세 가지 핵심 기능

기능설명효과
Coalescing Post-Shuffle Partitions셔플 후 작은 파티션들을 자동으로 합칩니다소형 파일/태스크 감소, 오버헤드 절감
Converting Sort-Merge Join to Broadcast Hash Join런타임에 한쪽 테이블이 작으면 Broadcast Join으로 전환합니다불필요한 셔플 제거
Optimizing Skew Join데이터 편향(Skew)이 있는 파티션을 자동 분할합니다Skew로 인한 지연 해소
# AQE 관련 설정 (Databricks에서는 기본 활성화)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Skew Join 감지 임계값 (기본: 256MB, 파티션 중앙값의 5배)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "268435456")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")

# 셔플 파티션 합치기 목표 크기 (기본: 64MB)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728")  # 128MB
⚠️ Gotcha: AQE는 Shuffle 또는 Broadcast Exchange 경계 에서만 재최적화합니다. 단일 Stage 내에서는 초기 계획이 유지됩니다. 따라서 AQE가 있어도 ANALYZE TABLE로 통계를 수집하는 것이 여전히 중요합니다.

데이터 Skew 처리

💡 데이터 Skew(편향) 란 특정 키에 데이터가 집중되어, 하나의 파티션이 다른 파티션보다 훨씬 큰 상태를 말합니다. 예를 들어, 전체 주문의 40%가 “서울” 지역인 경우, GROUP BY 지역 시 “서울” 파티션이 병목이 됩니다.

Skew 진단 방법

Spark UI → Stages 탭 → Task Duration 확인
- 대부분의 Task: 10초
- 하나의 Task: 10분  ← Skew!

해결 방법 1: AQE Skew Join (자동)

위에서 설명한 AQE의 Skew Join 최적화가 자동으로 처리합니다. Databricks에서는 기본 활성화되어 있으므로, 대부분의 경우 별도 조치 없이 해결됩니다.

해결 방법 2: Skew Hint (수동)

-- Databricks에서 Skew Join Hint 사용
SELECT /*+ SKEW('orders', 'region') */
    o.*, s.store_name
FROM orders o
JOIN stores s ON o.region = s.region;

-- 특정 Skew 값을 명시
SELECT /*+ SKEW('orders', 'region', ('서울', '경기')) */
    o.*, s.store_name
FROM orders o
JOIN stores s ON o.region = s.region;

해결 방법 3: Salting 기법 (수동)

Skew가 심한 키에 랜덤 접미사(salt)를 추가하여 데이터를 강제로 분산시킵니다.
from pyspark.sql.functions import col, concat, lit, floor, rand, explode, array

SALT_BUCKETS = 10

# 큰 테이블: 키에 salt 추가
orders_salted = orders.withColumn(
    "salted_key",
    concat(col("region"), lit("_"), floor(rand() * SALT_BUCKETS).cast("string"))
)

# 작은 테이블: salt 값 전체를 explode로 복제
salt_values = [str(i) for i in range(SALT_BUCKETS)]
stores_exploded = stores.withColumn(
    "salt", explode(array([lit(s) for s in salt_values]))
).withColumn(
    "salted_key", concat(col("region"), lit("_"), col("salt"))
)

# Salted 키로 조인
result = orders_salted.join(stores_exploded, "salted_key")
⚠️ Gotcha: Salting은 작은 테이블을 N배(salt 수)로 복제하므로 메모리를 많이 사용합니다. AQE의 자동 Skew 처리가 가능하면 먼저 시도하고, 효과가 부족할 때만 Salting을 사용하세요.

메모리 관리

Spark Executor의 메모리는 세 영역으로 나뉩니다. 메모리 구조를 이해하면 OOM 오류를 진단하고 해결하는 데 큰 도움이 됩니다.

Executor 메모리 구조

spark.executor.memory (예: 8GB) 구성:
메모리 영역비율/크기용도
Unified Memoryspark.memory.fraction = 0.6 → 4.8GB동적 분배 (아래 두 영역이 서로 차용 가능)
- Execution Memory(동적)Shuffle, Sort, Aggregation 등 연산
- Storage Memory(동적)cache(), persist() 등 캐시
User Memory0.4 → 3.2GBUDF, 사용자 데이터 구조
Reserved Memory300MB 고정Spark 내부 사용
메모리 영역비율용도OOM 원인
Execution동적 (Unified 내)Shuffle, Sort, Join, Aggregation셔플 데이터가 메모리 초과 → Spill to Disk
Storage동적 (Unified 내)DataFrame 캐시캐시가 너무 많으면 Execution 영역 부족
User40%UDF, 브로드캐스트 변수대형 컬렉션을 Driver로 collect()

Spill to Disk

Execution Memory가 부족하면 Spark는 데이터를 디스크로 Spill(유출) 합니다. Spill 자체는 OOM을 방지하는 안전 장치이지만, 디스크 I/O로 인해 성능이 크게 저하 됩니다.
Spark UI → Stages 탭에서 확인:
- Shuffle Spill (Memory): 메모리에서 처리된 데이터
- Shuffle Spill (Disk): 디스크로 유출된 데이터
- Disk Spill 비율이 높으면 메모리 부족 신호!

OOM 대응 체크리스트

순서조치설명
1파티션 수 늘리기파티션당 데이터량을 줄여 메모리 사용 감소
2Broadcast Join 확인너무 큰 테이블이 브로드캐스트되고 있지 않은지 확인
3collect() 제거Driver로 대량 데이터를 가져오는 collect() 사용 자제
4캐시 정리불필요한 cache/persist 제거 (unpersist())
5Executor 메모리 증가클러스터 설정에서 메모리가 큰 인스턴스 타입 선택
6Executor 수 증가Worker 노드를 추가하여 데이터를 더 분산
# OOM 디버깅에 유용한 설정
spark.conf.set("spark.sql.adaptive.enabled", "true")  # AQE로 자동 최적화
spark.conf.set("spark.sql.shuffle.partitions", "auto")  # Databricks에서 자동 조정

# 메모리 사용량 모니터링 (Spark UI 외)
print(f"Storage Memory Used: {spark.sparkContext._jsc.sc().getExecutorMemoryStatus()}")
⚠️ Gotcha — collect()의 위험: df.collect()는 전체 DataFrame 데이터를 Driver 한 대의 메모리 로 가져옵니다. 1억 건의 데이터를 collect하면 Driver가 OOM으로 죽습니다. 반드시 .limit(), .head(), .take() 등으로 필요한 데이터만 가져오세요.
⚠️ Gotcha — Python UDF 메모리: Python UDF(User Defined Function)는 JVM과 Python 프로세스 사이에 데이터를 직렬화/역직렬화(SerDe)합니다. 이 과정에서 메모리를 2배 이상 사용하며, 성능도 크게 저하됩니다. 가능하면 Spark 내장 함수Pandas UDF(Arrow 기반)를 사용하세요.

정리

핵심 개념설명
Catalyst OptimizerSQL/DataFrame 연산을 자동으로 최적화하는 쿼리 엔진입니다
AQE런타임 통계를 활용하여 실행 중 계획을 동적으로 최적화합니다
Shuffle키 기반 데이터 재분배. 가장 비용이 큰 연산이므로 최적화가 중요합니다
Broadcast Join작은 테이블을 모든 Executor에 복제하는 가장 빠른 조인 전략입니다
Data Skew특정 키에 데이터가 집중되는 현상. AQE 또는 Salting으로 해결합니다
메모리 관리Execution/Storage/User 영역을 이해하고 OOM을 예방합니다

참고 링크