콘텐츠로 이동
Data Prep
상세

Data-Centric AI

Model-centric에서 Data-centric으로의 패러다임 전환. 모델 아키텍처 개선보다 데이터 품질 향상에 집중하는 AI 개발 방법론.


개요

핵심 개념

Data-Centric AI란?

기존 ML 개발은 데이터를 고정하고 모델을 개선하는 "Model-Centric" 접근이었다. Data-Centric AI는 이와 반대로, 모델을 고정하고 데이터를 체계적으로 개선하여 성능을 향상시킨다.

"In real-world AI projects, improving data quality often yields better results than improving models." — Andrew Ng

Meta

Item Value
Category ML Methodology / Data Engineering
Proposed By Andrew Ng et al. (2021)
Key Conferences NeurIPS 2024, ICML 2024
Key Surveys Zha et al. (2023), Jarrahi et al. (2024)

Model-Centric vs Data-Centric

summary diagram

왜 Data-Centric인가?

인사이트 설명
데이터 품질 병목 실제 프로젝트에서 80% 이상의 시간이 데이터 준비에 소요
모델 성숙도 SOTA 모델들의 성능 차이가 점점 줄어듦
Long-tail 문제 소수 클래스, 엣지 케이스에서 데이터 개선이 더 효과적
재현성 데이터 품질 이슈가 모델 성능 변동의 주요 원인

Data Lifecycle Framework

summary diagram

1. Training Data Development

Task Description Methods
Data Collection 데이터 수집 전략 Active Learning, Web Scraping, Crowdsourcing
Data Labeling 레이블 생성/검증 Weak Supervision, Programmatic Labeling
Data Preparation 전처리, 정제 Cleaning, Normalization, Deduplication
Data Augmentation 데이터 증강 Mixup, CutMix, Synthetic Generation
Data Reduction 데이터셋 축소 Core-set Selection, Dataset Distillation

2. Inference Data Development

Task Description Methods
Distribution Shift 분포 변화 대응 Domain Adaptation, TTA
In-Context Learning 프롬프트 예시 선택 Example Selection, Prompt Engineering
Feature Engineering 피처 변환/생성 AutoFE, Feature Selection

3. Data Maintenance

Task Description Methods
Data Understanding 데이터 탐색/분석 EDA, Data Profiling
Quality Assurance 품질 모니터링 Data Validation, Drift Detection
Storage & Retrieval 저장/검색 최적화 Data Versioning, Vector DB

Key Techniques

1. Data Quality Auditing

SELFCLEAN (NeurIPS 2024): Self-supervised representation을 활용한 데이터 품질 진단.

summary diagram

구현 예시:

from sklearn.neighbors import NearestNeighbors
import numpy as np
from typing import Dict, List, Tuple

def detect_quality_issues(
    embeddings: np.ndarray, 
    labels: np.ndarray, 
    k: int = 10
) -> Dict[str, List]:
    """
    임베딩 거리 기반 데이터 품질 이슈 탐지

    탐지 가능한 이슈:
    - Off-topic samples (outliers)
    - Near duplicates
    - Label errors (neighbors with different labels)

    Args:
        embeddings: (n_samples, embedding_dim) 형태의 임베딩
        labels: (n_samples,) 형태의 레이블
        k: KNN의 이웃 수

    Returns:
        각 이슈 유형별 인덱스 딕셔너리
    """
    nn = NearestNeighbors(n_neighbors=k+1, metric='cosine')
    nn.fit(embeddings)
    distances, indices = nn.kneighbors(embeddings)

    issues = {
        'outliers': [],
        'duplicates': [],
        'label_errors': []
    }

    # 평균 거리 계산 (자기 자신 제외)
    avg_distances = [np.mean(d[1:]) for d in distances]
    outlier_threshold = np.percentile(avg_distances, 95)

    for i in range(len(embeddings)):
        # 1. Outlier detection: 평균 거리가 95 퍼센타일 이상
        if avg_distances[i] > outlier_threshold:
            issues['outliers'].append(i)

        # 2. Near-duplicate detection: 가장 가까운 이웃과 거리 < 0.01
        if distances[i, 1] < 0.01:
            issues['duplicates'].append((i, indices[i, 1]))

        # 3. Label error detection: 이웃의 50% 미만이 같은 레이블
        neighbor_labels = labels[indices[i, 1:]]
        consistency = np.mean(neighbor_labels == labels[i])
        if consistency < 0.5:
            issues['label_errors'].append((i, consistency))

    return issues


