콘텐츠로 이동
Data Prep
상세

스토리지 시스템 (Storage Systems)

데이터를 효율적으로 저장하고 접근하기 위한 시스템. 데이터의 규모, 접근 패턴, 분석 요구에 맞는 선택이 중요함.

스토리지 아키텍처 개요

+-------------------+  +-------------------+  +-------------------+
|    Data Lake      |  |  Data Warehouse   |  |    Lakehouse      |
|   (Raw Storage)   |  | (Curated Storage) |  |  (Unified Layer)  |
+-------------------+  +-------------------+  +-------------------+
| Schema-on-Read    |  | Schema-on-Write   |  | Both Supported    |
| 저비용, 유연       |  | 고성능 쿼리        |  | Lake + Warehouse  |
| 모든 포맷          |  | 정형 데이터        |  | ACID + 스키마      |
| S3, GCS, ADLS     |  | BQ, Snowflake     |  | Delta, Iceberg    |
+-------------------+  +-------------------+  +-------------------+

Data Lake vs Data Warehouse vs Lakehouse

선택 기준

기준 Data Lake Data Warehouse Lakehouse
데이터 유형 원본, 비정형 정제된 정형 둘 다
스키마 읽을 때 정의 쓸 때 정의 유연
사용자 데이터 엔지니어, DS 분석가, BI 모든 사용자
쿼리 성능 느림 빠름 중간~빠름
비용 저장 비용 저렴 컴퓨팅 비용 높음 균형
ACID 제한적 지원 지원
실시간 어려움 어려움 지원 (스트리밍)

Data Lake를 선택해야 할 때

[x] 원본 데이터를 그대로 보존해야 함
[x] 비정형 데이터 (이미지, 로그, JSON)가 많음
[x] 데이터 과학/ML 워크로드가 주 목적
[x] 저비용 대용량 저장이 중요
[x] 분석 요구사항이 아직 명확하지 않음

storage diagram 1

Data Warehouse를 선택해야 할 때

[x] BI/리포팅이 주 목적
[x] SQL 기반 분석가가 주 사용자
[x] 빠른 쿼리 응답 시간 필요
[x] 데이터 품질과 일관성이 중요
[x] 정형 데이터 위주

주요 Data Warehouse:

제품 특징 비용 모델 적합한 상황
Snowflake 분리된 스토리지/컴퓨팅 사용량 기반 멀티클라우드
BigQuery 서버리스 쿼리 기반 GCP, 빠른 시작
Redshift AWS 통합 프로비저닝 AWS 환경
Databricks SQL Delta Lake 기반 클러스터 기반 ML + 분석

Lakehouse를 선택해야 할 때

[x] Lake의 유연성 + Warehouse의 성능 모두 필요
[x] ML과 BI 워크로드 통합
[x] 실시간 + 배치 통합 필요
[x] 데이터 버전 관리와 타임 트래블 필요
[x] 현대적인 스택 구축

Lakehouse 구현:

솔루션 특징 기반
Delta Lake Databricks, 가장 성숙 Spark
Apache Iceberg 벤더 중립, Netflix 기원 다양한 엔진
Apache Hudi 실시간 특화, Uber 기원 Spark/Flink

파일 포맷

포맷 비교

포맷 유형 압축 스키마 쿼리 성능 쓰기 성능 사용 사례
CSV X X 느림 빠름 데이터 교환, 소규모
JSON/JSONL X X 느림 빠름 API, 로그
Parquet O O 빠름 중간 분석, 대용량
ORC O O 빠름 중간 Hive/Spark
Avro O O 중간 빠름 스트리밍, Kafka
Arrow - O 최상 - 인메모리 교환

파일 포맷 선택 가이드

storage diagram 2

Parquet

분석 워크로드의 표준 포맷.

import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd

# Parquet 쓰기
df = pd.DataFrame({
    'id': range(1000),
    'category': ['A', 'B', 'C'] * 333 + ['A'],
    'value': [1.0] * 1000
})

# 기본 저장
df.to_parquet('data.parquet', engine='pyarrow')

# 고급 설정
table = pa.Table.from_pandas(df)
pq.write_table(
    table,
    'data_optimized.parquet',
    compression='zstd',           # 압축: snappy(빠름), gzip(작음), zstd(균형)
    row_group_size=100000,        # 행 그룹 크기 (메모리 vs I/O 트레이드오프)
    use_dictionary=True,          # 반복 값이 많으면 효과적
    write_statistics=True,        # 쿼리 최적화용 통계
)

# 파티션 저장
df['year'] = 2024
df['month'] = 1
pq.write_to_dataset(
    pa.Table.from_pandas(df),
    root_path='data/',
    partition_cols=['year', 'month']
)

