Skip to main content

1. 왜 Lakebase에서 최적화가 중요한가

OLTP vs OLAP 특성 차이

Databricks는 본래 대규모 분석(OLAP, Online Analytical Processing) 워크로드에 최적화된 플랫폼입니다. Lakebase는 그 위에 OLTP (Online Transaction Processing) 특성을 더한 관리형 PostgreSQL로, 두 세계를 연결하는 역할을 합니다.
특성OLAP (Delta / Spark)OLTP (Lakebase / PostgreSQL)
주요 작업대규모 집계, 스캔단건 삽입/조회/수정/삭제
트랜잭션제한적 (ACID on Delta)완전한 ACID 트랜잭션
동시 접속수십~수백 Spark job수십~수천 클라이언트 연결
지연 시간초~분 단위 허용밀리초 단위 요구
인덱스파일 통계 기반B-tree, Hash, GIN, GiST 등

Databricks 관리형이라는 특징

Lakebase는 단순한 PostgreSQL이 아니라 Databricks Unity Catalog 아래에서 동작하는 관리형 서비스입니다. 이 때문에 다음과 같은 제약과 이점이 동시에 존재합니다.
  • 이점: 자동 백업, Unity Catalog 거버넌스, Delta 테이블과의 양방향 동기화 (Synced Tables)
  • 제약: 직접 OS/스토리지 레벨 접근 불가, pg_hba.conf 등 일부 서버 파라미터 변경 제한
  • 네트워크: Databricks Apps와 동일 VPC 내에서 Private Link로 저지연 통신
따라서 최적화 전략은 “PostgreSQL 표준 방법”을 기반으로 하되, 관리형 환경의 제약을 인식하고 적용 해야 합니다.
참고: Databricks Lakebase 공식 문서

2. CRUD 작업 패턴

INSERT — 삽입 최적화

단건 INSERT는 네트워크 왕복(round-trip)이 발생할 때마다 오버헤드가 누적됩니다. 배치 처리 (batch insert) 를 활용하면 성능을 크게 향상시킬 수 있습니다.
import psycopg2
import psycopg2.extras

conn = psycopg2.connect(...)

# ❌ 나쁜 예: 루프 내 단건 INSERT (네트워크 왕복 N번)
for row in data:
    cur.execute("INSERT INTO events (user_id, action) VALUES (%s, %s)", row)

# ✅ 좋은 예: executemany로 배치 INSERT
records = [(row["user_id"], row["action"]) for row in data]
psycopg2.extras.execute_values(
    cur,
    "INSERT INTO events (user_id, action) VALUES %s",
    records,
    page_size=500   # 500건씩 묶어서 전송
)
conn.commit()
-- 대량 데이터 로드 시: COPY 명령이 가장 빠름
-- (Databricks Apps에서 파일 기반으로 사용하는 경우)
COPY events (user_id, action, created_at)
FROM '/tmp/events.csv'
WITH (FORMAT csv, HEADER true);

SELECT — 조회 최적화

# ✅ 파라미터 바인딩으로 SQL Injection 방지 + 실행 계획 재사용
cur.execute(
    "SELECT id, name, tier FROM customers WHERE tier = %s AND created_at > %s",
    ("premium", "2025-01-01")
)

# ✅ 커서 기반 페이지네이션 (OFFSET 대신 keyset pagination 권장)
# OFFSET은 뒤로 갈수록 느려짐 — 대규모 테이블에서 치명적
cur.execute(
    """
    SELECT id, name, created_at FROM orders
    WHERE created_at < %s          -- 마지막 조회 커서
    ORDER BY created_at DESC
    LIMIT 50
    """,
    (last_seen_cursor,)
)

# ❌ 나쁜 예: SELECT * + 대용량 OFFSET
cur.execute("SELECT * FROM orders ORDER BY id OFFSET 10000 LIMIT 50")

UPDATE — 수정 최적화

-- ✅ 배치 UPDATE: 조건을 명확히 지정하여 인덱스 활용
UPDATE customers
SET tier = 'premium', updated_at = NOW()
WHERE id = ANY(ARRAY[1001, 1002, 1003]);  -- IN 보다 ANY(ARRAY[...])가 플래너 친화적

