BigData/Airflow

[Airflow] Task 의존성 정의 방법

twoDeveloper 2022. 9. 14. 15:52

개요

작업 의존성을 Airflow에서 정의하는 방법과 이러한 기능을 사용하여 조건부 Task, 분기 및 조인을 비롯한 보다 복잡한 패턴을 구현하는 방법에 대해 알아보고, XCom을 이용한 Task 사이의 상태 공유 방법, Airflow 2의 새로운 API인 Taskflow API를 통해, 파이썬 작업과 XCom을 많이 사용하는 DAG를 단순화하는 방법에 대해서 알아보겠습니다.


기본 의존성 유형

1. 선형 의존성 유형

책에서 제공하는 로켓 사진 가져오기 DAG의 Task를 기반으로 선형 의존성 유형을 설명드리겠습니다. 

 

1.1 로켓 사진 가져오기 Task Chain

download_launches=BashOperator(...)
get_pictures=PythonOperator(...)
notify=BashOperator(...)

로켓 사진 가져오기 DAG의 워크플로우를 보면 다음과 같습니다. 이미지를 다운로드 하고, 다운로드한 이미지를 가져옵니다. 전체 프로세스가 완료되면 알려주는 총 3가지의 Task로 이루어져 있습니다.

 

위 유형의 DAG에서는 앞선 Task의 결과가 다음 Task의 입력 값으로 사용되기 때문에, 다음 Task로 이동하려면 앞선 Task를 Issue 없이 완료하여야 합니다.

 

1.2 로켓 가져오기 (시프트 연산자)

download_launches >> get_pictures
get_pictures >> notify

download_launches >> get_pictures >> notify

이러한 의존 관계를 시프트 연산자(>>)를 사용해 나타낼 수 있습니다. 위 방법을 보시면 두가지의 방식으로 의존성 여부를 나타내었는데 첫번째 경우에는 작업 의존성을 각각 설정한 경우이고, 두번째 경우에는 여러 개의 의존성을 설정할 수 있습니다. (딱히 크게 상관은 없음)

 

다음과 같이 Task 의존성을 명시적으로 지정하면 Task의 순서가 명확하게 정의된다는 이점이 있습니다.

 

모든 Issue는 Airflow에 의해 다운스트림 태스크로 전달되어 실행을 지연시키는 특징을 가지고 있습니다. 예를 들어 download_launches Task가 실패한 경우, Airflow는 download_launches Task가 정상적으로 실행될 때 까지 get_pictures Task를 실행시키지 않습니다.


2. 팬인/팬아웃 (fan-in/fan-out) 의존성

위의 설명드린 선형 체인 외에더 Airflow의 Task 의존성을 사용하여 Task 간 복잡한 의존성 구조를 만들 수 있습니다. 책에서 제공하는 우산 판매 예측 모델을 기반으로 팬인/팬아웃 의존성을 설명드리겠습니다.

 

2.1 우산 판매 예측 모델 Task Chain

Umbella DAG의 주요 목적은 서로 다른 두 소스에서 매일 날씨 및 판매 데이터를 가져와서 두 데이터 세트를 데이터 세트로 결합하여 모델을 학습시키는 것입니다.

 

위에서 보시는 바와 같이 fetch_weather (날씨 데이터 가져오기), clean_weather (날씨 데이터 정제하기) Task는 서로 선형 의존성을 가지고 있지만 fetch_sales (판매 데이터 가져오기), clean_sales (판매 데이터 정제하기) Task와는 서로 의존성을 가지고 있지 않습니다.

 

2.2 우산 판매 예측 모델 (시프트 연산자)

fetch_weather >> clean_weather
fetch_sales >> clean_sales

따라서, 시프트 연산자를 통해 위 DAG를 표현하면 다음과 같습니다. (병렬로 실행되는 선형 의존성)

 

2.3 팬아웃 (Fan-out)

여러 개의 입력 Task 연결 수를 제한하는 것을 팬아웃(fan-out) 이라고 합니다.

 

fetch_weather, fetch_sales Task의 업스트림에 DAG의 시작을 나타내는 start 라는 Dummy Task를 추가할 수도 있습니다. Dummy Task는 반드시 필요하지는 않지만 fetch_weather, fetch_sales Task가 DAG 시작 시 발생하는 암묵적인 팬아웃을 설명하는데 도움이 됩니다.

 

