스토리지 시스템 (Storage Systems)¶
데이터를 효율적으로 저장하고 접근하기 위한 시스템. 데이터의 규모, 접근 패턴, 분석 요구에 맞는 선택이 중요함.
스토리지 아키텍처 개요¶
+-------------------+ +-------------------+ +-------------------+
| Data Lake | | Data Warehouse | | Lakehouse |
| (Raw Storage) | | (Curated Storage) | | (Unified Layer) |
+-------------------+ +-------------------+ +-------------------+
| Schema-on-Read | | Schema-on-Write | | Both Supported |
| 저비용, 유연 | | 고성능 쿼리 | | Lake + Warehouse |
| 모든 포맷 | | 정형 데이터 | | ACID + 스키마 |
| S3, GCS, ADLS | | BQ, Snowflake | | Delta, Iceberg |
+-------------------+ +-------------------+ +-------------------+
Data Lake vs Data Warehouse vs Lakehouse¶
선택 기준¶
| 기준 | Data Lake | Data Warehouse | Lakehouse |
|---|---|---|---|
| 데이터 유형 | 원본, 비정형 | 정제된 정형 | 둘 다 |
| 스키마 | 읽을 때 정의 | 쓸 때 정의 | 유연 |
| 사용자 | 데이터 엔지니어, DS | 분석가, BI | 모든 사용자 |
| 쿼리 성능 | 느림 | 빠름 | 중간~빠름 |
| 비용 | 저장 비용 저렴 | 컴퓨팅 비용 높음 | 균형 |
| ACID | 제한적 | 지원 | 지원 |
| 실시간 | 어려움 | 어려움 | 지원 (스트리밍) |
Data Lake를 선택해야 할 때¶
[x] 원본 데이터를 그대로 보존해야 함
[x] 비정형 데이터 (이미지, 로그, JSON)가 많음
[x] 데이터 과학/ML 워크로드가 주 목적
[x] 저비용 대용량 저장이 중요
[x] 분석 요구사항이 아직 명확하지 않음
Data Warehouse를 선택해야 할 때¶
주요 Data Warehouse:
| 제품 | 특징 | 비용 모델 | 적합한 상황 |
|---|---|---|---|
| Snowflake | 분리된 스토리지/컴퓨팅 | 사용량 기반 | 멀티클라우드 |
| BigQuery | 서버리스 | 쿼리 기반 | GCP, 빠른 시작 |
| Redshift | AWS 통합 | 프로비저닝 | AWS 환경 |
| Databricks SQL | Delta Lake 기반 | 클러스터 기반 | ML + 분석 |
Lakehouse를 선택해야 할 때¶
[x] Lake의 유연성 + Warehouse의 성능 모두 필요
[x] ML과 BI 워크로드 통합
[x] 실시간 + 배치 통합 필요
[x] 데이터 버전 관리와 타임 트래블 필요
[x] 현대적인 스택 구축
Lakehouse 구현:
| 솔루션 | 특징 | 기반 |
|---|---|---|
| Delta Lake | Databricks, 가장 성숙 | Spark |
| Apache Iceberg | 벤더 중립, Netflix 기원 | 다양한 엔진 |
| Apache Hudi | 실시간 특화, Uber 기원 | Spark/Flink |
파일 포맷¶
포맷 비교¶
| 포맷 | 유형 | 압축 | 스키마 | 쿼리 성능 | 쓰기 성능 | 사용 사례 |
|---|---|---|---|---|---|---|
| CSV | 행 | X | X | 느림 | 빠름 | 데이터 교환, 소규모 |
| JSON/JSONL | 행 | X | X | 느림 | 빠름 | API, 로그 |
| Parquet | 열 | O | O | 빠름 | 중간 | 분석, 대용량 |
| ORC | 열 | O | O | 빠름 | 중간 | Hive/Spark |
| Avro | 행 | O | O | 중간 | 빠름 | 스트리밍, Kafka |
| Arrow | 열 | - | O | 최상 | - | 인메모리 교환 |
파일 포맷 선택 가이드¶
Parquet¶
분석 워크로드의 표준 포맷.
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
# Parquet 쓰기
df = pd.DataFrame({
'id': range(1000),
'category': ['A', 'B', 'C'] * 333 + ['A'],
'value': [1.0] * 1000
})
# 기본 저장
df.to_parquet('data.parquet', engine='pyarrow')
# 고급 설정
table = pa.Table.from_pandas(df)
pq.write_table(
table,
'data_optimized.parquet',
compression='zstd', # 압축: snappy(빠름), gzip(작음), zstd(균형)
row_group_size=100000, # 행 그룹 크기 (메모리 vs I/O 트레이드오프)
use_dictionary=True, # 반복 값이 많으면 효과적
write_statistics=True, # 쿼리 최적화용 통계
)
# 파티션 저장
df['year'] = 2024
df['month'] = 1
pq.write_to_dataset(
pa.Table.from_pandas(df),
root_path='data/',
partition_cols=['year', 'month']
)
# Parquet 읽기 (열 선택 - Column Pruning)
df = pd.read_parquet('data.parquet', columns=['id', 'value'])
# 필터링 읽기 (Predicate Pushdown)
df = pd.read_parquet(
'data/',
filters=[('year', '=', 2024), ('month', '>=', 6)]
)
Parquet 내부 구조:
Avro¶
스트리밍과 스키마 진화에 적합.
import fastavro
from fastavro import writer, reader
# Avro 스키마 정의
schema = {
'type': 'record',
'name': 'User',
'fields': [
{'name': 'id', 'type': 'int'},
{'name': 'name', 'type': 'string'},
{'name': 'email', 'type': ['null', 'string'], 'default': None}, # Optional
]
}
# Avro 쓰기
records = [
{'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
{'id': 2, 'name': 'Bob', 'email': None},
]
with open('users.avro', 'wb') as f:
writer(f, schema, records)
# Avro 읽기
with open('users.avro', 'rb') as f:
for record in reader(f):
print(record)
# 스키마 진화: 새 필드 추가 (기본값 필수)
new_schema = {
'type': 'record',
'name': 'User',
'fields': [
{'name': 'id', 'type': 'int'},
{'name': 'name', 'type': 'string'},
{'name': 'email', 'type': ['null', 'string'], 'default': None},
{'name': 'age', 'type': ['null', 'int'], 'default': None}, # 새 필드
]
}
JSON Lines (JSONL)¶
ML 학습 데이터의 일반적인 형식.
import json
# JSONL 쓰기
def write_jsonl(data, filepath):
with open(filepath, 'w', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
# JSONL 읽기 (스트리밍 - 메모리 효율적)
def read_jsonl(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
yield json.loads(line)
# 대화 데이터 형식 (OpenAI 호환)
conversations = [
{
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"}
]
}
]
write_jsonl(conversations, 'train.jsonl')
스토리지 계층 (Tiering)¶
+------------------+ 속도 빠름, 비용 높음
| Hot Storage | <- 자주 접근 (일별)
| (SSD, NVMe) |
+------------------+
|
v
+------------------+
| Warm Storage | <- 가끔 접근 (주/월별)
| (HDD, S3 IA) |
+------------------+
|
v
+------------------+ 속도 느림, 비용 낮음
| Cold Storage | <- 드물게 접근 (연 1-2회)
| (Glacier) |
+------------------+
| 계층 | AWS 예시 | 비용 (GB/월) | 접근 시간 | 사용 사례 |
|---|---|---|---|---|
| Hot | S3 Standard | $0.023 | 밀리초 | 활성 데이터 |
| Warm | S3 IA | $0.0125 | 밀리초 | 백업, 최근 로그 |
| Cold | S3 Glacier IR | $0.004 | 밀리초~분 | 컴플라이언스 |
| Archive | S3 Glacier DA | $0.00099 | 시간~일 | 장기 보관 |
자동 계층화 정책¶
# boto3로 S3 Lifecycle 정책 설정
import boto3
s3 = boto3.client('s3')
lifecycle_policy = {
'Rules': [
{
'ID': 'MoveToIAAfter30Days',
'Status': 'Enabled',
'Filter': {'Prefix': 'logs/'},
'Transitions': [
{
'Days': 30,
'StorageClass': 'STANDARD_IA'
},
{
'Days': 90,
'StorageClass': 'GLACIER'
}
],
'Expiration': {
'Days': 365 # 1년 후 삭제
}
}
]
}
s3.put_bucket_lifecycle_configuration(
Bucket='my-bucket',
LifecycleConfiguration=lifecycle_policy
)
객체 스토리지 (Object Storage)¶
AWS S3¶
import boto3
from botocore.config import Config
# 클라이언트 설정
s3 = boto3.client(
's3',
config=Config(
max_pool_connections=50,
retries={'max_attempts': 3}
)
)
# 업로드
def upload_file(local_path, bucket, s3_key):
s3.upload_file(local_path, bucket, s3_key)
# 대용량 업로드 (멀티파트)
from boto3.s3.transfer import TransferConfig
def upload_large_file(local_path, bucket, s3_key):
config = TransferConfig(
multipart_threshold=100 * 1024 * 1024, # 100MB 이상이면 멀티파트
max_concurrency=10,
multipart_chunksize=100 * 1024 * 1024,
use_threads=True
)
s3.upload_file(local_path, bucket, s3_key, Config=config)
# 스트리밍 읽기 (대용량 파일)
def stream_s3_file(bucket, key):
response = s3.get_object(Bucket=bucket, Key=key)
for chunk in response['Body'].iter_chunks(chunk_size=1024*1024):
yield chunk
# S3 Select: 서버사이드 필터링
def query_with_s3_select(bucket, key, sql):
response = s3.select_object_content(
Bucket=bucket,
Key=key,
Expression=sql,
ExpressionType='SQL',
InputSerialization={'Parquet': {}},
OutputSerialization={'JSON': {}}
)
for event in response['Payload']:
if 'Records' in event:
yield event['Records']['Payload'].decode()
# 사용 예: 특정 컬럼만 가져오기
sql = "SELECT id, name FROM s3object WHERE status = 'active'"
MinIO (Self-hosted S3)¶
from minio import Minio
client = Minio(
"localhost:9000",
access_key="minioadmin",
secret_key="minioadmin",
secure=False
)
# 버킷 생성
if not client.bucket_exists("data"):
client.make_bucket("data")
# 업로드
client.fput_object("data", "train.parquet", "/local/train.parquet")
# 스트리밍 업로드
from io import BytesIO
data = BytesIO(b"some binary data")
client.put_object("data", "stream.bin", data, length=len(data.getvalue()))
Delta Lake¶
ACID 트랜잭션을 지원하는 Lakehouse 솔루션.
from delta import *
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Delta 테이블 쓰기
df.write.format("delta").save("/delta/events")
# 테이블로 등록
spark.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events'")
# ACID 업데이트
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/delta/events")
# UPDATE
delta_table.update(
condition="status = 'pending'",
set={"status": "'processed'", "updated_at": "current_timestamp()"}
)
# DELETE
delta_table.delete(condition="created_at < '2024-01-01'")
# MERGE (Upsert)
delta_table.alias("target").merge(
source_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(
set={"value": "source.value", "updated_at": "current_timestamp()"}
).whenNotMatchedInsert(
values={"id": "source.id", "value": "source.value", "created_at": "current_timestamp()"}
).execute()
# 타임 트래블 (버전 관리)
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/delta/events")
df_yesterday = spark.read.format("delta").option("timestampAsOf", "2024-01-15").load("/delta/events")
# 버전 히스토리
history = delta_table.history()
history.show()
# 스키마 진화
df_new.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save("/delta/events")
# 최적화 (Z-Order)
delta_table.optimize().executeZOrderBy("user_id", "event_date")
# Vacuum (오래된 파일 삭제)
delta_table.vacuum(168) # 168시간(7일) 이전 파일 삭제
분산 파일 시스템¶
HDFS¶
from hdfs import InsecureClient
client = InsecureClient('http://namenode:50070', user='hadoop')
# 파일 업로드
client.upload('/data/train.parquet', 'local_train.parquet')
# 파일 읽기
with client.read('/data/train.parquet') as reader:
content = reader.read()
# 디렉토리 생성
client.makedirs('/data/processed')
# 파일 리스트
files = client.list('/data/')
Spark + 객체 스토리지¶
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Data Processing") \
.config("spark.hadoop.fs.s3a.access.key", "...") \
.config("spark.hadoop.fs.s3a.secret.key", "...") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.fast.upload", "true") \
.getOrCreate()
# S3에서 읽기
df = spark.read.parquet("s3a://bucket/data/")
# 처리
df_processed = df \
.filter(df.length > 100) \
.withColumn("processed_at", current_timestamp())
# S3에 쓰기 (파티션)
df_processed.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.parquet("s3a://bucket/processed/")
장애 시나리오와 대응¶
데이터 손상¶
| 장애 | 원인 | 대응 |
|---|---|---|
| 파일 손상 | 불완전 쓰기, 네트워크 오류 | Checksum 검증, 재시도 |
| 파티션 누락 | ETL 실패, 타임아웃 | 모니터링, Backfill |
| 스키마 불일치 | 업스트림 변경 | 스키마 레지스트리 |
import hashlib
def write_with_checksum(df, path):
"""체크섬과 함께 저장"""
# 데이터 저장
df.to_parquet(f"{path}/data.parquet")
# 체크섬 계산 및 저장
with open(f"{path}/data.parquet", "rb") as f:
checksum = hashlib.md5(f.read()).hexdigest()
with open(f"{path}/_checksum", "w") as f:
f.write(checksum)
def read_with_checksum(path):
"""체크섬 검증 후 읽기"""
# 체크섬 검증
with open(f"{path}/data.parquet", "rb") as f:
actual_checksum = hashlib.md5(f.read()).hexdigest()
with open(f"{path}/_checksum", "r") as f:
expected_checksum = f.read().strip()
if actual_checksum != expected_checksum:
raise DataCorruptionError(f"Checksum mismatch: {actual_checksum} != {expected_checksum}")
return pd.read_parquet(f"{path}/data.parquet")
스토리지 용량¶
# S3 버킷 크기 모니터링
def get_bucket_size(bucket_name):
s3 = boto3.client('s3')
paginator = s3.get_paginator('list_objects_v2')
total_size = 0
for page in paginator.paginate(Bucket=bucket_name):
for obj in page.get('Contents', []):
total_size += obj['Size']
return total_size / (1024**3) # GB
# 알림 설정
def check_storage_and_alert(bucket, threshold_gb):
size_gb = get_bucket_size(bucket)
if size_gb > threshold_gb:
send_alert(f"Bucket {bucket} exceeds {threshold_gb}GB: {size_gb:.2f}GB")
일관성 문제¶
# 최종 일관성 대응: 읽기 전 대기
import time
def read_after_write(s3, bucket, key, max_retries=5):
"""쓰기 후 일관성 보장 읽기"""
for attempt in range(max_retries):
try:
response = s3.head_object(Bucket=bucket, Key=key)
return s3.get_object(Bucket=bucket, Key=key)
except s3.exceptions.NoSuchKey:
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 지수 백오프
else:
raise
데이터 카탈로그¶
메타데이터 관리¶
from dataclasses import dataclass, asdict, field
from datetime import datetime
from typing import List, Dict
import json
@dataclass
class DatasetMetadata:
name: str
version: str
path: str
format: str
size_bytes: int
num_records: int
created_at: datetime
schema: Dict
statistics: Dict
lineage: List[str] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
owner: str = "unknown"
description: str = ""
def save(self, catalog_path):
filepath = f"{catalog_path}/{self.name}_{self.version}.json"
with open(filepath, 'w') as f:
data = asdict(self)
data['created_at'] = data['created_at'].isoformat()
json.dump(data, f, indent=2)
@classmethod
def load(cls, filepath):
with open(filepath) as f:
data = json.load(f)
data['created_at'] = datetime.fromisoformat(data['created_at'])
return cls(**data)
# 메타데이터 등록
metadata = DatasetMetadata(
name="user_events",
version="2.0.0",
path="s3://bucket/processed/user_events",
format="parquet",
size_bytes=50_000_000_000,
num_records=10_000_000,
created_at=datetime.now(),
schema={
"user_id": "int64",
"event_type": "string",
"timestamp": "timestamp",
"properties": "map<string, string>"
},
statistics={
"avg_events_per_user": 50,
"date_range": ["2024-01-01", "2024-12-31"]
},
lineage=["raw_events_v1", "user_dim_v2"],
tags=["production", "pii-masked"],
owner="data-team",
description="사용자 이벤트 데이터 (PII 마스킹됨)"
)
metadata.save("catalog/")
AWS Glue Data Catalog 연동¶
import boto3
glue = boto3.client('glue')
# 테이블 등록
glue.create_table(
DatabaseName='analytics',
TableInput={
'Name': 'user_events',
'StorageDescriptor': {
'Columns': [
{'Name': 'user_id', 'Type': 'bigint'},
{'Name': 'event_type', 'Type': 'string'},
{'Name': 'timestamp', 'Type': 'timestamp'},
],
'Location': 's3://bucket/processed/user_events/',
'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
'SerdeInfo': {
'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
}
},
'PartitionKeys': [
{'Name': 'year', 'Type': 'int'},
{'Name': 'month', 'Type': 'int'}
],
'TableType': 'EXTERNAL_TABLE'
}
)
성능 최적화¶
파티셔닝 전략¶
# 날짜 기반 파티션 (가장 일반적)
df.write.partitionBy("year", "month", "day").parquet("s3://bucket/data/")
# 카디널리티가 낮은 컬럼 선택
# 좋음: region (10개), status (5개)
# 나쁨: user_id (100만개) → 너무 많은 파티션
# 파티션 프루닝 확인
spark.sql("""
EXPLAIN EXTENDED
SELECT * FROM events WHERE year = 2024 AND month = 1
""")
압축 선택¶
| 코덱 | 압축률 | 속도 | 사용 사례 |
|---|---|---|---|
| Snappy | 낮음 | 빠름 | 실시간 쿼리 |
| Gzip | 높음 | 느림 | 아카이브 |
| Zstd | 중간 | 중간 | 균형 (권장) |
| LZ4 | 낮음 | 매우 빠름 | 스트리밍 |
# 압축 코덱별 저장
df.write.option("compression", "zstd").parquet("data_zstd/")
df.write.option("compression", "snappy").parquet("data_snappy/")