데이터 파이프라인 (Data Pipelines)¶
데이터 처리 작업을 자동화하고 스케줄링하는 워크플로우 시스템. 복잡한 의존성과 재시도 로직을 관리함.
파이프라인 오케스트레이션¶
핵심 개념¶
| 개념 | 설명 |
|---|---|
| DAG (Directed Acyclic Graph) | 작업 간 의존성을 표현하는 비순환 그래프 |
| Task | 파이프라인의 개별 작업 단위 |
| Operator | 작업 유형 (Python, SQL, Bash 등) |
| Scheduler | 작업 실행 시점 결정 |
| Executor | 실제 작업 실행 (Local, Celery, K8s) |
| Trigger | 파이프라인 시작 조건 |
오케스트레이션 도구 비교¶
| 도구 | 장점 | 단점 | 적합한 상황 |
|---|---|---|---|
| Apache Airflow | 성숙함, 커뮤니티, 플러그인 | 복잡한 설정, 무거움 | 대규모 프로덕션 |
| Prefect | Pythonic API, 유연함 | 상대적으로 새로움 | 클라우드 네이티브 |
| Dagster | 자산 중심, 테스트 용이 | 학습 곡선 | 데이터 플랫폼 |
| Luigi | 단순함, 가벼움 | 제한된 기능 | 작은 프로젝트 |
| Argo Workflows | K8s 네이티브, 컨테이너 | K8s 필수 | K8s 환경 |
도구 선택 가이드¶
어떤 도구가 "최고"인지가 아니라, 상황에 맞는 도구를 선택해야 함.
Airflow를 선택해야 할 때¶
| 상황 | 이유 |
|---|---|
| 팀 규모가 크고 다양한 기술 스택 | 풍부한 Operator, 커뮤니티 지원 |
| 복잡한 의존성 관리 필요 | 강력한 DAG 관리, 재시도 로직 |
| 기존 Hadoop/Spark 인프라 | Hive, Spark, HDFS Operator |
| 스케줄링이 핵심 요구사항 | 강력한 스케줄러, Backfill 지원 |
| 온프레미스 또는 하이브리드 | 자체 호스팅 가능 |
Airflow 적합 체크리스트:
[x] 팀에 10명 이상의 데이터 엔지니어
[x] DAG이 50개 이상
[x] Spark, Hive 등 빅데이터 도구 사용
[x] 복잡한 스케줄링 요구 (backfill, catchup)
Prefect를 선택해야 할 때¶
| 상황 | 이유 |
|---|---|
| Python 중심 팀 | Pythonic API, 데코레이터 기반 |
| 클라우드 네이티브 환경 | Prefect Cloud, 간편한 설정 |
| 빠른 개발 사이클 | 로컬 테스트 용이, 적은 보일러플레이트 |
| 동적 워크플로우 | 런타임에 Task 생성 용이 |
| 관리 오버헤드 최소화 | Prefect Cloud 사용 시 |
Dagster를 선택해야 할 때¶
| 상황 | 이유 |
|---|---|
| 데이터 품질이 핵심 | 자산 중심 설계, 테스트 내장 |
| ML 파이프라인 | 모델 재훈련, Feature Store 통합 |
| 데이터 계보(Lineage) 중요 | 자동 Lineage 추적 |
| 개발/프로덕션 환경 분리 | 리소스 추상화, 환경별 설정 |
마이그레이션 고려사항¶
기존 도구에서 새 도구로 이전할 때:
난이도
Luigi -> Airflow 낮음 (유사한 개념)
Airflow -> Prefect 중간 (재작성 필요하나 개념 유사)
Airflow -> Dagster 높음 (패러다임 변경)
Apache Airflow¶
아키텍처¶
+-------------+ +-----------+ +------------+
| Scheduler | --> | Metadata | <-- | Webserver |
+-------------+ | DB | +------------+
| +-----------+ |
| |
v v
+-------------+ +------------+
| Executor | | UI/API |
+-------------+ +------------+
|
v
+------+------+------+
|Worker|Worker|Worker|
+------+------+------+
기본 구조¶
DAG 작성¶
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
# 기본 설정
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['team@example.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
# DAG 정의
with DAG(
dag_id='data_processing_pipeline',
default_args=default_args,
description='일일 데이터 처리 파이프라인',
schedule_interval='@daily', # 또는 '0 2 * * *'
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['production', 'etl'],
max_active_runs=1, # 동시 실행 제한
) as dag:
# Task 1: 데이터 추출
def extract_data(**context):
execution_date = context['execution_date']
print(f"Extracting data for {execution_date}")
return {'records': 10000}
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
# Task 2: 데이터 변환
def transform_data(**context):
ti = context['ti']
extract_result = ti.xcom_pull(task_ids='extract')
print(f"Transforming {extract_result['records']} records")
return {'processed': 8000}
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
# Task 3: 품질 검증
validate = BashOperator(
task_id='validate',
bash_command='python /scripts/validate_data.py {{ ds }}',
)
# Task 4: 적재
def load_data(**context):
ti = context['ti']
transform_result = ti.xcom_pull(task_ids='transform')
print(f"Loading {transform_result['processed']} records")
load = PythonOperator(
task_id='load',
python_callable=load_data,
)
# 의존성 정의
extract >> transform >> validate >> load
TaskFlow API (권장)¶
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={'retries': 3}
)
def modern_pipeline():
@task
def extract() -> dict:
"""데이터 추출"""
return {'data': [1, 2, 3], 'count': 3}
@task
def transform(raw_data: dict) -> dict:
"""데이터 변환"""
processed = [x * 2 for x in raw_data['data']]
return {'data': processed, 'count': len(processed)}
@task
def load(processed_data: dict):
"""데이터 적재"""
print(f"Loading {processed_data['count']} records")
# 실행 순서는 데이터 의존성으로 자동 결정
raw = extract()
processed = transform(raw)
load(processed)
# DAG 인스턴스화
pipeline = modern_pipeline()
Task Group (병렬 처리)¶
with DAG(...) as dag:
extract = PythonOperator(task_id='extract', ...)
# 병렬 처리 그룹
with TaskGroup(group_id='parallel_processing') as process_group:
process_text = PythonOperator(
task_id='process_text',
python_callable=process_text_data,
)
process_images = PythonOperator(
task_id='process_images',
python_callable=process_image_data,
)
process_metadata = PythonOperator(
task_id='process_metadata',
python_callable=process_metadata,
)
merge = PythonOperator(task_id='merge', ...)
extract >> process_group >> merge
동적 Task 생성¶
from airflow.decorators import task, dag
@dag(schedule_interval='@daily', start_date=datetime(2024, 1, 1))
def dynamic_pipeline():
@task
def get_data_sources():
return ['source_a', 'source_b', 'source_c']
@task
def process_source(source: str):
print(f"Processing {source}")
return f"{source}_processed"
@task
def combine(results: list):
print(f"Combining {len(results)} results")
sources = get_data_sources()
processed = process_source.expand(source=sources) # 동적 확장
combine(processed)
pipeline = dynamic_pipeline()
Sensor (대기 Task)¶
외부 조건을 기다리는 Task.
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.sql import SqlSensor
# 파일 존재 확인
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/raw/{{ ds }}/data.parquet',
poke_interval=300, # 5분마다 확인
timeout=3600, # 1시간 타임아웃
mode='reschedule', # 대기 중 슬롯 반환
)
# 다른 DAG 완료 대기
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='upstream_pipeline',
external_task_id='final_task',
execution_delta=timedelta(hours=1),
mode='reschedule',
)
# S3 파일 대기
wait_for_s3 = S3KeySensor(
task_id='wait_for_s3',
bucket_name='my-bucket',
bucket_key='data/{{ ds }}/complete.txt',
aws_conn_id='aws_default',
)
# SQL 조건 대기
wait_for_data = SqlSensor(
task_id='wait_for_data',
conn_id='postgres_default',
sql="SELECT COUNT(*) FROM staging WHERE date = '{{ ds }}'",
success=lambda x: x[0][0] > 0,
)
Executor 선택¶
| Executor | 특징 | 사용 사례 |
|---|---|---|
| LocalExecutor | 단일 머신, 멀티프로세스 | 개발, 소규모 |
| CeleryExecutor | 분산, Celery 워커 | 중대규모 프로덕션 |
| KubernetesExecutor | K8s Pod per Task | 클라우드 네이티브 |
| CeleryKubernetesExecutor | Celery + K8s 하이브리드 | 대규모, 유연성 |
# airflow.cfg
[core]
executor = CeleryExecutor
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
Prefect¶
현대적이고 Pythonic한 워크플로우 오케스트레이션.
기본 Flow¶
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
# Task 정의 (캐싱 포함)
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
def extract(source: str) -> dict:
print(f"Extracting from {source}")
return {'data': [...], 'count': 1000}
@task(log_prints=True)
def transform(data: dict) -> dict:
print(f"Transforming {data['count']} records")
return {'processed': data['data'], 'count': 800}
@task
def load(data: dict, destination: str):
print(f"Loading {data['count']} records to {destination}")
# Flow 정의
@flow(name="Data Pipeline", log_prints=True)
def data_pipeline(source: str, destination: str):
# 실행 순서는 데이터 의존성으로 자동 결정
raw_data = extract(source)
processed = transform(raw_data)
load(processed, destination)
# 실행
if __name__ == "__main__":
data_pipeline(source="s3://bucket/raw", destination="s3://bucket/processed")
병렬 실행¶
from prefect import flow, task
from prefect.futures import wait
@task
def process_chunk(chunk_id: int, data: list) -> dict:
return {'chunk_id': chunk_id, 'processed': len(data)}
@flow
def parallel_processing():
chunks = [list(range(i*100, (i+1)*100)) for i in range(10)]
# 병렬 실행 (.submit)
futures = []
for i, chunk in enumerate(chunks):
future = process_chunk.submit(chunk_id=i, data=chunk)
futures.append(future)
# 모든 작업 완료 대기
wait(futures)
results = [f.result() for f in futures]
return results
# 또는 .map 사용
@flow
def parallel_with_map():
chunks = [list(range(i*100, (i+1)*100)) for i in range(10)]
chunk_ids = list(range(10))
results = process_chunk.map(chunk_id=chunk_ids, data=chunks)
return results
스케줄링 (Deployment)¶
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
@flow
def scheduled_pipeline():
print("Running scheduled pipeline")
# 배포 생성
deployment = Deployment.build_from_flow(
flow=scheduled_pipeline,
name="daily-pipeline",
schedule=CronSchedule(cron="0 2 * * *", timezone="Asia/Seoul"),
work_queue_name="default",
tags=["production"],
)
deployment.apply()
Prefect Cloud vs Self-hosted¶
| 항목 | Prefect Cloud | Self-hosted |
|---|---|---|
| 설정 | 즉시 사용 | 직접 구축 |
| 비용 | 사용량 기반 | 인프라 비용 |
| 관리 | 관리형 | 직접 관리 |
| 보안 | 클라우드 | 온프레미스 가능 |
Dagster¶
데이터 자산(Asset) 중심의 오케스트레이션.
Software-Defined Asset¶
from dagster import asset, Definitions, AssetExecutionContext
import pandas as pd
# 데이터 자산 정의
@asset(group_name="raw_data")
def raw_orders() -> pd.DataFrame:
"""원본 주문 데이터"""
return pd.read_parquet("s3://bucket/raw/orders.parquet")
@asset(group_name="processed")
def cleaned_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
"""정제된 주문 데이터"""
df = raw_orders.copy()
df = df.dropna(subset=['order_id', 'amount'])
df['amount'] = df['amount'].clip(lower=0)
return df
@asset(group_name="analytics")
def daily_revenue(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
"""일별 매출"""
return cleaned_orders.groupby('date').agg({
'amount': 'sum',
'order_id': 'count'
}).rename(columns={'order_id': 'order_count'})
# 정의 모음
defs = Definitions(
assets=[raw_orders, cleaned_orders, daily_revenue]
)
리소스 (환경별 설정)¶
from dagster import asset, Definitions, ConfigurableResource
import boto3
class S3Resource(ConfigurableResource):
bucket: str
region: str = "ap-northeast-2"
def get_client(self):
return boto3.client('s3', region_name=self.region)
def read_parquet(self, key: str):
# S3에서 파일 읽기 로직
pass
@asset
def data_from_s3(context, s3: S3Resource) -> pd.DataFrame:
return s3.read_parquet("raw/data.parquet")
# 환경별 리소스 설정
dev_defs = Definitions(
assets=[data_from_s3],
resources={"s3": S3Resource(bucket="dev-bucket")}
)
prod_defs = Definitions(
assets=[data_from_s3],
resources={"s3": S3Resource(bucket="prod-bucket")}
)
파티션 (시간 기반)¶
from dagster import asset, DailyPartitionsDefinition, AssetExecutionContext
daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")
@asset(partitions_def=daily_partitions)
def daily_logs(context: AssetExecutionContext) -> pd.DataFrame:
partition_date = context.partition_key
return pd.read_parquet(f"s3://bucket/logs/{partition_date}/")
@asset(partitions_def=daily_partitions)
def daily_metrics(context: AssetExecutionContext, daily_logs: pd.DataFrame) -> dict:
return {
'date': context.partition_key,
'total_requests': len(daily_logs),
'avg_latency': daily_logs['latency'].mean()
}
장애 대응 패턴¶
재시도 전략¶
# Airflow
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=60),
}
# Prefect
@task(
retries=3,
retry_delay_seconds=[60, 300, 900], # 1분, 5분, 15분
)
def task_with_backoff():
pass
# 커스텀 재시도 로직
def should_retry(exception):
"""재시도 가능한 예외인지 판단"""
retryable_errors = (
ConnectionError,
TimeoutError,
TransientDBError,
)
return isinstance(exception, retryable_errors)
멱등성 (Idempotency)¶
같은 입력으로 여러 번 실행해도 같은 결과.
def idempotent_load(df, table_name, partition_date, conn):
"""멱등성 적재: 재실행해도 안전"""
with conn.begin():
# 1. 기존 파티션 데이터 삭제
conn.execute(f"""
DELETE FROM {table_name}
WHERE partition_date = '{partition_date}'
""")
# 2. 새 데이터 삽입
df['partition_date'] = partition_date
df.to_sql(table_name, conn, if_exists='append', index=False)
# 파일 기반 멱등성
def idempotent_file_write(df, output_path):
"""임시 파일로 쓰고 atomic rename"""
import tempfile
import shutil
# 임시 파일에 쓰기
with tempfile.NamedTemporaryFile(delete=False) as tmp:
df.to_parquet(tmp.name)
tmp_path = tmp.name
# atomic rename (같은 파일시스템 내에서)
shutil.move(tmp_path, output_path)
데드레터 큐 (DLQ)¶
처리 실패한 레코드를 별도 저장.
def process_with_dlq(records, dlq_path):
"""실패 레코드를 DLQ로 분리"""
successful = []
failed = []
for record in records:
try:
result = process_record(record)
successful.append(result)
except Exception as e:
failed.append({
'record': record,
'error': str(e),
'timestamp': datetime.now().isoformat()
})
# 실패 레코드 저장
if failed:
pd.DataFrame(failed).to_json(
f"{dlq_path}/failed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
orient='records'
)
return successful
Circuit Breaker¶
연속 실패 시 빠른 실패.
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None
self.state = 'closed' # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == 'open':
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = 'half-open'
else:
raise CircuitOpenError("Circuit is open")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
self.state = 'closed'
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'open'
# 사용
circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=300)
@task
def call_external_api():
return circuit.call(requests.get, 'https://api.example.com/data')
장애 감지 및 알림¶
from airflow.operators.python import PythonOperator
from airflow.hooks.base import BaseHook
import requests
def send_slack_alert(context):
"""실패 시 Slack 알림"""
slack_webhook = BaseHook.get_connection('slack').password
message = {
'text': f":x: Task Failed",
'attachments': [{
'color': 'danger',
'fields': [
{'title': 'DAG', 'value': context['dag'].dag_id, 'short': True},
{'title': 'Task', 'value': context['task'].task_id, 'short': True},
{'title': 'Execution Date', 'value': str(context['execution_date'])},
{'title': 'Log URL', 'value': context['task_instance'].log_url}
]
}]
}
requests.post(slack_webhook, json=message)
def send_success_notification(context):
"""성공 시 알림 (선택적)"""
# 중요한 파이프라인만
pass
# Task에 적용
task = PythonOperator(
task_id='critical_task',
python_callable=my_function,
on_failure_callback=send_slack_alert,
on_success_callback=send_success_notification,
)
운영 팁¶
DAG 설계 원칙¶
- 작은 단위로 분리: 하나의 Task는 하나의 일만
- 멱등성 보장: 재실행해도 안전하게
- 명시적 의존성: 암묵적 순서 금지
- 적절한 타임아웃: 무한 대기 방지
- 로깅과 메트릭: 디버깅 가능하게
성능 최적화¶
# 1. Sensor mode 설정
sensor = FileSensor(
task_id='wait_file',
mode='reschedule', # 대기 중 슬롯 반환 (권장)
# mode='poke', # 슬롯 점유 (기본값)
)
# 2. Pool로 동시성 제한
task = PythonOperator(
task_id='db_heavy_task',
pool='db_pool', # 미리 정의된 Pool
pool_slots=2, # 사용할 슬롯 수
)
# 3. Priority Weight
high_priority_task = PythonOperator(
task_id='important',
priority_weight=10, # 높은 우선순위
)
# 4. XCom 크기 제한 (대용량 데이터는 외부 저장소 사용)
@task
def extract():
df = load_large_data()
# XCom에 데이터 직접 전달하지 않고 경로만 전달
path = save_to_s3(df)
return {'path': path, 'count': len(df)}
테스트¶
# tests/test_dags.py
import pytest
from airflow.models import DagBag
def test_dag_loading():
"""모든 DAG이 오류 없이 로드되는지 확인"""
dag_bag = DagBag(dag_folder='dags/', include_examples=False)
assert len(dag_bag.import_errors) == 0, \
f"DAG import errors: {dag_bag.import_errors}"
def test_dag_structure():
"""DAG 구조 검증"""
dag_bag = DagBag(dag_folder='dags/', include_examples=False)
for dag_id, dag in dag_bag.dags.items():
# 모든 Task에 owner 설정
for task in dag.tasks:
assert task.owner, f"{dag_id}.{task.task_id} has no owner"
# 시작 날짜 확인
assert dag.start_date, f"{dag_id} has no start_date"
# Cycle 없음 확인 (DAG이므로)
assert dag.is_paused is not None
def test_task_logic():
"""Task 로직 단위 테스트"""
from dags.my_pipeline import transform_data
input_data = {'records': [1, 2, 3]}
result = transform_data(input_data)
assert result['count'] == 3
assert all(x > 0 for x in result['processed'])
CI/CD¶
# .github/workflows/deploy_dags.yml
name: Deploy DAGs
on:
push:
branches: [main]
paths:
- 'dags/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: pip install apache-airflow pytest
- name: Test DAGs
run: |
pytest tests/test_dags.py -v
python -c "from dags import *" # Import test
deploy:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Sync to S3
run: |
aws s3 sync dags/ s3://${{ secrets.DAG_BUCKET }}/dags/
참고 자료¶
- Apache Airflow Documentation
- Prefect Documentation
- Dagster Documentation
- Data Pipelines Pocket Reference - James Densmore
- Fundamentals of Data Engineering - Joe Reis, Matt Housley