2.3.1 팬아웃 (fan-out) 종속성 정의

from airflow.operators.dummy import DummyOperator

start=DummyOperator(task_id="start") # 더미 시작 태스크 생성
start >> [fetch_weather, fetch_sales] # 팬아웃(일 대 다) 의존성 태스크 생성

하나의 Task를 여러 다운스트림 Task에 연결하는 것을 팬아웃 (fan-out) 종속성이라고 합니다.

 

2.4 팬인 (fan-in)

단일 다운스트림 Task가 여러 업스트림 Task에 의존성을 갖는데 이런 구조를 팬인 (fan-in) 이라고 합니다.

 

2.4.1 팬인 (fan-in) 의존성 정의

[clean_weather, clean_sales] >> join_datasets

팬인 (fan-in) (다 대 일) 의존성을 시프트 연산자로 나타낸 것입니다.

 

위 팬아웃 구조와 달리 결합된 데이터 세트를 만들기 위해서는 clean_weather, clean_sales Task가 정상적으로 이루어 져야 join_datasets Task가 실행됩니다. 즉 join_datasets Task는 clean_weather, clean_sales Task 업스트림에 종속성을 가지고 있습니다. 이러한 구조를 팬인 구조라고 합니다.

 

2.5 나머지 Task에 대한 시프트 연산자

join_datasets >> train_model >> deploy_model

차례대로 데이터 셋 결합하기, 머신러닝 모델 학습하기, 머신러닝 모델 배포하기의 순서로 Task가 선형 의존성을 가지고 있습니다.

 

2.6 Umbella DAG 실행 실습 예제 코드

import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator

dag = DAG(
    dag_id="01_umbrella",
    description="Umbrella example with DummyOperators.",
    start_date=airflow.utils.dates.days_ago(5),
    schedule_interval="@daily",
)

start = DummyOperator(task_id="start")
fetch_weather = DummyOperator(task_id="fetch_weather", dag=dag)
fetch_sales = DummyOperator(task_id="fetch_sales", dag=dag)
clean_weather = DummyOperator(task_id="clean_weather", dag=dag)
clean_sales = DummyOperator(task_id="clean_sales", dag=dag)
join_datasets = DummyOperator(task_id="join_datasets", dag=dag)
train_model = DummyOperator(task_id="train_model", dag=dag)
deploy_model = DummyOperator(task_id="deploy_model", dag=dag)

start >> [fetch_weather, fetch_sales]
fetch_weather >> clean_weather
fetch_sales >> clean_sales
[clean_weather, clean_sales] >> join_datasets
join_datasets >> train_model >> deploy_model

 

2.6.1 Umbella DAG 구성도

팬아웃/팬인 구조를 설명드리면서 실습한 Umbella DAG의 구성도 입니다.

다음과 같이 Task가 정상적으로 실행된 것을 볼 수 있습니다.

 

보시는 바와 같이 DAG를 실행하면 Airflow가 먼저 start Task를 실행시키고, 병렬로 sales, weather Task가 실행 됩니다. 이 후 업스트림이 Issue 없이 정상적으로 실행된다면 join_datasets Task가 실행되고, 이 후 train_model, deploy_model이 실행됩니다.


브랜치하기

3.1 브랜치란?

새로운 데이터가 유입 되었을 때, 모델 학습을 중단시키지 않고 향후 분석에서 과거 판매 데이터를 계속 사용할 수 있도록 이전 시스템과 새로운 시스템 모두 정상 동작하기 위한 방법입니다.

 

3.2 Task 내부에서 브랜치 하기

1) clean Task (정제) 안에서 브랜치 하기

def _clean_sales(**context):
    if context["execution_date"] < ERP_CHANGE_DATE:
        _clean_sales_old(**context)
    else:
        _clean_sales_new(**context)
 
...
clean_sales_data=PythonOperator(
    task_id="clean_sales",
    python_callable=_clean_sales,
)

다음과 같은 구조로 _clean_sales_old 함수와 _clean_sales_new 함수가 호환되는 이상, 나머지 두 ERP 시스템 간 차이로 DAG는 변경하지 않아도 됩니다.

 