# 사용 예시
from sentence_transformers import SentenceTransformer

# 임베딩 생성
encoder = SentenceTransformer('all-MiniLM-L6-v2')
texts = ["sample text 1", "sample text 2", ...]
embeddings = encoder.encode(texts)

# 이슈 탐지
issues = detect_quality_issues(embeddings, labels)

print(f"Outliers: {len(issues['outliers'])}")
print(f"Duplicates: {len(issues['duplicates'])}")
print(f"Label errors: {len(issues['label_errors'])}")

2. Data Valuation (Shapley-based)

데이터 포인트의 기여도를 정량화한다.

import numpy as np
from typing import Callable

def data_shapley(
    X_train: np.ndarray, 
    y_train: np.ndarray, 
    X_val: np.ndarray, 
    y_val: np.ndarray, 
    model_fn: Callable,
    n_samples: int = 100
) -> np.ndarray:
    """
    Monte Carlo 샘플링을 통한 Data Shapley 값 근사

    각 학습 데이터 포인트의 모델 성능 기여도를 측정

    Args:
        X_train, y_train: 학습 데이터
        X_val, y_val: 검증 데이터
        model_fn: 모델 생성 함수 (파라미터 없이 호출 가능)
        n_samples: Monte Carlo 샘플 수

    Returns:
        각 데이터 포인트의 Shapley 값
    """
    n = len(X_train)
    shapley_values = np.zeros(n)

    for _ in range(n_samples):
        # 학습 데이터의 랜덤 순열
        perm = np.random.permutation(n)

        prev_score = 0
        for j, idx in enumerate(perm):
            # 현재 포인트까지의 subset으로 학습
            subset = perm[:j+1]
            model = model_fn()
            model.fit(X_train[subset], y_train[subset])
            curr_score = model.score(X_val, y_val)

            # Marginal contribution
            shapley_values[idx] += (curr_score - prev_score)
            prev_score = curr_score

    return shapley_values / n_samples


def knn_shapley(
    X_train: np.ndarray, 
    y_train: np.ndarray, 
    X_val: np.ndarray, 
    y_val: np.ndarray, 
    k: int = 10
) -> np.ndarray:
    """
    KNN-Shapley: 효율적인 Shapley 값 근사 O(n log n)

    Reference: Jia et al., "Efficient Task-Specific Data Valuation" (2019)
    """
    from sklearn.neighbors import NearestNeighbors

    nn = NearestNeighbors(n_neighbors=min(k, len(X_train)))
    nn.fit(X_train)

    shapley_values = np.zeros(len(X_train))

    for x, y in zip(X_val, y_val):
        distances, indices = nn.kneighbors([x])
        indices = indices[0]

        for j, idx in enumerate(indices):
            # Shapley 가중치 계산
            weight = 1.0 / (j + 1) - 1.0 / (j + 2) if j < k-1 else 1.0 / k

            # 정답 여부에 따라 기여도 조정
            if y_train[idx] == y:
                shapley_values[idx] += weight
            else:
                shapley_values[idx] -= weight

    return shapley_values / len(X_val)


# 사용 예시: 저품질 데이터 식별
shapley_values = knn_shapley(X_train, y_train, X_val, y_val)

# 기여도가 낮거나 음수인 데이터 = 품질 문제 가능
low_value_idx = np.where(shapley_values < np.percentile(shapley_values, 10))[0]
print(f"Low-value samples: {len(low_value_idx)}")

3. Programmatic Labeling (Weak Supervision)

Labeling Function을 활용한 약한 감독 학습이다.

import numpy as np
from typing import Callable, List

