Skip to main content

FastAPI + Lakebase (피드백 수집 앱)

Lakebase(PostgreSQL)를 사용하여 사용자 피드백을 수집하고 조회하는 REST API입니다.

requirements.txt

databricks-sdk
fastapi
uvicorn
psycopg2-binary

app.yaml

command:
  - uvicorn
  - app:app
  - --host
  - '0.0.0.0'
  - --port
  - '${DATABRICKS_APP_PORT}'
env:
  - name: DB_CONNECTION
    valueFrom: lakebase_db
resources:
  - name: lakebase_db
    type: postgres

app.py

import os
from datetime import datetime
from typing import Optional

import psycopg2
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel

app = FastAPI(title="피드백 수집 API")

DB_CONNECTION = os.getenv("DB_CONNECTION")


def get_db():
    """Lakebase PostgreSQL 연결"""
    return psycopg2.connect(DB_CONNECTION)


# 앱 시작 시 테이블 생성
@app.on_event("startup")
def init_db():
    conn = get_db()
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE IF NOT EXISTS feedback (
                id SERIAL PRIMARY KEY,
                user_email VARCHAR(255),
                category VARCHAR(100),
                message TEXT NOT NULL,
                rating INT CHECK (rating >= 1 AND rating <= 5),
                created_at TIMESTAMP DEFAULT NOW()
            )
        """)
        conn.commit()
    conn.close()


class FeedbackCreate(BaseModel):
    user_email: Optional[str] = None
    category: str
    message: str
    rating: int


class FeedbackResponse(BaseModel):
    id: int
    user_email: Optional[str]
    category: str
    message: str
    rating: int
    created_at: datetime


@app.post("/api/feedback", response_model=FeedbackResponse)
def create_feedback(fb: FeedbackCreate):
    """피드백을 Lakebase에 저장합니다."""
    conn = get_db()
    try:
        with conn.cursor() as cur:
            cur.execute(
                """INSERT INTO feedback (user_email, category, message, rating)
                   VALUES (%s, %s, %s, %s) RETURNING id, created_at""",
                (fb.user_email, fb.category, fb.message, fb.rating)
            )
            row = cur.fetchone()
            conn.commit()
            return FeedbackResponse(
                id=row[0], created_at=row[1],
                **fb.model_dump()
            )
    finally:
        conn.close()


@app.get("/api/feedback", response_model=list[FeedbackResponse])
def list_feedback(category: Optional[str] = None, limit: int = 50):
    """피드백 목록을 조회합니다."""
    conn = get_db()
    try:
        with conn.cursor() as cur:
            if category:
                cur.execute(
                    "SELECT id, user_email, category, message, rating, created_at "
                    "FROM feedback WHERE category = %s ORDER BY created_at DESC LIMIT %s",
                    (category, limit)
                )
            else:
                cur.execute(
                    "SELECT id, user_email, category, message, rating, created_at "
                    "FROM feedback ORDER BY created_at DESC LIMIT %s",
                    (limit,)
                )
            rows = cur.fetchall()
            return [
                FeedbackResponse(
                    id=r[0], user_email=r[1], category=r[2],
                    message=r[3], rating=r[4], created_at=r[5]
                ) for r in rows
            ]
    finally:
        conn.close()


@app.get("/api/feedback/stats")
def feedback_stats():
    """피드백 통계를 반환합니다."""
    conn = get_db()
    try:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT
                    category,
                    COUNT(*) as count,
                    AVG(rating) as avg_rating
                FROM feedback
                GROUP BY category
                ORDER BY count DESC
            """)
            rows = cur.fetchall()
            return [
                {"category": r[0], "count": r[1], "avg_rating": float(r[2])}
                for r in rows
            ]
    finally:
        conn.close()
참고 Lakebase의 장점: SQL Warehouse를 통한 UC 테이블 접근은 분석 쿼리에 최적화되어 있어 쓰기 작업에는 지연이 있습니다. Lakebase는 PostgreSQL 호환이므로 INSERT, UPDATE, DELETE가 수 ms 내에 처리됩니다. 피드백 수집, 채팅 이력 저장, 세션 관리 등 OLTP 워크로드에 적합합니다.