본문 바로가기
MLOps

MLOps - 19. Airflow

by cocacola0 2022. 6. 3.

출처 : 변성윤님 블로그.
출처 : 부스트캠프 AI Tech.

1. Apache Airflow 소개

1.1 Batch Process란?

  • 예약된 시간에 실행되는 프로세스
    • 일회성(1회)도 가능하고, 주기적인 실행도 가능
      • ex. 이번 주 일요일 07:00에 1번 실행되는 프로세스
      • ex. 매주 일요일 07:00에 실행되는 프로세스
  • Batch Process를 AI 엔지니어가 알아야 하는 이유
    • 모델을 주기적으로 학습시키는 경우 사용(Continuous Training)
    • 주기적인 Batch Serving을 하는 경우 사용
    • 그 외 개발에서 필요한 배치성 작업

1.2 Batch Process - Airflow 등장 전

  • 대표적인 Batch Process 구축 방법 : Linux Crontab
    • (서버에서) crontab -e 입력
    • 실행된 에디터에서 0 * * * * predict.py 입력
    • (0 * * * * 은 크론탭 표현으로 매 시 0분에 실행하는 것을 의미)
    • OS에 의해 매 시 0분에 predict.py가 실행
    • Linux는 일반적인 서버 환경이고, Crontab은 기본적으로 설치되어 있기 때문에 매우 간편
    • 간단하게 Batch Process를 시작하기에 Crontab은 좋은 선택
  • 크론 표현식
    • Batch Process의 스케줄링을 정의한 표현식
    • 이 표현식은 다른 Batch Process 도구에서도 자주 사용됨
  • 크론 표현식 예시
  • 매번 표현식을 암기할 필요는 없지만, 읽을 정도만 인지하면 좋음
  • 크론 표현식 제너레이터 사이트가 많이 있으니, 활용
  • cron 표현식이 어렵다면 다음 사이트에서 확인 가능
  • Linux Crontab의 문제
    • 재실행 및 알람
      • 파일을 실행하다 오류가 발생한 경우, 크론탭이 별도의 처리를 하지 않음
      • 예) 매주 일요일 07:00에 predict.py를 실행하다가 에러가 발생한 경우, 알람을 별도로 받지 못함
    • 실패할 경우, 자동으로 몇 번 더 재실행(Retry)하고, 그래도 실패하면 실패했다는 알람을 받으면 좋음
    • 과거 실행 이력 및 실행 로그를 보기 어려움
    • 여러 파일을 실행하거나, 복잡한 파이프라인을 만들기 힘듬
  • Crontab은 간단히 사용할 수는 있지만, 실패 시 재실행, 실행 로그 확인, 알람 등의 기능은 제공하지 않음
  • 좀 더 정교한 스케줄링 및 워크플로우 도구가 필요함
  • 스케줄링 워크플로우 전용 도구의 등장

1.3 Airflow 소개

  • 현재 스케줄링, 워크플로우 도구의 표준
  • 에어비앤비(Airbnb)에서 개발
  • 현재 릴리즈된 버전은 2.2.0으로, 업데이트 주기가 빠름
  • 스케줄링 도구로 무거울 수 있지만, 거의 모든 기능을 제공하고, 확장성이 넓어 일반적으로 스케줄링과 파이프라인 작성 도구로 많이 사용
  • 특히 데이터 엔지니어링 팀에서 많이 사용

1.3.1 Airflow가 제공하는 기능

  • 파이썬을 사용해 스케줄링 및 파이프라인 작성

  • 스케줄링 및 파이프라인 목록을 볼 수 있는 웹 UI 제공

  • 실패 시 알람
  • 실패 시 재실행 시도
  • 동시 실행 워커 수
  • 설정 및 변수 값 분리

2. Apache Airflow 실습하며 배워보기

2.1 설치하고 실행하기

# 가상환경 설정
$ python -m venv .venv
$ source .venv/bin/activate

# Airfow 설치 
$ pip install pip --upgrade
$ pip install 'apache-airflow==2.2.0'

# Airflow Default Directory 지정
$ export AIRFLOW_HOME="."

# Airflow 에서 사용할 DB 초기화
$ airflow db init

# DB 초기화시 시본 파일 생성
$ ls -al

# Airflow Admin 계정 생성
$ airflow users create \
--username admin \
--password 1234 \
--firstname philhoon \
--lastname oh \
--role Admin \
--email vlfgns5@naver.com

