Skip to main content

Lakehouse Monitoring 연동

Inference Table의 데이터를 Lakehouse Monitoring 과 연동하면, 데이터 드리프트와 모델 품질을 체계적으로 모니터링할 수 있습니다.

데이터 드리프트 감지

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorInferenceLog, MonitorInferenceLogProblemType

w = WorkspaceClient()

# Inference Table에 Lakehouse Monitor 설정
w.quality_monitors.create(
    table_name="ml_prod.monitoring.fraud_model_payload",
    inference_log=MonitorInferenceLog(
        problem_type=MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION,
        prediction_col="prediction",
        label_col="actual_label",       # Ground Truth (사후 레이블링 필요)
        timestamp_col="timestamp_ms",
        model_id_col="model_version",
        granularities=["1 day", "1 hour"],
    ),
    output_schema_name="ml_prod.monitoring",
)
모니터가 설정되면 자동으로 두 가지 분석 테이블이 생성됩니다.
생성 테이블내용
*_profile_metrics각 컬럼의 통계 요약 (평균, 분산, 분포 등)
*_drift_metrics기준 기간 대비 드리프트 지표 (PSI, KL Divergence 등)
💡 PSI(Population Stability Index): 두 분포의 차이를 측정하는 지표입니다. PSI가 0.2를 초과하면 유의미한 드리프트로 판단합니다.
드리프트 지표 해석 기준:
지표범위해석
PSI< 0.1안정 (No Change)
PSI0.1 ~ 0.2소폭 변화 (Minor Shift), 모니터링 강화
PSI> 0.2유의미한 드리프트 (Major Shift), 재학습 권장
KS 통계량< 0.05두 분포 동일 (p-value 기준)
Jensen-Shannon Divergence> 0.1분포 차이 감지

Lakehouse Monitoring 알림 설정

드리프트 감지 시 자동으로 알림을 발송하도록 설정할 수 있습니다.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import (
    MonitorInferenceLog,
    MonitorInferenceLogProblemType,
    MonitorNotificationsConfig,
    MonitorNotificationsSpec,
)

w = WorkspaceClient()

# 알림이 포함된 모니터 생성
w.quality_monitors.create(
    table_name="ml_prod.monitoring.fraud_model_payload",
    inference_log=MonitorInferenceLog(
        problem_type=MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION,
        prediction_col="prediction",
        label_col="actual_label",
        timestamp_col="timestamp_ms",
        model_id_col="model_version",
        granularities=["1 day"],
    ),
    output_schema_name="ml_prod.monitoring",
    # 드리프트 감지 시 이메일 알림
    notifications=MonitorNotificationsConfig(
        on_new_classification_tag_detected=MonitorNotificationsSpec(
            email_addresses=["ml-team@company.com"]
        )
    ),
    # 모니터 실행 실패 시 알림
    # on_failure=MonitorNotificationsSpec(email_addresses=["ml-team@company.com"])
)
-- 생성된 드리프트 메트릭 조회 (자동 생성 테이블)
SELECT
    window,
    column_name,
    drift_type,
    drift_value,
    threshold_value,
    -- 0.2 초과 시 알림 트리거
    CASE WHEN drift_value > 0.2 THEN 'ALERT' ELSE 'OK' END AS status
FROM ml_prod.monitoring.fraud_model_payload_drift_metrics
WHERE window_start >= CURRENT_DATE() - INTERVAL 7 DAYS
ORDER BY window_start DESC, drift_value DESC;

모델 품질 모니터링

Ground Truth 레이블이 확보되면 모델의 실제 성능을 추적할 수 있습니다.
-- Ground Truth 레이블을 Inference Table에 조인
-- (실제 결과가 나중에 확인되는 경우)
CREATE OR REPLACE TABLE ml_prod.monitoring.fraud_model_labeled AS
SELECT
    p.*,
    g.actual_label
FROM ml_prod.monitoring.fraud_model_payload p
LEFT JOIN ml_prod.gold.fraud_labels g
    ON p.request:customer_id = g.customer_id
    AND p.date = g.event_date;

-- 일별 모델 정확도 추적
SELECT
    date,
    COUNT(*) AS total_predictions,
    SUM(CASE WHEN prediction = actual_label THEN 1 ELSE 0 END) AS correct,
    ROUND(AVG(CASE WHEN prediction = actual_label THEN 1.0 ELSE 0.0 END) * 100, 2) AS accuracy_pct
FROM ml_prod.monitoring.fraud_model_labeled
WHERE actual_label IS NOT NULL
GROUP BY date
ORDER BY date DESC;

MLflow Tracing (에이전트/LLM 서빙)

