Skip to main content
이 페이지는 Databricks에서 데이터 파이프라인을 설계하고 운영하는 실전 패턴 3가지를 다룹니다.
사전 지식이 필요합니다

주요 용어 정리

용어설명
DAGDirected Acyclic Graph(방향성 비순환 그래프). 태스크 간 의존관계를 표현하는 구조 — “A가 끝나면 B를 실행”
CDCChange Data Capture. 외부 DB에서 변경된 데이터만 캡처하여 수집 (상세)
CDFChange Data Feed. Delta 테이블의 변경 내역을 다운스트림에 전파하는 Delta Lake 기능
SCDSlowly Changing Dimension. 마스터 데이터(고객, 상품)의 변경을 처리하는 전략 (Type 1: 덮어쓰기, Type 2: 이력 보존)
멱등성같은 작업을 여러 번 실행해도 결과가 동일한 속성 — 파이프라인 재시도 시 데이터 중복을 방지
Materialized View쿼리 결과를 물리적으로 저장하는 뷰. 소스 변경 시 자동으로 증분 갱신 (상세)
Online TableDelta 테이블의 데이터를 실시간 서빙용 저지연 스토리지에 동기화하는 기능

패턴 1: Lakeflow Jobs로 E2E 오케스트레이션

Lakeflow Jobs는 여러 종류의 태스크를 DAG로 조합하여 전체 파이프라인을 하나의 워크플로로 관리합니다. 수집 → 변환 → 품질 검증 → 서빙까지 하나의 Job으로 구성할 수 있습니다. Job: daily-data-pipeline
태스크유형설명
Task 1: 수집SDP Pipeline (Lakeflow Connect CDC)Bronze 테이블에 원본 데이터 적재
Task 2: 변환SDP Pipeline (Bronze → Silver → Gold)Silver: 정제, 중복 제거, SCD Type 1/2. Gold: 비즈니스 집계
Task 3: 품질 검증SQL TaskGold 테이블의 행 수, NULL 비율, 전일 대비 변화량 체크
Task 4-A: 성공 시Notebook TaskFeature Table 업데이트 → Online Table 동기화
Task 4-B: 성공 시SQL TaskREFRESH MATERIALIZED VIEW gold_daily_kpi
Task 5: 실패 시WebhookSlack 알림
# Lakeflow Jobs SDK로 위 파이프라인 생성
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import *

w = WorkspaceClient()

w.jobs.create(
    name="daily-data-pipeline",
    tasks=[
        Task(
            task_key="ingest",
            pipeline_task=PipelineTask(pipeline_id="<ingest-pipeline-id>"),
        ),
        Task(
            task_key="transform",
            depends_on=[TaskDependency(task_key="ingest")],
            pipeline_task=PipelineTask(pipeline_id="<sdp-pipeline-id>"),
        ),
        Task(
            task_key="quality_check",
            depends_on=[TaskDependency(task_key="transform")],
            sql_task=SqlTask(
                query=SqlTaskQuery(query_id="<quality-check-query-id>"),
                warehouse_id="<warehouse-id>"
            ),
        ),
        Task(
            task_key="update_features",
            depends_on=[TaskDependency(task_key="quality_check")],
            notebook_task=NotebookTask(
                notebook_path="/Workspace/pipelines/update_features"
            ),
        ),
        Task(
            task_key="refresh_dashboard",
            depends_on=[TaskDependency(task_key="quality_check")],
            sql_task=SqlTask(
                query=SqlTaskQuery(query_id="<refresh-mv-query-id>"),
                warehouse_id="<warehouse-id>"
            ),
        ),
    ],
    schedule=CronSchedule(
        quartz_cron_expression="0 0 6 * * ?",  # 매일 06:00
        timezone_id="Asia/Seoul"
    ),
    email_notifications=JobEmailNotifications(
        on_failure=["data-team@company.com"]
    )
)

패턴 2: CDC 파이프라인 — SCD Type 1 vs Type 2

CDC(Change Data Capture) 데이터를 처리할 때 가장 중요한 결정은 SCD(Slowly Changing Dimension) 전략 입니다.

SCD Type 1: 최신 값으로 덮어쓰기

변경 이력을 보존하지 않고, 항상 최신 값만 유지 합니다.
-- SDP에서 SCD Type 1 구현
CREATE OR REFRESH STREAMING TABLE silver_customers;

APPLY CHANGES INTO silver_customers
FROM STREAM(bronze_customers)
KEYS (customer_id)
SEQUENCE BY _commit_timestamp
STORED AS SCD TYPE 1;
항목설명
결과 테이블customer_id당 항상 1개 행 (최신)
이전 값삭제됨 (복구 불가, Delta Time Travel로만 가능)
적합한 경우최신 상태만 필요 (주소, 이메일, 전화번호 등)
스토리지효율적 (행 수 = 고유 키 수)

SCD Type 2: 변경 이력 보존

모든 변경 이력을 별도 행으로 보존 합니다. __START_AT, __END_AT 컬럼으로 유효 기간을 관리합니다.
-- SDP에서 SCD Type 2 구현
CREATE OR REFRESH STREAMING TABLE silver_customers_history;

APPLY CHANGES INTO silver_customers_history
FROM STREAM(bronze_customers)
KEYS (customer_id)
SEQUENCE BY _commit_timestamp
STORED AS SCD TYPE 2;

-- 결과 테이블 구조:
-- | customer_id | name   | city  | __START_AT     | __END_AT       |
-- |-------------|--------|-------|----------------|----------------|
-- | 1001        | 김철수 | 서울  | 2025-01-01     | 2025-03-15     |
-- | 1001        | 김철수 | 부산  | 2025-03-15     | NULL           | ← 현재 값
항목설명
결과 테이블customer_id당 여러 행 (변경 이력)
현재 값 조회WHERE __END_AT IS NULL
특정 시점 조회WHERE __START_AT <= '2025-02-01' AND (__END_AT > '2025-02-01' OR __END_AT IS NULL)
적합한 경우이력 추적 필수 (고객 등급 변화, 가격 변동, 규제 감사)
스토리지변경 빈도에 비례하여 증가

