Skip to main content

Streamlit 앱으로 테이블 조회

Unity Catalog 테이블을 읽어 Streamlit 대시보드에 표시하고, 데이터를 편집하여 다시 저장하는 앱입니다. 이 예제를 통해 Databricks Apps의 핵심 패턴인 SQL Warehouse 연결, 인증, 데이터 CRUD 를 학습합니다.

사전 준비

서비스 프린시펄에 다음 권한 부여:
  • Unity Catalog 테이블에 대한 SELECT 권한
  • Unity Catalog 테이블에 대한 MODIFY 권한 (편집 기능 사용 시)
  • SQL Warehouse에 대한 CAN USE 권한
주의 권한 부여를 잊지 마세요: 가장 흔한 실수가 이 단계를 건너뛰는 것입니다. 앱은 정상 배포되지만 데이터 조회 시 Permission denied 오류가 발생합니다. 앱의 Overview 페이지에서 서비스 프린시펄 이름을 확인한 후, SQL Editor에서 다음을 실행하세요:
GRANT SELECT, MODIFY ON TABLE catalog.schema.table TO `<sp-application-id>`;
GRANT USE CATALOG ON CATALOG catalog TO `<sp-application-id>`;
GRANT USE SCHEMA ON SCHEMA catalog.schema TO `<sp-application-id>`;

requirements.txt

databricks-sdk
databricks-sql-connector
streamlit
pandas
각 패키지의 역할은 다음과 같습니다: databricks-sdk는 인증 처리(Config), databricks-sql-connector는 SQL Warehouse 연결, streamlit은 웹 UI 프레임워크, pandas는 데이터 처리입니다. 프로덕션에서는 반드시 버전을 명시하세요 (예: streamlit==1.32.0).

app.yaml

command: ['streamlit', 'run', 'app.py']
env:
  - name: DATABRICKS_WAREHOUSE_ID
    valueFrom: sql_warehouse
  - name: STREAMLIT_GATHER_USAGE_STATS
    value: 'false'
command에서 Streamlit을 직접 실행합니다. Streamlit은 자체적으로 포트를 관리하므로 DATABRICKS_APP_PORT를 별도로 지정하지 않아도 됩니다. STREAMLIT_GATHER_USAGE_STATS=false는 Streamlit의 사용 통계 수집을 비활성화하여 외부로의 네트워크 요청을 줄입니다.

app.py

아래 코드의 각 부분이 왜 이렇게 작성되었는지 설명합니다.
import math
import os
import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config

cfg = Config()
Config()는 환경변수에서 Databricks 인증 정보를 자동으로 읽습니다. 앱 실행 시 DATABRICKS_HOST, DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET가 자동 주입되므로 별도 설정이 필요 없습니다.
def get_connection():
    """SQL Warehouse에 연결합니다."""
    warehouse_id = os.getenv("DATABRICKS_WAREHOUSE_ID")
    http_path = f"/sql/1.0/warehouses/{warehouse_id}"

    server_hostname = cfg.host
    if server_hostname.startswith("https://"):
        server_hostname = server_hostname.replace("https://", "")
    elif server_hostname.startswith("http://"):
        server_hostname = server_hostname.replace("http://", "")

    return sql.connect(
        server_hostname=server_hostname,
        http_path=http_path,
        credentials_provider=lambda: cfg.authenticate,
        _use_arrow_native_complex_types=False,
    )
server_hostname에서 https://를 제거하는 이유는 sql.connect()가 호스트명만 받기 때문입니다. credentials_provider=lambda: cfg.authenticate는 SDK가 OAuth 토큰을 자동으로 발급/갱신하도록 위임합니다. _use_arrow_native_complex_types=False는 Arrow의 복합 타입을 Pandas 네이티브 타입으로 변환하여 호환성을 보장합니다.
def read_table(table_name: str, conn) -> pd.DataFrame:
    """Unity Catalog 테이블을 읽어 DataFrame으로 반환합니다."""
    with conn.cursor() as cursor:
        cursor.execute(f"SELECT * FROM {table_name}")
        return cursor.fetchall_arrow().to_pandas()
