Skip to main content
Databricks Apps의 기본 구조와 배포 방법은 Apps 가이드 — 앱 만들기를 먼저 참고하세요. Lakebase 리소스 연결과 Service Principal 권한은 Apps 리소스 관리에서 다룹니다.

Lakebase란?

Lakebase는 Databricks가 완전 관리형으로 제공하는 PostgreSQL 호환 트랜잭션 데이터베이스입니다. AWS Aurora나 Cloud SQL과 달리, Unity Catalog에 통합되어 있어 별도의 네트워크 피어링이나 IAM 설정 없이 바로 사용할 수 있습니다. OLTP(온라인 트랜잭션 처리) 워크로드를 Lakebase에서 처리하고, Data Sync 기능으로 Delta Lake에 자동 동기화하면 분석까지 하나의 플랫폼에서 해결됩니다.

사전 준비 체크리스트

시작하기 전에 아래 항목이 모두 준비되어 있는지 확인하세요.
항목필요 조건확인 방법
Databricks WorkspacePremium 이상 플랜워크스페이스 설정 페이지에서 플랜 확인
Unity Catalog카탈로그가 활성화되어 있어야 함SHOW CATALOGS SQL로 확인
Lakebase 인스턴스아래 SQL로 생성 완료SHOW DATABASES IN my_catalog
Databricks Apps 배포 환경Apps가 활성화된 워크스페이스사이드바에 “Apps” 메뉴 존재
Python 패키지fastapi, psycopg2-binary, databricks-sdkrequirements.txt에 명시
권한Service Principal에 Lakebase READ_WRITEapp.yamlresources 섹션에서 선언

1. Lakebase 데이터베이스 및 테이블 생성

Lakebase 데이터베이스 생성

아래 SQL은 Unity Catalog 내에 Lakebase로 관리되는 새 데이터베이스를 생성합니다. MANAGED LAKEBASE 키워드가 이 데이터베이스를 PostgreSQL 엔진으로 구동하겠다는 선언입니다.
-- Unity Catalog에서 Lakebase DB 생성
CREATE DATABASE my_catalog.my_app_db
MANAGED LAKEBASE;

테이블 생성

이 SQL은 상품 정보를 저장할 테이블을 만듭니다. PostgreSQL 표준 문법(SERIAL, VARCHAR, NUMERIC 등)을 그대로 사용할 수 있습니다.
CREATE TABLE my_catalog.my_app_db.products (
  id         SERIAL PRIMARY KEY,       -- 자동 증가 기본키
  name       VARCHAR(200) NOT NULL,    -- 상품명 (필수)
  price      NUMERIC(10,2) NOT NULL DEFAULT 0,  -- 가격 (소수점 2자리)
  category   VARCHAR(50),              -- 카테고리 (선택)
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,  -- 생성 시각 (자동)
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP   -- 수정 시각 (자동)
);

-- category 컬럼으로 자주 조회하므로 인덱스 추가
CREATE INDEX idx_products_category ON my_catalog.my_app_db.products (category);

2. FastAPI + psycopg2 연결 설정

app.yaml (리소스 선언)

이 파일은 Databricks Apps에 배포할 앱의 실행 명령, 환경변수, 그리고 Lakebase 리소스 접근 권한을 선언합니다. resources 섹션에서 lakebase_database를 선언하면, 앱의 Service Principal이 해당 DB에 접근할 수 있는 권한을 자동으로 받습니다.
command:
  - uvicorn
  - main:app
  - --host=0.0.0.0
  - --port=8000
env:
  - name: LAKEBASE_CATALOG
    value: my_catalog
  - name: LAKEBASE_SCHEMA
    value: my_app_db
resources:
  - name: lakebase-db
    lakebase_database:
      catalog: my_catalog
      schema: my_app_db
      permission: READ_WRITE

requirements.txt

fastapi>=0.115
uvicorn>=0.30
psycopg2-binary>=2.9
databricks-sdk>=0.40

연결 헬퍼 (db.py)

이 코드는 Lakebase에 연결하기 위한 커넥션 풀을 관리합니다. 핵심은 generate_credential() 함수로 임시 인증 토큰을 발급받는 부분입니다.

generate_credential()이 왜 필요한가?

Lakebase는 고정된 사용자/비밀번호 대신 토큰 기반 인증을 사용합니다. generate_credential()은 현재 Service Principal의 권한으로 임시 PostgreSQL 접속 정보(호스트, 포트, 사용자명, 비밀번호)를 발급합니다. 이 토큰은 1시간 동안 유효하므로, 프로덕션에서는 만료 전에 풀을 재생성하는 로직이 반드시 필요합니다 (4장에서 다룹니다).

ThreadedConnectionPool 설정값 의미