# Airflow Webserver 실행
$ airflow webserver --port 8080
  • http://localhost:8080 에 접속하면 다음처럼 웹 UI가 등장! 어드민 계정으로 로그인!

  • 웹 UI 대시보드 화면이 등장. 하지만 스케줄러 실행중이지 않다는 에러가 보임

  • 별도의 터미널 창을 띄워 다음처럼 Airflow Scheduler를 실행합니다.
$ export AIRFLOW_HOME="."
$ airflow scheduler
  • 다시 웹 UI를 새로고침하면, Scheduler 관련 에러가 없어짐
  • 기본적으로 제공되는 여러 DAG

설치하고 실행하기 - 정리

  • Airflow 설치
    • pip install apache-airflow
  • Airflow 기본 디렉토리 설정
    • 환경변수 AIRFLOW_HOME에 사용할 기본 디렉토리 경로 설정
    • export AIRFLOW_HOME=.
  • Airflow DB 초기화
    • Airflow에서 사용할 DB를 초기화
    • airflow db init
  • Airflow 어드민 계정 생성
    • airflow user create
  • Airflow 웹서버 실행
    • airflow webserver
  • Airflow 스케줄러 실행
    • airflow scheduler

2.2 DAG

2.2.1 DAG 와 Task

  • Batch Scheduling을 위한 DAG 생성
    • Airflow에서는 스케줄링할 작업을 DAG이라고 부름
    • DAG은 Directed Acyclic Graph의 약자로, Airflow에 한정된 개념이 아닌 소프트웨어 자료구조에서 일반적으로 다루는 개념
    • DAG은 이름 그대로, 순환하지 않는 방향이 존재하는 그래프를 의미
  • Airflow는 Crontab처럼 단순히 하나의 파일을 실행하는 것이 아닌, 여러 작업의 조합도 가능함
    • DAG 1개 : 1개의 파이프라인
    • Task : DAG 내에서 실행할 작업
  • 하나의 DAG에 여러 Task의 조합으로 구성

  • tutorial_etl_dag라는 DAG은 3가지 Task로 구성
    • extract
    • transform
    • load
  • tutorial_etl_dag라는 DAG을 실행하면 이 3가지 Task을 순차적으로 실행
  • Task가 꼭 순차적으로 진행하지 않게 할 수도 있음
  • print_date Task 이후
  • sleep, templated Task 동시 실행

DAG 와 Task 정리

  • Airflow는 DAG이라는 단위로 스케줄링 관리
  • 각 DAG은 Task로 구성
  • DAG 내 Task는 순차적으로 실행되거나, 동시에(병렬로) 실행할 수 있음

2.2.2 DAG 작성하기

# 먼저 DAG을 담을 디렉토리 생성(이름은 무조건 dags)
$ mkdir dags

# dags 폴더 내에 hello_world.py 생성
# hello_world.py

# 필요 라이브러리 import 
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator


def print_world() -> None:
    print("world")

