Catalyst Optimizer — Spark의 두뇌
Spark가 빠른 이유 중 하나는 Catalyst Optimizer 라는 쿼리 최적화 엔진입니다. SQL이든 DataFrame API든, 모든 연산은 Catalyst를 거쳐 최적화된 실행 계획으로 변환됩니다.실행 계획 변환 과정
주요 최적화 규칙
| 최적화 규칙 | 설명 | 효과 |
|---|---|---|
| Predicate Pushdown | WHERE 조건을 가능한 한 데이터 소스에 가깝게 내려보냅니다 | Parquet/Delta에서 필요한 행만 읽음 → I/O 대폭 감소 |
| Column Pruning | SELECT에 필요한 컬럼만 읽습니다 | 100개 컬럼 테이블에서 3개만 필요하면 3개만 읽음 |
| Constant Folding | 1 + 2와 같은 상수 표현식을 컴파일 시점에 3으로 치환합니다 | 런타임 계산 제거 |
| Join Reordering | 조인 순서를 최적화합니다 (작은 테이블을 먼저 조인) | Shuffle 데이터량 감소 |
| Partition Pruning | 파티션 컬럼 조건으로 불필요한 파티션을 건너뜁니다 | 전체 테이블 대신 필요 파티션만 스캔 |
Cost-Based Optimization (CBO)
Catalyst는 테이블 통계(행 수, 컬럼 분포, 데이터 크기)를 활용하여 비용 기반 으로 최적의 실행 계획을 선택합니다.⚠️ Gotcha: CBO는 통계가 최신 상태일 때만 효과적입니다. 대량 데이터 적재 후 ANALYZE를 실행하지 않으면, Catalyst가 잘못된 통계로 비효율적인 조인 전략을 선택할 수 있습니다. Delta Lake의 Predictive Optimization은 이를 자동화합니다.
Shuffle 메커니즘 심화
💡 Shuffle 이란 데이터를 키(key) 기준으로 재분배하는 과정입니다. GROUP BY, JOIN, ORDER BY 등의 연산에서 발생하며, 네트워크를 통해 데이터가 이동 하므로 Spark에서 가장 비용이 큰 연산입니다.
조인 전략 비교
| 조인 전략 | 조건 | 데이터 이동 | 성능 |
|---|---|---|---|
| Broadcast Hash Join | 한쪽 테이블이 작음 (기본 ≤ 10MB) | 작은 테이블을 모든 Executor에 복제 | ⭐⭐⭐ 가장 빠름 |
| Sort-Merge Join | 양쪽 테이블 모두 큼 | 양쪽 데이터를 키별로 셔플 + 정렬 | ⭐⭐ 안정적 |
| Shuffle Hash Join | 한쪽이 상대적으로 작음 | 양쪽 데이터를 키별로 셔플 | ⭐⭐ 정렬 불필요 |
| Cartesian Join | 조인 조건 없음 | 전체 데이터 교차 결합 | ⭐ 매우 느림 (주의!) |
⚠️ Gotcha — Broadcast Join 메모리: Broadcast되는 테이블은 각 Executor의 메모리에 적재 됩니다. 100MB 테이블을 20개 Executor에 브로드캐스트하면 총 2GB의 메모리를 사용합니다. 임계값을 너무 크게 설정하면 OOM(Out of Memory)이 발생할 수 있습니다.
Shuffle 파티션 수 튜닝
| 데이터 규모 | 권장 Shuffle 파티션 수 | 파티션당 크기 |
|---|---|---|
| < 1GB | 20~50 | 20~50MB |
| 1~10GB | 50~200 | 50~200MB |
| 10~100GB | 200~1,000 | 100~200MB |
| 100GB~1TB | 1,000~5,000 | 100~200MB |
| > 1TB | 5,000~20,000 | 100~200MB |
⚠️ Gotcha: 파티션 수가 너무 많으면 스케줄링 오버헤드 와 소형 파일 문제 가 발생합니다. 반대로 너무 적으면 각 Task가 처리하는 데이터가 많아져 OOM 이나 GC 지연 이 발생합니다. AQE를 활성화하면 이를 자동으로 조정합니다.
AQE (Adaptive Query Execution) 심화
💡 AQE 는 Spark 3.x부터 도입된 기능으로, 쿼리 실행 중에 런타임 통계를 수집하여 실행 계획을 동적으로 수정합니다. Databricks Runtime에서는 기본 활성화되어 있습니다.
AQE의 세 가지 핵심 기능
| 기능 | 설명 | 효과 |
|---|---|---|
| Coalescing Post-Shuffle Partitions | 셔플 후 작은 파티션들을 자동으로 합칩니다 | 소형 파일/태스크 감소, 오버헤드 절감 |
| Converting Sort-Merge Join to Broadcast Hash Join | 런타임에 한쪽 테이블이 작으면 Broadcast Join으로 전환합니다 | 불필요한 셔플 제거 |
| Optimizing Skew Join | 데이터 편향(Skew)이 있는 파티션을 자동 분할합니다 | Skew로 인한 지연 해소 |
⚠️ Gotcha: AQE는 Shuffle 또는 Broadcast Exchange 경계 에서만 재최적화합니다. 단일 Stage 내에서는 초기 계획이 유지됩니다. 따라서 AQE가 있어도 ANALYZE TABLE로 통계를 수집하는 것이 여전히 중요합니다.
데이터 Skew 처리
💡 데이터 Skew(편향) 란 특정 키에 데이터가 집중되어, 하나의 파티션이 다른 파티션보다 훨씬 큰 상태를 말합니다. 예를 들어, 전체 주문의 40%가 “서울” 지역인 경우, GROUP BY 지역 시 “서울” 파티션이 병목이 됩니다.
Skew 진단 방법
해결 방법 1: AQE Skew Join (자동)
위에서 설명한 AQE의 Skew Join 최적화가 자동으로 처리합니다. Databricks에서는 기본 활성화되어 있으므로, 대부분의 경우 별도 조치 없이 해결됩니다.해결 방법 2: Skew Hint (수동)
해결 방법 3: Salting 기법 (수동)
Skew가 심한 키에 랜덤 접미사(salt)를 추가하여 데이터를 강제로 분산시킵니다.⚠️ Gotcha: Salting은 작은 테이블을 N배(salt 수)로 복제하므로 메모리를 많이 사용합니다. AQE의 자동 Skew 처리가 가능하면 먼저 시도하고, 효과가 부족할 때만 Salting을 사용하세요.
메모리 관리
Spark Executor의 메모리는 세 영역으로 나뉩니다. 메모리 구조를 이해하면 OOM 오류를 진단하고 해결하는 데 큰 도움이 됩니다.Executor 메모리 구조
spark.executor.memory (예: 8GB) 구성:| 메모리 영역 | 비율/크기 | 용도 |
|---|---|---|
| Unified Memory | spark.memory.fraction = 0.6 → 4.8GB | 동적 분배 (아래 두 영역이 서로 차용 가능) |
| - Execution Memory | (동적) | Shuffle, Sort, Aggregation 등 연산 |
| - Storage Memory | (동적) | cache(), persist() 등 캐시 |
| User Memory | 0.4 → 3.2GB | UDF, 사용자 데이터 구조 |
| Reserved Memory | 300MB 고정 | Spark 내부 사용 |
| 메모리 영역 | 비율 | 용도 | OOM 원인 |
|---|---|---|---|
| Execution | 동적 (Unified 내) | Shuffle, Sort, Join, Aggregation | 셔플 데이터가 메모리 초과 → Spill to Disk |
| Storage | 동적 (Unified 내) | DataFrame 캐시 | 캐시가 너무 많으면 Execution 영역 부족 |
| User | 40% | UDF, 브로드캐스트 변수 | 대형 컬렉션을 Driver로 collect() |
Spill to Disk
Execution Memory가 부족하면 Spark는 데이터를 디스크로 Spill(유출) 합니다. Spill 자체는 OOM을 방지하는 안전 장치이지만, 디스크 I/O로 인해 성능이 크게 저하 됩니다.OOM 대응 체크리스트
| 순서 | 조치 | 설명 |
|---|---|---|
| 1 | 파티션 수 늘리기 | 파티션당 데이터량을 줄여 메모리 사용 감소 |
| 2 | Broadcast Join 확인 | 너무 큰 테이블이 브로드캐스트되고 있지 않은지 확인 |
| 3 | collect() 제거 | Driver로 대량 데이터를 가져오는 collect() 사용 자제 |
| 4 | 캐시 정리 | 불필요한 cache/persist 제거 (unpersist()) |
| 5 | Executor 메모리 증가 | 클러스터 설정에서 메모리가 큰 인스턴스 타입 선택 |
| 6 | Executor 수 증가 | Worker 노드를 추가하여 데이터를 더 분산 |
⚠️ Gotcha — collect()의 위험:df.collect()는 전체 DataFrame 데이터를 Driver 한 대의 메모리 로 가져옵니다. 1억 건의 데이터를 collect하면 Driver가 OOM으로 죽습니다. 반드시.limit(),.head(),.take()등으로 필요한 데이터만 가져오세요.
⚠️ Gotcha — Python UDF 메모리: Python UDF(User Defined Function)는 JVM과 Python 프로세스 사이에 데이터를 직렬화/역직렬화(SerDe)합니다. 이 과정에서 메모리를 2배 이상 사용하며, 성능도 크게 저하됩니다. 가능하면 Spark 내장 함수 나 Pandas UDF(Arrow 기반)를 사용하세요.
정리
| 핵심 개념 | 설명 |
|---|---|
| Catalyst Optimizer | SQL/DataFrame 연산을 자동으로 최적화하는 쿼리 엔진입니다 |
| AQE | 런타임 통계를 활용하여 실행 중 계획을 동적으로 최적화합니다 |
| Shuffle | 키 기반 데이터 재분배. 가장 비용이 큰 연산이므로 최적화가 중요합니다 |
| Broadcast Join | 작은 테이블을 모든 Executor에 복제하는 가장 빠른 조인 전략입니다 |
| Data Skew | 특정 키에 데이터가 집중되는 현상. AQE 또는 Salting으로 해결합니다 |
| 메모리 관리 | Execution/Storage/User 영역을 이해하고 OOM을 예방합니다 |