[airflow] - 오퍼레이터 with Xcom
- -
아래 글은 옵시디언으로 작성되었습니다.
Xcom이란?
Cross Communication의 약자로 Airflow DAG 안 ==Task 간 데이터 공유를 위해 사용되는 기술==이다.(Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우) ==주로 작은 규모의 데이터 공유를 위해 사용==되며 Xcom의 내용은 메타 DB의 Xcom 테이블에 값이 저장된다. 만약, 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션을 사용해야 한다.(AWS S3, HDEF 등)
Python 오퍼레이터에서 Xcom사용하기
크게 두 가지 방법으로 Xcom 사용이 가능하다.
1) **kwargs에 존재하는 ti(task_instance) 객체 활용
Xcom에 데이터 push
@task(task_id='python_xcom_push_task')
def xcom_push(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_1")
ti.xcom_push(key="result2", value=[1,2,3])
template 변수인 kwargs 변수에서 ti객체를 얻을 수 있으며 task_istance객체가 가진 xcom_push 메서드를 활용할 수 있다. 이때 ti에 딕셔너리 형태로 데이터가 저장되며 다른 task에서 꺼내쓸 수 있도록 메타 DB의 Xcom에 저장된다.
Xcom에서 데이터 pull
@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
ti = kwargs['ti']
value_key1 = ti.xcom_pull(key="result1")
value_key2 = ti.xcom_pull(key="result2",
task_ids='python_xcom_push_task')
print(value_key1)
print(value_key2)
이 역시 마찬가지로 ti객체에서 xcom_pull 메서드를 이용해 저장했던 객체를 다른 task에서 꺼낼 수 있다. 다만 task_ids를 명시해 주는 게 좋다. 만약 task_ids를 명시하지 않았는데 key값이 같은 데이터가 xcom 내에 존재하는 경우 마지막으로 넣은 value가 불러와진다. key를 unique 하게 유지하는 경우가 아니라면 task_ids는 생략하면 안 된다.
2) 파이썬 함수의 return값 활용(1안)
@task (task_id='xcom_push by return')
def xcom_push_by_return(**kwargs):
transaction_value = 'status Good'
return transaction_value
@task(task_id='xcom_pull_by_return')
def xcom_pull_by_return(status, **kwargs):
print(status)
xcom_pull_by_return(xcom_push_by_return())
위 사례는 task1의 반환값이 다른 task의 입력값이 되는 경우이다. 이렇게 하면 ti 객체를 이용하지 않고 값을 DAG내 다른 task로 전달할 수 있다. 또한, task 데코레이터를 사용할 경우 airflow는
함수 입출력 관계 만으로도 자동으로 순서가 정해진다.
따라서 TASK 간 수행관계도 명시할 필요가 없다.
파이썬 함수의 return값 활용(2안)
@task(task_id='xcom_push_by_return')
def xcom_push_return(**kwargs):
transaction_value = 'statusGood'
return transaction_value
위 코드는 1안과 차이가 없다. return 한 값은 다동으로 xcom에 key=‘return value’, task_ids = task_id로 저장이 된다.
@task(task_id='xcom_pull_by_return')
def xcom_pull_return_by_method(**kwargs):
ti=kwargs['ti']
pull_value = ti.xcom_pull(key='return_value', task_ids='xcom_push_by_return' )
xcom_push_by_return() >> xcom_pull_by_return()
1안에서는 task1의 출력값이 두 번째 task의 인자로 들어가게끔 하였으나 이번에는 return값이 자동으로 xcom에 들어간다는 사실을 이용하여 두 번째 task에서 xcom_pull로 값을 당겨왔다. 이런 경우에는 task관 관계를 명시해주어야 한다.
# Python 오퍼레이터를 이용한 xcom DAG 직접 짜보기
1안 DAG
from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_xcom_eg1",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_1")
ti.xcom_push(key="result2", value=[1,2,3])
@task(task_id='python_xcom_push_task2')
def xcom_push2(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_2")
ti.xcom_push(key="result2", value=[1,2,3,4])
@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(key="result1")
value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
print(value1)
print(value2)
xcom_push1() >> xcom_push2() >> xcom_pull()
python_xcom_push_task1
에서 먼저 두 데이터를 xcom에 집어넣는다. 그 후 python_xcom_push_task2
에서 task1과 같은 key값을 가진 두 데이터를 집어넣는다.
마지막으로 python_xcom_pull_task
에서 xcom_pull을 하는데 이때 value1에는 task_ids를 명시하지 않았으므로 마지막 입력 값인 value_2가 들어가고 value2에는 task1에서 가져오라는 task_ids를 명시하였으므로 [1,2,3]가 출력될 것이다.
위 출력결과를 보면 잘 출력이 된 것을 알 수 있다.
2안 DAG
from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_xcom_eg2",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_xcom_push_by_return')
def xcom_push_result(**kwargs):
return 'Success'
@task(task_id='python_xcom_pull_1')
def xcom_pull_1(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
print('xcom_pull 메서드로 직접 찾은 리턴 값:' + value1)
@task(task_id='python_xcom_pull_2')
def xcom_pull_2(status, **kwargs):
print('함수 입력값으로 받은 값:' + status)
python_xcom_push_by_return = xcom_push_result()
xcom_pull_2(python_xcom_push_by_return)
python_xcom_push_by_return >> xcom_pull_1()
일단 위 DAG을 보면 자동으로 해당 로직이 머리에 떠올라야 한다.
가장 먼저 python_xcom_push_by_return
은 단순히 'Success’라는 string을 return 하는 task이다.python_xcom_pull_1
task는 ti 객체를 통해 xcom_pull 메서드로 직접 찾은 값을 출력하는 형태고,python_xcom_pull_2
task는 return값 'Success’를 직접 인자로 받은 후 task 내 status에 넣어주는 형태다.
그 후, task 포함관계를 보면 우선 python_xcom_push_by_return
가 실행된 후 xcom_pull_2
와 xcom_pull_1
이 동시에 실행되는 로직을 갖고 있는 것을 알 수 있다.
두 출력값 모두 정상적으로 잘 나온 것을 알 수 있다.
정리하면 다음과 같다.
Xcom push를 하는 두 가지 방법과 Xcom pull 하는 두 가지 방법이 있고 개발자가 원하는 방식으로 만들어주면 된다. 나라면 굳이 task 간 관계를 명시하지 않고 불필요한 객체 생성을 하지 않는 함수 return 방식을 사용할 것 같다.
BASH 오퍼레이터에서 Xcom 사용하기
from airflow import DAG
import pendulum
import datetime
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_xcom",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
bash_push = BashOperator(
task_id='bash_push',
bash_command="echo START && "
"echo XCOM_PUSHED "
"{{ ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
"echo COMPLETE"
)
bash_pull = BashOperator(
task_id='bash_pull',
env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
do_xcom_push=False
)
bash_push >> bash_pull
bash_push에서 Xcom으로 들어가는 값은 총 2개다. 먼저 xcom_push를 통해 넣은 first_bash_message라는 값과 마지막으로 echo 된 COMPLETE라는 string이다. 파이썬에서 return값이 자동으로 Xcom에 들어간다고 했었는데 bash에서는 마지막에 echo 된 값도 return값으로 취급된다. 아래 사진을 보면 값이 제대로 xcom에 input 된 것을 확인할 수 있다.
bash_pull을 보면 key를 명시하여 xcom_pull을 하는 경우와 task_ids만 명사하여 xcom_pull을 하는 경우가 있다. key를 명시한 경우는 그 key 해당하는 value값인 first_bash_message 스트링 값이 잘 출력이 될 것이다. task_ids만 명시한 경우는 해당 task에서 마지막으로 xcom에 들어간 value가 출력된다. 따라서 COMPLETA 스트링 값이 출력될 것이다.
추가적으로 bash_pull task에서 마지막으로 echo 된 값 또한 결국 return 값이므로 Xcom에 들어가야 마땅하나 do_xcom_push 파라미터를 False값으로 주면 Xcom으로 값을 집어넣지 않는다.
Python & Bash 오퍼레이터 간 Xcom 사용
Python to Bash 오퍼레이터 Xcom 전달
편의상 DAG 정의 부분은 생략하였다.
@task(task_id='python_push')
def python_push_xcom():
result_dict = {'status':'Good','data':[1,2,3],'options_cnt':100}
return result_dict
bash_pull = BashOperator(
task_id='bash_pull',
env={
'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["options_cnt"]}}'
},
bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)
python_push_xcom() >> bash_pull
이제 위와 같은 코드도 작성할 수 있다. python_push 테스트에서 dict 형태로 반환값을 만들면 Dict가 Xcom에 담긴다. 이것을 bash 오퍼레이터에서 뺄 수 있는데 bash_pull task를 보면 STATUS라는 환경변수에 Good, DATA라는 환경변수에 리스트[1, 2, 3]를 담으라는 뜻인 것을 알 수 있다. 출력값을 보면 다음과 같다.
Bash to Python 오퍼레이터 Xcom 전달
bash_push = BashOperator(
task_id='bash_push',
bash_command='echo PUSH_START '
'{{ti.xcom_push(key="bash_pushed",value=200)}} && '
'echo PUSH_COMPLETE'
)
@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
ti = kwargs['ti']
status_value = ti.xcom_pull(key='bash_pushed')
return_value = ti.xcom_pull(task_ids='bash_push')
print('status_value:' + str(status_value))
print('return_value:' + return_value)
bash_push >> python_pull_xcom()
반대로 이렇게 코드를 짜는 것도 가능하다. 마찬가지로 bash_push task에서 두 값을 Xcom에 넣고 python_pull 태스크에서 데이터를 가져온 모습을 확인할 수 있다.
Python & email 오퍼레이터 간 Xcom 사용
from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task
from airflow.operators.email import EmailOperator
with DAG(
dag_id="dags_python_email_operator",
schedule="0 8 1 * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='something_task')
def some_logic(**kwargs):
from random import choice
return choice(['Success','Fail'])
send_email = EmailOperator(
task_id='send_email',
to='akfktl328@naver.com',
subject='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some_logic 처리결과',
html_content='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 처리 결과는 <br> \
{{ti.xcom_pull(task_ids="something_task")}} 했습니다 <br>'
)
some_logic() >> send_email
위 코드도 크게 어려울 건 없다. something_task에서 단순히 Success인지 Fail인지 랜덤으로 값을 받는다. 그 후 EmailOperator 안에서 xcom_pull을 통해 Success or Fail String값을 반환하고 해당 내용을 이메일로 보내주는 코드이다. 제목은 템플릿을 활용하여 data_interval_end 날짜를 넣어주었다.
'airflow' 카테고리의 다른 글
[Airflow] - Macro 변수 (0) | 2023.12.31 |
---|---|
[Airflow] - Airflow의 날짜개념 (0) | 2023.12.20 |
[Airflow] - JINJA templates (0) | 2023.12.20 |
[Airflow] - Email Operator로 메일 전송하기 (1) | 2023.11.23 |
.gitignore로 commit이 필요하지 않은 파일 관리하기 (0) | 2023.11.09 |
소중한 공감 감사합니다