SCD Type 1 vs Type 2 선택 기준

기준Type 1Type 2
”이전 값이 필요한가?”아니오 → Type 1예 → Type 2
규제 감사 요건없음 → Type 1있음 (변경 이력 보존) → Type 2
분석 요구현재 상태 분석 → Type 1시간에 따른 변화 분석 → Type 2
스토리지 비용낮음높음 (이력 누적)
쿼리 복잡도단순 (항상 최신)복잡 (__START_AT/__END_AT 필터)
대표 사용고객 연락처, 제품 정보고객 등급, 가격 이력, 재고 변동

실전: 하이브리드 전략 (Type 1 + Type 2 동시 운영)

대부분의 프로덕션 환경에서는 같은 소스에서 Type 1과 Type 2를 동시에 생성 합니다.
-- 같은 Bronze 소스에서 두 가지 Silver 테이블 생성

-- Silver 1: 현재 상태 (조인용, 대시보드용) — SCD Type 1
CREATE OR REFRESH STREAMING TABLE silver_customers_current;
APPLY CHANGES INTO silver_customers_current
FROM STREAM(bronze_customers)
KEYS (customer_id) SEQUENCE BY _commit_timestamp
STORED AS SCD TYPE 1;

-- Silver 2: 전체 이력 (감사, 시계열 분석용) — SCD Type 2
CREATE OR REFRESH STREAMING TABLE silver_customers_history;
APPLY CHANGES INTO silver_customers_history
FROM STREAM(bronze_customers)
KEYS (customer_id) SEQUENCE BY _commit_timestamp
STORED AS SCD TYPE 2;

-- Gold: 현재 상태 기반 집계 (Type 1에서 읽음 → 빠른 조인)
CREATE OR REFRESH MATERIALIZED VIEW gold_customer_revenue AS
SELECT c.customer_id, c.name, c.city,
       COUNT(o.order_id) AS total_orders,
       SUM(o.amount) AS total_revenue
FROM silver_customers_current c
JOIN silver_orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.city;

-- 감사 리포트: 이력 기반 조회 (Type 2에서 읽음)
-- "2025년 1분기에 서울에 거주했던 고객 목록"
SELECT * FROM silver_customers_history
WHERE city = '서울'
  AND __START_AT <= '2025-03-31'
  AND (__END_AT > '2025-01-01' OR __END_AT IS NULL);
💡 왜 하이브리드인가? Type 2만 사용하면 Gold 계층의 JOIN이 복잡해집니다 (__END_AT IS NULL 조건이 매번 필요). Type 1의 “현재 상태” 테이블을 별도로 유지하면 Gold 집계가 단순하고 빠릅니다.

패턴 3: CDC → CDF → 다운스트림 전파

외부 DB의 CDC를 수집하고, Delta CDF(Change Data Feed)로 다운스트림에 전파하는 전체 흐름입니다.
단계구성 요소처리 방식설명
1외부 MySQLLakeflow Connect (CDC - binlog)소스 시스템
2Bronze: bronze_customersCDC 수집원본 CDC 이벤트 보존
3Silver: silver_customersSDP: APPLY CHANGES INTOSCD Type 1 (CDF 활성화)
4aOnline TableDelta CDF (readChangeFeed)실시간 Feature Serving
4bGold: gold_customer_360Delta CDF (readChangeFeed)MV 증분 갱신
4c외부 시스템Delta CDF (readChangeFeed)Reverse ETL (foreachBatch)
-- Silver 테이블에 CDF 활성화 (다운스트림 전파용)
ALTER TABLE silver_customers
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
# Silver의 변경사항을 외부 시스템에 Reverse ETL
def push_to_crm(batch_df, batch_id):
    updates = batch_df.filter("_change_type IN ('insert', 'update_postimage')")
    # CRM API 호출로 고객 정보 동기화
    for row in updates.collect():
        crm_api.update_customer(row.customer_id, row.name, row.email)

(spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .table("silver_customers")
    .writeStream
    .foreachBatch(push_to_crm)
    .option("checkpointLocation", "/checkpoints/reverse-etl")
    .trigger(availableNow=True)
    .start()
)

파이프라인 오케스트레이션 모범 사례

#원칙설명
1수집과 변환을 분리Lakeflow Connect(수집)과 SDP(변환)를 별도 파이프라인 으로 구성합니다. 수집이 실패해도 변환은 마지막 성공 데이터로 동작합니다
2Job으로 전체 오케스트레이션수집 파이프라인 → SDP 파이프라인 → 품질 검증 → 후처리를 하나의 Lakeflow Job DAG로 묶습니다
3품질 게이트Silver → Gold 사이에 품질 검증 태스크를 삽입합니다. 실패 시 Gold 갱신을 차단합니다
4멱등성 보장모든 태스크는 재실행해도 같은 결과를 보장해야 합니다. MERGE, APPLY CHANGES는 기본적으로 멱등적입니다
5환경별 분리Dev/Staging/Prod에 동일한 파이프라인을 Declarative Automation Bundles로 배포합니다
6SCD 전략은 비즈니스 요구로 결정이력이 필요하면 Type 2, 최신값만 필요하면 Type 1. 대부분 하이브리드
7CDF로 다운스트림 연결Silver 테이블에 CDF를 활성화하면 Online Table, MV, Reverse ETL이 증분으로 동작합니다

관련 가이드

이 가이드의 기초 문서: 심화 문서: 공식 문서: