콘텐츠로 이동
Data Prep
상세

데이터 파이프라인 (Data Pipelines)

데이터 처리 작업을 자동화하고 스케줄링하는 워크플로우 시스템. 복잡한 의존성과 재시도 로직을 관리함.

파이프라인 오케스트레이션

핵심 개념

개념 설명
DAG (Directed Acyclic Graph) 작업 간 의존성을 표현하는 비순환 그래프
Task 파이프라인의 개별 작업 단위
Operator 작업 유형 (Python, SQL, Bash 등)
Scheduler 작업 실행 시점 결정
Executor 실제 작업 실행 (Local, Celery, K8s)
Trigger 파이프라인 시작 조건

오케스트레이션 도구 비교

도구 장점 단점 적합한 상황
Apache Airflow 성숙함, 커뮤니티, 플러그인 복잡한 설정, 무거움 대규모 프로덕션
Prefect Pythonic API, 유연함 상대적으로 새로움 클라우드 네이티브
Dagster 자산 중심, 테스트 용이 학습 곡선 데이터 플랫폼
Luigi 단순함, 가벼움 제한된 기능 작은 프로젝트
Argo Workflows K8s 네이티브, 컨테이너 K8s 필수 K8s 환경

도구 선택 가이드

어떤 도구가 "최고"인지가 아니라, 상황에 맞는 도구를 선택해야 함.

Airflow를 선택해야 할 때

상황 이유
팀 규모가 크고 다양한 기술 스택 풍부한 Operator, 커뮤니티 지원
복잡한 의존성 관리 필요 강력한 DAG 관리, 재시도 로직
기존 Hadoop/Spark 인프라 Hive, Spark, HDFS Operator
스케줄링이 핵심 요구사항 강력한 스케줄러, Backfill 지원
온프레미스 또는 하이브리드 자체 호스팅 가능
Airflow 적합 체크리스트:
[x] 팀에 10명 이상의 데이터 엔지니어
[x] DAG이 50개 이상
[x] Spark, Hive 등 빅데이터 도구 사용
[x] 복잡한 스케줄링 요구 (backfill, catchup)

Prefect를 선택해야 할 때

상황 이유
Python 중심 팀 Pythonic API, 데코레이터 기반
클라우드 네이티브 환경 Prefect Cloud, 간편한 설정
빠른 개발 사이클 로컬 테스트 용이, 적은 보일러플레이트
동적 워크플로우 런타임에 Task 생성 용이
관리 오버헤드 최소화 Prefect Cloud 사용 시
Prefect 적합 체크리스트:
[x] Python 위주 데이터 파이프라인
[x] 소규모 팀 (1-10명)
[x] 빠른 프로토타이핑 필요
[x] 인프라 관리 최소화 원함

Dagster를 선택해야 할 때

상황 이유
데이터 품질이 핵심 자산 중심 설계, 테스트 내장
ML 파이프라인 모델 재훈련, Feature Store 통합
데이터 계보(Lineage) 중요 자동 Lineage 추적
개발/프로덕션 환경 분리 리소스 추상화, 환경별 설정
Dagster 적합 체크리스트:
[x] 데이터 품질과 테스트가 중요
[x] ML Ops 워크플로우
[x] 데이터 계보 추적 필요
[x] 환경별 설정 분리 필요

마이그레이션 고려사항

기존 도구에서 새 도구로 이전할 때:

                    난이도
Luigi -> Airflow    낮음 (유사한 개념)
Airflow -> Prefect  중간 (재작성 필요하나 개념 유사)
Airflow -> Dagster  높음 (패러다임 변경)

Apache Airflow

아키텍처

+-------------+     +-----------+     +------------+
|  Scheduler  | --> | Metadata  | <-- |  Webserver |
+-------------+     |    DB     |     +------------+
      |             +-----------+           |
      |                                     |
      v                                     v
+-------------+                      +------------+
|  Executor   |                      |   UI/API   |
+-------------+                      +------------+
      |
      v
+------+------+------+
|Worker|Worker|Worker|
+------+------+------+

기본 구조

pipelines diagram 1

DAG 작성

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup

# 기본 설정
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['team@example.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

# DAG 정의
with DAG(
    dag_id='data_processing_pipeline',
    default_args=default_args,
    description='일일 데이터 처리 파이프라인',
    schedule_interval='@daily',  # 또는 '0 2 * * *'
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['production', 'etl'],
    max_active_runs=1,  # 동시 실행 제한
) as dag:

    # Task 1: 데이터 추출
    def extract_data(**context):
        execution_date = context['execution_date']
        print(f"Extracting data for {execution_date}")
        return {'records': 10000}

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    # Task 2: 데이터 변환
    def transform_data(**context):
        ti = context['ti']
        extract_result = ti.xcom_pull(task_ids='extract')
        print(f"Transforming {extract_result['records']} records")
        return {'processed': 8000}

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )

    # Task 3: 품질 검증
    validate = BashOperator(
        task_id='validate',
        bash_command='python /scripts/validate_data.py {{ ds }}',
    )

    # Task 4: 적재
    def load_data(**context):
        ti = context['ti']
        transform_result = ti.xcom_pull(task_ids='transform')
        print(f"Loading {transform_result['processed']} records")

    load = PythonOperator(
        task_id='load',
        python_callable=load_data,
    )

    # 의존성 정의
    extract >> transform >> validate >> load

TaskFlow API (권장)

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={'retries': 3}
)
def modern_pipeline():

    @task
    def extract() -> dict:
        """데이터 추출"""
        return {'data': [1, 2, 3], 'count': 3}

    @task
    def transform(raw_data: dict) -> dict:
        """데이터 변환"""
        processed = [x * 2 for x in raw_data['data']]
        return {'data': processed, 'count': len(processed)}

    @task
    def load(processed_data: dict):
        """데이터 적재"""
        print(f"Loading {processed_data['count']} records")

    # 실행 순서는 데이터 의존성으로 자동 결정
    raw = extract()
    processed = transform(raw)
    load(processed)

# DAG 인스턴스화
pipeline = modern_pipeline()

Task Group (병렬 처리)

with DAG(...) as dag:

    extract = PythonOperator(task_id='extract', ...)

    # 병렬 처리 그룹
    with TaskGroup(group_id='parallel_processing') as process_group:

        process_text = PythonOperator(
            task_id='process_text',
            python_callable=process_text_data,
        )

        process_images = PythonOperator(
            task_id='process_images',
            python_callable=process_image_data,
        )

        process_metadata = PythonOperator(
            task_id='process_metadata',
            python_callable=process_metadata,
        )

    merge = PythonOperator(task_id='merge', ...)

    extract >> process_group >> merge

동적 Task 생성

from airflow.decorators import task, dag

@dag(schedule_interval='@daily', start_date=datetime(2024, 1, 1))
def dynamic_pipeline():

    @task
    def get_data_sources():
        return ['source_a', 'source_b', 'source_c']

    @task
    def process_source(source: str):
        print(f"Processing {source}")
        return f"{source}_processed"

    @task
    def combine(results: list):
        print(f"Combining {len(results)} results")

    sources = get_data_sources()
    processed = process_source.expand(source=sources)  # 동적 확장
    combine(processed)

pipeline = dynamic_pipeline()

Sensor (대기 Task)

외부 조건을 기다리는 Task.

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.sql import SqlSensor

# 파일 존재 확인
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/data/raw/{{ ds }}/data.parquet',
    poke_interval=300,  # 5분마다 확인
    timeout=3600,       # 1시간 타임아웃
    mode='reschedule',  # 대기 중 슬롯 반환
)

# 다른 DAG 완료 대기
wait_for_upstream = ExternalTaskSensor(
    task_id='wait_for_upstream',
    external_dag_id='upstream_pipeline',
    external_task_id='final_task',
    execution_delta=timedelta(hours=1),
    mode='reschedule',
)

# S3 파일 대기
wait_for_s3 = S3KeySensor(
    task_id='wait_for_s3',
    bucket_name='my-bucket',
    bucket_key='data/{{ ds }}/complete.txt',
    aws_conn_id='aws_default',
)

# SQL 조건 대기
wait_for_data = SqlSensor(
    task_id='wait_for_data',
    conn_id='postgres_default',
    sql="SELECT COUNT(*) FROM staging WHERE date = '{{ ds }}'",
    success=lambda x: x[0][0] > 0,
)

Executor 선택

Executor 특징 사용 사례
LocalExecutor 단일 머신, 멀티프로세스 개발, 소규모
CeleryExecutor 분산, Celery 워커 중대규모 프로덕션
KubernetesExecutor K8s Pod per Task 클라우드 네이티브
CeleryKubernetesExecutor Celery + K8s 하이브리드 대규모, 유연성
# airflow.cfg
[core]
executor = CeleryExecutor

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16

Prefect

현대적이고 Pythonic한 워크플로우 오케스트레이션.

기본 Flow

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

# Task 정의 (캐싱 포함)
@task(
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
def extract(source: str) -> dict:
    print(f"Extracting from {source}")
    return {'data': [...], 'count': 1000}

@task(log_prints=True)
def transform(data: dict) -> dict:
    print(f"Transforming {data['count']} records")
    return {'processed': data['data'], 'count': 800}

@task
def load(data: dict, destination: str):
    print(f"Loading {data['count']} records to {destination}")

# Flow 정의
@flow(name="Data Pipeline", log_prints=True)
def data_pipeline(source: str, destination: str):
    # 실행 순서는 데이터 의존성으로 자동 결정
    raw_data = extract(source)
    processed = transform(raw_data)
    load(processed, destination)

# 실행
if __name__ == "__main__":
    data_pipeline(source="s3://bucket/raw", destination="s3://bucket/processed")

병렬 실행

from prefect import flow, task
from prefect.futures import wait

@task
def process_chunk(chunk_id: int, data: list) -> dict:
    return {'chunk_id': chunk_id, 'processed': len(data)}

@flow
def parallel_processing():
    chunks = [list(range(i*100, (i+1)*100)) for i in range(10)]

    # 병렬 실행 (.submit)
    futures = []
    for i, chunk in enumerate(chunks):
        future = process_chunk.submit(chunk_id=i, data=chunk)
        futures.append(future)

    # 모든 작업 완료 대기
    wait(futures)
    results = [f.result() for f in futures]

    return results

# 또는 .map 사용
@flow
def parallel_with_map():
    chunks = [list(range(i*100, (i+1)*100)) for i in range(10)]
    chunk_ids = list(range(10))

    results = process_chunk.map(chunk_id=chunk_ids, data=chunks)
    return results

스케줄링 (Deployment)

from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

@flow
def scheduled_pipeline():
    print("Running scheduled pipeline")

# 배포 생성
deployment = Deployment.build_from_flow(
    flow=scheduled_pipeline,
    name="daily-pipeline",
    schedule=CronSchedule(cron="0 2 * * *", timezone="Asia/Seoul"),
    work_queue_name="default",
    tags=["production"],
)

deployment.apply()

Prefect Cloud vs Self-hosted

항목 Prefect Cloud Self-hosted
설정 즉시 사용 직접 구축
비용 사용량 기반 인프라 비용
관리 관리형 직접 관리
보안 클라우드 온프레미스 가능

Dagster

데이터 자산(Asset) 중심의 오케스트레이션.

Software-Defined Asset

from dagster import asset, Definitions, AssetExecutionContext
import pandas as pd

# 데이터 자산 정의
@asset(group_name="raw_data")
def raw_orders() -> pd.DataFrame:
    """원본 주문 데이터"""
    return pd.read_parquet("s3://bucket/raw/orders.parquet")

@asset(group_name="processed")
def cleaned_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
    """정제된 주문 데이터"""
    df = raw_orders.copy()
    df = df.dropna(subset=['order_id', 'amount'])
    df['amount'] = df['amount'].clip(lower=0)
    return df

@asset(group_name="analytics")
def daily_revenue(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
    """일별 매출"""
    return cleaned_orders.groupby('date').agg({
        'amount': 'sum',
        'order_id': 'count'
    }).rename(columns={'order_id': 'order_count'})

# 정의 모음
defs = Definitions(
    assets=[raw_orders, cleaned_orders, daily_revenue]
)

리소스 (환경별 설정)

from dagster import asset, Definitions, ConfigurableResource
import boto3

class S3Resource(ConfigurableResource):
    bucket: str
    region: str = "ap-northeast-2"

    def get_client(self):
        return boto3.client('s3', region_name=self.region)

    def read_parquet(self, key: str):
        # S3에서 파일 읽기 로직
        pass

@asset
def data_from_s3(context, s3: S3Resource) -> pd.DataFrame:
    return s3.read_parquet("raw/data.parquet")

# 환경별 리소스 설정
dev_defs = Definitions(
    assets=[data_from_s3],
    resources={"s3": S3Resource(bucket="dev-bucket")}
)

prod_defs = Definitions(
    assets=[data_from_s3],
    resources={"s3": S3Resource(bucket="prod-bucket")}
)

파티션 (시간 기반)

from dagster import asset, DailyPartitionsDefinition, AssetExecutionContext

daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=daily_partitions)
def daily_logs(context: AssetExecutionContext) -> pd.DataFrame:
    partition_date = context.partition_key
    return pd.read_parquet(f"s3://bucket/logs/{partition_date}/")

@asset(partitions_def=daily_partitions)
def daily_metrics(context: AssetExecutionContext, daily_logs: pd.DataFrame) -> dict:
    return {
        'date': context.partition_key,
        'total_requests': len(daily_logs),
        'avg_latency': daily_logs['latency'].mean()
    }

장애 대응 패턴

재시도 전략

# Airflow
default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=60),
}

# Prefect
@task(
    retries=3,
    retry_delay_seconds=[60, 300, 900],  # 1분, 5분, 15분
)
def task_with_backoff():
    pass

# 커스텀 재시도 로직
def should_retry(exception):
    """재시도 가능한 예외인지 판단"""
    retryable_errors = (
        ConnectionError,
        TimeoutError,
        TransientDBError,
    )
    return isinstance(exception, retryable_errors)

멱등성 (Idempotency)

같은 입력으로 여러 번 실행해도 같은 결과.

def idempotent_load(df, table_name, partition_date, conn):
    """멱등성 적재: 재실행해도 안전"""
    with conn.begin():
        # 1. 기존 파티션 데이터 삭제
        conn.execute(f"""
            DELETE FROM {table_name} 
            WHERE partition_date = '{partition_date}'
        """)

        # 2. 새 데이터 삽입
        df['partition_date'] = partition_date
        df.to_sql(table_name, conn, if_exists='append', index=False)

# 파일 기반 멱등성
def idempotent_file_write(df, output_path):
    """임시 파일로 쓰고 atomic rename"""
    import tempfile
    import shutil

    # 임시 파일에 쓰기
    with tempfile.NamedTemporaryFile(delete=False) as tmp:
        df.to_parquet(tmp.name)
        tmp_path = tmp.name

    # atomic rename (같은 파일시스템 내에서)
    shutil.move(tmp_path, output_path)

데드레터 큐 (DLQ)

처리 실패한 레코드를 별도 저장.

def process_with_dlq(records, dlq_path):
    """실패 레코드를 DLQ로 분리"""
    successful = []
    failed = []

    for record in records:
        try:
            result = process_record(record)
            successful.append(result)
        except Exception as e:
            failed.append({
                'record': record,
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            })

    # 실패 레코드 저장
    if failed:
        pd.DataFrame(failed).to_json(
            f"{dlq_path}/failed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
            orient='records'
        )

    return successful

Circuit Breaker

연속 실패 시 빠른 실패.

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = None
        self.state = 'closed'  # closed, open, half-open

    def call(self, func, *args, **kwargs):
        if self.state == 'open':
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = 'half-open'
            else:
                raise CircuitOpenError("Circuit is open")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0
        self.state = 'closed'

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = 'open'

# 사용
circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=300)

@task
def call_external_api():
    return circuit.call(requests.get, 'https://api.example.com/data')

장애 감지 및 알림

from airflow.operators.python import PythonOperator
from airflow.hooks.base import BaseHook
import requests

def send_slack_alert(context):
    """실패 시 Slack 알림"""
    slack_webhook = BaseHook.get_connection('slack').password

    message = {
        'text': f":x: Task Failed",
        'attachments': [{
            'color': 'danger',
            'fields': [
                {'title': 'DAG', 'value': context['dag'].dag_id, 'short': True},
                {'title': 'Task', 'value': context['task'].task_id, 'short': True},
                {'title': 'Execution Date', 'value': str(context['execution_date'])},
                {'title': 'Log URL', 'value': context['task_instance'].log_url}
            ]
        }]
    }

    requests.post(slack_webhook, json=message)

def send_success_notification(context):
    """성공 시 알림 (선택적)"""
    # 중요한 파이프라인만
    pass

# Task에 적용
task = PythonOperator(
    task_id='critical_task',
    python_callable=my_function,
    on_failure_callback=send_slack_alert,
    on_success_callback=send_success_notification,
)

운영 팁

DAG 설계 원칙

  1. 작은 단위로 분리: 하나의 Task는 하나의 일만
  2. 멱등성 보장: 재실행해도 안전하게
  3. 명시적 의존성: 암묵적 순서 금지
  4. 적절한 타임아웃: 무한 대기 방지
  5. 로깅과 메트릭: 디버깅 가능하게

성능 최적화

# 1. Sensor mode 설정
sensor = FileSensor(
    task_id='wait_file',
    mode='reschedule',  # 대기 중 슬롯 반환 (권장)
    # mode='poke',      # 슬롯 점유 (기본값)
)

# 2. Pool로 동시성 제한
task = PythonOperator(
    task_id='db_heavy_task',
    pool='db_pool',  # 미리 정의된 Pool
    pool_slots=2,    # 사용할 슬롯 수
)

# 3. Priority Weight
high_priority_task = PythonOperator(
    task_id='important',
    priority_weight=10,  # 높은 우선순위
)

# 4. XCom 크기 제한 (대용량 데이터는 외부 저장소 사용)
@task
def extract():
    df = load_large_data()
    # XCom에 데이터 직접 전달하지 않고 경로만 전달
    path = save_to_s3(df)
    return {'path': path, 'count': len(df)}

테스트

# tests/test_dags.py
import pytest
from airflow.models import DagBag

def test_dag_loading():
    """모든 DAG이 오류 없이 로드되는지 확인"""
    dag_bag = DagBag(dag_folder='dags/', include_examples=False)

    assert len(dag_bag.import_errors) == 0, \
        f"DAG import errors: {dag_bag.import_errors}"

def test_dag_structure():
    """DAG 구조 검증"""
    dag_bag = DagBag(dag_folder='dags/', include_examples=False)

    for dag_id, dag in dag_bag.dags.items():
        # 모든 Task에 owner 설정
        for task in dag.tasks:
            assert task.owner, f"{dag_id}.{task.task_id} has no owner"

        # 시작 날짜 확인
        assert dag.start_date, f"{dag_id} has no start_date"

        # Cycle 없음 확인 (DAG이므로)
        assert dag.is_paused is not None

def test_task_logic():
    """Task 로직 단위 테스트"""
    from dags.my_pipeline import transform_data

    input_data = {'records': [1, 2, 3]}
    result = transform_data(input_data)

    assert result['count'] == 3
    assert all(x > 0 for x in result['processed'])

CI/CD

# .github/workflows/deploy_dags.yml
name: Deploy DAGs

on:
  push:
    branches: [main]
    paths:
      - 'dags/**'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'

      - name: Install dependencies
        run: pip install apache-airflow pytest

      - name: Test DAGs
        run: |
          pytest tests/test_dags.py -v
          python -c "from dags import *"  # Import test

  deploy:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Sync to S3
        run: |
          aws s3 sync dags/ s3://${{ secrets.DAG_BUCKET }}/dags/

참고 자료