Skip to main content

For Each 태스크

For Each 태스크 는 입력 배열의 각 요소에 대해 내부 태스크를 반복 실행합니다. 동일한 로직을 여러 테넌트, 여러 리전, 여러 파티션에 적용하는 패턴에 매우 유용합니다.

구조

  • For Each 태스크 (외부): 반복할 파라미터 배열을 정의합니다
  • 내부 태스크 (nested task): 각 반복에서 실행할 태스크를 정의합니다
  • concurrency: 동시 실행할 최대 태스크 수를 제어합니다

Asset Bundle YAML 예시

- task_key: "process_all_regions"
  for_each_task:
    inputs: '["us-east-1", "eu-west-1", "ap-northeast-2"]'
    concurrency: 3   # 3개 리전을 동시에 처리합니다
    task:
      task_key: "process_region"
      notebook_task:
        notebook_path: "/Workspace/etl/process_region"
        base_parameters:
          region: "{{input}}"   # 배열의 현재 요소가 주입됩니다

동적 배열 사용 (upstream 태스크 값 활용)

# discover_tenants 태스크: 처리할 테넌트 목록을 동적으로 생성합니다
import json

tenants = spark.table("catalog.meta.active_tenants").select("tenant_id").collect()
tenant_list = [row.tenant_id for row in tenants]

dbutils.jobs.taskValues.set(key="tenant_list", value=json.dumps(tenant_list))
- task_key: "process_tenants"
  depends_on:
    - task_key: "discover_tenants"
  for_each_task:
    inputs: "{{tasks.discover_tenants.values.tenant_list}}"
    concurrency: 5
    task:
      task_key: "process_single_tenant"
      python_wheel_task:
        package_name: "etl_pipeline"
        entry_point: "run_tenant"
        parameters: ["--tenant-id", "{{input}}"]
참고: For Each 태스크 공식 문서

Job 생성 방법

UI에서 생성

  1. 좌측 메뉴 WorkflowsCreate Job 클릭
  2. Job 이름 입력
  3. Task 추가 (노트북 경로, SQL 쿼리 등 지정)
  4. Task 간 의존성 설정 (드래그&드롭 또는 depends_on 선택)
  5. 컴퓨트 설정 (Serverless, Job Cluster, 또는 기존 All-Purpose Cluster)
  6. 스케줄, 알림, 재시도 정책 설정

Asset Bundles (IaC)

Infrastructure as Code 방식으로 Job을 정의하면 버전 관리와 환경별 배포가 쉬워집니다.
resources:
  jobs:
    daily_pipeline:
      name: "daily-sales-pipeline"
      tags:
        team: "data-engineering"
        project: "sales"
        environment: "${bundle.target}"

      parameters:
        - name: "target_date"
          default: ""
        - name: "environment"
          default: "production"

      tasks:
        - task_key: "ingest_orders"
          pipeline_task:
            pipeline_id: "${var.pipeline_id}"
          max_retries: 2
          min_retry_interval_millis: 60000
          timeout_seconds: 1800

        - task_key: "ingest_products"
          pipeline_task:
            pipeline_id: "${var.products_pipeline_id}"
          max_retries: 2

        - task_key: "transform"
          depends_on:
            - task_key: "ingest_orders"
            - task_key: "ingest_products"
          notebook_task:
            notebook_path: "/Workspace/etl/transform"
            base_parameters:
              date: "{{job.parameters.target_date}}"
          new_cluster:
            spark_version: "15.4.x-scala2.12"
            num_workers: 4
            node_type_id: "i3.xlarge"
            aws_attributes:
              availability: "SPOT_WITH_FALLBACK"

        - task_key: "validate"
          depends_on:
            - task_key: "transform"
          sql_task:
            query:
              query_text: >
                SELECT COUNT(*) AS row_count
                FROM gold.daily_revenue
                WHERE sale_date = CURRENT_DATE()
            warehouse_id: "${var.warehouse_id}"

        - task_key: "notify"
          run_if: "ALL_DONE"
          depends_on:
            - task_key: "validate"
          notebook_task:
            notebook_path: "/Workspace/etl/send_notification"

      email_notifications:
        on_failure:
          - "data-team@company.com"
        on_success:
          - "data-team@company.com"

      webhook_notifications:
        on_failure:
          - id: "${var.slack_webhook_id}"

      schedule:
        quartz_cron_expression: "0 0 2 * * ?"
        timezone_id: "Asia/Seoul"

