airflow

[Airflow] - Macro 변수

  • -
728x90

이번에는 macro 변수에 대해서 알아보자 jinja 템플릿 내에서 날짜 연산을 가능하게끔 해주는 기능이다. macro가 있다면 만들지 못하는 날짜가 없다고 한다. 스케쥴러를 잘 다루지 못하면 airflow를 잘 다룬다고 할 수 없는 만큼 날짜 연산은 중요하므로 이번 기회에 잘 포스팅 해보고자 한다.

airflow에서 macro란?

airflow dags를 구성할때 jinja 템플릿 내에서 날짜 연산을 가능하게끔 해주는 기능이다.

macro 변수의 필요성

Dag 스케줄이 매월 말 일에 도는 스케줄인데 Between 값을 전월 마지막일부터 어제 날짜까지 주고 싶을 경우가 있다고 하자 이때 어제 날짜는 스케쥴러가 끝나는 오늘 날부터 하루 전 날짜이므로 아래처럼 식을 구성해야 하는데 그것을 macro가 해준다.

📌 {{data_interval_end}} - 1day

macro를 지원하는 파이썬의 라이브러리는 다음과 같은데 대부분 datetime, dateutil 라이브러리만 사용하므로 파이썬의 datetime, dateutil 라이브러리에 익숙해져야 한다.

 

아래는 macro를 사용하여 날짜연산을 해본 간단한 예시이다.

from datetime import datetime
from dateutil import relativedelta

now = datetime(year = 2024, month = 3, day=30)

print(f'현재시간: {str(now)}')
print('------------------월 연산------------------')
print(now + relativedelta.relativedelta(month=1)) # 1월로 변경
print(now.replace(month=1)) # 1월로 변경
print(now + relativedelta.relativedelta(months=-1)) # 1개월 빼기
print('------------------일 연산------------------')
print(now + relativedelta.relativedelta(day=1)) # 1일로 변경
print(now.replace(day=1)) # 1일로 변경
print(now + relativedelta.relativedelta(days=-1)) # 1일 빼기
print('----------------연산 여러 개----------------')
print(now + relativedelta.relativedelta(months=-1) + relativedelta.relativedelta(days=-1)) # 1개월 하고 1일 빼기

 

bash 오퍼레이터에서 macro 사용해보기

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
	dag_id="dags_bash_with_macro_eg2",
	schedule="10 0 * * 6#2",
	start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
	catchup=False
) as dag:
	# START_DATE: 2주전 월요일, END_DATE: 2주전 토요일
	bash_task_2 = BashOperator(
		task_id='bash_task_2',
		env={'START_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=19)) | ds}}',
			 'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=14)) | ds}}'
		},
		bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
	)

매주 둘째 주 토요일 00시 10분에 도는 월 단위 dag이다.

현재는 12월 31일으로 {{data_interval_end}}는 12월의 둘째 주 토요일인 12월 9일이다. 나는 2주전 월요일을 START_DATE로 삼고 싶었기에 -19일 END_DATE는 -14일을 해주었다. 이렇게 진자 템플릿 안에 macro를 사용하여 날짜 연산을 하고 dags 출력결과를 보자

12월 9일의 19일 전인 11월 20일이 START_DATE로 출력되었으며 END_DATE도 잘 나온 것을 볼 수 있다.

 

Python 오퍼레이터에서 macro 사용해보기

먼저 작성한 dags 파일을 보자

from airflow import DAG
import pendulum
from airflow.decorators import task



