Skip to main content

FastAPI REST 엔드포인트

Unity Catalog 테이블에 대한 CRUD API를 FastAPI로 제공하는 예제입니다. 이 예제는 Databricks 데이터를 REST API로 노출 하여 외부 시스템이나 프론트엔드 앱에서 접근할 수 있게 하는 패턴을 보여줍니다.

왜 FastAPI인가?

Streamlit은 UI를 포함한 풀스택 앱에 적합하지만, 프론트엔드를 별도로 개발 하거나 다른 시스템에서 데이터를 호출 해야 하는 경우에는 REST API가 필요합니다. FastAPI는 다음과 같은 이유로 Databricks Apps에서 API 서버로 가장 적합합니다:
  • 자동 문서 생성: Swagger UI (/docs)가 자동으로 생성되어 API 테스트와 문서화가 동시에 해결됩니다
  • 타입 안전성: Pydantic 모델로 요청/응답 형식을 엄격하게 정의하여 런타임 오류를 방지합니다
  • 비동기 지원: async/await으로 고성능 비동기 처리가 가능합니다

requirements.txt

databricks-sdk
databricks-sql-connector
fastapi
uvicorn
pandas
uvicorn은 FastAPI의 ASGI 서버입니다. FastAPI는 프레임워크일 뿐이고, 실제로 HTTP 요청을 처리하는 것은 uvicorn입니다.

app.yaml

command:
  - uvicorn
  - app:app
  - --host
  - '0.0.0.0'
  - --port
  - '${DATABRICKS_APP_PORT}'
env:
  - name: DATABRICKS_WAREHOUSE_ID
    valueFrom: sql_warehouse
--host 0.0.0.0은 컨테이너 외부(리버스 프록시)에서 접근 가능하도록 모든 인터페이스에 바인딩합니다. --port ${DATABRICKS_APP_PORT}는 Databricks가 동적으로 할당한 포트를 사용합니다. 이 두 설정이 없으면 앱이 시작되어도 외부에서 접속할 수 없습니다.

app.py

import os
from typing import Optional

import pandas as pd
from databricks import sql
from databricks.sdk.core import Config
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel

app = FastAPI(
    title="Databricks Data API",
    description="Unity Catalog 테이블 조회 REST API",
    version="1.0.0",
)

cfg = Config()
FastAPI() 인스턴스 생성 시 title, description, version을 지정하면 자동 생성되는 Swagger UI에 반영됩니다. 앱 URL에 /docs를 추가하면 이 API의 인터랙티브 문서를 볼 수 있습니다.
def get_connection():
    """SQL Warehouse 연결을 반환합니다."""
    warehouse_id = os.getenv("DATABRICKS_WAREHOUSE_ID")
    http_path = f"/sql/1.0/warehouses/{warehouse_id}"

    server_hostname = cfg.host
    if server_hostname.startswith("https://"):
        server_hostname = server_hostname.replace("https://", "")

    return sql.connect(
        server_hostname=server_hostname,
        http_path=http_path,
        credentials_provider=lambda: cfg.authenticate,
    )
연결 함수는 Streamlit 예제와 동일한 패턴입니다. credentials_provider가 OAuth 토큰의 발급과 갱신을 자동으로 처리합니다.
class QueryRequest(BaseModel):
    """SQL 쿼리 요청 모델"""
    sql: str


class TableResponse(BaseModel):
    """테이블 조회 응답 모델"""
    columns: list[str]
    data: list[dict]
    row_count: int
Pydantic의 BaseModel로 요청/응답 형식을 정의합니다. 이렇게 하면 FastAPI가 자동으로 입력 유효성 검사, JSON 직렬화/역직렬화, API 문서 생성을 처리합니다. 잘못된 형식의 요청이 들어오면 422 에러와 함께 상세한 에러 메시지가 자동으로 반환됩니다.
@app.get("/")
def root():
    """API 헬스 체크"""
    return {"status": "healthy", "message": "Databricks Data API is running"}
