airflow는 데이터 처리를 목적으로 만들어진 툴이기 때문에 우리도 ETL 처리를 할 때 데이터 관점에서 생각해야 한다.
1. 데이터 추출 예시
위와 같은 예시 테이블이 있다고 생각해보자 이러한 테이블을 조회하기 위한 Daily ETL 조회쿼리는 다음과 같다.(2023/02/25 0시 실행)
SELECT NAME, ADDRESS
FROM TBL_REG
WHERE REG_DATE BETWEEN TIMESTAMP(‘2023-02-24 00:00:00’)
AND TIMESTAMP(‘2023-02-24 23:59:59’)
위 쿼리를 보면 실제 쿼리가 시작되는 시점은 2023/02/25 0시이다.
데이터 관점에서 보면 다음과 같다.
데이터 관점의 시작일: 2023-02-24 데이터 관점의 종료일: 2023-02-25
이걸 자연스럽게 DAG를 구성할 때 변수로 넣어주어야 하는데 이때 Airflow 날짜 Template 변수를 이용해 값들을 가져올 수 있다.
2. Airflow 날짜 Template 변수
이때 이전 배치일 즉 25일에 배치 시 처리하는 날짜 범위의 첫부분(2023-02-24)은 다음과 같은 명령어로 가져올 수 있다.
data_intervel_start
dag_run.logical_date
ds(yyyy-mm-dd 형식)
ls(타임스탬프)
배치당일 즉 배치시 처리하는 날짜의 마지막 부분(2023-02-25)은 다음과 같은 명령어로 가져올 수 있다.
이걸 실제 dags 코드를 통해 구현해 보면 다음과 같다.
from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from airflow.decorators import task
with DAG(
dag_id="dags_python_template",
schedule="30 9 * * *",
start_date=pendulum.datetime(2023, 3, 10, tz="Asia/Seoul"),
catchup=False
) as dag:
def python_function1(start_date, end_date, **kwargs):
print(start_date)
print(end_date)
python_t1 = PythonOperator(
task_id='python_t1',
python_callable=python_function1,
op_kwargs={'start_date':'{{data_interval_start | ds}}', 'end_date':'{{data_interval_end | ds}}'}
)
@task(task_id='python_t2')
def python_function2(**kwargs):
print('ds:' + kwargs['ds'])
print('ts:' + kwargs['ts'])
print('data_interval_start:' + str(kwargs['data_interval_start']))
print('data_interval_end:' + str(kwargs['data_interval_end']))
python_t1 >> python_function2()
위 코드는 두 개의 task가 정의되어 있다.
python1 task -> op_kwargs 파라미터에 jinja templates 를 적용하여 data_interval_start과 data_interval_end을 가져온 것 python2 task -> kwargs 안에 있는 data_interval_end과 data_interval_start를 비롯한 다양한 날짜 호출 값들을 인덱싱으로 꺼내온
것
task1의 출력값은 다음과 같다. 데이터 관점에서 starttime과 endtime이 잘 출력된 것을 알 수 있다.
task2의 출력값은 다음과 같다. 마찬가지로 잘 출력된 것을 알 수 있으며 ds와 ts 역시 형식에 맞추어 잘 출력된 것을 알 수 있다.
task가 나뉜 것을 그저 원하는 배치 시점을 가져오는 코드를 두 개로 나눈 것에 불과하다. 둘 중 편한 것 을 쓰면 된다. 나는 task 데코레이터를 사용하는 2번 task 방법이 더 편해 보인다.