airflow

[Airflow] - Airflow의 날짜개념

  • -
728x90

airflow는 데이터 처리를 목적으로 만들어진 툴이기 때문에 우리도 ETL 처리를 할 때 데이터 관점에서 생각해야 한다.

 

1. 데이터 추출 예시

위와 같은 예시 테이블이 있다고 생각해보자 이러한 테이블을 조회하기 위한 Daily ETL 조회쿼리는 다음과 같다.(2023/02/25 0시 실행)

SELECT NAME, ADDRESS
FROM TBL_REG
WHERE REG_DATE BETWEEN TIMESTAMP(‘2023-02-24 00:00:00’)
AND TIMESTAMP(‘2023-02-24 23:59:59’)

 

위 쿼리를 보면 실제 쿼리가 시작되는 시점은 2023/02/25 0시이다.

데이터 관점에서 보면 다음과 같다.

데이터 관점의 시작일: 2023-02-24
데이터 관점의 종료일: 2023-02-25

 

이걸 자연스럽게 DAG를 구성할 때 변수로 넣어주어야 하는데 이때 Airflow 날짜 Template 변수를 이용해 값들을 가져올 수 있다.

 

 

2. Airflow 날짜 Template 변수

 

이때 이전 배치일 즉 25일에 배치 시 처리하는 날짜 범위의 첫부분(2023-02-24)은 다음과 같은 명령어로 가져올 수 있다.

  • data_intervel_start
  • dag_run.logical_date
  • ds(yyyy-mm-dd 형식)
  • ls(타임스탬프)

배치당일 즉 배치시 처리하는 날짜의 마지막 부분(2023-02-25)은 다음과 같은 명령어로 가져올 수 있다.

  • data_intervel_end

 

이걸 실제 dags 코드를 통해 구현해 보면 다음과 같다.

from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from airflow.decorators import task

with DAG(
    dag_id="dags_python_template",
    schedule="30 9 * * *",
    start_date=pendulum.datetime(2023, 3, 10, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    def python_function1(start_date, end_date, **kwargs):
        print(start_date)
        print(end_date)

    python_t1 = PythonOperator(
        task_id='python_t1',
        python_callable=python_function1,
        op_kwargs={'start_date':'{{data_interval_start | ds}}', 'end_date':'{{data_interval_end | ds}}'}
    )
    
    @task(task_id='python_t2')
    def python_function2(**kwargs):
        print('ds:' + kwargs['ds'])
        print('ts:' + kwargs['ts'])
        print('data_interval_start:' + str(kwargs['data_interval_start']))
        print('data_interval_end:' + str(kwargs['data_interval_end']))

    python_t1 >> python_function2()

 

위 코드는 두 개의 task가 정의되어 있다.

python1 task -> op_kwargs 파라미터에 jinja templates를 적용하여 data_interval_start과 data_interval_end을 가져온 것
python2 task -> kwargs 안에 있는 data_interval_end과 data_interval_start를 비롯한 다양한 날짜 호출 값들을 인덱싱으로 꺼내온 

 

task1의 출력값은 다음과 같다. 데이터 관점에서 starttime과 endtime이 잘 출력된 것을 알 수 있다.

task2의 출력값은 다음과 같다. 마찬가지로 잘 출력된 것을 알 수 있으며 ds와 ts 역시 형식에 맞추어 잘 출력된 것을 알 수 있다.

 

task가 나뉜 것을 그저 원하는 배치 시점을 가져오는 코드를 두 개로 나눈 것에 불과하다. 둘 중 편한 것 을 쓰면 된다. 나는 task 데코레이터를 사용하는 2번 task 방법이 더 편해 보인다.

 

728x90
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.