ETL과 ELT
데이터를 소스에서 목적지로 이동하고 변환하는 프로세스. 데이터 파이프라인의 핵심 패턴.
ETL vs ELT
Source --> Extract --> Transform --> Load --> Data Warehouse
|
중간 서버에서 변환
| 단계 |
설명 |
| Extract |
소스에서 데이터 추출 |
| Transform |
중간 시스템에서 변환/정제 |
| Load |
목적지에 적재 |
Source --> Extract --> Load --> Data Warehouse --> Transform
|
웨어하우스에서 변환
| 단계 |
설명 |
| Extract |
소스에서 데이터 추출 |
| Load |
웨어하우스에 raw 데이터 적재 |
| Transform |
웨어하우스 내에서 변환 |
비교
| 특성 |
ETL |
ELT |
| 변환 위치 |
중간 서버 |
목적지 |
| 확장성 |
제한적 |
높음 |
| 데이터 양 |
중소규모 |
대규모 |
| 원본 보존 |
어려움 |
용이 |
| 비용 |
서버 비용 |
컴퓨팅 비용 |
| 도구 |
Informatica, Talend |
dbt, Spark |
| 유연성 |
낮음 (사전 정의) |
높음 (사후 변환) |
| 디버깅 |
어려움 (원본 손실) |
용이 (원본 보존) |
판단 기준: ETL vs ELT 선택
올바른 선택은 "어떤 게 더 좋은가"가 아니라 상황에 맞는 것.
ETL을 선택해야 할 때
| 상황 |
이유 |
| 데이터 규모가 작음 (GB 단위) |
중간 서버로 충분히 처리 가능 |
| 변환 로직이 고정됨 |
리포트 요구사항이 명확하고 변하지 않음 |
| 보안/컴플라이언스 요구 |
PII를 목적지에 보내기 전 마스킹 필수 |
| 네트워크 비용 최적화 |
압축/필터링 후 전송으로 비용 절감 |
| 레거시 시스템 |
기존 인프라가 ETL 기반 |
# ETL이 적합한 예: PII 마스킹 후 적재
def etl_with_pii_masking(source_df):
# Transform: 반드시 적재 전에 마스킹
df = source_df.copy()
df['email'] = df['email'].apply(lambda x: hash_pii(x))
df['phone'] = df['phone'].apply(lambda x: '***-****-' + x[-4:])
# Load: 마스킹된 데이터만 웨어하우스에 적재
load_to_warehouse(df)
ELT를 선택해야 할 때
| 상황 |
이유 |
| 데이터 규모가 큼 (TB+ 단위) |
클라우드 웨어하우스의 컴퓨팅 파워 활용 |
| 요구사항이 자주 변함 |
원본 데이터 보존으로 새로운 분석 가능 |
| 다양한 분석 니즈 |
같은 데이터를 여러 방식으로 변환 |
| 클라우드 네이티브 환경 |
BigQuery, Snowflake 등 사용 |
| 데이터 레이크 구축 |
스키마 없이 먼저 저장 |
-- ELT가 적합한 예: dbt로 웨어하우스 내 변환
-- models/marts/user_metrics.sql
{{ config(materialized='table') }}
WITH raw_events AS (
SELECT * FROM {{ source('raw', 'user_events') }}
),
aggregated AS (
SELECT
user_id,
DATE_TRUNC('day', event_time) as event_date,
COUNT(*) as event_count,
COUNT(DISTINCT session_id) as session_count
FROM raw_events
GROUP BY 1, 2
)
SELECT * FROM aggregated
하이브리드 접근법
실무에서는 ETL과 ELT를 혼합해서 사용하는 경우가 많다.
Source --> Extract --> [Light Transform] --> Load --> [Heavy Transform]
| |
PII 마스킹, 포맷 정규화 집계, 조인, 파생
# 하이브리드 예시
def hybrid_pipeline(source):
# Extract
raw_data = extract_from_source(source)
# Light Transform (ETL 부분): 보안상 필수 변환
data = mask_pii(raw_data)
data = normalize_timestamps(data) # 타임존 통일
# Load
load_to_warehouse(data, table='raw_masked')
# Heavy Transform (ELT 부분): dbt 또는 SQL로 처리
# 이 부분은 웨어하우스에서 실행
추출 패턴
| 패턴 |
설명 |
사용 시점 |
주의사항 |
| Full Extraction |
매번 전체 데이터 추출 |
작은 테이블, 초기 로드 |
소스 부하 |
| Incremental |
변경분만 추출 |
대용량, timestamp 컬럼 존재 |
삭제 감지 어려움 |
| CDC |
DB 변경 로그 캡처 |
실시간 동기화 필요 |
설정 복잡 |
# 전체 추출 (Full Extraction)
def full_extraction(source):
return source.read_all()
# 증분 추출 (Incremental Extraction)
def incremental_extraction(source, last_timestamp):
return source.read_where(f"updated_at > {last_timestamp}")
# 증분 추출 주의: 삭제된 레코드 감지
def incremental_with_soft_delete(source, last_timestamp):
# deleted_at 컬럼 활용
return source.read_where(
f"updated_at > {last_timestamp} OR deleted_at > {last_timestamp}"
)
CDC (Change Data Capture)
데이터베이스의 변경 로그를 캡처하여 실시간 동기화.
+----------------+ +--------+ +-------+ +-----------+
| Source DB | --> | CDC | --> | Kafka | --> | Target |
| (MySQL/PG/...) | | Tool | | | | (Warehouse|
+----------------+ +--------+ +-------+ +-----------+
Debezium
CDC가 필요한 상황:
- 실시간 데이터 동기화 필요 (지연 < 수 초)
- 삭제된 레코드도 추적해야 함
- 소스 DB에 부하를 최소화해야 함
- 정확한 변경 순서가 중요함
CDC 도구 선택:
| 도구 |
지원 DB |
특징 |
| Debezium |
MySQL, PostgreSQL, MongoDB, Oracle |
오픈소스, Kafka 통합 |
| AWS DMS |
RDS, Aurora, 대부분 DB |
AWS 네이티브 |
| Fivetran |
150+ 소스 |
매니지드 서비스 |
| Airbyte |
300+ 커넥터 |
오픈소스 |
# Debezium + Kafka로 CDC 구현
# 1. Debezium 커넥터 설정 (JSON)
debezium_config = {
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"table.include.list": "inventory.customers,inventory.orders",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
# 2. Kafka Consumer로 CDC 이벤트 처리
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'dbserver1.inventory.customers',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
payload = message.value['payload']
operation = payload['op'] # c=create, u=update, d=delete
if operation == 'c':
handle_insert(payload['after'])
elif operation == 'u':
handle_update(payload['before'], payload['after'])
elif operation == 'd':
handle_delete(payload['before'])
다양한 소스 추출
import pandas as pd
import requests
from sqlalchemy import create_engine
# 1. 관계형 데이터베이스
engine = create_engine('postgresql://user:pass@host/db')
df = pd.read_sql('SELECT * FROM users', engine)
# 2. REST API (페이지네이션 처리)
def extract_from_api(base_url, headers):
all_data = []
page = 1
while True:
response = requests.get(
f'{base_url}?page={page}&limit=100',
headers=headers
)
data = response.json()
if not data['results']:
break
all_data.extend(data['results'])
page += 1
return all_data
# 3. 파일 (CSV, JSON, Parquet)
df_csv = pd.read_csv('data.csv')
df_json = pd.read_json('data.json')
df_parquet = pd.read_parquet('data.parquet')
# 4. 웹 스크래핑
from bs4 import BeautifulSoup
html = requests.get('https://example.com').text
soup = BeautifulSoup(html, 'html.parser')
text = soup.get_text()
일반적인 변환 작업
| 변환 유형 |
설명 |
예시 |
| 정제 (Cleaning) |
결측치, 이상치 처리 |
NULL 제거, 범위 필터링 |
| 정규화 (Normalization) |
형식 표준화 |
날짜 형식, 대소문자 |
| 필터링 (Filtering) |
불필요한 데이터 제거 |
조건 기반 필터 |
| 집계 (Aggregation) |
데이터 요약 |
합계, 평균, 카운트 |
| 조인 (Join) |
데이터 결합 |
테이블 조인 |
| 파생 (Derivation) |
새 필드 생성 |
계산된 컬럼 |
| 인코딩 (Encoding) |
범주형 변환 |
One-hot, Label encoding |
Pandas를 이용한 변환
import pandas as pd
import numpy as np
def transform_pipeline(df):
"""데이터 변환 파이프라인"""
# 1. 결측치 처리
df = df.dropna(subset=['required_column'])
df['optional_column'] = df['optional_column'].fillna('unknown')
# 2. 데이터 타입 변환
df['date'] = pd.to_datetime(df['date'])
df['amount'] = df['amount'].astype(float)
# 3. 문자열 정규화
df['text'] = df['text'].str.lower().str.strip()
# 4. 필터링
df = df[df['quality_score'] >= 0.7]
# 5. 파생 필드 생성
df['year'] = df['date'].dt.year
df['word_count'] = df['text'].str.split().str.len()
# 6. 중복 제거
df = df.drop_duplicates(subset=['id'], keep='last')
return df
dbt를 이용한 SQL 변환 (ELT)
-- models/staging/stg_documents.sql
{{ config(materialized='view') }}
SELECT
id,
TRIM(LOWER(content)) as content,
LENGTH(content) as content_length,
created_at,
CURRENT_TIMESTAMP as loaded_at
FROM {{ source('raw', 'documents') }}
WHERE content IS NOT NULL
AND LENGTH(content) >= 100
-- models/marts/documents_processed.sql
{{ config(materialized='table') }}
SELECT
id,
content,
content_length,
-- 토큰 수 추정 (단어 수 * 1.3)
CAST(ARRAY_LENGTH(SPLIT(content, ' ')) * 1.3 AS INT) as estimated_tokens,
created_at
FROM {{ ref('stg_documents') }}
WHERE content_length <= 100000
Load (적재)
적재 전략
| 전략 |
설명 |
사용 사례 |
주의사항 |
| Full Load |
전체 교체 |
작은 테이블, 초기 로드 |
다운타임 발생 가능 |
| Incremental |
새 데이터만 추가 |
대용량, 시계열 |
중복 체크 필요 |
| Upsert |
삽입 또는 업데이트 |
마스터 데이터 |
PK 필수 |
| Merge |
조건부 갱신 |
SCD Type 2 |
복잡한 로직 |
from sqlalchemy import create_engine
def load_to_database(df, table_name, engine, strategy='append'):
"""데이터베이스에 적재"""
if strategy == 'replace':
# 전체 교체 (주의: 테이블 DROP 후 CREATE)
df.to_sql(table_name, engine, if_exists='replace', index=False)
elif strategy == 'append':
# 추가
df.to_sql(table_name, engine, if_exists='append', index=False)
elif strategy == 'upsert':
# PostgreSQL upsert
from sqlalchemy.dialects.postgresql import insert
with engine.connect() as conn:
stmt = insert(table_name).values(df.to_dict('records'))
stmt = stmt.on_conflict_do_update(
index_elements=['id'],
set_={c.name: c for c in stmt.excluded if c.name != 'id'}
)
conn.execute(stmt)
SCD (Slowly Changing Dimension) 처리
-- SCD Type 2: 이력 관리
MERGE INTO dim_customer AS target
USING staging_customer AS source
ON target.customer_id = source.customer_id
AND target.is_current = true
-- 변경된 레코드: 기존 행 만료
WHEN MATCHED AND (
target.email != source.email OR
target.address != source.address
) THEN UPDATE SET
is_current = false,
valid_to = CURRENT_TIMESTAMP
-- 새 레코드 또는 변경된 레코드: 새 행 삽입
WHEN NOT MATCHED THEN INSERT (
customer_id, name, email, address,
valid_from, valid_to, is_current
) VALUES (
source.customer_id, source.name, source.email, source.address,
CURRENT_TIMESTAMP, '9999-12-31', true
);
장애 시나리오와 대응
추출 단계 장애
| 장애 |
원인 |
대응 |
| 연결 실패 |
네트워크, 인증 |
재시도 로직, 지수 백오프 |
| 타임아웃 |
대용량 쿼리 |
청크 단위 추출 |
| 스키마 변경 |
소스 DDL 변경 |
스키마 검증, 알림 |
| 데이터 누락 |
증분 추출 실패 |
워터마크 관리, Full Refresh |
import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1):
"""지수 백오프 재시도 데코레이터"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s")
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=3, base_delay=5)
def extract_with_retry(source, query):
return source.execute(query)
변환 단계 장애
| 장애 |
원인 |
대응 |
| 메모리 부족 |
대용량 처리 |
청크 처리, Spark 사용 |
| 데이터 타입 오류 |
예상치 못한 값 |
타입 검증, 예외 처리 |
| 비즈니스 로직 오류 |
잘못된 변환 |
단위 테스트, 데이터 검증 |
def safe_transform(df, transform_func, error_log_path):
"""안전한 변환: 에러 레코드 분리"""
success_records = []
error_records = []
for idx, row in df.iterrows():
try:
transformed = transform_func(row)
success_records.append(transformed)
except Exception as e:
error_records.append({
'original': row.to_dict(),
'error': str(e),
'timestamp': datetime.now().isoformat()
})
# 에러 레코드 로깅
if error_records:
pd.DataFrame(error_records).to_json(error_log_path, orient='records')
print(f"Warning: {len(error_records)} records failed transformation")
return pd.DataFrame(success_records)
적재 단계 장애
| 장애 |
원인 |
대응 |
| 제약조건 위반 |
PK 중복, FK 오류 |
Upsert 사용, 순서 보장 |
| 디스크 부족 |
저장공간 |
모니터링, 자동 확장 |
| 트랜잭션 실패 |
락 충돌 |
배치 크기 조정, 재시도 |
| 데이터 정합성 |
부분 적재 |
트랜잭션, Idempotency |
def idempotent_load(df, table_name, engine, batch_id):
"""멱등성 적재: 같은 batch_id로 재실행해도 안전"""
with engine.begin() as conn:
# 기존 배치 데이터 삭제
conn.execute(f"DELETE FROM {table_name} WHERE batch_id = '{batch_id}'")
# 새 데이터 삽입
df['batch_id'] = batch_id
df.to_sql(table_name, conn, if_exists='append', index=False)
데이터 품질 검증
import great_expectations as gx
def validate_data(df, suite_name="etl_validation"):
"""데이터 품질 검증"""
context = gx.get_context()
validator = context.sources.pandas_default.read_dataframe(df)
# 검증 규칙
validator.expect_column_values_to_not_be_null("id")
validator.expect_column_values_to_be_unique("id")
validator.expect_column_values_to_be_between("amount", 0, 1000000)
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
result = validator.validate()
if not result.success:
raise DataQualityError(f"Validation failed: {result}")
return True
# 간단한 검증
def simple_validate(df, rules):
"""간단한 검증"""
results = {}
for column, checks in rules.items():
for check_name, check_func in checks.items():
passed = check_func(df[column])
results[f"{column}.{check_name}"] = passed
if not passed:
print(f"FAIL: {column}.{check_name}")
return all(results.values())
# 사용
rules = {
'id': {
'not_null': lambda col: col.notna().all(),
'unique': lambda col: col.is_unique,
},
'amount': {
'positive': lambda col: (col >= 0).all(),
}
}
simple_validate(df, rules)
모니터링과 관측성
import logging
from datetime import datetime
from dataclasses import dataclass, asdict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class PipelineMetrics:
pipeline_name: str
run_id: str
stage: str
records_in: int
records_out: int
duration_seconds: float
timestamp: str
status: str
error_message: str = None
class PipelineMonitor:
"""파이프라인 모니터링"""
def __init__(self, pipeline_name: str, run_id: str):
self.pipeline_name = pipeline_name
self.run_id = run_id
self.metrics = []
def log_stage(self, stage: str, records_in: int, records_out: int,
duration: float, status: str = "success", error: str = None):
metric = PipelineMetrics(
pipeline_name=self.pipeline_name,
run_id=self.run_id,
stage=stage,
records_in=records_in,
records_out=records_out,
duration_seconds=duration,
timestamp=datetime.now().isoformat(),
status=status,
error_message=error
)
self.metrics.append(metric)
drop_rate = 1 - (records_out / records_in) if records_in > 0 else 0
logger.info(
f"[{stage}] In: {records_in}, Out: {records_out}, "
f"Drop: {drop_rate:.2%}, Time: {duration:.2f}s, Status: {status}"
)
# 알림 조건
if drop_rate > 0.5:
self.alert(f"High drop rate in {stage}: {drop_rate:.2%}")
def alert(self, message: str):
logger.warning(f"ALERT: {message}")
# Slack/PagerDuty 연동 가능
def summary(self):
total_in = self.metrics[0].records_in if self.metrics else 0
total_out = self.metrics[-1].records_out if self.metrics else 0
total_time = sum(m.duration_seconds for m in self.metrics)
print(f"\n{'='*50}")
print(f"Pipeline: {self.pipeline_name}")
print(f"Run ID: {self.run_id}")
print(f"{'='*50}")
print(f"Total Records: {total_in} -> {total_out}")
print(f"Overall Drop Rate: {1 - total_out/total_in:.2%}" if total_in > 0 else "N/A")
print(f"Total Time: {total_time:.2f}s")
print(f"{'='*50}")
참고 자료