콘텐츠로 이동
Data Prep
상세

ETL과 ELT

데이터를 소스에서 목적지로 이동하고 변환하는 프로세스. 데이터 파이프라인의 핵심 패턴.

ETL vs ELT

ETL (Extract, Transform, Load)

Source --> Extract --> Transform --> Load --> Data Warehouse
                          |
                  중간 서버에서 변환
단계 설명
Extract 소스에서 데이터 추출
Transform 중간 시스템에서 변환/정제
Load 목적지에 적재

ELT (Extract, Load, Transform)

Source --> Extract --> Load --> Data Warehouse --> Transform
                                     |
                               웨어하우스에서 변환
단계 설명
Extract 소스에서 데이터 추출
Load 웨어하우스에 raw 데이터 적재
Transform 웨어하우스 내에서 변환

비교

특성 ETL ELT
변환 위치 중간 서버 목적지
확장성 제한적 높음
데이터 양 중소규모 대규모
원본 보존 어려움 용이
비용 서버 비용 컴퓨팅 비용
도구 Informatica, Talend dbt, Spark
유연성 낮음 (사전 정의) 높음 (사후 변환)
디버깅 어려움 (원본 손실) 용이 (원본 보존)

판단 기준: ETL vs ELT 선택

올바른 선택은 "어떤 게 더 좋은가"가 아니라 상황에 맞는 것.

ETL을 선택해야 할 때

상황 이유
데이터 규모가 작음 (GB 단위) 중간 서버로 충분히 처리 가능
변환 로직이 고정됨 리포트 요구사항이 명확하고 변하지 않음
보안/컴플라이언스 요구 PII를 목적지에 보내기 전 마스킹 필수
네트워크 비용 최적화 압축/필터링 후 전송으로 비용 절감
레거시 시스템 기존 인프라가 ETL 기반
# ETL이 적합한 예: PII 마스킹 후 적재
def etl_with_pii_masking(source_df):
    # Transform: 반드시 적재 전에 마스킹
    df = source_df.copy()
    df['email'] = df['email'].apply(lambda x: hash_pii(x))
    df['phone'] = df['phone'].apply(lambda x: '***-****-' + x[-4:])

    # Load: 마스킹된 데이터만 웨어하우스에 적재
    load_to_warehouse(df)

ELT를 선택해야 할 때

상황 이유
데이터 규모가 큼 (TB+ 단위) 클라우드 웨어하우스의 컴퓨팅 파워 활용
요구사항이 자주 변함 원본 데이터 보존으로 새로운 분석 가능
다양한 분석 니즈 같은 데이터를 여러 방식으로 변환
클라우드 네이티브 환경 BigQuery, Snowflake 등 사용
데이터 레이크 구축 스키마 없이 먼저 저장
-- ELT가 적합한 예: dbt로 웨어하우스 내 변환
-- models/marts/user_metrics.sql
{{ config(materialized='table') }}

WITH raw_events AS (
    SELECT * FROM {{ source('raw', 'user_events') }}
),
aggregated AS (
    SELECT
        user_id,
        DATE_TRUNC('day', event_time) as event_date,
        COUNT(*) as event_count,
        COUNT(DISTINCT session_id) as session_count
    FROM raw_events
    GROUP BY 1, 2
)
SELECT * FROM aggregated

하이브리드 접근법

실무에서는 ETL과 ELT를 혼합해서 사용하는 경우가 많다.

Source --> Extract --> [Light Transform] --> Load --> [Heavy Transform]
                             |                              |
                    PII 마스킹, 포맷 정규화            집계, 조인, 파생
# 하이브리드 예시
def hybrid_pipeline(source):
    # Extract
    raw_data = extract_from_source(source)

    # Light Transform (ETL 부분): 보안상 필수 변환
    data = mask_pii(raw_data)
    data = normalize_timestamps(data)  # 타임존 통일

    # Load
    load_to_warehouse(data, table='raw_masked')

    # Heavy Transform (ELT 부분): dbt 또는 SQL로 처리
    # 이 부분은 웨어하우스에서 실행

Extract (추출)

추출 패턴

패턴 설명 사용 시점 주의사항
Full Extraction 매번 전체 데이터 추출 작은 테이블, 초기 로드 소스 부하
Incremental 변경분만 추출 대용량, timestamp 컬럼 존재 삭제 감지 어려움
CDC DB 변경 로그 캡처 실시간 동기화 필요 설정 복잡
# 전체 추출 (Full Extraction)
def full_extraction(source):
    return source.read_all()

