Skip to main content

이벤트 로그란?

SDP(Spark Declarative Pipelines) 파이프라인은 실행 과정에서 발생하는 모든 활동을 이벤트 로그(Event Log) 에 기록합니다. 데이터 흐름 진행 상황, 데이터 품질 검증 결과, 오류 정보, 성능 지표 등이 모두 포함됩니다.
💡 이벤트 로그는 파이프라인의 “블랙박스”입니다. 파이프라인에서 무슨 일이 일어났는지, 데이터 품질은 어떤지, 어디서 병목이 발생했는지를 모두 확인할 수 있습니다.

이벤트 로그 조회 방법

event_log() 테이블 함수

Unity Catalog에 등록된 파이프라인의 이벤트 로그는 event_log() 함수로 조회할 수 있습니다.
-- 파이프라인 ID로 이벤트 로그 조회
SELECT * FROM event_log("pipeline-id-here")
ORDER BY timestamp DESC
LIMIT 100;

-- 파이프라인 이름으로 조회
SELECT * FROM event_log(TABLE(catalog.schema.my_table))
ORDER BY timestamp DESC;

이벤트 로그 스키마

컬럼타입설명
idSTRING이벤트 고유 ID입니다
sequenceSTRUCT이벤트 순서 정보입니다
originSTRUCT이벤트 발생 원점 (파이프라인, 클러스터 등) 정보입니다
timestampTIMESTAMP이벤트 발생 시각입니다
messageSTRING사람이 읽을 수 있는 이벤트 메시지입니다
levelSTRING이벤트 레벨입니다 (INFO, WARN, ERROR, METRICS)
maturity_levelSTRINGAPI 안정성 수준입니다 (STABLE, EVOLVING)
errorSTRUCT오류 발생 시 상세 정보입니다
detailsSTRING이벤트 상세 정보 (JSON 형식)입니다
event_typeSTRING이벤트 유형입니다

주요 이벤트 유형

이벤트 유형 분류

이벤트 유형설명활용
user_action사용자가 파이프라인을 시작/중지한 이벤트입니다실행 이력 추적
flow_definition데이터 흐름(flow) 정의 정보입니다파이프라인 구조 파악
flow_progress데이터 흐름의 진행 상황입니다처리량 모니터링, 병목 분석
planning_information실행 계획 정보입니다리소스 사용 분석
dataset_definition데이터셋(테이블/뷰) 정의 정보입니다스키마 변경 추적
cluster_resources클러스터 리소스 사용 정보입니다비용 최적화

flow_progress 이벤트 분석

flow_progress는 각 데이터 흐름의 처리 결과를 담고 있어, 파이프라인 성능 분석에 가장 유용합니다.
-- 각 테이블별 처리 행 수와 소요 시간 조회
SELECT
    timestamp,
    details:flow_progress:data_quality:expectations AS expectations,
    details:flow_progress:metrics:num_output_rows AS output_rows,
    details:flow_progress:status AS status,
    details:flow_progress:flow_name AS flow_name
FROM event_log("pipeline-id-here")
WHERE event_type = 'flow_progress'
    AND details:flow_progress:status = 'COMPLETED'
ORDER BY timestamp DESC;

Expectations (데이터 품질) 결과 조회

SDP의 Expectations(데이터 품질 규칙) 검증 결과는 이벤트 로그에 자동으로 기록됩니다. 이를 활용하면 데이터 품질 추이를 모니터링할 수 있습니다.

Expectations 결과 조회 쿼리

-- Expectations 결과만 추출
WITH expectations_data AS (
    SELECT
        timestamp,
        details:flow_progress:flow_name AS flow_name,
        EXPLODE(
            FROM_JSON(
                details:flow_progress:data_quality:expectations,
                'ARRAY<STRUCT<name: STRING, dataset: STRING, passed_records: BIGINT, failed_records: BIGINT>>'
            )
        ) AS expectation
    FROM event_log("pipeline-id-here")
    WHERE event_type = 'flow_progress'
        AND details:flow_progress:data_quality IS NOT NULL
)
SELECT
    DATE(timestamp) AS date,
    flow_name,
    expectation.name AS rule_name,
    expectation.passed_records,
    expectation.failed_records,
    ROUND(
        expectation.passed_records * 100.0 /
        (expectation.passed_records + expectation.failed_records), 2
    ) AS pass_rate_pct
