airflow

[airflow] - 오퍼레이터 with Xcom

  • -
728x90

아래 글은 옵시디언으로 작성되었습니다.

Xcom이란?

Cross Communication의 약자로 Airflow DAG 안 ==Task 간 데이터 공유를 위해 사용되는 기술==이다.(Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우) ==주로 작은 규모의 데이터 공유를 위해 사용==되며 Xcom의 내용은 메타 DB의 Xcom 테이블에 값이 저장된다. 만약, 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션을 사용해야 한다.(AWS S3, HDEF 등)

Python 오퍼레이터에서 Xcom사용하기

크게 두 가지 방법으로 Xcom 사용이 가능하다.

1) **kwargs에 존재하는 ti(task_instance) 객체 활용

Xcom에 데이터 push

@task(task_id='python_xcom_push_task') 
def xcom_push(**kwargs): 	
	ti = kwargs['ti'] 	
	ti.xcom_push(key="result1", value="value_1") 	
	ti.xcom_push(key="result2", value=[1,2,3])

template 변수인 kwargs 변수에서 ti객체를 얻을 수 있으며 task_istance객체가 가진 xcom_push 메서드를 활용할 수 있다. 이때 ti에 딕셔너리 형태로 데이터가 저장되며 다른 task에서 꺼내쓸 수 있도록 메타 DB의 Xcom에 저장된다.

Xcom에서 데이터 pull

@task(task_id='python_xcom_pull_task') 
def xcom_pull(**kwargs): 	
	ti = kwargs['ti'] 	
    value_key1 = ti.xcom_pull(key="result1") 	
    value_key2 = ti.xcom_pull(key="result2", 
    task_ids='python_xcom_push_task') 	
    print(value_key1) 	
    print(value_key2)

이 역시 마찬가지로 ti객체에서 xcom_pull 메서드를 이용해 저장했던 객체를 다른 task에서 꺼낼 수 있다. 다만 task_ids를 명시해 주는 게 좋다. 만약 task_ids를 명시하지 않았는데 key값이 같은 데이터가 xcom 내에 존재하는 경우 마지막으로 넣은 value가 불러와진다. key를 unique 하게 유지하는 경우가 아니라면 task_ids는 생략하면 안 된다.

2) 파이썬 함수의 return값 활용(1안)

@task (task_id='xcom_push by return') 
def xcom_push_by_return(**kwargs): 	
	transaction_value = 'status Good' 	
	return transaction_value
@task(task_id='xcom_pull_by_return') 
def xcom_pull_by_return(status, **kwargs): 	
	print(status)
xcom_pull_by_return(xcom_push_by_return())

위 사례는 task1의 반환값이 다른 task의 입력값이 되는 경우이다. 이렇게 하면 ti 객체를 이용하지 않고 값을 DAG내 다른 task로 전달할 수 있다. 또한, task 데코레이터를 사용할 경우 airflow는

함수 입출력 관계 만으로도 자동으로 순서가 정해진다.

따라서 TASK 간 수행관계도 명시할 필요가 없다.

파이썬 함수의 return값 활용(2안)

@task(task_id='xcom_push_by_return') 
def xcom_push_return(**kwargs): 	
	transaction_value = 'statusGood' 	
    return transaction_value

위 코드는 1안과 차이가 없다. return 한 값은 다동으로 xcom에 key=‘return value’, task_ids = task_id로 저장이 된다.

@task(task_id='xcom_pull_by_return') 
def xcom_pull_return_by_method(**kwargs): 	
	ti=kwargs['ti'] 		
    pull_value = ti.xcom_pull(key='return_value', task_ids='xcom_push_by_return' )  
    
xcom_push_by_return() >> xcom_pull_by_return()

1안에서는 task1의 출력값이 두 번째 task의 인자로 들어가게끔 하였으나 이번에는 return값이 자동으로 xcom에 들어간다는 사실을 이용하여 두 번째 task에서 xcom_pull로 값을 당겨왔다. 이런 경우에는 task관 관계를 명시해주어야 한다.

# Python 오퍼레이터를 이용한 xcom DAG 직접 짜보기

1안 DAG