# 증분 추출 (Incremental Extraction)
def incremental_extraction(source, last_timestamp):
    return source.read_where(f"updated_at > {last_timestamp}")

# 증분 추출 주의: 삭제된 레코드 감지
def incremental_with_soft_delete(source, last_timestamp):
    # deleted_at 컬럼 활용
    return source.read_where(
        f"updated_at > {last_timestamp} OR deleted_at > {last_timestamp}"
    )

CDC (Change Data Capture)

데이터베이스의 변경 로그를 캡처하여 실시간 동기화.

+----------------+     +--------+     +-------+     +-----------+
|   Source DB    | --> | CDC    | --> | Kafka | --> | Target    |
| (MySQL/PG/...) |     | Tool   |     |       |     | (Warehouse|
+----------------+     +--------+     +-------+     +-----------+
                       Debezium

CDC가 필요한 상황:

  • 실시간 데이터 동기화 필요 (지연 < 수 초)
  • 삭제된 레코드도 추적해야 함
  • 소스 DB에 부하를 최소화해야 함
  • 정확한 변경 순서가 중요함

CDC 도구 선택:

도구 지원 DB 특징
Debezium MySQL, PostgreSQL, MongoDB, Oracle 오픈소스, Kafka 통합
AWS DMS RDS, Aurora, 대부분 DB AWS 네이티브
Fivetran 150+ 소스 매니지드 서비스
Airbyte 300+ 커넥터 오픈소스
# Debezium + Kafka로 CDC 구현
# 1. Debezium 커넥터 설정 (JSON)
debezium_config = {
    "name": "mysql-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "inventory",
        "table.include.list": "inventory.customers,inventory.orders",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
    }
}

# 2. Kafka Consumer로 CDC 이벤트 처리
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'dbserver1.inventory.customers',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    payload = message.value['payload']
    operation = payload['op']  # c=create, u=update, d=delete

    if operation == 'c':
        handle_insert(payload['after'])
    elif operation == 'u':
        handle_update(payload['before'], payload['after'])
    elif operation == 'd':
        handle_delete(payload['before'])

다양한 소스 추출

import pandas as pd
import requests
from sqlalchemy import create_engine

# 1. 관계형 데이터베이스
engine = create_engine('postgresql://user:pass@host/db')
df = pd.read_sql('SELECT * FROM users', engine)

# 2. REST API (페이지네이션 처리)
def extract_from_api(base_url, headers):
    all_data = []
    page = 1
    while True:
        response = requests.get(
            f'{base_url}?page={page}&limit=100',
            headers=headers
        )
        data = response.json()
        if not data['results']:
            break
        all_data.extend(data['results'])
        page += 1
    return all_data

# 3. 파일 (CSV, JSON, Parquet)
df_csv = pd.read_csv('data.csv')
df_json = pd.read_json('data.json')
df_parquet = pd.read_parquet('data.parquet')

# 4. 웹 스크래핑
from bs4 import BeautifulSoup

html = requests.get('https://example.com').text
soup = BeautifulSoup(html, 'html.parser')
text = soup.get_text()

Transform (변환)

일반적인 변환 작업

변환 유형 설명 예시
정제 (Cleaning) 결측치, 이상치 처리 NULL 제거, 범위 필터링
정규화 (Normalization) 형식 표준화 날짜 형식, 대소문자
필터링 (Filtering) 불필요한 데이터 제거 조건 기반 필터
집계 (Aggregation) 데이터 요약 합계, 평균, 카운트
조인 (Join) 데이터 결합 테이블 조인
파생 (Derivation) 새 필드 생성 계산된 컬럼
인코딩 (Encoding) 범주형 변환 One-hot, Label encoding

Pandas를 이용한 변환

import pandas as pd
import numpy as np

def transform_pipeline(df):
    """데이터 변환 파이프라인"""

    # 1. 결측치 처리
    df = df.dropna(subset=['required_column'])
    df['optional_column'] = df['optional_column'].fillna('unknown')

    # 2. 데이터 타입 변환
    df['date'] = pd.to_datetime(df['date'])
    df['amount'] = df['amount'].astype(float)

    # 3. 문자열 정규화
    df['text'] = df['text'].str.lower().str.strip()

    # 4. 필터링
    df = df[df['quality_score'] >= 0.7]

    # 5. 파생 필드 생성
    df['year'] = df['date'].dt.year
    df['word_count'] = df['text'].str.split().str.len()

    # 6. 중복 제거
    df = df.drop_duplicates(subset=['id'], keep='last')

    return df

