Skip to main content

왜 스케줄링이 중요한가?

데이터 파이프라인은 “적시에, 안정적으로” 실행되어야 합니다. 매일 새벽 2시에 전일 매출을 집계해야 하고, 새 파일이 도착하면 즉시 수집해야 하며, 소스 테이블이 갱신되면 후속 파이프라인이 자동으로 시작되어야 합니다.
💡 스케줄링(Scheduling) 이란 작업의 실행 시점과 빈도를 미리 정의하는 것이고, 트리거(Trigger) 란 특정 이벤트 발생 시 작업을 시작시키는 메커니즘입니다. Databricks는 시간 기반, 이벤트 기반, 연속 실행 등 다양한 방식을 지원합니다.

스케줄링 방식 비교

방식설명적합한 시나리오
Cron 스케줄고정된 시간에 주기적으로 실행합니다일별/주별 배치 ETL, 리포트 생성
파일 도착 트리거새 파일이 도착하면 실행합니다외부 시스템 연동, 벤더 데이터 수집
테이블 트리거Delta 테이블이 갱신되면 실행합니다파이프라인 체이닝, 이벤트 기반 처리
연속 실행완료 즉시 다시 시작합니다스트리밍 워크로드, 실시간 처리
수동 / API 트리거API 호출로 직접 실행합니다임시 실행, 외부 오케스트레이터 연동

Cron 스케줄링

Cron 표현식 문법

💡 Cron 표현식 은 작업의 실행 시점을 정의하는 표준 문법입니다. Databricks는 Quartz Cron 형식을 사용하며, 일반적인 Unix Cron과 약간 다릅니다.
Cron 필드 순서:  초  분  시  일  월  요일

| 필드 | 범위 |
|------|------|
| 초 | 0-59 |
| 분 | 0-59 |
| 시 | 0-23 |
| 일 | 1-31 |
| 월 | 1-12 |
| 요일 | 1-7 (1=일요일) |
* * * * * *

자주 사용하는 Cron 표현식

Cron 표현식의미사용 사례
0 0 2 * * ?매일 새벽 2시일별 배치 ETL
0 0 */6 * * ?6시간마다정기 데이터 동기화
0 0 9 ? * MON매주 월요일 오전 9시주간 리포트 생성
0 0 0 1 * ?매월 1일 자정월별 집계
0 */30 * * * ?30분마다빈번한 데이터 갱신
0 0 8-18 ? * MON-FRI평일 매시간 (8시~18시)업무 시간 내 모니터링
0 0 2 ? * MON-FRI평일 새벽 2시영업일 기준 배치 처리
0 0 0 L * ?매월 마지막 날 자정월말 정산

특수 문자 설명

문자의미예시
*모든 값* * * * * ? = 매초
?지정하지 않음 (일/요일에 사용)일과 요일은 동시에 * 사용 불가
/증분값0/15 = 0분부터 15분 간격
-범위MON-FRI = 월~금
,목록MON,WED,FRI = 월, 수, 금
L마지막L = 해당 월의 마지막 날
W가장 가까운 평일15W = 15일에 가장 가까운 평일

UI 및 YAML에서 설정

UI 방식: Workflows → Job → Schedule 탭에서 드롭다운으로 간편하게 설정하거나, 고급 옵션에서 Cron 표현식을 직접 입력할 수 있습니다.
# Asset Bundles 설정
schedule:
  quartz_cron_expression: "0 0 2 * * ?"
  timezone_id: "Asia/Seoul"
  pause_status: "UNPAUSED"      # PAUSED로 설정하면 일시 중지

파일 도착 트리거 (File Arrival Trigger)

클라우드 스토리지(S3, ADLS, GCS)에 새 파일이 도착하면 자동으로 Job을 실행합니다. 외부 벤더가 FTP/S3로 데이터를 전송하는 시나리오에 적합합니다.

동작 원리

단계발신수신내용
1외부 벤더S3 Bucket파일 업로드 (orders_20250331.csv)
2DatabricksS3 Bucket주기적 폴링 (새 파일 감지)
3S3 BucketDatabricks새 파일 발견 알림
4DatabricksJobJob을 트리거합니다
5JobS3 Bucket파일을 읽고 처리합니다
6JobDatabricks처리 완료

설정 옵션

옵션설명기본값
url감시할 클라우드 스토리지 경로입니다(필수)
min_time_between_triggers_seconds연속 트리거 사이 최소 대기 시간(초)입니다60
wait_after_last_change_seconds마지막 파일 변경 후 대기 시간(초)입니다. 여러 파일이 연속 도착하는 경우 유용합니다0
# Asset Bundles 설정
trigger:
  file_arrival:
    url: "s3://my-bucket/incoming/orders/"
    min_time_between_triggers_seconds: 300    # 최소 5분 간격
    wait_after_last_change_seconds: 60        # 마지막 변경 후 1분 대기