2) fetch Task (수집) 안에서 브랜치 하기

def _fetch_sales(**context):
    if context["execution_date"] < ERP_CHANGE_DATE:
        _fetch_sales_old(**context)
    else:
    	_fetch_sales_new(**context)
    ...

두 시스템에서 수집하기 위한 코드의 경로를 추가하여 초기 수집 단계를 두 ERP 시스템과 호환되도록 만들 수 있습니다.

 

위 2가지 방식을 통해 초기 데이터 수집/정제 작업을 수행하면 새로운 데이터가 중간에 유입되어도 일관된 형식으로 판매 데이터를 처리할 수 있습니다.

 

3.2.1 Task 내부에서 브랜치 하기 (문제점)

위 방식의 문제점은 DAG 실행 중에 Airflow에서 어떤 코드 분기를 사용하고 있는지 확인하기 어렵다는 점입니다.

문제점을 해결하는 방법은 Task에 세세한 Log를 포함하는 것과 PythonOperator와 같은 일반적인 Airflow Operator로 대체하여 Task에 유연하게 대처하는 것입니다.

 

3.3 DAG 내부에서 브랜치 하기

다음과 같이 두 개의 개별 Task 세트 즉 판매 데이터 old, new를 개발하고 DAG가 이전 또는 새로운 ERP 시스템에서 데이터 수집 작업 실행을 선택할 수 있도록 하는 것입니다.

 

3.3.1 DAG 내부에서 브랜치 한 워크플로우

 

3.3.2 추가적인 수집/정제 Task 추가

fetch_sales_old=PythonOperator (...)
clean_sales_old=PythonOperator (...)

fetch_sales_new=PythonOperator (...)
clean_sales_new=PythonOperator (...)

fetch_sales_old >> clean_sales_old
fetch_sales_new >> clean_sales_new

다음과 같이 적절한 Operator를 사용하여 각 ERP 시스템에 대한 Task를 별도 생성하고, 각 Task를 연결하면 됩니다.

 

3.3.3 BranchPythonOperator 기능 사용

Airflow는 다운스트림 Task 세트 중 선택할 수 있는 기능을 'BranchPythonOperator'를 통해 제공합니다.

def _pick_erp_system(**context):
    ...

pick_erp_system=BranchPythonOperator(
    task_id="pick_erp_system",
    python_callable=_pick_erp_system,
)

DAG 실행 날짜에 따라 적절한 task_id를 반환 하여 콜러블 인수 기능을 사용하여 ERP 시스템 선택 기능을 구현할 수 있습니다.

 

3.4 Branch 함수를 추가해 보겠습니다.

def_pick_erp_system(**context):
    if context [ "execution_date" ] < ERP_CHANGE_DATE:
        return "fetch_sales_old"
    else:
        return "fetch_sales_new"
 
pick_erp_system=BranchPythonOperator(
    task_id='pick_erp_system',
    python_callable=_pick_erp_system,
)

pick_erp_system >> [fetch_sales_old, fetch_sales_new]

다음 Branch 함수를 통해 Airflow는 변경 날짜 전에 발생하는 기존 ERP 시스템 Task (fetch_sales_old) 실행하고, 새로운 Task (fetch_sales_new)는 변경 날짜 후에 실행합니다.

 

3.5 Branch를 join_datasets Task에 연결

[clean_sales_old, clean_sales_new] >> join_datasets

현재까지 작업에서는 큰 문제점이 존재합니다. 현재까지 진행한 DAG를 실행하면 join_datasets Task 실행 시  Airflow는 모든 다운스트림 작업을 건너뛰게 됩니다.

 

이유는 가장 근본적인 Airflow는 반드시 업스트림 Task가 모두 성공적으로 완료되어야 실행되기 때문입니다. clean_sales_old, clean_sales_new 두 개의 Task가 join_datasets Task에 연결됨으로써 clean_sales_old, clean_sales_new 중 하나만 성공적으로 완료되는 경우, 나머지 하나의 Task는 실행되지 않았기 때문에 join_datasets Task는 실행할 수 없게 됩니다.

 

3.6 Airflow의 트리거 규칙