# 상수 정의
ABSTAIN = -1
SPAM = 1
NOT_SPAM = 0

class LabelingFunction:
    """
    Labeling Function wrapper

    규칙 기반으로 레이블을 생성하는 함수
    ABSTAIN(-1)을 반환하면 해당 샘플에 대해 판단 보류
    """

    def __init__(self, name: str, fn: Callable[[str], int]):
        self.name = name
        self.fn = fn

    def __call__(self, text: str) -> int:
        return self.fn(text)


# Labeling Functions 정의 예시 (스팸 분류)
def lf_contains_free(text: str) -> int:
    """'free'가 포함되면 스팸"""
    return SPAM if 'free' in text.lower() else ABSTAIN

def lf_short_text(text: str) -> int:
    """짧은 텍스트는 스팸 아님"""
    return NOT_SPAM if len(text) < 20 else ABSTAIN

def lf_contains_click(text: str) -> int:
    """'click here'가 포함되면 스팸"""
    return SPAM if 'click here' in text.lower() else ABSTAIN

def lf_has_url(text: str) -> int:
    """URL이 많으면 스팸"""
    import re
    urls = re.findall(r'https?://\S+', text)
    return SPAM if len(urls) > 2 else ABSTAIN


def apply_lfs(
    texts: List[str], 
    lfs: List[LabelingFunction]
) -> np.ndarray:
    """
    Labeling Functions를 적용하여 Label Matrix 생성

    Returns:
        L: (n_samples, n_lfs) 형태의 레이블 행렬
    """
    L = np.zeros((len(texts), len(lfs)), dtype=int)
    for i, text in enumerate(texts):
        for j, lf in enumerate(lfs):
            L[i, j] = lf(text)
    return L


def majority_vote(L: np.ndarray, abstain: int = -1) -> np.ndarray:
    """
    단순 다수결 Label Model

    Labeling Functions의 투표로 최종 레이블 결정
    """
    labels = []
    for row in L:
        votes = row[row != abstain]
        if len(votes) == 0:
            labels.append(abstain)
        else:
            labels.append(int(np.round(np.mean(votes))))
    return np.array(labels)


# 사용 예시
lfs = [
    LabelingFunction("free", lf_contains_free),
    LabelingFunction("short", lf_short_text),
    LabelingFunction("click", lf_contains_click),
    LabelingFunction("urls", lf_has_url)
]

texts = [
    "FREE! Click here to win!",
    "Hi, how are you?",
    "Check out these deals",
]

# Label Matrix 생성
L = apply_lfs(texts, lfs)
print("Label Matrix:\n", L)

# 최종 레이블
final_labels = majority_vote(L)
print("Final labels:", final_labels)

# 커버리지 분석
coverage = np.mean(np.any(L != ABSTAIN, axis=1))
print(f"Coverage: {coverage:.1%}")

4. Intelligent Data Augmentation

ConBias (NeurIPS 2024): Concept graph 기반 편향 탐지 및 타겟 증강.

from collections import defaultdict
import numpy as np
from typing import Dict, List, Set, Tuple

def analyze_concept_cooccurrence(
    labels: np.ndarray, 
    concepts: List[Set[str]]
) -> Dict[Tuple[int, str], float]:
    """
    클래스별 개념 공존 패턴 분석

    Args:
        labels: 클래스 레이블
        concepts: 각 샘플의 개념 집합 리스트

    Returns:
        (class, concept) -> bias_score 딕셔너리
        양수 = 과대표현, 음수 = 과소표현
    """
    cooccurrence = defaultdict(lambda: defaultdict(int))
    class_counts = defaultdict(int)

    for label, concept_set in zip(labels, concepts):
        class_counts[label] += 1
        for concept in concept_set:
            cooccurrence[label][concept] += 1

    # 전체 개념 집합
    all_concepts = set()
    for cs in concepts:
        all_concepts.update(cs)

    # Expected vs Actual 빈도 계산
    bias_scores = {}
    for label in class_counts:
        for concept in all_concepts:
            actual = cooccurrence[label][concept] / class_counts[label]
            expected = sum(
                cooccurrence[l][concept] for l in class_counts
            ) / len(labels)
            bias_scores[(label, concept)] = actual - expected

    return bias_scores