# Parquet 읽기 (열 선택 - Column Pruning)
df = pd.read_parquet('data.parquet', columns=['id', 'value'])

# 필터링 읽기 (Predicate Pushdown)
df = pd.read_parquet(
    'data/',
    filters=[('year', '=', 2024), ('month', '>=', 6)]
)

Parquet 내부 구조:

storage diagram 3

Avro

스트리밍과 스키마 진화에 적합.

import fastavro
from fastavro import writer, reader

# Avro 스키마 정의
schema = {
    'type': 'record',
    'name': 'User',
    'fields': [
        {'name': 'id', 'type': 'int'},
        {'name': 'name', 'type': 'string'},
        {'name': 'email', 'type': ['null', 'string'], 'default': None},  # Optional
    ]
}

# Avro 쓰기
records = [
    {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
    {'id': 2, 'name': 'Bob', 'email': None},
]

with open('users.avro', 'wb') as f:
    writer(f, schema, records)

# Avro 읽기
with open('users.avro', 'rb') as f:
    for record in reader(f):
        print(record)

# 스키마 진화: 새 필드 추가 (기본값 필수)
new_schema = {
    'type': 'record',
    'name': 'User',
    'fields': [
        {'name': 'id', 'type': 'int'},
        {'name': 'name', 'type': 'string'},
        {'name': 'email', 'type': ['null', 'string'], 'default': None},
        {'name': 'age', 'type': ['null', 'int'], 'default': None},  # 새 필드
    ]
}

JSON Lines (JSONL)

ML 학습 데이터의 일반적인 형식.

import json

# JSONL 쓰기
def write_jsonl(data, filepath):
    with open(filepath, 'w', encoding='utf-8') as f:
        for item in data:
            f.write(json.dumps(item, ensure_ascii=False) + '\n')

# JSONL 읽기 (스트리밍 - 메모리 효율적)
def read_jsonl(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            if line.strip():
                yield json.loads(line)

# 대화 데이터 형식 (OpenAI 호환)
conversations = [
    {
        "messages": [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "Hello"},
            {"role": "assistant", "content": "Hi there!"}
        ]
    }
]
write_jsonl(conversations, 'train.jsonl')

스토리지 계층 (Tiering)

+------------------+  속도 빠름, 비용 높음
|   Hot Storage    |  <- 자주 접근 (일별)
|   (SSD, NVMe)    |
+------------------+
        |
        v
+------------------+
|   Warm Storage   |  <- 가끔 접근 (주/월별)
|   (HDD, S3 IA)   |
+------------------+
        |
        v
+------------------+  속도 느림, 비용 낮음
|   Cold Storage   |  <- 드물게 접근 (연 1-2회)
|   (Glacier)      |
+------------------+
계층 AWS 예시 비용 (GB/월) 접근 시간 사용 사례
Hot S3 Standard $0.023 밀리초 활성 데이터
Warm S3 IA $0.0125 밀리초 백업, 최근 로그
Cold S3 Glacier IR $0.004 밀리초~분 컴플라이언스
Archive S3 Glacier DA $0.00099 시간~일 장기 보관

자동 계층화 정책

# boto3로 S3 Lifecycle 정책 설정
import boto3

s3 = boto3.client('s3')

lifecycle_policy = {
    'Rules': [
        {
            'ID': 'MoveToIAAfter30Days',
            'Status': 'Enabled',
            'Filter': {'Prefix': 'logs/'},
            'Transitions': [
                {
                    'Days': 30,
                    'StorageClass': 'STANDARD_IA'
                },
                {
                    'Days': 90,
                    'StorageClass': 'GLACIER'
                }
            ],
            'Expiration': {
                'Days': 365  # 1년 후 삭제
            }
        }
    ]
}

s3.put_bucket_lifecycle_configuration(
    Bucket='my-bucket',
    LifecycleConfiguration=lifecycle_policy
)

객체 스토리지 (Object Storage)

AWS S3

import boto3
from botocore.config import Config

# 클라이언트 설정
s3 = boto3.client(
    's3',
    config=Config(
        max_pool_connections=50,
        retries={'max_attempts': 3}
    )
)

# 업로드
def upload_file(local_path, bucket, s3_key):
    s3.upload_file(local_path, bucket, s3_key)

# 대용량 업로드 (멀티파트)
from boto3.s3.transfer import TransferConfig

def upload_large_file(local_path, bucket, s3_key):
    config = TransferConfig(
        multipart_threshold=100 * 1024 * 1024,  # 100MB 이상이면 멀티파트
        max_concurrency=10,
        multipart_chunksize=100 * 1024 * 1024,
        use_threads=True
    )
    s3.upload_file(local_path, bucket, s3_key, Config=config)

# 스트리밍 읽기 (대용량 파일)
def stream_s3_file(bucket, key):
    response = s3.get_object(Bucket=bucket, Key=key)
    for chunk in response['Body'].iter_chunks(chunk_size=1024*1024):
        yield chunk

# S3 Select: 서버사이드 필터링
def query_with_s3_select(bucket, key, sql):
    response = s3.select_object_content(
        Bucket=bucket,
        Key=key,
        Expression=sql,
        ExpressionType='SQL',
        InputSerialization={'Parquet': {}},
        OutputSerialization={'JSON': {}}
    )

    for event in response['Payload']:
        if 'Records' in event:
            yield event['Records']['Payload'].decode()

# 사용 예: 특정 컬럼만 가져오기
sql = "SELECT id, name FROM s3object WHERE status = 'active'"

MinIO (Self-hosted S3)

from minio import Minio

client = Minio(
    "localhost:9000",
    access_key="minioadmin",
    secret_key="minioadmin",
    secure=False
)

# 버킷 생성
if not client.bucket_exists("data"):
    client.make_bucket("data")

# 업로드
client.fput_object("data", "train.parquet", "/local/train.parquet")

# 스트리밍 업로드
from io import BytesIO
data = BytesIO(b"some binary data")
client.put_object("data", "stream.bin", data, length=len(data.getvalue()))

Delta Lake

ACID 트랜잭션을 지원하는 Lakehouse 솔루션.

from delta import *
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Delta 테이블 쓰기
df.write.format("delta").save("/delta/events")

# 테이블로 등록
spark.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events'")

# ACID 업데이트
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/delta/events")

# UPDATE
delta_table.update(
    condition="status = 'pending'",
    set={"status": "'processed'", "updated_at": "current_timestamp()"}
)

# DELETE
delta_table.delete(condition="created_at < '2024-01-01'")

# MERGE (Upsert)
delta_table.alias("target").merge(
    source_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdate(
    set={"value": "source.value", "updated_at": "current_timestamp()"}
).whenNotMatchedInsert(
    values={"id": "source.id", "value": "source.value", "created_at": "current_timestamp()"}
).execute()

# 타임 트래블 (버전 관리)
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/delta/events")
df_yesterday = spark.read.format("delta").option("timestampAsOf", "2024-01-15").load("/delta/events")

# 버전 히스토리
history = delta_table.history()
history.show()

# 스키마 진화
df_new.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .save("/delta/events")

# 최적화 (Z-Order)
delta_table.optimize().executeZOrderBy("user_id", "event_date")

# Vacuum (오래된 파일 삭제)
delta_table.vacuum(168)  # 168시간(7일) 이전 파일 삭제

분산 파일 시스템

HDFS

from hdfs import InsecureClient

client = InsecureClient('http://namenode:50070', user='hadoop')

# 파일 업로드
client.upload('/data/train.parquet', 'local_train.parquet')

# 파일 읽기
with client.read('/data/train.parquet') as reader:
    content = reader.read()

# 디렉토리 생성
client.makedirs('/data/processed')

# 파일 리스트
files = client.list('/data/')

Spark + 객체 스토리지

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Data Processing") \
    .config("spark.hadoop.fs.s3a.access.key", "...") \
    .config("spark.hadoop.fs.s3a.secret.key", "...") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.fast.upload", "true") \
    .getOrCreate()

# S3에서 읽기
df = spark.read.parquet("s3a://bucket/data/")

# 처리
df_processed = df \
    .filter(df.length > 100) \
    .withColumn("processed_at", current_timestamp())

# S3에 쓰기 (파티션)
df_processed.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("s3a://bucket/processed/")

장애 시나리오와 대응

데이터 손상

장애 원인 대응
파일 손상 불완전 쓰기, 네트워크 오류 Checksum 검증, 재시도
파티션 누락 ETL 실패, 타임아웃 모니터링, Backfill
스키마 불일치 업스트림 변경 스키마 레지스트리
import hashlib

def write_with_checksum(df, path):
    """체크섬과 함께 저장"""
    # 데이터 저장
    df.to_parquet(f"{path}/data.parquet")

    # 체크섬 계산 및 저장
    with open(f"{path}/data.parquet", "rb") as f:
        checksum = hashlib.md5(f.read()).hexdigest()

    with open(f"{path}/_checksum", "w") as f:
        f.write(checksum)

def read_with_checksum(path):
    """체크섬 검증 후 읽기"""
    # 체크섬 검증
    with open(f"{path}/data.parquet", "rb") as f:
        actual_checksum = hashlib.md5(f.read()).hexdigest()

    with open(f"{path}/_checksum", "r") as f:
        expected_checksum = f.read().strip()

    if actual_checksum != expected_checksum:
        raise DataCorruptionError(f"Checksum mismatch: {actual_checksum} != {expected_checksum}")

    return pd.read_parquet(f"{path}/data.parquet")

스토리지 용량

# S3 버킷 크기 모니터링
def get_bucket_size(bucket_name):
    s3 = boto3.client('s3')
    paginator = s3.get_paginator('list_objects_v2')

    total_size = 0
    for page in paginator.paginate(Bucket=bucket_name):
        for obj in page.get('Contents', []):
            total_size += obj['Size']

    return total_size / (1024**3)  # GB

# 알림 설정
def check_storage_and_alert(bucket, threshold_gb):
    size_gb = get_bucket_size(bucket)
    if size_gb > threshold_gb:
        send_alert(f"Bucket {bucket} exceeds {threshold_gb}GB: {size_gb:.2f}GB")

일관성 문제

# 최종 일관성 대응: 읽기 전 대기
import time

def read_after_write(s3, bucket, key, max_retries=5):
    """쓰기 후 일관성 보장 읽기"""
    for attempt in range(max_retries):
        try:
            response = s3.head_object(Bucket=bucket, Key=key)
            return s3.get_object(Bucket=bucket, Key=key)
        except s3.exceptions.NoSuchKey:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # 지수 백오프
            else:
                raise

데이터 카탈로그

메타데이터 관리

from dataclasses import dataclass, asdict, field
from datetime import datetime
from typing import List, Dict
import json

@dataclass
class DatasetMetadata:
    name: str
    version: str
    path: str
    format: str
    size_bytes: int
    num_records: int
    created_at: datetime
    schema: Dict
    statistics: Dict
    lineage: List[str] = field(default_factory=list)
    tags: List[str] = field(default_factory=list)
    owner: str = "unknown"
    description: str = ""

    def save(self, catalog_path):
        filepath = f"{catalog_path}/{self.name}_{self.version}.json"
        with open(filepath, 'w') as f:
            data = asdict(self)
            data['created_at'] = data['created_at'].isoformat()
            json.dump(data, f, indent=2)

    @classmethod
    def load(cls, filepath):
        with open(filepath) as f:
            data = json.load(f)
            data['created_at'] = datetime.fromisoformat(data['created_at'])
            return cls(**data)

# 메타데이터 등록
metadata = DatasetMetadata(
    name="user_events",
    version="2.0.0",
    path="s3://bucket/processed/user_events",
    format="parquet",
    size_bytes=50_000_000_000,
    num_records=10_000_000,
    created_at=datetime.now(),
    schema={
        "user_id": "int64",
        "event_type": "string",
        "timestamp": "timestamp",
        "properties": "map<string, string>"
    },
    statistics={
        "avg_events_per_user": 50,
        "date_range": ["2024-01-01", "2024-12-31"]
    },
    lineage=["raw_events_v1", "user_dim_v2"],
    tags=["production", "pii-masked"],
    owner="data-team",
    description="사용자 이벤트 데이터 (PII 마스킹됨)"
)
metadata.save("catalog/")

AWS Glue Data Catalog 연동

import boto3

glue = boto3.client('glue')

# 테이블 등록
glue.create_table(
    DatabaseName='analytics',
    TableInput={
        'Name': 'user_events',
        'StorageDescriptor': {
            'Columns': [
                {'Name': 'user_id', 'Type': 'bigint'},
                {'Name': 'event_type', 'Type': 'string'},
                {'Name': 'timestamp', 'Type': 'timestamp'},
            ],
            'Location': 's3://bucket/processed/user_events/',
            'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
            'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
            'SerdeInfo': {
                'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
            }
        },
        'PartitionKeys': [
            {'Name': 'year', 'Type': 'int'},
            {'Name': 'month', 'Type': 'int'}
        ],
        'TableType': 'EXTERNAL_TABLE'
    }
)

성능 최적화

파티셔닝 전략

# 날짜 기반 파티션 (가장 일반적)
df.write.partitionBy("year", "month", "day").parquet("s3://bucket/data/")

# 카디널리티가 낮은 컬럼 선택
# 좋음: region (10개), status (5개)
# 나쁨: user_id (100만개) → 너무 많은 파티션

# 파티션 프루닝 확인
spark.sql("""
    EXPLAIN EXTENDED
    SELECT * FROM events WHERE year = 2024 AND month = 1
""")

압축 선택

코덱 압축률 속도 사용 사례
Snappy 낮음 빠름 실시간 쿼리
Gzip 높음 느림 아카이브
Zstd 중간 중간 균형 (권장)
LZ4 낮음 매우 빠름 스트리밍
# 압축 코덱별 저장
df.write.option("compression", "zstd").parquet("data_zstd/")
df.write.option("compression", "snappy").parquet("data_snappy/")

참고 자료