with DAG(
	dag_id="dags_python_with_macro",
	schedule="10 0 * * *",
	start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
	catchup=False
) as dag:
	# 파이썬 오퍼레이터에서 템플릿과 함께 macro를 사용한 경우
	@task(task_id='task_using_macros',
	  templates_dict={'start_date':'{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}',
					  'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}'
	 }
	)
	def get_datetime_macro(**kwargs):
		# 위 templates_dict라는 string이 key값 나머지 전체가 value값으로 들어감 
		templates_dict = kwargs.get('templates_dict') or {}
		if templates_dict:
			start_date = templates_dict.get('start_date') or 'start_date없음' # key 값이 없으면 스트링으로 없다고 할당
			end_date = templates_dict.get('end_date') or 'end_date없음'
			print(start_date)
			print(end_date)

	# dag 안에서 직접 날짜 연산을 한 경우 
	@task(task_id='task_direct_calc')
	def get_datetime_calc(**kwargs):
		from dateutil.relativedelta import relativedelta
		# 스케줄러 부하 경감을 위해 패키지 import 하는 부분을 task 안에 명명 주기적으로 문법 검사를 하는 airflow의 부하를 줄일 수 있음
		# 오퍼레이터 안에만 필요한 함수는 오퍼레이터 안에 선언을 하는게 나중에 신상에 좋음
		data_interval_end = kwargs['data_interval_end']
		prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1)
		prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) + relativedelta(days=-1)
		print(prev_month_day_first.strftime('%Y-%m-%d')) # 날짜 형식 지정
		print(prev_month_day_last.strftime('%Y-%m-%d'))

	get_datetime_macro() >> get_datetime_calc()

이번에는 파이썬의 task데코레이터를 사용해서 dags를 구성해봤다. 데일리 스케쥴로 task를 두 개 만들었는데 차이점을 살펴보자

task_using_macros task - macro를 사용한 연산

말그대로 macro를 사용해서 만든 task다. templates_dict 안에 json 형태로 start_date와 end_date를 넣어 주었는데 이게 get_datetime_macro함수의 인자로 들어간다

특이한 것은 **kwargs로 인자가 들어갈때 start_date가 key로 들어가는 것이 아니라 templates_dict라는 string 값이 key값으로 들어가고 나머지 start_date와 end_date전체가 value 값으로 들어간다는 사실이다.

그렇게 함수에 값을 넣어주고 각 date를 출력해주는 함수를 만들었다. 만약 값이 할당되지 않았을 경우에는 그냥 없다고 string 값 할당하도록 안전하게 코딩했다.

start_date를 보면 매월 1일 00시 10분에 시작되는 배치에 한 개의 월을 빼고 1일로 고정하라는 의미다. 즉 현재는 12월 data_interval_end는 12월 31일 이므로 11월 1일을 start_date로 하라는 뜻이다.

end_date는 전월 말일로 설정하라는 의미로, 현재 달의 1일에서 1일을 빼 전월 말일로 설정하게 된다. 즉, 현재는 12월 이므로 11월 30일이 end_date가 된다.

출력결과를 보면 동일한 것을 알 수 있다.

task_direct_calc task - 직접 연산

macro를 사용하지 않고도 날짜 연산은 가능하다. 선행 task가 오퍼레이터 안에서 macro를 사용해 변수를 할당해 준 것 이라면 아예 task의 함수 안에서 replace와 relativedelta를 활용해 직접적으로 날짜 연산을 할 수 있다.

macro를 사용할때는 jinja 템플릿을 사용한 것이기 때문에 ds를 사용해 날짜형식을 지정해 주었다면 이번에는 strftime를 사용해 수동으로 형식을 지정해 주어야 한다.

마찬가지로 잘 출력된 것을 알 수 있다.

 

여기서 from dateutil.relativedelta import relativedelta함수 선언문을 데코레이터 안에 넣어주었는데 이게 다 이유가 있다. airflow는 정기적으로 스케줄러를 통해 문법 검사를 진행하는데 task 안에 들어가 있는 코드는 검사를 하지않는다. 때문에 스케줄러 부하 경감을 위해 오퍼레이터 안에만 필요한 함수는 오퍼레이터 안에 선언 해주는 것이 나중에 대규모 데이터 처리 할때 신상에 좋다.

 

 

728x90
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.