왜 흐름 제어가 필요한가?
실제 데이터 파이프라인은 단순한 일직선이 아닙니다. “데이터 검증에 성공하면 Gold 테이블을 갱신하고, 실패하면 알림을 보내라”, “10개 테넌트에 대해 같은 처리를 반복하라” 같은 동적 흐름 제어 가 필요합니다. Lakeflow Jobs의 If/Else, For Each 태스크, 그리고 Task Values 를 활용하면 이러한 복잡한 워크플로를 구현할 수 있습니다.If/Else 태스크 (조건부 분기)
개념
If/Else 태스크는 조건 표현식의 결과에 따라 다른 태스크 분기를 실행 합니다. 별도의 컴퓨트 리소스를 사용하지 않으며, DAG의 흐름을 제어하는 역할만 합니다.| 단계 | 작업 | 설명 |
|---|---|---|
| 1 | 데이터 수집 | 데이터를 수집합니다 |
| 2 | 데이터 검증 | 수집된 데이터를 검증합니다 |
| 3 | If/Else 분기 | row_count > 0 조건을 평가합니다 |
| 4a | True → Gold 테이블 갱신 | 데이터가 있으면 Gold 테이블을 갱신하고 성공 알림을 발송합니다 |
| 4b | False → 알림 발송 | 데이터가 없으면 경고 알림을 발송합니다 |
조건 표현식
If/Else 태스크에서 사용할 수 있는 조건 표현식은 다음과 같습니다.| 표현식 유형 | 예시 | 설명 |
|---|---|---|
| Task Value 비교 | {{tasks.validate.values.row_count}} > 0 | 선행 태스크의 값과 비교합니다 |
| 문자열 비교 | {{tasks.validate.values.status}} == "PASS" | 문자열 일치 여부를 확인합니다 |
| Job 파라미터 | {{job.parameters.mode}} == "full" | Job 레벨 파라미터를 참조합니다 |
| 복합 조건 | {{tasks.t1.values.count}} > 100 AND {{job.parameters.env}} == "prod" | AND/OR로 조합합니다 |
YAML 설정 예제
지원되는 연산자
| 연산자 | 설명 | 예시 |
|---|---|---|
EQUAL_TO | 같음 | "status" == "PASS" |
NOT_EQUAL | 같지 않음 | "status" != "FAIL" |
GREATER_THAN | 초과 | count > 100 |
GREATER_THAN_OR_EQUAL | 이상 | count >= 100 |
LESS_THAN | 미만 | error_rate < 5 |
LESS_THAN_OR_EQUAL | 이하 | error_rate <= 5 |
For Each 태스크 (반복 실행)
개념
For Each 태스크는 입력 리스트의 각 항목에 대해 중첩된 태스크를 반복 실행 합니다. 멀티 테넌트 처리, 파티션별 처리, 여러 테이블에 대한 동일 작업 등에 활용됩니다.| 단계 | 작업 | 설명 |
|---|---|---|
| 1 | 테넌트 목록 생성 | [tenant_A, tenant_B, tenant_C] 목록을 생성합니다 |
| 2 | For Each 반복 | 각 테넌트에 대해 병렬로 처리를 실행합니다 |
| 3 | 개별 처리 | process(tenant_A), process(tenant_B), process(tenant_C) 가 병렬 실행됩니다 |
| 4 | 완료 | 모든 테넌트 처리가 완료됩니다 |
YAML 설정 예제
입력 리스트 생성 노트북
For Each 태스크 설정 옵션
| 옵션 | 설명 | 기본값 |
|---|---|---|
inputs | 반복할 입력 리스트 (JSON 배열 문자열)입니다 | (필수) |
concurrency | 동시에 실행할 최대 태스크 수입니다 | 1 |
task | 각 반복에서 실행할 태스크 정의입니다 | (필수) |
💡 concurrency 설정: 리소스에 여유가 있다면 concurrency를 높여 병렬 처리 속도를 높일 수 있습니다. 단, 하류 시스템의 부하를 고려하여 적절한 값을 설정하세요.
Task Values (태스크 간 값 전달)
개념
Task Values는 선행 태스크에서 계산한 결과를 후행 태스크로 전달하는 메커니즘입니다.dbutils.jobs.taskValues를 사용합니다.
값 설정 (선행 태스크)
값 읽기 (후행 태스크)
YAML에서 Task Values 참조
DAG 의존성 패턴
1. 선형 패턴 (Sequential)
2. 팬아웃/팬인 패턴 (Fan-out/Fan-in)
3. 조건부 패턴 (Conditional)
실전 예제: 멀티 테넌트 ETL 워크플로
정리
| 핵심 개념 | 설명 |
|---|---|
| If/Else 태스크 | 조건 표현식에 따라 DAG 내에서 분기를 수행합니다 |
| For Each 태스크 | 입력 리스트의 각 항목에 대해 태스크를 반복 실행합니다 |
| Task Values | dbutils.jobs.taskValues로 태스크 간 데이터를 전달합니다 |
| 의존성 패턴 | 선형, 팬아웃/팬인, 조건부 패턴을 조합하여 복잡한 DAG를 구성합니다 |
| concurrency | For Each의 동시 실행 수를 제어하여 리소스를 관리합니다 |