def targeted_augmentation(
    bias_scores: Dict[Tuple[int, str], float], 
    threshold: float = -0.1
) -> List[Dict]:
    """
    과소표현된 클래스-개념 조합 식별

    Returns:
        증강이 필요한 조합 리스트 (deficit 순 정렬)
    """
    to_augment = []
    for (label, concept), score in bias_scores.items():
        if score < threshold:
            to_augment.append({
                'class': label,
                'concept': concept,
                'deficit': abs(score)
            })
    return sorted(to_augment, key=lambda x: x['deficit'], reverse=True)


# 사용 예시
labels = np.array([0, 0, 0, 1, 1, 1])  # 이진 분류
concepts = [
    {'sky', 'grass'},      # class 0 샘플
    {'sky', 'tree'},       # class 0 샘플
    {'sky', 'grass'},      # class 0 샘플
    {'indoor', 'table'},   # class 1 샘플
    {'indoor', 'chair'},   # class 1 샘플
    {'indoor', 'window'},  # class 1 샘플
]

bias_scores = analyze_concept_cooccurrence(labels, concepts)

# 편향 확인: class 0은 sky와 강하게 연관
# 실제 환경에서는 class 0 + indoor 조합도 필요할 수 있음
augmentation_targets = targeted_augmentation(bias_scores)
for target in augmentation_targets[:5]:
    print(f"Class {target['class']} + {target['concept']}: "
          f"deficit = {target['deficit']:.2f}")

5. Dataset Distillation

대규모 데이터셋을 작은 합성 데이터셋으로 압축한다.

import torch
import torch.nn as nn
import torch.optim as optim
from typing import Callable, Tuple