LLM 기반 에이전트를 서빙하는 경우, MLflow Tracing 을 통해 각 요청의 내부 실행 경로를 추적할 수 있습니다.
구성 요소역할설명
사용자 요청입력에이전트에 요청을 전송합니다
에이전트오케스트레이션요청을 분석하고 필요한 작업을 수행합니다
문서 검색Vector Search관련 문서를 검색합니다
LLM 호출Foundation Model답변을 생성합니다
도구 실행SQL, API 등외부 도구를 호출합니다
최종 응답출력사용자에게 결과를 반환합니다
Tracing이 캡처하는 정보는 다음과 같습니다.
트레이스 항목설명
Span 계층구조에이전트의 각 단계(검색, LLM 호출, 도구 실행)를 트리 형태로 표시합니다
입출력각 단계의 입력과 출력을 기록합니다
토큰 사용량LLM 호출별 입력/출력 토큰 수를 추적합니다
지연 시간각 단계별 소요 시간을 밀리초 단위로 측정합니다
에러 정보실패 시 에러 메시지와 스택 트레이스를 기록합니다
# MLflow Tracing은 에이전트 서빙 시 자동 활성화됩니다
# Inference Table에서 트레이스 데이터를 조회할 수 있습니다

import json

# 트레이스 데이터 분석 쿼리 (SQL)
# SELECT
#     request_id,
#     trace:spans[0].name AS root_span,
#     trace:spans[0].attributes.total_tokens AS total_tokens,
#     execution_time_ms
# FROM ml_prod.monitoring.agent_payload
# WHERE date = CURRENT_DATE()
# ORDER BY execution_time_ms DESC
# LIMIT 10;

시스템 테이블 활용

Databricks 시스템 테이블(system.serving)에는 모든 서빙 엔드포인트의 운영 메트릭이 자동으로 기록됩니다.
-- 엔드포인트별 일일 요청량 및 에러율
SELECT
    endpoint_name,
    DATE(request_time) AS day,
    COUNT(*) AS total_requests,
    SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) AS errors,
    ROUND(
        SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2
    ) AS error_rate_pct,
    ROUND(AVG(execution_time_ms), 1) AS avg_latency_ms,
    ROUND(PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY execution_time_ms), 1) AS p99_latency_ms
FROM system.serving.served_model_requests
WHERE DATE(request_time) >= CURRENT_DATE() - INTERVAL 7 DAYS
GROUP BY endpoint_name, DATE(request_time)
ORDER BY day DESC, endpoint_name;

-- 엔드포인트별 프로비저닝된 동시성 변화 추적
SELECT
    endpoint_name,
    change_time,
    scaled_entity_name,
    previous_scale,
    new_scale
FROM system.serving.endpoint_scaling_events
WHERE DATE(change_time) >= CURRENT_DATE() - INTERVAL 7 DAYS
ORDER BY change_time DESC;
💡 시스템 테이블 은 Unity Catalog의 system 카탈로그에 위치하며, 워크스페이스 관리자가 활성화해야 합니다. 추가 비용 없이 90일간의 데이터를 보존합니다.

비용 모니터링

토큰 사용량 추적 (LLM 엔드포인트)

-- LLM 엔드포인트 일별 토큰 사용량 및 추정 비용
SELECT
    DATE(timestamp_ms / 1000) AS day,
    COUNT(*) AS total_requests,
    SUM(request:usage.prompt_tokens) AS total_input_tokens,
    SUM(request:usage.completion_tokens) AS total_output_tokens,
    SUM(request:usage.total_tokens) AS total_tokens,
    -- 대략적인 비용 추정 (모델별로 단가가 다릅니다)
    ROUND(SUM(request:usage.total_tokens) / 1000000.0 * 2.0, 2) AS estimated_cost_usd
FROM ml_prod.monitoring.agent_payload
WHERE date >= CURRENT_DATE() - INTERVAL 30 DAYS
GROUP BY DATE(timestamp_ms / 1000)
ORDER BY day DESC;

DBU 기반 비용 추적

-- 서빙 엔드포인트의 DBU 사용량 (system.billing 테이블)
SELECT
    usage_date,
    workspace_id,
    sku_name,
    usage_metadata.endpoint_name,
    SUM(usage_quantity) AS total_dbus,
    SUM(usage_quantity * list_price) AS estimated_cost
FROM system.billing.usage
WHERE sku_name LIKE '%SERVING%'
    AND usage_date >= CURRENT_DATE() - INTERVAL 30 DAYS
GROUP BY usage_date, workspace_id, sku_name, usage_metadata.endpoint_name
ORDER BY usage_date DESC;