위 문제점을 해결하기 위해 Airflow의 트리거 규칙에 의해 Task 실행 시기를 제어합니다. 모든 Operator에게 전달할 수 있는 "trigger_rule" 인수를 이용해 각 Task에 대해 트리거 규칙을 정의할 수 있습니다.

 

3.6.1 "trigger rule"이 적용되지 않았을 경우

join_datasets = DummyOperator(task_id="join_datasets")

 BranchPythonOperator를 사용할 시, 선택하지 않은 브랜치 작업을 모두 건너뛰기 때문에 join_datasets Task와 다운스트림 Task 또한 실행되지 않습니다.

 

3.6.2 "trigger rule"이 적용되었을 경우

join_datasets=DummyOperator(task_id="join_datasets", trigger_rule="none_failed")

다음과 같이 join_datasets Task에 trigger 규칙을 변경해서 업스트림 Task 중 하나를 건너뛰더라도 계속 트리거가 진행되도록 할 수 있습니다.

 

*none_failed 규칙

실패한 상위 Task가 없지만, Task가 성공 또는 건너뛴 경우 트리거 됩니다.

 

Trigger 규칙을 이용하는 방법은 단점을 가지고 있습니다. join_datasets Task에 연결되어있는 Task가 3개라는 것입니다. (clean_sales_new, clean_sales_old, clean_weather), 이렇게 되면 flow 특성이 잘 반영되지 않게 됩니다.

 

3.7 Dummy Task 이용

from airflow.operators.dummy import DummyOperator

join_branch=DummyOperator(
    task_id="join_erp_branch",
    trigger_rule="none_failed"
)

[clean_sales_old, clean_sales_new] >> join_branch
join_branch >> join_datasets

이를 해결하기 위해 DAG에 서로 다른 브랜치를 결합하는 Dummy Task (join_erp_branch) 를 추가하여 브랜치 조건을 명확하게 합니다. 이를 통해서 join_datasets Task에 대한 트리거 규칙을 더 이상 변경할 필요가 없기 때문에 브랜치를 독립적으로 사용 가능합니다.


4. 조건부 Task

Airflow는 특정 조건에 따라 DAG에서 특정 Task를 건너뛸 수 있는 다른 방법도 제공합니다.

 

4.1 Task 내에서 조건

PythonOperator를 사용하여 배포를 구현하고 배포 함수 내에서 DAG의 실행 날짜를 명시적으로 확인하여 모델의 특정 버전에 대해서 배포가 가능합니다.

 

4.1.1 Task 내에서 조건 구현

def _deploy(**context):
    if context["execution_date"] == ...:
        deploy_model()

deploy=PythonOperator(
    task_id="deploy_model",
    python_callable=_deploy,
)

 

4.2 조건부 Task 만들기

배포 Task 자체를 조건부화하는 방법이 있습니다. 이 방법은 미리 정의된 조건에 따라서만 실행됩니다. 예를 들어 Airflow에서 해당 조건을 테스트하고 조건이 실패할 경우 모든 다운스트림 작업을 건너뛰는 Task를 DAG에 추가하여 Task를 조건부화 할 수 있습니다.

 

4.2.1 DAG에서 조건부 빌드하기

def _latest_only(**context):
    ...
    
latest_only=PythonOperator(
    task_id="latest_only",
    python_callable=_latest_only
)
    
latest_only >> deploy_model

 

4.2.2 조건부 배포가 포함된 DAG 구현

조건부 배포가 포함된 umbrella DAG의 구현입니다. 조건이 DAG Task에 포함되어 이전 구현보다 훨씬 더 명확해진 것을 볼 수 있습니다.

 

4.3 latest_only 함수 작성

execution_date가 가장 최근 실행에 속하지 않는 경우, 다운스트림 작업을 건너뛰도록 _latest_only 함수를 작성합니다. 이를 위해 실행 날짜를 확인하고, 필요한 경우 AirflowSkipException 함수를 실행합니다.

 

* AirflowSkipException 란?

Airflow가 조건과 모든 다운스트림 Task를 건너뛰라는 것을 나타내는 함수입니다.

 

4.3.1 latest_only 조건 함수 구현

from airflow.exceptions import AirflowSkipException