dbt를 이용한 SQL 변환 (ELT)

-- models/staging/stg_documents.sql
{{ config(materialized='view') }}

SELECT
    id,
    TRIM(LOWER(content)) as content,
    LENGTH(content) as content_length,
    created_at,
    CURRENT_TIMESTAMP as loaded_at
FROM {{ source('raw', 'documents') }}
WHERE content IS NOT NULL
  AND LENGTH(content) >= 100

-- models/marts/documents_processed.sql
{{ config(materialized='table') }}

SELECT
    id,
    content,
    content_length,
    -- 토큰 수 추정 (단어 수 * 1.3)
    CAST(ARRAY_LENGTH(SPLIT(content, ' ')) * 1.3 AS INT) as estimated_tokens,
    created_at
FROM {{ ref('stg_documents') }}
WHERE content_length <= 100000

Load (적재)

적재 전략

전략 설명 사용 사례 주의사항
Full Load 전체 교체 작은 테이블, 초기 로드 다운타임 발생 가능
Incremental 새 데이터만 추가 대용량, 시계열 중복 체크 필요
Upsert 삽입 또는 업데이트 마스터 데이터 PK 필수
Merge 조건부 갱신 SCD Type 2 복잡한 로직
from sqlalchemy import create_engine

def load_to_database(df, table_name, engine, strategy='append'):
    """데이터베이스에 적재"""

    if strategy == 'replace':
        # 전체 교체 (주의: 테이블 DROP 후 CREATE)
        df.to_sql(table_name, engine, if_exists='replace', index=False)

    elif strategy == 'append':
        # 추가
        df.to_sql(table_name, engine, if_exists='append', index=False)

    elif strategy == 'upsert':
        # PostgreSQL upsert
        from sqlalchemy.dialects.postgresql import insert

        with engine.connect() as conn:
            stmt = insert(table_name).values(df.to_dict('records'))
            stmt = stmt.on_conflict_do_update(
                index_elements=['id'],
                set_={c.name: c for c in stmt.excluded if c.name != 'id'}
            )
            conn.execute(stmt)

SCD (Slowly Changing Dimension) 처리

-- SCD Type 2: 이력 관리
MERGE INTO dim_customer AS target
USING staging_customer AS source
ON target.customer_id = source.customer_id 
   AND target.is_current = true

-- 변경된 레코드: 기존 행 만료
WHEN MATCHED AND (
    target.email != source.email OR
    target.address != source.address
) THEN UPDATE SET
    is_current = false,
    valid_to = CURRENT_TIMESTAMP

-- 새 레코드 또는 변경된 레코드: 새 행 삽입
WHEN NOT MATCHED THEN INSERT (
    customer_id, name, email, address,
    valid_from, valid_to, is_current
) VALUES (
    source.customer_id, source.name, source.email, source.address,
    CURRENT_TIMESTAMP, '9999-12-31', true
);

장애 시나리오와 대응

추출 단계 장애