# DAG 정의(이름,태그)
# 언제부터 스케줄링을 시작할지,
# 스케줄링 간격은 어떻게 할지
# with 구문으로 DAG 정의를 시작합니다.
with DAG(
    dag_id="hello_world",  # DAG의 식별자용 아이디입니다.
    description="My First DAG",  # DAG에 대해 설명합니다.
    start_date=days_ago(2),  # DAG 정의 기준 2일 전부터 시작합니다.
    schedule_interval="0 6 * * *",  # 매일 06:00에 실행합니다.
    tags=["my_dags"],  # 태그 목록을 정의합니다. 추후에 DAG을 검색하는데 용이합니다.
) as dag:

    # DAG 내 Task 정의
    # Task 정의는 Airflow의 Operator 클래스를 사용 
    # Airflow에는 다양한 Operator 클래스가 존재


    # 첫번째 Task는 bash 커맨드를 실행
    # Airflow에서 제공하는 BashOperator를 사용 
    # bash_commnad 파라미터에 bash로 실행할 커맨드 전달 

    # 테스크를 정의합니다.
    # bash 커맨드로 echo hello 를 실행합니다.
    t1 = BashOperator(
        task_id="print_hello",
        bash_command="echo Hello",
        owner="heumsi",  # 이 작업의 오너입니다. 보통 작업을 담당하는 사람 이름을 넣습니다.
        retries=3,  # 이 테스크가 실패한 경우, 3번 재시도 합니다.
        retry_delay=timedelta(minutes=5),  # 재시도하는 시간 간격은 5분입니다.
    )

    # 첫번째 Task는 Python 함수 실행
    # Airflow에서 제공하는 PythonOperator 사용 
    # paython_callable 파라미터에 실행할 파이썬 함수 전달 

    # 테스크를 정의합니다.
    # python 함수인 print_world를 실행합니다.
    t2 = PythonOperator(
        task_id="print_world",
        python_callable=print_world,
        depends_on_past=True,
        owner="heumsi",
        retries=3,
        retry_delay=timedelta(minutes=5),
    )

    # DAG 내 TASK 간 순서를 정함
    # 순서는 >> 와 같은 형태로 표현
    # t1(BashOperator) 실행 후, t2(PythonOperator)를 실행 
    # 테스크 순서를 정합니다.
    # t1 실행 후 t2를 실행합니다.
    t1 >> t2
  • 이제 파일을 저장하고, 웹 UI를 확인해보면 새로 생성한 DAG이 보임
  • DAG 상세 페이지에 들어가 DAG을 ON 상태로 변경
  • 조금 기다리면 다음처럼 실행된 결과를 볼 수 있음
  • 모두 성공했기 때문에 초록색으로 표현됨. 마우스를 올리면 간단한 정보를 볼 수 있음
  • 두 번째 DAG Run의 첫 번째 Task 사각형을 클릭한 뒤, Log 버튼을 클릭
  • 우리가 의도한대로 echo Hello가 실행됨
  • 두 번째 DAG Run의 두 번째 Task Log를 확인
  • 의도한대로 파이썬 함수가 실행되어 print(“world”)가 실행된 것을 볼 수 있음
  • 약 특정 DAG Run의 기록을 지우고, 다시 실행시키고 싶으면 (Graph에서 해당 DAG RUN 클릭) Clear를 실행

DAG 작성하기 - 정리

  • AIRFLOW_HOME 으로 지정된 디렉토리에 dags 디렉토리를 생성하고 이 안에 DAG 파일을 작성
  • DAG은 파이썬 파일로 작성. 보통 하나의 .py 파일에 하나의 DAG을 저장
  • DAG 파일은 크게 다음으로 구성
    • DAG 정의 부분
    • Task 정의 부분
    • Task 간 순서 정의 부분
  • DAG 파일을 저장하면, Airflow 웹 UI에서 확인할 수 있음
  • Airflow 웹 UI에서 해당 DAG을 ON으로 변경하면 DAG이 스케줄링되어 실행
  • DAG 세부 페이지에서 실행된 DAG Run의 결과를 볼 수 있음

2.3 유용한 Operator 간단 소개

2.3.1 PythonOperator

  • 파이썬 함수를 실행
  • 함수 뿐 아니라, Callable한 객체를 파라미터로 넘겨 실행할 수 있음
  • 실행할 파이썬 로직을 함수로 생성한 후, PythonOperator로 실행

2.3.2 BashOperator

  • Bash 커맨드를 실행
  • 실행해야 할 프로세스가 파이썬이 아닌 경우에도 BashOperator로 실행 가능
    • ex. shell 스크립트, scala 파일 등

2.3.3 DummyOperator

  • 아무것도 실행하지 않음
  • DAG 내에서 Task를 구성할 때, 여러 개의 Task의 SUCCESS를 기다려야 하는 복잡한 Task 구성에서 사용

2.3.4 SimpleHttpOperator

  • 특정 호스트로 HTTP 요청을 보내고 Response를 반환
  • 파이썬 함수에서 requests 모듈을 사용한 뒤 PythonOperator로 실행시켜도 무방
  • 다만 이런 기능이 Airflow Operator에 이미 존재하는 것을 알면 좋음

2.3.5 이외 Operator

  • BranchOperator
  • DockerOperator
  • KuberntesOperator
  • CustomOperator (직접 Operator 구현)
  • 등등

2.3.6 이외 CSP(GCP, AWS) Operator

2.4 Airflow DAG 관련 다양한 기능들

  • Airflow DAG을 더 풍부하게 작성할 수 있는 방법으로 다음 내용
    • Variable : Airflow Console에서 변수(Variable)를 저장해 Airflow DAG에서 활용 - Connection & Hooks : 연결하기 위한 설정(MySQL, GCP 등)
    • Sensor : 외부 이벤트를 기다리며 특정 조건이 만족하면 실행
    • Marker
    • XComs : Task 끼리 결과를 주고받은 싶은 경우 사용
      • 자세한 내용은 Airflow DAG을 하나씩 만들면서 사용해보기!