def _latest_only(**context):
    left_window=context["dag"].following_schedule(context["execution_date"]) # 실행 윈도우에서 경계를 확인
    right_window=context["dag"].following_schedule(left_window)
    
    now=pendulum.now("UTC") # 현재 시간이 윈도우 안에 있는지 확인
    if not left_window < now <= right_window:
        raise AirflowSkipException("Not the most recent run!")

 

 

4.3.2 latest_only 조건을 적용한 결과

 

4.4 내장 Operator 사용

Airflow의 내장 클래스인 LastOnlyOperator 클래스를 사용하여 가장 최근 실행한 DAG만 실행하는 예를 구현할 수 있습니다.

 

4.4.1 내장 LatestOnlyOperator 사용하기

from airflow.operators.latest_only import LatestOnlyOperator

latest_only=LatestOnlyOperator(
    task_id="latest_only",
    dag=dag
)

join_datasets >> train_model >> deploy_model
latest_only >> deploy_model

가장 최근 실행한 DAG만 실행하는 부분을 구현한 예입니다. Airflow의 내장 클래스인 LastOnlyOperator 클래스를 사용하였고 이 Operator는 PythonOperator를 기반으로 동일한 작업을 가능하게 합니다.

 

따라서, LatestOnlyOperator를 사용하면 조건부 배포를 구현하기 위해 복잡한 로직을 작성할 필요가 없습니다.


트리거 규칙에 대한 추가 정보

5. 트리거 규칙

Airflow의 트리거 규칙에 의해 Task가 실행되는 시기를 정확히 결정할 수있습니다.

 

5.1 트리거 규칙이란?

Task의 의존성 기능과 같이 Airflow가 Task가 실행 준비가 되어 있는지 여부를 결정하기 위한 필수적인 조건입니다.

* Airflow의 기본 트리거 규칙은 "all_success"이며, Task를 실행하려면 모든 의존적인 Task가 모두 성공적으로 완료되어야 합니다.

 

5.2 Airflow가 지원하는 트리거 규칙

트리거 규칙 동작 사용 사례
all_success (default) 모든 상위 Task가 성공적으로 완료되면 트리거됩니다. 일반적인 워크플로에 대한 기본트리거 규칙입니다.
all_failed 모든 상위 Task가 실패했거나 상위 Task의 오류로 인해 실패했을 경우 트리거됩니다. Task Group에서 하나 이상 실패가 예상되는 상황에서 오류 처리 코드를 트리거합니다.
all_done 결과 상태에 관계없이 모든 부모가 실행을 완료하면 트리거됩니다. 모든 Task가 완료되었을 때 실행할 청소 코드를 실행합니다.
one_failed 하나 이상의 상의 Task가 실패하자마자 트리거되며 다른 상위 Task의 실행 완료를 기다리지 않습니다. 알림 또는 로랙과 같은 일부 오류 처리 코드를 빠르게 트리거합니다.
one_success 한 부모가 성공하자마자 트리거되면 다른 상위 Task의 실행 완료를 기다리지 않습니다. 하나의 결과를 사용할 수 있게 되는 즉시 다운스트림 연산/알림을 빠르게 트리거합니다.
none_failed 실패한 상위 Task가 없지만, Task가 성공 또는 건너뛴 경우 트리거됩니다. 5.2절에서 설명한 바와 같이 Airflow DAG상 조건부 브랜치의 결합
none_skipped 건너뛴 상위 Task가 없지만 Task가 성공 또는 실패한 경우 트리거됩니다. 모든 업스트림 Task가 실행된 경우, 해당 결과를 무시하고 트리거합니다.
dummy 업스트림 Task의 상태와 관계없이 트리거됩니다. 테스트 시

Task 간 Data 공유

Airflow의 XCom을 사용하여 Task 간에 작은 Data를 공유할 수 있습니다.

 

*XCom이란?

기본적으로 Task 간에 Message를 교환하여 특정 상태를 공유할 수 있게 해줍니다.

 

6.1 XCom을 사용하여 Data 공유하기 (push)

def _train_model(**context):
    model_id=str(uuid.uuid4())
    context["task_instance"].xcom_push(key="model_id", value=model_id)
    
train_model=PythonOperator(
    task_id="train_model",
    python_callable=_train_model
)