파라미터설명
minconn2풀 초기화 시 미리 만들어두는 최소 연결 수. 앱 시작 직후에도 대기 없이 응답 가능
maxconn10동시 요청이 많을 때 최대로 열 수 있는 연결 수. 너무 크면 DB에 부하, 너무 작으면 대기 발생
sslmode"require"Lakebase는 TLS 암호화 연결을 필수로 요구함
import os
import psycopg2
from psycopg2 import pool
from databricks.sdk import WorkspaceClient

_pool = None

def get_pool() -> pool.ThreadedConnectionPool:
    """Lakebase 커넥션 풀 (싱글턴)"""
    global _pool
    if _pool is None or _pool.closed:
        w = WorkspaceClient()
        # Lakebase 임시 인증 토큰 발급 (1시간 유효)
        cred = w.lakebase.generate_credential(
            catalog=os.environ["LAKEBASE_CATALOG"],
            schema=os.environ["LAKEBASE_SCHEMA"],
        )
        # ThreadedConnectionPool: 멀티스레드 환경(FastAPI)에서 안전한 풀
        _pool = pool.ThreadedConnectionPool(
            minconn=2,       # 최소 연결 수
            maxconn=10,      # 최대 연결 수
            host=cred.host,
            port=cred.port,
            dbname=cred.database,
            user=cred.username,
            password=cred.password,
            sslmode="require",  # TLS 필수
        )
    return _pool

def get_conn():
    """풀에서 커넥션 1개 대여"""
    return get_pool().getconn()

def put_conn(conn):
    """커넥션 반납"""
    get_pool().putconn(conn)
generate_credential()이 반환하는 토큰은 1시간 유효합니다. 프로덕션에서는 만료 전에 풀을 재생성하는 로직을 추가하세요.

3. CRUD 엔드포인트

main.py

아래 코드는 FastAPI로 상품(Product)의 생성·조회·수정·삭제 엔드포인트를 구현합니다. db() 컨텍스트 매니저가 커넥션 대여, 커밋/롤백, 반납을 자동으로 처리합니다.
from contextlib import contextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from db import get_conn, put_conn

app = FastAPI(title="Lakebase CRUD Demo")

# ── Pydantic 모델 ──────────────────────────
class ProductCreate(BaseModel):
    name: str
    price: float
    category: str | None = None

class ProductUpdate(BaseModel):
    name: str | None = None
    price: float | None = None
    category: str | None = None

# ── 커넥션 컨텍스트 매니저 ──────────────────
@contextmanager
def db():
    conn = get_conn()
    try:
        yield conn
        conn.commit()      # 정상 종료 시 커밋
    except Exception:
        conn.rollback()    # 예외 발생 시 롤백
        raise
    finally:
        put_conn(conn)     # 커넥션을 풀에 반납

# ── CREATE ──────────────────────────────────
@app.post("/products", status_code=201)
def create_product(body: ProductCreate):
    with db() as conn:
        cur = conn.cursor()
        cur.execute(
            """INSERT INTO products (name, price, category)
               VALUES (%s, %s, %s) RETURNING id, created_at""",
            (body.name, body.price, body.category),
        )
        row = cur.fetchone()
    return {"id": row[0], "created_at": str(row[1])}

# ── READ (목록) ─────────────────────────────
@app.get("/products")
def list_products(category: str | None = None, limit: int = 50):
    with db() as conn:
        cur = conn.cursor()
        if category:
            cur.execute(
                "SELECT id, name, price, category FROM products WHERE category = %s LIMIT %s",
                (category, limit),
            )
        else:
            cur.execute(
                "SELECT id, name, price, category FROM products LIMIT %s",
                (limit,),
            )
        rows = cur.fetchall()
    return [
        {"id": r[0], "name": r[1], "price": float(r[2]), "category": r[3]}
        for r in rows
    ]

# ── READ (단건) ─────────────────────────────
@app.get("/products/{product_id}")
def get_product(product_id: int):
    with db() as conn:
        cur = conn.cursor()
        cur.execute("SELECT id, name, price, category FROM products WHERE id = %s", (product_id,))
        row = cur.fetchone()
    if not row:
        raise HTTPException(404, "Product not found")
    return {"id": row[0], "name": row[1], "price": float(row[2]), "category": row[3]}

# ── UPDATE ──────────────────────────────────
@app.put("/products/{product_id}")
def update_product(product_id: int, body: ProductUpdate):
    fields, values = [], []
    for col in ("name", "price", "category"):
        v = getattr(body, col)
        if v is not None:
            fields.append(f"{col} = %s")
            values.append(v)
    if not fields:
        raise HTTPException(400, "No fields to update")
    fields.append("updated_at = CURRENT_TIMESTAMP")
    values.append(product_id)

    with db() as conn:
        cur = conn.cursor()
        cur.execute(
            f"UPDATE products SET {', '.join(fields)} WHERE id = %s RETURNING id",
            values,
        )
        row = cur.fetchone()
    if not row:
        raise HTTPException(404, "Product not found")
    return {"updated": row[0]}