-- ✅ UPSERT (INSERT ON CONFLICT): 중복 처리를 DB 레벨에서 해결
INSERT INTO customer_stats (customer_id, total_orders, last_order_at)
VALUES (%(cid)s, %(total)s, %(last)s)
ON CONFLICT (customer_id) DO UPDATE
  SET total_orders = EXCLUDED.total_orders,
      last_order_at = EXCLUDED.last_order_at;

-- ❌ 나쁜 예: 인덱스 없는 컬럼으로 UPDATE → Sequential Scan
UPDATE orders SET status = 'shipped' WHERE memo LIKE '%urgent%';

DELETE — 삭제 최적화

-- ✅ 소프트 삭제 (soft delete) 패턴 권장
-- 실제 DELETE 대신 is_deleted 플래그 설정 → 인덱스 활용, 복구 가능
ALTER TABLE customers ADD COLUMN is_deleted BOOLEAN DEFAULT FALSE;
ALTER TABLE customers ADD COLUMN deleted_at TIMESTAMPTZ;

UPDATE customers SET is_deleted = TRUE, deleted_at = NOW() WHERE id = %s;

-- ✅ 정기 하드 삭제가 필요하다면 배치로 처리
DELETE FROM audit_logs
WHERE created_at < NOW() - INTERVAL '90 days'
  AND id IN (SELECT id FROM audit_logs WHERE created_at < NOW() - INTERVAL '90 days' LIMIT 1000);
-- LIMIT으로 한 번에 삭제할 건수를 제한 → 락(lock) 경합 방지

3. 인덱스 전략

PostgreSQL 인덱스 유형

인덱스 유형적합한 사용처예시
B-tree등호/범위 조건, 정렬WHERE id = ?, ORDER BY created_at
Hash등호 조건만 (범위 불가)WHERE email = ? (등호 전용)
GIN배열, JSONB, 전문 검색WHERE tags @> '{python}', LIKE with trigram
GiST지리 정보, 범위 타입PostGIS 좌표, tsrange 겹침 검사
-- B-tree: 복합 인덱스 — 자주 함께 쓰이는 컬럼은 함께 묶기
-- (컬럼 순서가 중요: 카디널리티 높은 것을 앞에)
CREATE INDEX idx_orders_customer_date
    ON orders (customer_id, created_at DESC);

-- Hash: 등호 조건만 있는 고빈도 조회 컬럼
CREATE INDEX idx_sessions_token
    ON user_sessions USING hash (session_token);

-- GIN + trigram: ILIKE 기반 텍스트 검색 가속
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE INDEX idx_products_name_trgm
    ON products USING gin (name gin_trgm_ops);

-- GIN: JSONB 컬럼 내 키 검색
CREATE INDEX idx_events_payload
    ON events USING gin (payload jsonb_path_ops);

-- 부분 인덱스 (partial index): 조건에 해당하는 행만 인덱싱 → 크기 절감
CREATE INDEX idx_orders_pending
    ON orders (created_at)
    WHERE status = 'pending';

인덱스 설계 가이드

-- 인덱스 사용 현황 조회: 사용 안 되는 인덱스는 DROP 검토
SELECT
    schemaname,
    tablename,
    indexname,
    idx_scan        AS times_used,
    idx_tup_read    AS tuples_read,
    pg_size_pretty(pg_relation_size(indexrelid)) AS index_size
FROM pg_stat_user_indexes
ORDER BY idx_scan ASC;  -- 사용 횟수 낮은 인덱스 식별

-- 테이블 크기 vs 인덱스 크기 비율 확인
SELECT
    relname AS table_name,
    pg_size_pretty(pg_total_relation_size(oid)) AS total_size,
    pg_size_pretty(pg_indexes_size(oid))        AS indexes_size
FROM pg_class
WHERE relkind = 'r'
ORDER BY pg_total_relation_size(oid) DESC;
인덱스가 많을수록 INSERT/UPDATE/DELETE 속도가 저하됩니다. 쿼리 패턴을 먼저 분석하고 필요한 인덱스만 생성하십시오.

4. 연결 관리 (Connection Management)

