콘텐츠로 이동
Data Prep
상세

Anomaly Detection (이상탐지 분석)

개요

이상탐지는 정상 패턴에서 벗어난 데이터 포인트를 식별하는 분석 기법이다. 사기 탐지, 시스템 모니터링, 품질 관리 등 다양한 비즈니스 영역에서 활용된다.

이상치 유형

1. 포인트 이상치 (Point Anomaly)

개별 데이터 포인트가 전체 분포에서 벗어남

예시: 평균 $50 거래에서 $50,000 거래

2. 컨텍스트 이상치 (Contextual Anomaly)

특정 맥락에서만 비정상

예시: 크리스마스에 에어컨 판매 급증
      (여름에는 정상, 겨울에는 이상)

3. 집단 이상치 (Collective Anomaly)

개별로는 정상이나 집단으로 비정상

예시: 동시에 여러 계정에서 소액 인출
      (개별 $10는 정상, 100건 동시 발생은 이상)

비지도 학습 기법

Isolation Forest

원리:
- 랜덤 피처, 랜덤 스플릿으로 트리 생성
- 이상치는 더 적은 분할로 고립됨
- 평균 경로 길이가 짧을수록 이상치

┌─────────────────────┐
│     Root            │
│    /    \           │
│   ▼      ▼          │
│  이상치   정상       │  ← 이상치: 경로 짧음
│          /  \       │
│         ▼    ▼      │
│        정상  정상    │  ← 정상: 경로 김
└─────────────────────┘
from sklearn.ensemble import IsolationForest

model = IsolationForest(
    n_estimators=100,
    contamination=0.01,  # 예상 이상치 비율
    random_state=42
)

# 학습 및 예측 (-1: 이상치, 1: 정상)
predictions = model.fit_predict(data)
anomaly_scores = model.score_samples(data)  # 낮을수록 이상

장점: 고차원 데이터에 효과적, 빠른 학습 적합: 대용량 데이터, 다양한 피처

LOF (Local Outlier Factor)

원리:
- k-최근접 이웃의 밀도 대비 해당 포인트 밀도 비교
- LOF > 1: 주변보다 밀도 낮음 → 이상치

    ●●●●    ← 고밀도 영역
   ●●●●●
     ○      ← LOF 높음 (이상치)

        ●   ← LOF 낮음 (정상)
        ●●
from sklearn.neighbors import LocalOutlierFactor

model = LocalOutlierFactor(
    n_neighbors=20,
    contamination=0.01
)

predictions = model.fit_predict(data)
lof_scores = model.negative_outlier_factor_  # 낮을수록 이상

장점: 지역 밀도 고려, 클러스터 구조 반영 적합: 밀도 불균등 데이터

One-Class SVM

원리:
- 정상 데이터를 고차원 공간에 매핑
- 원점에서 최대한 멀리 분리하는 초평면 학습
from sklearn.svm import OneClassSVM

model = OneClassSVM(
    kernel='rbf',
    nu=0.01,  # 이상치 비율 상한
    gamma='auto'
)

model.fit(normal_data)
predictions = model.predict(test_data)

장점: 비선형 경계 학습 가능 제한: 대용량 데이터에 느림

DBSCAN 기반

from sklearn.cluster import DBSCAN

model = DBSCAN(eps=0.5, min_samples=5)
clusters = model.fit_predict(data)

# 클러스터 -1 = 노이즈 (이상치)
anomalies = data[clusters == -1]

딥러닝 기법

Autoencoder

원리:
- 정상 데이터로 Encoder-Decoder 학습
- 이상 데이터는 재구성 오차 큼

Input → [Encoder] → Latent → [Decoder] → Output
         압축         복원

Reconstruction Error = ||Input - Output||
이상치: 높은 재구성 오차
import torch
import torch.nn as nn

class Autoencoder(nn.Module):
    def __init__(self, input_dim, latent_dim=32):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, latent_dim)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Linear(64, input_dim)
        )

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed

# 이상 점수 = 재구성 오차
def anomaly_score(model, data):
    with torch.no_grad():
        reconstructed = model(data)
        mse = ((data - reconstructed) ** 2).mean(dim=1)
    return mse

VAE (Variational Autoencoder)

class VAE(nn.Module):
    def __init__(self, input_dim, latent_dim=32):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU()
        )
        self.fc_mu = nn.Linear(64, latent_dim)
        self.fc_var = nn.Linear(64, latent_dim)
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Linear(64, input_dim)
        )

    def encode(self, x):
        h = self.encoder(x)
        return self.fc_mu(h), self.fc_var(h)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decoder(z), mu, logvar

# 이상 점수 = 재구성 오차 + KL divergence

시계열 이상탐지

통계적 방법

import pandas as pd
import numpy as np

def statistical_anomaly(series, window=20, n_std=3):
    """이동 평균/표준편차 기반 이상탐지"""
    rolling_mean = series.rolling(window=window).mean()
    rolling_std = series.rolling(window=window).std()

    upper_bound = rolling_mean + n_std * rolling_std
    lower_bound = rolling_mean - n_std * rolling_std

    anomalies = (series > upper_bound) | (series < lower_bound)
    return anomalies