루트 엔드포인트는 헬스체크 용도입니다. CI/CD 파이프라인에서 배포 후 앱이 정상적으로 실행 중인지 확인할 수 있습니다.
@app.get("/tables/{catalog}/{schema}/{table}", response_model=TableResponse)
def read_table(
    catalog: str,
    schema: str,
    table: str,
    limit: int = Query(default=100, le=10000),
    offset: int = Query(default=0, ge=0),
):
    """Unity Catalog 테이블을 조회합니다.

    Args:
        catalog: 카탈로그 이름
        schema: 스키마 이름
        table: 테이블 이름
        limit: 반환 행 수 (최대 10,000)
        offset: 시작 위치
    """
    table_name = f"`{catalog}`.`{schema}`.`{table}`"
    query = f"SELECT * FROM {table_name} LIMIT {limit} OFFSET {offset}"

    try:
        conn = get_connection()
        with conn.cursor() as cursor:
            cursor.execute(query)
            df = cursor.fetchall_arrow().to_pandas()
        conn.close()

        return TableResponse(
            columns=df.columns.tolist(),
            data=df.to_dict(orient="records"),
            row_count=len(df),
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
URL 경로에 {catalog}/{schema}/{table}을 사용하면 REST 스타일의 직관적인 API가 됩니다. Query(default=100, le=10000)limit 파라미터의 기본값을 100으로, 최대값을 10,000으로 제한합니다. 백틱(`)으로 테이블 이름을 감싸는 이유는 하이픈이나 특수문자가 포함된 이름을 안전하게 처리하기 위해서입니다.
@app.get("/tables/{catalog}/{schema}/{table}/schema")
def get_table_schema(catalog: str, schema: str, table: str):
    """테이블 스키마(컬럼 정보)를 조회합니다."""
    table_name = f"`{catalog}`.`{schema}`.`{table}`"
    query = f"DESCRIBE TABLE {table_name}"

    try:
        conn = get_connection()
        with conn.cursor() as cursor:
            cursor.execute(query)
            df = cursor.fetchall_arrow().to_pandas()
        conn.close()

        return {
            "table": f"{catalog}.{schema}.{table}",
            "columns": df.to_dict(orient="records"),
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
DESCRIBE TABLE은 테이블의 컬럼 이름, 데이터 타입, 설명 등 메타데이터를 반환합니다. 프론트엔드에서 동적으로 폼이나 테이블을 생성할 때 유용합니다.
@app.post("/query")
def execute_query(request: QueryRequest):
    """커스텀 SQL 쿼리를 실행합니다.

    보안 주의: 프로덕션에서는 SQL 인젝션 방어 및 허용 쿼리 제한을 구현하세요.
    """
    try:
        conn = get_connection()
        with conn.cursor() as cursor:
            cursor.execute(request.sql)
            df = cursor.fetchall_arrow().to_pandas()
        conn.close()

        return {
            "columns": df.columns.tolist(),
            "data": df.to_dict(orient="records"),
            "row_count": len(df),
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
주의 보안 경고: /query 엔드포인트는 임의의 SQL을 실행하므로, 프로덕션에서는 반드시 다음을 구현하세요:
  1. 허용된 SQL 유형만 실행(예: SELECT만 허용, DROP/DELETE 차단)
  2. 쿼리 크기 제한(LIMIT 강제 추가)
  3. 접근 가능한 카탈로그/스키마 제한
  4. 감사 로그(누가 어떤 쿼리를 실행했는지 기록)

API 사용 예시

# 헬스 체크
curl https://<app-url>/

# 테이블 조회
curl "https://<app-url>/tables/my_catalog/my_schema/my_table?limit=50"

# 테이블 스키마 조회
curl "https://<app-url>/tables/my_catalog/my_schema/my_table/schema"

# 커스텀 쿼리 실행
curl -X POST "https://<app-url>/query" \
  -H "Content-Type: application/json" \
  -d '{"sql": "SELECT count(*) as cnt FROM my_catalog.my_schema.my_table"}'
참고 API 문서 자동 생성: 앱 URL에 /docs를 추가하면 Swagger UI가 열립니다. 여기서 모든 엔드포인트를 인터랙티브하게 테스트할 수 있습니다. /redoc에서는 ReDoc 스타일의 읽기 전용 문서를 볼 수 있습니다.

프로덕션 전환 시 고려사항

항목현재 예제프로덕션 권장
SQL 인젝션입력 검증 없음허용 쿼리 유형 제한, 파라미터화 쿼리
인증Databricks SSO만API Key 추가 또는 사용자 인증 토큰 검증
Rate Limiting없음slowapi 패키지로 요청 속도 제한
연결 관리매 요청마다 새 연결연결 풀링 구현
로깅기본 로그만구조화된 감사 로그 (사용자, 쿼리, 응답 시간)
에러 응답내부 에러를 그대로 반환사용자 친화적 에러 메시지, 내부 상세는 로그에만