Connection Pooling의 필요성

PostgreSQL은 연결(connection)마다 별도 프로세스를 생성합니다. Databricks Apps처럼 다수의 요청이 동시에 들어오는 환경에서는 커넥션 풀링 (connection pooling) 이 필수입니다.
# SQLAlchemy Connection Pool 설정 예시
from sqlalchemy import create_engine

engine = create_engine(
    "postgresql+psycopg2://user:password@host:5432/dbname",
    pool_size=5,          # 유지할 기본 연결 수
    max_overflow=10,      # pool_size 초과 시 최대 추가 연결 수
    pool_timeout=30,      # 연결 대기 타임아웃 (초)
    pool_recycle=1800,    # 30분마다 연결 재생성 (stale connection 방지)
    pool_pre_ping=True,   # 사용 전 연결 유효성 확인
    connect_args={"sslmode": "require"}
)
# psycopg2 ThreadedConnectionPool 사용 예시 (멀티스레드 환경)
from psycopg2 import pool as pg_pool
import streamlit as st

@st.cache_resource
def get_pool():
    return pg_pool.ThreadedConnectionPool(
        minconn=2,
        maxconn=10,
        host=st.secrets["lakebase"]["host"],
        port=5432,
        dbname=st.secrets["lakebase"]["dbname"],
        user=st.secrets["lakebase"]["user"],
        password=st.secrets["lakebase"]["password"],
        sslmode="require"
    )

def execute_query(sql, params=None):
    pool = get_pool()
    conn = pool.getconn()
    try:
        with conn.cursor() as cur:
            cur.execute(sql, params)
            conn.commit()
            return cur.fetchall()
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        pool.putconn(conn)  # 반드시 반환

Databricks Apps에서의 연결 패턴

Databricks Apps는 기본적으로 단일 프로세스 내에서 실행됩니다. 아래 원칙을 따르십시오.
  • @st.cache_resource 로 연결 풀을 앱 수명 동안 한 번만 생성
  • 요청마다 연결을 생성/소멸하지 않음 (오버헤드 발생)
  • Lakebase 연결 수 상한 은 인스턴스 크기에 따라 다르며, 기본적으로 수백 개 수준
  • 연결이 오래 유휴 상태로 남으면 서버 측에서 끊길 수 있으므로 pool_pre_ping=True 권장
참고: PostgreSQL Connection Pooling Best Practices

5. 데이터 동기화 — Delta ↔ Lakebase (Synced Tables)

Synced Tables 개요

Synced Tables 는 Delta 테이블의 데이터를 Lakebase PostgreSQL 테이블로 자동 동기화하는 기능입니다. 분석용 데이터(Delta)를 OLTP 앱(Lakebase)에서 빠르게 조회할 수 있도록 연결해 줍니다.
Delta Table (Unity Catalog)

        │  Synced Table (자동 동기화)

Lakebase PostgreSQL Table

        │  SQL 조회

Databricks App (Streamlit 등)

Synced Tables 생성

-- Lakebase 내에서 Synced Table 생성
-- Unity Catalog의 Delta 테이블을 PostgreSQL 테이블로 동기화
CREATE SYNCED TABLE product_catalog
FROM catalog.schema.products_delta
WITH (sync_mode = 'INCREMENTAL');  -- FULL 또는 INCREMENTAL
# Python SDK로 Synced Table 관리
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Synced Table 생성
w.lakebase.create_sync(
    database_name="mydb",
    source_table="catalog.schema.products",
    target_table="product_catalog",
    sync_mode="INCREMENTAL"
)

# 동기화 상태 확인
status = w.lakebase.get_sync(database_name="mydb", sync_name="product_catalog")
print(status.state)  # RUNNING, COMPLETED, FAILED

실시간 vs 배치 동기화 비교

방식동기화 주기사용 사례비고
Incremental Sync변경분만 자동 감지제품 카탈로그, 사용자 프로필권장 기본값
Full Sync전체 재적재소규모 참조 테이블데이터 양 적을 때만
수동 트리거필요 시 호출배치 작업 후 즉시 반영Job에서 SDK 호출
참고: Databricks Synced Tables 문서