다음은 XCom을 사용하여 train_model 및 deploy_model 작업 간에 모델 식별자를 고유하는 것입니다.

  - train_model Task는 다른 Task에서 XCom 값을 사용할 수 있도록 XCom에 모델 식별자 값을 보냅니다.

  - Airflow 컨택스트의 태스크 인스턴스의 xcom_push 메서드를 사용하여 값을 게시할 수 있습니다.

 

6.1.1 XCom 값 확인

Admin -> XCom에서 확인할 수 있습니다.

 

6.2 XCom을 사용하여 Data 공유 (pull)

def _deploy_model(**context):
    model_id=context["task_instance"].xcom_pull(
        task_ids="train_model", key="model_id"
    )
    print(f"Deploying model {model_id}")

deploy_model=PythonOperator(
    task_id="deploy_model",
    python_callable=_deploy_model
)

xcom_pull 메서드를 사용하여 다른 Task에서 XCom 값을 확인할 수 있습니다.

 

6.3 XCom 사용 시 고려사항

XCom은 작업 간에 상태를 공유하는 데 매우 유용해 보일 수 있지만, 몇 가지 단점이 존재합니다.

 

1) 의존성 문제

pull Task는 묵시적인 의존성이 필요하고, DAG에 표시되지 않으며 Task Schedule 시에 고려되지 않습니다. 따라서 숨겨진 의존성은 서로 다른 DAG에서 실행 날짜 사이에 XCom 값을 공유할 매우 복잡해지기 때문에 권장하지 않습니다.

 

2) 원자성을 무너뜨리는 패턴 문제

예를 들어 Token 값을 가지고 API에 접근할 때 Token 값이 만료 되어 다음 Task를 재실행 하지 못할 수 있습니다.

 

3) XCom이 저장하는 모든 값은 직렬화를 지원해야 하는 문제

람다 또는 다중 멀티프로세스 관련 클래스 같은 일부 파이썬 유형은 XCom에 저장할 수 없습니다.

 

4) 백엔드에 의해 XCom 값의 저장 크기가 제한되는 문제

기본적으로 XCom은 Airflow의 메타스토어에 저장되며 다음과 같이 크기가 제한됩니다.

SQLite BLOB 유형으로 저장, 2GB 제한
PostgreSQL BYTEA 유형으로 저장, 1GB 제한
MySQL BLOB 유형으로 저장, 64KB 제한

 

6.4 커스텀 XCom 백엔드 사용하기

Airflow 메타스토어를 사용하여 XCom을 저장 시에 제한 사항은 큰 데이터 볼륨을 저장할 때 확장할 수 없다는 점입니다. XCom은 일반적으로 작은 값이나 결과값을 저장하는데 사용되고, 큰 데이터 세트를 저장시에는 사용하지 않습니다.

 

6.4.1 커스텀 XCom 백엔드를 위한 구조

from typing import Any
from airflow.models.xcom import BaseXCom

class CustomXComBackend(BaseXCom):
    @staticmethod
    def serialize_value(value: Any):
        ...
    @staticmethod
    def deserizlize_value(result) -> Any:
        ...

Airflow 2 에서는 XCom을 좀 더 유연하게 활용하기 위해 XCom 백엔드를 지정할 수 있는 옵션이 있습니다. 이 옵션을 사용하면 커스텀 클래스를 정의하여 XCom을 저장, 검색할 수 있습니다.

  - 다음 클래스를 사용하기 위해서는 'BaseXCom 기본 클래스가 상속되어야 합니다.

  - 값을 직렬화 및 역직렬화 하기 위해 두 가지 정적 매서드를 각각 구현해야 합니다.

  - 커스텀 백엔드 클래스에서 직렬화 메서드는 XCom 값이 Operator 내에서 게시될 때마다 호출됩니다.

  - 역직렬화 메서드는 XCom 값이 백엔드에서 가져올 때 호출됩니다.

  - 원하는 백엔드 클래스가 있으면 Airflow 구성에서 xcom_backend 매개변수를 사용해 클래스를 사용하도록 Airflow를 구성할 수 있습니다.

 

커스텀 XCom 백엔드는 XCom 값 저장 선택을 다양하게 하며 상대적으로 저렴하고 확장 가능한 클라우드 스토리지에 더 큰 XCom 값의 저장이 가능합니다. 따라서 클라우드 서비스를 위한 커스텀 백엔드를 구현할 수 있습니다.