fetchall_arrow().to_pandas()는 Arrow 포맷으로 데이터를 가져온 뒤 Pandas DataFrame으로 변환합니다. Arrow를 거치는 이유는 대용량 데이터 전송에서 JSON 직렬화보다 훨씬 빠르기 때문 입니다.
주의 SQL 인젝션 주의: 이 예제에서 f"SELECT * FROM {table_name}"은 사용자 입력을 직접 SQL에 넣으므로 SQL 인젝션에 취약합니다. 프로덕션에서는 테이블 이름을 허용 목록으로 검증하거나, 파라미터화된 쿼리를 사용하세요.
def format_value(val):
    """SQL INSERT용 값 포맷팅"""
    if val is None or (isinstance(val, float) and math.isnan(val)):
        return "NULL"
    else:
        return repr(val)


def insert_overwrite_table(table_name: str, df: pd.DataFrame, conn):
    """편집된 데이터를 테이블에 저장합니다."""
    progress = st.empty()
    with conn.cursor() as cursor:
        rows = list(df.itertuples(index=False))
        values = ",".join(
            [f"({','.join(map(format_value, row))})" for row in rows]
        )
        with progress:
            st.info("Databricks SQL 실행 중...")
        cursor.execute(f"INSERT OVERWRITE {table_name} VALUES {values}")
    progress.empty()
    st.success("변경 사항이 저장되었습니다!")
INSERT OVERWRITE는 기존 데이터를 완전히 대체합니다. 이 방식은 소규모 테이블에서는 괜찮지만, 대용량 테이블에서는 MERGE INTO를 사용하는 것이 더 효율적입니다.
# ===== UI 구성 =====
st.title("Unity Catalog 테이블 뷰어")

table_name = st.text_input(
    "Unity Catalog 테이블 이름:",
    placeholder="catalog.schema.table_name",
)

if table_name:
    conn = get_connection()
    if conn:
        st.success("SQL Warehouse 연결 성공!")

        # 테이블 읽기
        original_df = read_table(table_name, conn)

        # 편집 가능한 데이터 에디터
        edited_df = st.data_editor(
            original_df, num_rows="dynamic", hide_index=True
        )

        # 변경 사항 감지
        df_diff = pd.concat([original_df, edited_df]).drop_duplicates(
            keep=False
        )
        if not df_diff.empty:
            st.warning(f"저장되지 않은 변경 사항이 {len(df_diff) // 2}건 있습니다.")
            if st.button("변경 사항 저장"):
                insert_overwrite_table(table_name, edited_df, conn)
                st.rerun()
else:
    st.info("테이블 이름을 입력하면 데이터가 로드됩니다.")
st.data_editor()는 Streamlit의 내장 편집 가능 테이블 위젯입니다. num_rows="dynamic"은 행 추가/삭제를 허용합니다. st.rerun()은 저장 후 페이지를 새로고침하여 최신 데이터를 다시 로드합니다.

배포

# 1. 앱 생성 (UI에서 또는 CLI로)
databricks apps create my-streamlit-app

# 2. 리소스 연결 (UI의 Configure에서 SQL Warehouse 추가)

# 3. 배포
databricks apps deploy my-streamlit-app --source-code-path ./my-streamlit-app

프로덕션 전환 시 고려사항

이 예제를 프로덕션에서 사용하려면 다음 사항을 개선해야 합니다.
항목현재 예제프로덕션 권장
SQL 인젝션사용자 입력을 직접 SQL에 삽입테이블 이름 허용 목록 검증, 파라미터화 쿼리
에러 핸들링기본 에러만 처리try/except로 연결 실패, 권한 오류, 타임아웃 처리
성능매 요청마다 새 연결 생성연결 풀링 또는 st.cache_resource로 연결 재사용
대용량 데이터SELECT * 전체 조회페이지네이션, 필터 조건 추가, LIMIT 강제 적용
의존성 버전버전 미명시streamlit==1.32.0처럼 버전 고정