베스트 프랙티스

태스크 분리 기준

  • 단일 책임 원칙: 하나의 태스크는 하나의 명확한 역할만 수행합니다 (수집, 변환, 검증 등)
  • 재실행 단위: 실패 후 재실행하고 싶은 최소 단위로 태스크를 나눕니다
  • 컴퓨트 요구사항: 리소스 요구가 크게 다른 단계는 별도 태스크로 분리하여 클러스터를 개별 최적화합니다

의존성 최소화

  • 실제로 필요한 의존성만 선언합니다. 불필요한 직렬화는 전체 Job 실행 시간을 늘립니다
  • 논리적으로 독립적인 데이터 소스 수집은 항상 병렬 태스크로 설계합니다
  • 팬인 지점(합류 태스크)은 반드시 필요한 경우에만 추가합니다

멱등성 설계

모든 태스크는 멱등성(idempotency) 을 갖도록 설계합니다. 같은 태스크를 여러 번 실행해도 결과가 동일해야 합니다.
# 멱등성 보장: 덮어쓰기 모드 + 파티션 교체
spark.table("silver.orders") \
    .filter(f"sale_date = '{target_date}'") \
    .write \
    .mode("overwrite") \
    .option("replaceWhere", f"sale_date = '{target_date}'") \
    .saveAsTable("gold.daily_revenue")

흔한 실수

과도한 의존성

모든 태스크를 직렬로 연결하면 병렬화의 이점을 전혀 얻지 못합니다. 태스크 A의 출력이 태스크 B에 실제로 필요한지 먼저 검토합니다.
# 잘못된 패턴: 모든 태스크 직렬화
ingest_orders → ingest_products → transform → validate → build_gold

# 올바른 패턴: 독립 태스크 병렬화
ingest_orders ──┐
                ├── transform → validate → build_gold
ingest_products─┘

태스크 값 크기 제한 초과

dbutils.jobs.taskValues의 최대 크기는 48KB 입니다. DataFrame, 대형 리스트, 바이너리 데이터를 태스크 값으로 전달하지 마십시오.
# 잘못된 패턴: 대용량 데이터를 태스크 값으로 전달
rows = spark.table("silver.orders").collect()
dbutils.jobs.taskValues.set(key="data", value=str(rows))  # 크기 초과 위험

# 올바른 패턴: Delta 테이블 경로 또는 메타데이터만 전달
dbutils.jobs.taskValues.set(key="output_table", value="silver.orders")
dbutils.jobs.taskValues.set(key="row_count", value=spark.table("silver.orders").count())

타임아웃 미설정

타임아웃을 설정하지 않으면 무한 대기 상태의 태스크가 클러스터를 점유합니다. 예상 실행 시간의 2-3배를 타임아웃으로 설정합니다.
- task_key: "long_running_transform"
  timeout_seconds: 7200   # 예상 1시간 → 2시간 타임아웃 설정
  max_retries: 1

디버그 시 debugValue 미활용

노트북을 독립 실행할 때 dbutils.jobs.taskValues.get은 Job 컨텍스트가 없어 오류를 발생시킵니다. debugValue 파라미터를 반드시 설정합니다.
# debugValue 없이 독립 실행 시 오류 발생
row_count = dbutils.jobs.taskValues.get(taskKey="validate", key="row_count")

# debugValue를 설정하면 독립 실행 시 해당 값을 사용합니다
row_count = dbutils.jobs.taskValues.get(
    taskKey="validate",
    key="row_count",
    default=0,
    debugValue=100   # 노트북 단독 실행 시 사용할 값
)

참고 링크