3. Apache Airflow 아키텍처와 활용방안

3.1 기본 아키텍처

  • Airflow의 기본 아키텍처 형태

3.1.1 DAG Directory

  • DAG 파일들을 저장
    • 기본 경로는 $AIRFLOW_HOME/dags
    • DAG_FOLDER 라고도 부르며, 이 폴더 내부에서 폴더 구조를 어떻게 두어도 상관없음
    • Scheduler에 의해 .py 파일은 모두 탐색되고 DAG이 파싱

3.1.2 Scheduler

  • Scheduler는 각종 메타 정보의 기록을 담당
    • DAG Directory 내 .py 파일에서 DAG을 파싱하여 DB에 저장
    • DAG들의 스케줄링 관리 및 담당
    • 실행 진행 상황과 결과를 DB에 저장
    • Executor를 통해 실제로 스케줄링된 DAG을 실행
    • Airflow에서 가장 중요한 컴포넌트

3.1.3 Scheduler - Executor

  • Executor는 스케줄링된 DAG을 실행하는 객체로, Local Executor, Remote Executor 크게 2종류로 나뉨

3.1.3.1 Scheduler - Local Executor

  • Local Executor는 DAG Run을 프로세스 단위로 실행하며, 다음처럼 나뉨
  • Local Executor
    • 하나의 DAG Run을 하나의 프로세스로 띄워서 실행
    • 최대로 생성할 프로세스 수를 정해야 함
    • Airflow를 간단하게 운영할 때 적합
  • Sequential Executor
    • 하나의 프로세스에서 모든 DAG Run들을 처리
    • Airflow 기본 Executor로, 별도 설정이 없으면 이 Executor를 사용
    • Airflow를 테스트로 잠시 운영할 때 적합

3.1.3.2 Scheduler - Remote Executor

  • DAG Run을 외부 프로세스로 실행
  • Celery Executor
    • DAG Run을 Celery Worker Process로 실행
    • 보통 Redis를 중간에 두고 같이 사용
    • Local Executor를 사용하다, Airflow 운영 규모가 좀 더 커지면 Celery Executor로 전환
  • Kubernetes Executor
    • 쿠버네티스 상에서 Airflow를 운영할 때 사용
    • DAG Run 하나가 하나의 Pod(쿠버네티스의 컨테이너 같은 개념)
    • Airflow 운영 규모가 큰 팀에서 사용
    • 이 외에도 CeleryKubernetes Executor, Dask Executor 등이 있지만, 여기서는 생략
    • Executor의 동작 과정에 대해 좀 궁금하시면 라인 블로그 글 추천!

3.1.4 Scheduler - Workers

  • DAG을 실제로 실행
    • Scheduler에 의해 생기고 실행
    • Executor에 따라 워커의 형태가 다름
      • Celery 혹은 Local Executor인 경우, Worker는 프로세스
      • Kubernetes Executor인 경우, Worker는 pod.
    • DAG Run을 실행하는 과정에서 생긴 로그를 저장

3.1.5 Scheduler - Metadata Database

  • 메타 정보를 저장
    • Scheduler에 의해 Metadata가 쌓임
    • 보통 MySQL이나 Postgres를 사용
    • 파싱한 DAG 정보, DAG Run 상태와 실행 내용, Task 정보 등을 저장
    • User와 Role (RBAC)에 대한 정보 저장
    • Scheduler와 더불어 핵심 컴포넌트
      • 트러블 슈팅 시, 디버깅을 위해 직접 DB에 연결해 데이터를 확인하기도 함
      • 실제 운영 환경에서는 GCP Cloud SQL이나, AWS Aurora DB 등 외부 DB 인스턴스를 사용

3.1.6 Scheduler - Webserver

  • WEB UI를 담당
    • Metadata DB와 통신하며 유저에게 필요한 메타 데이터를 웹 브라우저에 보여주고 시각화
    • 보통 Airflow 사용자들은 이 웹서버를 이용하여 DAG을 ON/OFF 하며, 현 상황을 파악
    • REST API도 제공하므로, 꼭 WEB UI를 통해서 통신하지 않아도 괜찮음
    • 웹서버가 당장 작동하지 않아도, Airflow에 큰 장애가 발생하지 않음(반면 Scheduler의 작동 여부는 매우 중요)