from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_task1')
    def xcom_push1(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_1")
        ti.xcom_push(key="result2", value=[1,2,3])

    @task(task_id='python_xcom_push_task2')
    def xcom_push2(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_2")
        ti.xcom_push(key="result2", value=[1,2,3,4])

    @task(task_id='python_xcom_pull_task')
    def xcom_pull(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(key="result1")
        value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
        print(value1)
        print(value2)


    xcom_push1() >> xcom_push2() >> xcom_pull()

python_xcom_push_task1에서 먼저 두 데이터를 xcom에 집어넣는다. 그 후 python_xcom_push_task2에서 task1과 같은 key값을 가진 두 데이터를 집어넣는다.
마지막으로 python_xcom_pull_task에서 xcom_pull을 하는데 이때 value1에는 task_ids를 명시하지 않았으므로 마지막 입력 값인 value_2가 들어가고 value2에는 task1에서 가져오라는 task_ids를 명시하였으므로 [1,2,3]가 출력될 것이다.


위 출력결과를 보면 잘 출력이 된 것을 알 수 있다.

2안 DAG

from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg2",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_by_return')
    def xcom_push_result(**kwargs):
        return 'Success'


    @task(task_id='python_xcom_pull_1')
    def xcom_pull_1(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
        print('xcom_pull 메서드로 직접 찾은 리턴 값:' + value1)

    @task(task_id='python_xcom_pull_2')
    def xcom_pull_2(status, **kwargs):
        print('함수 입력값으로 받은 값:' + status)


    python_xcom_push_by_return = xcom_push_result()
    xcom_pull_2(python_xcom_push_by_return)
    python_xcom_push_by_return >> xcom_pull_1()

일단 위 DAG을 보면 자동으로 해당 로직이 머리에 떠올라야 한다.


가장 먼저 python_xcom_push_by_return은 단순히 'Success’라는 string을 return 하는 task이다.
python_xcom_pull_1 task는 ti 객체를 통해 xcom_pull 메서드로 직접 찾은 값을 출력하는 형태고,
python_xcom_pull_2 task는 return값 'Success’를 직접 인자로 받은 후 task 내 status에 넣어주는 형태다.
그 후, task 포함관계를 보면 우선 python_xcom_push_by_return가 실행된 후 xcom_pull_2xcom_pull_1이 동시에 실행되는 로직을 갖고 있는 것을 알 수 있다.

 


두 출력값 모두 정상적으로 잘 나온 것을 알 수 있다.

정리하면 다음과 같다.


Xcom push를 하는 두 가지 방법과 Xcom pull 하는 두 가지 방법이 있고 개발자가 원하는 방식으로 만들어주면 된다. 나라면 굳이 task 간 관계를 명시하지 않고 불필요한 객체 생성을 하지 않는 함수 return 방식을 사용할 것 같다.

BASH 오퍼레이터에서 Xcom 사용하기

from airflow import DAG
import pendulum
import datetime
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_xcom",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    bash_push = BashOperator(
    task_id='bash_push',
    bash_command="echo START && "
                 "echo XCOM_PUSHED "
                 "{{ ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
                 "echo COMPLETE"
    )

    bash_pull = BashOperator(
        task_id='bash_pull',
        env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
            'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
        bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
        do_xcom_push=False
    )

    bash_push >> bash_pull

bash_push에서 Xcom으로 들어가는 값은 총 2개다. 먼저 xcom_push를 통해 넣은 first_bash_message라는 값과 마지막으로 echo 된 COMPLETE라는 string이다. 파이썬에서 return값이 자동으로 Xcom에 들어간다고 했었는데 bash에서는 마지막에 echo 된 값도 return값으로 취급된다. 아래 사진을 보면 값이 제대로 xcom에 input 된 것을 확인할 수 있다.

bash_pull을 보면 key를 명시하여 xcom_pull을 하는 경우와 task_ids만 명사하여 xcom_pull을 하는 경우가 있다. key를 명시한 경우는 그 key 해당하는 value값인 first_bash_message 스트링 값이 잘 출력이 될 것이다. task_ids만 명시한 경우는 해당 task에서 마지막으로 xcom에 들어간 value가 출력된다. 따라서 COMPLETA 스트링 값이 출력될 것이다.

추가적으로 bash_pull task에서 마지막으로 echo 된 값 또한 결국 return 값이므로 Xcom에 들어가야 마땅하나 do_xcom_push 파라미터를 False값으로 주면 Xcom으로 값을 집어넣지 않는다.

Python & Bash 오퍼레이터 간 Xcom 사용

Python to Bash 오퍼레이터 Xcom 전달

편의상 DAG 정의 부분은 생략하였다.

    @task(task_id='python_push')
    def python_push_xcom():
        result_dict = {'status':'Good','data':[1,2,3],'options_cnt':100}
        return result_dict

    bash_pull = BashOperator(
        task_id='bash_pull',
        env={
            'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
            'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
            'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["options_cnt"]}}'

        },
        bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
    )
    python_push_xcom() >> bash_pull

이제 위와 같은 코드도 작성할 수 있다. python_push 테스트에서 dict 형태로 반환값을 만들면 Dict가 Xcom에 담긴다. 이것을 bash 오퍼레이터에서 뺄 수 있는데 bash_pull task를 보면 STATUS라는 환경변수에 Good, DATA라는 환경변수에 리스트[1, 2, 3]를 담으라는 뜻인 것을 알 수 있다. 출력값을 보면 다음과 같다.

Bash to Python 오퍼레이터 Xcom 전달

bash_push = BashOperator(
    task_id='bash_push',
    bash_command='echo PUSH_START '
                 '{{ti.xcom_push(key="bash_pushed",value=200)}} && '
                 'echo PUSH_COMPLETE'
    )

    @task(task_id='python_pull')
    def python_pull_xcom(**kwargs):
        ti = kwargs['ti']
        status_value = ti.xcom_pull(key='bash_pushed')
        return_value = ti.xcom_pull(task_ids='bash_push')
        print('status_value:' + str(status_value))
        print('return_value:' + return_value)

    bash_push >> python_pull_xcom()

반대로 이렇게 코드를 짜는 것도 가능하다. 마찬가지로 bash_push task에서 두 값을 Xcom에 넣고 python_pull 태스크에서 데이터를 가져온 모습을 확인할 수 있다.

Python & email 오퍼레이터 간 Xcom 사용

from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task
from airflow.operators.email import EmailOperator

with DAG(
    dag_id="dags_python_email_operator",
    schedule="0 8 1 * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='something_task')
    def some_logic(**kwargs):
        from random import choice 
        return choice(['Success','Fail'])


    send_email = EmailOperator(
        task_id='send_email',
        to='akfktl328@naver.com',
        subject='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some_logic 처리결과',
        html_content='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 처리 결과는 <br> \
                    {{ti.xcom_pull(task_ids="something_task")}} 했습니다 <br>'
    )

    some_logic() >> send_email

위 코드도 크게 어려울 건 없다. something_task에서 단순히 Success인지 Fail인지 랜덤으로 값을 받는다. 그 후 EmailOperator 안에서 xcom_pull을 통해 Success or Fail String값을 반환하고 해당 내용을 이메일로 보내주는 코드이다. 제목은 템플릿을 활용하여 data_interval_end 날짜를 넣어주었다. 

728x90
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.