def dataset_distillation(
    real_loader: torch.utils.data.DataLoader,
    model_fn: Callable[[], nn.Module],
    n_synthetic_per_class: int = 10,
    n_classes: int = 10,
    img_shape: Tuple[int, int, int] = (3, 32, 32),
    lr: float = 0.1,
    epochs: int = 1000,
    device: str = 'cuda'
) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    Gradient Matching 기반 Dataset Distillation

    대규모 데이터셋의 "본질"을 소규모 합성 데이터셋으로 압축

    Reference: Zhao et al., "Dataset Condensation" ICLR 2021

    Args:
        real_loader: 실제 데이터 로더
        model_fn: 모델 생성 함수
        n_synthetic_per_class: 클래스당 합성 샘플 수
        n_classes: 클래스 수
        img_shape: 이미지 형태 (C, H, W)
        lr: 학습률
        epochs: 학습 에폭 수
        device: 디바이스

    Returns:
        (synthetic_images, synthetic_labels)
    """
    # 합성 데이터셋 초기화 (학습 가능)
    n_synthetic = n_synthetic_per_class * n_classes
    syn_images = torch.randn(
        n_synthetic, *img_shape, 
        device=device, 
        requires_grad=True
    )
    syn_labels = torch.arange(n_classes, device=device).repeat(
        n_synthetic_per_class
    )

    optimizer = optim.SGD([syn_images], lr=lr, momentum=0.5)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        # 새 모델로 그래디언트 계산 (모델은 매번 초기화)
        model = model_fn().to(device)
        model.train()

        # Real data gradient
        real_images, real_labels = next(iter(real_loader))
        real_images = real_images.to(device)
        real_labels = real_labels.to(device)

        real_loss = criterion(model(real_images), real_labels)
        real_grads = torch.autograd.grad(
            real_loss, 
            model.parameters(),
            create_graph=False
        )

        # Synthetic data gradient
        syn_loss = criterion(model(syn_images), syn_labels)
        syn_grads = torch.autograd.grad(
            syn_loss, 
            model.parameters(),
            create_graph=True
        )

        # Gradient matching loss
        match_loss = sum(
            (rg - sg).pow(2).sum() 
            for rg, sg in zip(real_grads, syn_grads)
        )

        optimizer.zero_grad()
        match_loss.backward()
        optimizer.step()

        # 이미지 값 클리핑
        with torch.no_grad():
            syn_images.clamp_(-1, 1)

        if epoch % 100 == 0:
            print(f"Epoch {epoch}, Match Loss: {match_loss.item():.4f}")

    return syn_images.detach(), syn_labels


# 사용 예시
# distilled_images, distilled_labels = dataset_distillation(
#     real_loader=train_loader,
#     model_fn=lambda: SimpleCNN(),
#     n_synthetic_per_class=10,  # 클래스당 10개 = 100개 (10 classes)
#     epochs=1000
# )
# 
# # 원본 50,000개 → 100개로 압축
# # 압축된 데이터로 학습해도 유사한 성능!

Synthetic Data: Promise vs Reality

NeurIPS 2024 연구 결과 (Voxel51 분석)에 따른 합성 데이터의 한계:

Finding Implication
Real > Synthetic 생성 모델의 학습 데이터에서 검색한 실제 이미지가 합성 이미지보다 우수
Generator Artifacts 합성 이미지의 아티팩트가 모델 성능 저하 유발
Semantic Inaccuracy 세부 시각적 디테일에서 합성 데이터의 부정확성

권장사항:

합성 데이터 평가 시 항상 생성 모델의 학습 셋에서 검색한 실제 이미지와 비교할 것.


Tools & Frameworks

Tool Purpose Link
Snorkel Programmatic labeling snorkel.org
Cleanlab Label error detection cleanlab.ai
Great Expectations Data validation greatexpectations.io
DVC Data version control dvc.org
Label Studio Annotation platform labelstud.io
FiftyOne Data curation & analysis voxel51.com

Practical Workflow

Data Quality Checklist

1. [ ] Data Profiling: 분포, 결측치, 이상치 파악
2. [ ] Duplicate Detection: 중복/유사 샘플 제거
3. [ ] Label Audit: 레이블 오류 탐지 및 수정
4. [ ] Class Balance: 클래스 불균형 분석
5. [ ] Concept Bias: 개념 공존 편향 분석
6. [ ] Distribution Shift: Train/Val/Test 분포 차이 확인
7. [ ] Data Versioning: 변경 이력 관리

End-to-End Pipeline 구현

import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import NearestNeighbors
from typing import Optional, List, Tuple, Dict

class DataCentricPipeline:
    """
    End-to-end Data-Centric AI 파이프라인

    데이터 품질 이슈 탐지 및 정제를 자동화
    """

    def __init__(
        self, 
        embeddings: np.ndarray, 
        labels: np.ndarray,
        metadata: Optional[pd.DataFrame] = None
    ):
        """
        Args:
            embeddings: (n_samples, embedding_dim) 데이터 임베딩
            labels: (n_samples,) 레이블
            metadata: 추가 메타데이터 (선택)
        """
        self.embeddings = embeddings
        self.labels = labels
        self.metadata = metadata
        self.issues: Dict[str, List] = {}
        self.data_values: Optional[np.ndarray] = None

    def detect_outliers(
        self, 
        contamination: float = 0.05
    ) -> np.ndarray:
        """
        Isolation Forest로 이상치 탐지

        Args:
            contamination: 예상 이상치 비율

        Returns:
            이상치 인덱스 배열
        """
        iso = IsolationForest(
            contamination=contamination, 
            random_state=42
        )
        outlier_pred = iso.fit_predict(self.embeddings)
        self.issues['outliers'] = np.where(outlier_pred == -1)[0].tolist()
        return np.array(self.issues['outliers'])

    def detect_duplicates(
        self, 
        threshold: float = 0.99
    ) -> List[Tuple[int, int, float]]:
        """
        코사인 유사도 기반 중복 탐지

        Args:
            threshold: 유사도 임계값 (이상이면 중복)

        Returns:
            (idx1, idx2, similarity) 튜플 리스트
        """
        nn = NearestNeighbors(n_neighbors=2, metric='cosine')
        nn.fit(self.embeddings)
        distances, indices = nn.kneighbors(self.embeddings)

        duplicates = []
        seen = set()
        for i, (dist, idx) in enumerate(zip(distances[:, 1], indices[:, 1])):
            similarity = 1 - dist
            if similarity > threshold and i < idx and (i, idx) not in seen:
                duplicates.append((i, idx, similarity))
                seen.add((i, idx))

        self.issues['duplicates'] = duplicates
        return duplicates

    def detect_label_errors(
        self, 
        k: int = 10, 
        threshold: float = 0.3
    ) -> List[Tuple[int, float]]:
        """
        KNN 일관성 기반 레이블 오류 탐지

        Args:
            k: 이웃 수
            threshold: 일관성 임계값 (미만이면 오류 의심)

        Returns:
            (index, consistency) 튜플 리스트
        """
        nn = NearestNeighbors(n_neighbors=k+1)
        nn.fit(self.embeddings)
        _, indices = nn.kneighbors(self.embeddings)

        errors = []
        for i in range(len(self.labels)):
            neighbor_labels = self.labels[indices[i, 1:]]
            consistency = np.mean(neighbor_labels == self.labels[i])
            if consistency < threshold:
                errors.append((i, consistency))

        self.issues['label_errors'] = errors
        return errors

    def compute_data_value(
        self, 
        X_val: np.ndarray, 
        y_val: np.ndarray,
        k: int = 10
    ) -> np.ndarray:
        """
        KNN-Shapley로 데이터 가치 계산
        """
        self.data_values = knn_shapley(
            self.embeddings, self.labels, X_val, y_val, k
        )
        return self.data_values

    def get_clean_indices(self) -> np.ndarray:
        """
        이슈가 없는 샘플 인덱스 반환
        """
        problem_indices = set(self.issues.get('outliers', []))
        problem_indices.update([d[0] for d in self.issues.get('duplicates', [])])
        problem_indices.update([e[0] for e in self.issues.get('label_errors', [])])

        clean = [
            i for i in range(len(self.labels)) 
            if i not in problem_indices
        ]
        return np.array(clean)

    def report(self) -> Dict:
        """품질 리포트 생성"""
        n_total = len(self.labels)
        n_clean = len(self.get_clean_indices())

        report = {
            'total_samples': n_total,
            'outliers': len(self.issues.get('outliers', [])),
            'duplicates': len(self.issues.get('duplicates', [])),
            'label_errors': len(self.issues.get('label_errors', [])),
            'clean_samples': n_clean,
            'clean_ratio': n_clean / n_total
        }

        print("=== Data Quality Report ===")
        for key, value in report.items():
            if key == 'clean_ratio':
                print(f"{key}: {value:.2%}")
            else:
                print(f"{key}: {value}")

        return report


# 사용 예시
# pipeline = DataCentricPipeline(embeddings, labels)
# pipeline.detect_outliers()
# pipeline.detect_duplicates()
# pipeline.detect_label_errors()
# report = pipeline.report()
# clean_idx = pipeline.get_clean_indices()
# 
# # 정제된 데이터로 학습
# X_clean = X_train[clean_idx]
# y_clean = y_train[clean_idx]

Key Takeaways

인사이트 설명
Paradigm Shift 모델 개선보다 데이터 개선이 실제 성능 향상에 더 효과적
Systematic Approach 데이터 수집 → 레이블링 → 정제 → 증강의 체계적 파이프라인
Quality over Quantity 더 많은 데이터보다 더 좋은 데이터가 중요
Synthetic Caution 합성 데이터는 만능이 아님; 실제 데이터 검색이 더 효과적일 수 있음
Automation Data-centric 작업의 자동화가 핵심 연구 방향

참고 자료

자료 링크 설명
Zha et al. (2023) arXiv:2303.10158 Data-centric AI Survey
Jarrahi et al. (2024) BISE Journal Data-Centric AI 개념
Landing AI Blog landing.ai Andrew Ng의 Data-Centric 관점
Sambasivan et al. (2021) CHI 2021 "Everyone wants to do the model work, not the data work"

Last updated: 2025-01