ARIMA 잔차 기반

from statsmodels.tsa.arima.model import ARIMA

# 모델 학습
model = ARIMA(train_series, order=(1, 1, 1))
fitted = model.fit()

# 잔차 기반 이상탐지
residuals = fitted.resid
threshold = 3 * residuals.std()
anomalies = np.abs(residuals) > threshold

Prophet 활용

from prophet import Prophet

# 학습
model = Prophet(interval_width=0.99)
model.fit(df[['ds', 'y']])

# 예측 및 이상탐지
forecast = model.predict(df)
df['anomaly'] = (df['y'] < forecast['yhat_lower']) | \
                (df['y'] > forecast['yhat_upper'])

LSTM Autoencoder

class LSTMAutoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, seq_len):
        super().__init__()
        self.encoder = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.decoder = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.output = nn.Linear(hidden_dim, input_dim)
        self.seq_len = seq_len

    def forward(self, x):
        # Encode
        _, (hidden, cell) = self.encoder(x)

        # Decode
        decoder_input = hidden.repeat(self.seq_len, 1, 1).permute(1, 0, 2)
        output, _ = self.decoder(decoder_input, (hidden, cell))

        return self.output(output)

비즈니스 적용

1. 금융 사기 탐지

# 거래 데이터 피처
features = [
    'amount',           # 거래 금액
    'hour_of_day',      # 시간대
    'merchant_category',# 가맹점 유형
    'distance_from_home',# 집과의 거리
    'time_since_last',  # 이전 거래 후 경과 시간
    'amount_ratio_to_avg' # 평균 대비 금액 비율
]

# Isolation Forest + Rule 결합
model = IsolationForest(contamination=0.001)
model.fit(normal_transactions[features])

def detect_fraud(transaction):
    score = model.score_samples([transaction[features]])[0]
    rules = check_business_rules(transaction)

    if score < threshold or rules['suspicious']:
        return 'REVIEW'
    return 'PASS'

2. 시스템 모니터링

# 서버 메트릭
metrics = {
    'cpu_usage': [],
    'memory_usage': [],
    'disk_io': [],
    'network_latency': [],
    'error_rate': []
}

# 다변량 이상탐지
from pyod.models.copod import COPOD

model = COPOD()
model.fit(historical_metrics)

# 실시간 탐지
def monitor(current_metrics):
    score = model.decision_function([current_metrics])[0]
    if score > alert_threshold:
        send_alert(score, current_metrics)

3. 제조 품질 관리

# 센서 데이터 기반 불량 탐지
sensor_features = ['temperature', 'pressure', 'vibration', 'speed']

# VAE로 정상 패턴 학습
vae = VAE(input_dim=len(sensor_features))
train_vae(vae, normal_samples)

# 생산 중 모니터링
def quality_check(sensor_reading):
    recon_error = anomaly_score(vae, sensor_reading)
    if recon_error > threshold:
        return {'status': 'DEFECT_RISK', 'score': recon_error}
    return {'status': 'NORMAL', 'score': recon_error}

기법 선택 가이드

상황 추천 기법
고차원 tabular Isolation Forest
지역 밀도 중요 LOF
시계열 (단변량) 통계적 방법, Prophet
시계열 (다변량) LSTM Autoencoder
이미지/복잡 패턴 VAE, Autoencoder
대용량 실시간 Isolation Forest, COPOD
레이블 일부 있음 Semi-supervised (PU Learning)

임계값 설정

분위수 기반

# 상위 1%를 이상치로
threshold = np.percentile(anomaly_scores, 99)

F1 최적화 (레이블 있을 때)

from sklearn.metrics import f1_score

best_threshold = None
best_f1 = 0

for threshold in np.linspace(min_score, max_score, 100):
    preds = (scores > threshold).astype(int)
    f1 = f1_score(labels, preds)
    if f1 > best_f1:
        best_f1 = f1
        best_threshold = threshold

비즈니스 비용 기반

# FP 비용: $10 (정상인데 조사)
# FN 비용: $1000 (사기 놓침)
def total_cost(threshold, scores, labels):
    preds = scores > threshold
    fp = ((preds == 1) & (labels == 0)).sum()
    fn = ((preds == 0) & (labels == 1)).sum()
    return fp * 10 + fn * 1000

# 비용 최소화 임계값 찾기
optimal_threshold = minimize(total_cost, ...)

참고 라이브러리

라이브러리 특징
scikit-learn Isolation Forest, LOF, One-Class SVM
PyOD 40+ 이상탐지 알고리즘
Alibi Detect 드리프트 탐지 포함
PyCaret AutoML 이상탐지
ADTK 시계열 이상탐지 특화

참고 자료

  • "Isolation Forest" (Liu et al., 2008)
  • "LOF: Identifying Density-Based Local Outliers" (Breunig et al., 2000)
  • "Deep Learning for Anomaly Detection: A Survey" (2021)
  • PyOD Documentation

최종 업데이트: 2026-02-18