3.2 Airflow 실제 활용 사례

  • Airflow를 구축하는 방법으로 보통 3가지 방법을 사용
    • 1) Managed Airflow (GCP Composer, AWS MWAA)
    • 2) VM + Docker compose
    • 3) Kubernetes + Helm

3.2.1 Managed Airflow

  • Managed Airflow은 클라우드 서비스 형태로 Airflow를 사용하는 방법
    • AWS의 MWAA, GCP의 Cloud Composer
  • 보통 별도의 데이터 엔지니어가 없고, 분석가로 이루어진 데이터 팀의 초기에 활용하기 좋음
  • Managed Airflow의 장단점
    • 장점
      • 설치와 구축을 클릭 몇번으로 클라우드 서비스가 다 진행
      • 유저는 DAG 파일을 스토리지(파일 업로드) 형태로 관리
    • 단점
      • 비용
      • 자유도 가 적음. 클라우드에서 기능을 제공하지 않으면 불가능한 제약이 많음

3.2.2 VM + Docker compose

  • VM + Docker compose는 직접 VM 위에서 Docker compose로 Airflow를 배포하는 방법
  • Airflow 구축에 필요한 컴포넌트(Scheduler, Webserver, Database 등)를 Docker container 형태로 배포
  • 보통 데이터 팀에 데이터 엔지니어가 적게 존재하는 데이터 팀 성장 초반에 적합
  • VM + Docker compose 방법의 장단점
    • 장점
      • Managed Service 보다는 살짝 복잡하지만, 어려운 난이도는 아님
      • (Docker와 Docker compose에 익숙한 사람이라면 금방 익힐 수 있음)
      • 하나의 VM만을 사용하기 때문에 단순
        • 단점
      • 각 도커 컨테이너 별로 환경이 다르므로, 관리 포인트가 늘어남
      • 예를 들어, 특정 컨테이너가 갑자기 죽을 수도 있고, 특정 컨테이너에 라이브러리를 설치했다면, 나머지 컨테이너에도 하나씩 설치해야 함

3.2.3 Kubernetes + Helm

  • Kubernetes + Helm은 Kubernetes 환경에서 Helm 차트로 Airflow를 배포하는 방법
  • Kubernetes는 여러 개의 VM을 동적으로 운영하는 일종의 분산환경으로, 리소스 사용이 매우 유연한게 대표적인 특징(필요에 따라 VM 수를 알아서 늘려주고 줄여줌)
  • 이런 특징 덕분에, 특정 시간에 배치 프로세스를 실행시키는 Airflow와 궁합이 매우 잘 맞음
  • Airflow DAG 수가 몇 백개로 늘어나도 노드 오토 스케일링으로 모든 프로세스를 잘 처리할 수 있음
  • 하지만 쿠버네티스 자체가 난이도가 있는만큼 구축과 운영이 어려움
  • 보통 데이터 팀에 엔지니어링 팀이 존재하고, 쿠버네티스 환경인 경우에 적극 사용

3.3 MLOps 관점의 Airflow

  • Airflow는 데이터 엔지니어링에서 많이 사용하지만, MLOps에서도 활용할 수 있음
  • “주기적인 실행”이 필요한 경우
    • Batch Training : 1주일 단위로 모델 학습
    • Batch Serving(Batch Inference) : 30분 단위로 인퍼런스
    • 인퍼런스 결과를 기반으로 일자별, 주차별 모델 퍼포먼스 Report 생성
    • MySQL에 저장된 메타데이터를 데이터 웨어하우스로 1시간 단위로 옮기기
      • S3, GCS 등 Objest Storage
    • Feature Store를 만들기 위해 Batch ETL 실행

4.Airflow 관련 추천 글

1. 버킷플레이스 - Airflow 도입기

2. 라인 엔지니어링 - Airflow on Kubernetes

3. 쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기

4. Airflow Executors Explained

'MLOps' 카테고리의 다른 글

MLOps - 21. Further (앞으로 더 공부하면 좋을 내용)  (0) 2022.06.05
MLOps - 20. 머신러닝 디자인 패턴  (0) 2022.06.04
MLOps - 18. BentoML  (0) 2022.06.02
MLOps - 17. MLFlow  (0) 2022.06.01
MLOps - 16.Logging  (0) 2022.05.31

댓글