# ── DELETE ──────────────────────────────────
@app.delete("/products/{product_id}")
def delete_product(product_id: int):
    with db() as conn:
        cur = conn.cursor()
        cur.execute("DELETE FROM products WHERE id = %s RETURNING id", (product_id,))
        row = cur.fetchone()
    if not row:
        raise HTTPException(404, "Product not found")
    return {"deleted": row[0]}

4. 커넥션 풀링 고급 패턴

프로덕션에서는 토큰 만료를 자동 처리해야 합니다. 아래 PoolRefresher는 백그라운드 스레드에서 50분(3000초)마다 커넥션 풀을 폐기하고 새로 생성합니다. 토큰 유효기간이 1시간이므로, 50분 간격이면 만료 전에 항상 새 토큰으로 교체됩니다.
import time, threading
from db import get_pool

class PoolRefresher:
    """50분마다 커넥션 풀을 재생성해 토큰 만료를 방지"""

    def __init__(self, interval_sec: int = 3000):
        self.interval = interval_sec
        self._stop = threading.Event()

    def start(self):
        t = threading.Thread(target=self._run, daemon=True)
        t.start()

    def _run(self):
        while not self._stop.is_set():
            time.sleep(self.interval)
            old = get_pool()
            old.closeall()          # 기존 풀 닫기
            _ = get_pool()          # 새 풀 자동 생성

# FastAPI lifespan에서 시작
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app):
    refresher = PoolRefresher()
    refresher.start()
    yield

app = FastAPI(lifespan=lifespan)

5. Data Sync로 Delta Lake 동기화

Lakebase 테이블 변경분을 Delta Lake로 자동 동기화하려면 Data Sync를 설정합니다. 동기화된 Delta 테이블에서 BI 대시보드나 ML 파이프라인을 연결할 수 있습니다. 아래 SQL은 products 테이블의 변경 사항을 5분마다 Delta Lake 테이블로 복제하는 동기화 작업을 생성합니다.
-- Delta Lake 타깃 테이블 확인 (자동 생성됨)
-- Lakebase 테이블과 동일 카탈로그·스키마 아래 _delta 접미사
CREATE SYNC my_catalog.my_app_db.products_sync
FROM my_catalog.my_app_db.products
SCHEDULE EVERY 5 MINUTES;

Python SDK로 Data Sync 생성

동일한 작업을 Python SDK로도 수행할 수 있습니다. CI/CD 파이프라인이나 자동화 스크립트에서 유용합니다.
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
w.lakebase.create_sync(
    source_catalog="my_catalog",
    source_schema="my_app_db",
    source_table="products",
    target_catalog="my_catalog",
    target_schema="analytics",
    target_table="products_delta",
    schedule="EVERY 5 MINUTES",
)
동기화된 Delta 테이블에서 분석 쿼리를 실행할 수 있습니다. 아래는 카테고리별 상품 수와 평균 가격을 집계하는 예시입니다.
-- Delta Lake 쪽에서 분석
SELECT category, COUNT(*) AS cnt, AVG(price) AS avg_price
FROM my_catalog.analytics.products_delta
GROUP BY category
ORDER BY cnt DESC;

트러블슈팅

연결 실패: connection refused 또는 timeout

증상원인해결 방법
Connection refusedLakebase 인스턴스가 아직 프로비저닝 중SHOW DATABASES IN my_catalog로 상태 확인. 생성 후 2~3분 대기
SSL SYSCALL errorsslmode 미설정sslmode="require" 반드시 지정
Connection timed out네트워크 제한 또는 잘못된 호스트generate_credential() 반환값의 host/port 로그 출력 후 확인

권한 오류: permission denied for table products

psycopg2.errors.InsufficientPrivilege: permission denied for table products
원인: app.yamlresources 섹션에서 permissionREAD_WRITE가 아닌 경우, 또는 리소스 선언이 누락된 경우 발생합니다. 해결:
  1. app.yamllakebase_database 리소스가 정확히 선언되어 있는지 확인
  2. permission: READ_WRITE로 설정되어 있는지 확인
  3. 앱을 재배포(databricks apps deploy)하여 권한 변경을 적용

토큰 만료: password authentication failed

psycopg2.OperationalError: password authentication failed for user "..."
원인: generate_credential()로 발급받은 토큰이 1시간을 초과하여 만료됨. 해결: 4장의 PoolRefresher 패턴을 적용하거나, 요청마다 _pool.closed를 체크하여 풀을 재생성하세요.

체크리스트

항목확인
Lakebase DB가 Unity Catalog에 생성되었는가[ ]
app.yaml에 lakebase_database 리소스를 선언했는가[ ]
SP에 READ_WRITE 권한이 부여되었는가[ ]
커넥션 풀 재생성 로직이 있는가 (토큰 만료 대비)[ ]
Data Sync 스케줄이 분석 요구사항에 맞는가[ ]