장애 원인 대응
연결 실패 네트워크, 인증 재시도 로직, 지수 백오프
타임아웃 대용량 쿼리 청크 단위 추출
스키마 변경 소스 DDL 변경 스키마 검증, 알림
데이터 누락 증분 추출 실패 워터마크 관리, Full Refresh
import time
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1):
    """지수 백오프 재시도 데코레이터"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    delay = base_delay * (2 ** attempt)
                    print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s")
                    time.sleep(delay)
        return wrapper
    return decorator

@retry_with_backoff(max_retries=3, base_delay=5)
def extract_with_retry(source, query):
    return source.execute(query)

변환 단계 장애

장애 원인 대응
메모리 부족 대용량 처리 청크 처리, Spark 사용
데이터 타입 오류 예상치 못한 값 타입 검증, 예외 처리
비즈니스 로직 오류 잘못된 변환 단위 테스트, 데이터 검증
def safe_transform(df, transform_func, error_log_path):
    """안전한 변환: 에러 레코드 분리"""
    success_records = []
    error_records = []

    for idx, row in df.iterrows():
        try:
            transformed = transform_func(row)
            success_records.append(transformed)
        except Exception as e:
            error_records.append({
                'original': row.to_dict(),
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            })

    # 에러 레코드 로깅
    if error_records:
        pd.DataFrame(error_records).to_json(error_log_path, orient='records')
        print(f"Warning: {len(error_records)} records failed transformation")

    return pd.DataFrame(success_records)

적재 단계 장애

장애 원인 대응
제약조건 위반 PK 중복, FK 오류 Upsert 사용, 순서 보장
디스크 부족 저장공간 모니터링, 자동 확장
트랜잭션 실패 락 충돌 배치 크기 조정, 재시도
데이터 정합성 부분 적재 트랜잭션, Idempotency
def idempotent_load(df, table_name, engine, batch_id):
    """멱등성 적재: 같은 batch_id로 재실행해도 안전"""

    with engine.begin() as conn:
        # 기존 배치 데이터 삭제
        conn.execute(f"DELETE FROM {table_name} WHERE batch_id = '{batch_id}'")

        # 새 데이터 삽입
        df['batch_id'] = batch_id
        df.to_sql(table_name, conn, if_exists='append', index=False)

데이터 품질 검증

import great_expectations as gx

def validate_data(df, suite_name="etl_validation"):
    """데이터 품질 검증"""
    context = gx.get_context()

    validator = context.sources.pandas_default.read_dataframe(df)

    # 검증 규칙
    validator.expect_column_values_to_not_be_null("id")
    validator.expect_column_values_to_be_unique("id")
    validator.expect_column_values_to_be_between("amount", 0, 1000000)
    validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")

    result = validator.validate()

    if not result.success:
        raise DataQualityError(f"Validation failed: {result}")

    return True

# 간단한 검증
def simple_validate(df, rules):
    """간단한 검증"""
    results = {}

    for column, checks in rules.items():
        for check_name, check_func in checks.items():
            passed = check_func(df[column])
            results[f"{column}.{check_name}"] = passed
            if not passed:
                print(f"FAIL: {column}.{check_name}")

    return all(results.values())

# 사용
rules = {
    'id': {
        'not_null': lambda col: col.notna().all(),
        'unique': lambda col: col.is_unique,
    },
    'amount': {
        'positive': lambda col: (col >= 0).all(),
    }
}
simple_validate(df, rules)

모니터링과 관측성

import logging
from datetime import datetime
from dataclasses import dataclass, asdict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class PipelineMetrics:
    pipeline_name: str
    run_id: str
    stage: str
    records_in: int
    records_out: int
    duration_seconds: float
    timestamp: str
    status: str
    error_message: str = None

class PipelineMonitor:
    """파이프라인 모니터링"""

    def __init__(self, pipeline_name: str, run_id: str):
        self.pipeline_name = pipeline_name
        self.run_id = run_id
        self.metrics = []

    def log_stage(self, stage: str, records_in: int, records_out: int, 
                  duration: float, status: str = "success", error: str = None):
        metric = PipelineMetrics(
            pipeline_name=self.pipeline_name,
            run_id=self.run_id,
            stage=stage,
            records_in=records_in,
            records_out=records_out,
            duration_seconds=duration,
            timestamp=datetime.now().isoformat(),
            status=status,
            error_message=error
        )
        self.metrics.append(metric)

        drop_rate = 1 - (records_out / records_in) if records_in > 0 else 0
        logger.info(
            f"[{stage}] In: {records_in}, Out: {records_out}, "
            f"Drop: {drop_rate:.2%}, Time: {duration:.2f}s, Status: {status}"
        )

        # 알림 조건
        if drop_rate > 0.5:
            self.alert(f"High drop rate in {stage}: {drop_rate:.2%}")

    def alert(self, message: str):
        logger.warning(f"ALERT: {message}")
        # Slack/PagerDuty 연동 가능

    def summary(self):
        total_in = self.metrics[0].records_in if self.metrics else 0
        total_out = self.metrics[-1].records_out if self.metrics else 0
        total_time = sum(m.duration_seconds for m in self.metrics)

        print(f"\n{'='*50}")
        print(f"Pipeline: {self.pipeline_name}")
        print(f"Run ID: {self.run_id}")
        print(f"{'='*50}")
        print(f"Total Records: {total_in} -> {total_out}")
        print(f"Overall Drop Rate: {1 - total_out/total_in:.2%}" if total_in > 0 else "N/A")
        print(f"Total Time: {total_time:.2f}s")
        print(f"{'='*50}")

참고 자료