FROM expectations_data
ORDER BY timestamp DESC;

결과 예시

dateflow_namerule_namepassed_recordsfailed_recordspass_rate_pct
2025-03-31silver_ordersvalid_amount98,54215899.84
2025-03-31silver_ordersnot_null_id98,7000100.00
2025-03-30silver_ordersvalid_amount95,21128999.70

파이프라인 디버깅에 활용

오류 이벤트 조회

-- 오류 이벤트만 필터링
SELECT
    timestamp,
    level,
    message,
    error.exceptions AS error_details
FROM event_log("pipeline-id-here")
WHERE level = 'ERROR'
ORDER BY timestamp DESC
LIMIT 20;

실행 시간 분석

-- 파이프라인 실행별 소요 시간 분석
WITH update_events AS (
    SELECT
        origin.update_id,
        MIN(CASE WHEN event_type = 'user_action' AND details:user_action:action = 'START' THEN timestamp END) AS start_time,
        MAX(CASE WHEN event_type = 'user_action' AND details:user_action:action IN ('STOP', 'COMPLETE') THEN timestamp END) AS end_time
    FROM event_log("pipeline-id-here")
    GROUP BY origin.update_id
)
SELECT
    update_id,
    start_time,
    end_time,
    TIMESTAMPDIFF(MINUTE, start_time, end_time) AS duration_minutes
FROM update_events
WHERE start_time IS NOT NULL AND end_time IS NOT NULL
ORDER BY start_time DESC
LIMIT 10;

테이블별 처리량 추이

-- 일별 테이블별 처리 행 수 추이
SELECT
    DATE(timestamp) AS date,
    details:flow_progress:flow_name AS table_name,
    SUM(CAST(details:flow_progress:metrics:num_output_rows AS BIGINT)) AS total_rows
FROM event_log("pipeline-id-here")
WHERE event_type = 'flow_progress'
    AND details:flow_progress:status = 'COMPLETED'
GROUP BY 1, 2
ORDER BY date DESC, table_name;

이벤트 로그 기반 모니터링 대시보드

이벤트 로그 데이터를 활용하여 모니터링 대시보드를 구축할 수 있습니다.

권장 모니터링 지표

지표쿼리 대상임계값 예시
파이프라인 실행 시간user_action 이벤트의 START/COMPLETE 간격평소 대비 2배 초과 시 알림
Expectations 통과율flow_progress의 data_quality99% 미만 시 알림
처리된 행 수flow_progress의 num_output_rows0행 처리 시 알림
오류 발생 횟수level = 'ERROR' 이벤트 수1건 이상 시 알림

알림 설정용 뷰 생성

-- 데이터 품질 모니터링 뷰
CREATE OR REPLACE VIEW catalog.schema.v_pipeline_quality AS
WITH latest_expectations AS (
    SELECT
        timestamp,
        details:flow_progress:flow_name AS flow_name,
        EXPLODE(
            FROM_JSON(
                details:flow_progress:data_quality:expectations,
                'ARRAY<STRUCT<name: STRING, passed_records: BIGINT, failed_records: BIGINT>>'
            )
        ) AS exp
    FROM event_log("pipeline-id-here")
    WHERE event_type = 'flow_progress'
        AND details:flow_progress:data_quality IS NOT NULL
        AND DATE(timestamp) = CURRENT_DATE()
)
SELECT
    flow_name,
    exp.name AS rule_name,
    SUM(exp.passed_records) AS total_passed,
    SUM(exp.failed_records) AS total_failed,
    ROUND(
        SUM(exp.passed_records) * 100.0 /
        NULLIF(SUM(exp.passed_records) + SUM(exp.failed_records), 0), 2
    ) AS pass_rate_pct
FROM latest_expectations
GROUP BY flow_name, exp.name;


참고 링크