# Azure Blob Storage 예제
trigger:
  file_arrival:
    url: "abfss://container@storage.dfs.core.windows.net/incoming/"
    min_time_between_triggers_seconds: 120
⚠️ 주의: 파일 도착 트리거는 디렉토리 리스팅 방식으로 동작하므로, 파일이 매우 많은 디렉토리에서는 감지 지연이 발생할 수 있습니다. 날짜별 파티션 디렉토리를 사용하여 감시 대상을 좁히는 것을 권장합니다.

테이블 기반 트리거 (Table Trigger)

🆕 [2024 릴리즈] Delta 테이블의 데이터가 변경되면 자동으로 Job을 트리거하는 기능입니다. 파이프라인 체이닝에 매우 유용합니다.
소스 테이블이 갱신되면 후속 파이프라인이 자동으로 실행되어, 수동으로 스케줄을 맞출 필요가 없어집니다.

동작 원리

단계발신수신내용
1Job A (수집)Delta Table데이터 INSERT/UPDATE
2DatabricksDelta Table테이블 변경 감지
3DatabricksJob B (변환)Job B를 트리거합니다
4Job BDelta Table변경된 데이터를 읽습니다
5Job BDatabricks처리 완료

설정 방법

trigger:
  table:
    condition: "ANY_UPDATED"
    table_names:
      - "catalog.schema.bronze_orders"
      - "catalog.schema.bronze_customers"
    min_time_between_triggers_seconds: 300
    wait_after_last_change_seconds: 120
옵션설명
condition트리거 조건입니다. ANY_UPDATED(하나라도 갱신), ALL_UPDATED(모두 갱신)
table_names감시할 Delta 테이블 목록입니다
min_time_between_triggers_seconds연속 트리거 사이 최소 대기 시간입니다
wait_after_last_change_seconds마지막 변경 후 대기 시간입니다
💡 ALL_UPDATED 조건을 사용하면, 지정된 모든 테이블이 갱신되었을 때만 트리거됩니다. 여러 소스 테이블을 조인하는 변환 Job에 적합합니다.

연속 실행 (Continuous) 모드

Job이 완료되면 즉시 다시 시작하는 모드입니다. 스트리밍 워크로드나 매우 빈번한 배치 처리에 적합합니다.
trigger:
  periodic:
    interval: 1
    unit: "MINUTES"
continuous:
  pause_status: "UNPAUSED"
특성설명
동작 방식Job 완료 즉시 새 실행을 시작합니다
재시작 간격periodic.interval로 최소 간격을 설정할 수 있습니다
장애 복구실패 시 자동 재시작됩니다
비용24시간 클러스터가 실행되므로 비용이 높을 수 있습니다
⚠️ 연속 모드 vs Structured Streaming: 단순히 “자주 실행”이 목적이라면 연속 모드가 아닌 짧은 Cron 간격(5~15분)이 더 비용 효율적일 수 있습니다. 진정한 스트리밍이 필요한 경우에만 연속 모드를 사용하세요.

API 기반 수동 트리거

REST API를 통해 외부 시스템에서 Job을 실행할 수 있습니다. CI/CD 파이프라인, Airflow 같은 외부 오케스트레이터와의 연동에 사용됩니다.

REST API 호출

# Job 즉시 실행 (Run Now)
curl -X POST "https://<workspace-url>/api/2.1/jobs/run-now" \
  -H "Authorization: Bearer <token>" \
  -H "Content-Type: application/json" \
  -d '{
    "job_id": 12345,
    "job_parameters": {
      "target_date": "2025-03-31",
      "mode": "incremental"
    }
  }'
# 일회성 실행 (Submit Run) - Job 정의 없이 바로 실행
curl -X POST "https://<workspace-url>/api/2.1/jobs/runs/submit" \
  -H "Authorization: Bearer <token>" \
  -H "Content-Type: application/json" \
  -d '{
    "run_name": "ad-hoc-backfill",
    "tasks": [
      {
        "task_key": "backfill",
        "notebook_task": {
          "notebook_path": "/Workspace/etl/backfill",
          "base_parameters": {
            "start_date": "2025-01-01",
            "end_date": "2025-03-31"
          }
        },
        "new_cluster": {
          "spark_version": "15.4.x-scala2.12",
          "num_workers": 4,
          "node_type_id": "r5.xlarge"
        }
      }
    ]
  }'

Databricks SDK (Python)

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Job 즉시 실행
run = w.jobs.run_now(
    job_id=12345,
    job_parameters={
        "target_date": "2025-03-31",
        "mode": "incremental"
    }
)

# 실행 결과 대기
result = w.jobs.get_run(run_id=run.run_id)
print(f"상태: {result.state.result_state}")