7. Taskflow API로 파이썬 Task 연결하기

Taskflow API를 통해 PythonOperators를 사용하고 XCom으로 데이터를 전달하는 경우 코드를 상당히 단순화할 수 있습니다.

 

7.1 Taskflow API 로 파이썬 태스크 단순화하기

머신러닝 모델 학습 및 배포 Task를 통해 Taskflow API가 어떻게 사용되는지 확인하겠습니다.

 

7.2 일반 API를 사용하여 훈련/배포 Task 정의

def _train_model(**context):
    model_id=str(uuid.uuid4())
    context["task_instance"].xcom_push(key="model_id", value=model_id)

def _deploy_model(**context):
    model_id=context["task_instance"].xcom_pull(
        task_ids="train_model", key="model_id"
    )
    print(f"Deploying model {model_id}")

with DAG(...) as dag:
    ...
    train_model=PythonOperator(
        tesk_id="train_model",
        python_callable=_train_model
    )
    
    deploy_model=PythonOperator(
        task_id="deploy_model",
        python_callable=_deploy_model
    )
    
    ...
    join_datasets >> train_model >> deploy_model

1) XCom을 사용해 model_id 공유합니다.

 

2) PythonOperator를 사용해 훈련/배포 Task 생성합니다.

  - PythonOperator는 python_callable을 통해 함수를 콜/백 할 수 있다.

 

7.2.1 일반 API를 사용하여 훈련/배포 Task 정의 단점

다음 방식은 함수 _train_model 과 _deploy_model 을 정의한 후 PythonOperator를 이용해 Airflow Task를 생성해야 한다는 부분에서 단점이 존재합니다.

 

train_model, deploy_model 간 ID를 공유하기 위해 함수 내에서 xcom_push 및 xcom_pull을 명시적으로 사용 하여 모델 ID 값을 전송 및 반환해야 합니다.

  - 의존성 정의가 번거롭고, 위 두 Task의 참조되는 공유 키 값이 변경되면 중단될 수 있습니다.

 

7.3 Taskflow API를 사용하여 훈련 Task 정의

...
from airflow.decorators import task
...

with DAG(...) as dag:
    ...
    @task
    def train_model():
        model_id=str(uuid.uuid4())
        return model_id

Airflow가 train_model 함수를 래핑하도록 하는 파이썬 Task의 정의가 가능합니다. 따라서 model_id를 XCom으로 명시적으로 게시하지 않고 Taskflow API가 간단하게 model_id를 함수로부터 반환하여 다음 Task로 전달할 수 있도록 합니다.

 

7.4 데커레이터를 이용해 훈련/모델 Task 정의

@task
def deploy_model(model_id: str):
    print(f"Deploying model {model_id}")

다음 정의를 통해 xcom_pull을 사용해서 검색되는 것이 아닌 파이썬 함수에 인수로 전달됩니다.

 

7.5 Taskflow Task 간의 의존성 정의

model_id=train_model()
deploy_model(model_id)

 

7.6 Taskflow API를 활용한 의존성 코드

import uuid
import airflow

from airflow import DAG
from airflow.decorators import task

with DAG(
    dag_id="12_taskflow",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:

    @task
    def train_model():
        model_id = str(uuid.uuid4())
        return model_id

    @task
    def deploy_model(model_id: str):
        print(f"Deploying model {model_id}")

    model_id = train_model()
    deploy_model(model_id)

 

7.7 Taskflow API를 사용해 Task 간 의존성이 정의된 그래프

 

7.8 Taskflow API를 사용하지 않는 경우

Taskflow API는 XCom을 사용해 Task 간 작업 결과 데이터를 전달하기 위해 PythonOperator를 많이 사용하는 DAG를 크게 간소화 한다는 장점을 가지고 있습니다. 하지만 Taskflow API는 PythonOperator를 사용하여 구현되는 파이썬 Task로 제한된다는 것입니다.

 

따라서, 다른 Airflow Operator를 사용하는 경우에는 일반 API를 사용해야 하기 때문에, Taskflow API, 일반 API를 혼용해서 사용해야합니다. 사용하는데 문제는 없지만 완성 코드가 복잡해 보일 수 있다는 단점을 가지고 있습니다.