# app.py — FastAPI + Lakebase REST API
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from contextlib import asynccontextmanager
import psycopg2
from psycopg2 import pool
from typing import Optional
import os
# 커넥션 풀 초기화
db_pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""앱 시작 시 커넥션 풀 생성, 종료 시 정리"""
global db_pool
db_pool = pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
host=os.environ["LAKEBASE_HOST"],
port=5432,
dbname=os.environ["LAKEBASE_DBNAME"],
user="token",
password=os.environ["LAKEBASE_TOKEN"],
sslmode="require"
)
yield
db_pool.closeall()
app = FastAPI(title="주문 API", lifespan=lifespan)
# --- 모델 정의 ---
class OrderCreate(BaseModel):
customer_id: int
product: str
amount: float
status: Optional[str] = "pending"
class OrderResponse(BaseModel):
id: int
customer_id: int
product: str
amount: float
status: str
# --- API 엔드포인트 ---
@app.get("/orders", response_model=list[OrderResponse])
def list_orders(limit: int = 50, status: Optional[str] = None):
"""주문 목록을 조회합니다."""
conn = db_pool.getconn()
try:
cursor = conn.cursor()
if status:
cursor.execute(
"SELECT id, customer_id, product, amount, status "
"FROM orders WHERE status = %s "
"ORDER BY created_at DESC LIMIT %s",
(status, limit)
)
else:
cursor.execute(
"SELECT id, customer_id, product, amount, status "
"FROM orders ORDER BY created_at DESC LIMIT %s",
(limit,)
)
rows = cursor.fetchall()
return [
OrderResponse(id=r[0], customer_id=r[1], product=r[2],
amount=r[3], status=r[4])
for r in rows
]
finally:
db_pool.putconn(conn)
@app.post("/orders", response_model=OrderResponse, status_code=201)
def create_order(order: OrderCreate):
"""새 주문을 생성합니다."""
conn = db_pool.getconn()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO orders (customer_id, product, amount, status) "
"VALUES (%s, %s, %s, %s) RETURNING id",
(order.customer_id, order.product, order.amount, order.status)
)
order_id = cursor.fetchone()[0]
conn.commit()
return OrderResponse(
id=order_id, customer_id=order.customer_id,
product=order.product, amount=order.amount,
status=order.status
)
except Exception as e:
conn.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
db_pool.putconn(conn)
@app.patch("/orders/{order_id}/status")
def update_order_status(order_id: int, status: str):
"""주문 상태를 업데이트합니다."""
conn = db_pool.getconn()
try:
cursor = conn.cursor()
cursor.execute(
"UPDATE orders SET status = %s, updated_at = CURRENT_TIMESTAMP "
"WHERE id = %s RETURNING id",
(status, order_id)
)
if cursor.fetchone() is None:
raise HTTPException(status_code=404, detail="주문을 찾을 수 없습니다")
conn.commit()
return {"message": f"주문 {order_id} 상태가 '{status}'로 변경되었습니다"}
finally:
db_pool.putconn(conn)