Anomaly Detection (이상탐지 분석)¶
개요¶
이상탐지는 정상 패턴에서 벗어난 데이터 포인트를 식별하는 분석 기법이다. 사기 탐지, 시스템 모니터링, 품질 관리 등 다양한 비즈니스 영역에서 활용된다.
이상치 유형¶
1. 포인트 이상치 (Point Anomaly)¶
2. 컨텍스트 이상치 (Contextual Anomaly)¶
3. 집단 이상치 (Collective Anomaly)¶
비지도 학습 기법¶
Isolation Forest¶
원리:
- 랜덤 피처, 랜덤 스플릿으로 트리 생성
- 이상치는 더 적은 분할로 고립됨
- 평균 경로 길이가 짧을수록 이상치
┌─────────────────────┐
│ Root │
│ / \ │
│ ▼ ▼ │
│ 이상치 정상 │ ← 이상치: 경로 짧음
│ / \ │
│ ▼ ▼ │
│ 정상 정상 │ ← 정상: 경로 김
└─────────────────────┘
from sklearn.ensemble import IsolationForest
model = IsolationForest(
n_estimators=100,
contamination=0.01, # 예상 이상치 비율
random_state=42
)
# 학습 및 예측 (-1: 이상치, 1: 정상)
predictions = model.fit_predict(data)
anomaly_scores = model.score_samples(data) # 낮을수록 이상
장점: 고차원 데이터에 효과적, 빠른 학습 적합: 대용량 데이터, 다양한 피처
LOF (Local Outlier Factor)¶
원리:
- k-최근접 이웃의 밀도 대비 해당 포인트 밀도 비교
- LOF > 1: 주변보다 밀도 낮음 → 이상치
●●●● ← 고밀도 영역
●●●●●
○ ← LOF 높음 (이상치)
● ← LOF 낮음 (정상)
●●
from sklearn.neighbors import LocalOutlierFactor
model = LocalOutlierFactor(
n_neighbors=20,
contamination=0.01
)
predictions = model.fit_predict(data)
lof_scores = model.negative_outlier_factor_ # 낮을수록 이상
장점: 지역 밀도 고려, 클러스터 구조 반영 적합: 밀도 불균등 데이터
One-Class SVM¶
from sklearn.svm import OneClassSVM
model = OneClassSVM(
kernel='rbf',
nu=0.01, # 이상치 비율 상한
gamma='auto'
)
model.fit(normal_data)
predictions = model.predict(test_data)
장점: 비선형 경계 학습 가능 제한: 대용량 데이터에 느림
DBSCAN 기반¶
from sklearn.cluster import DBSCAN
model = DBSCAN(eps=0.5, min_samples=5)
clusters = model.fit_predict(data)
# 클러스터 -1 = 노이즈 (이상치)
anomalies = data[clusters == -1]
딥러닝 기법¶
Autoencoder¶
원리:
- 정상 데이터로 Encoder-Decoder 학습
- 이상 데이터는 재구성 오차 큼
Input → [Encoder] → Latent → [Decoder] → Output
압축 복원
Reconstruction Error = ||Input - Output||
이상치: 높은 재구성 오차
import torch
import torch.nn as nn
class Autoencoder(nn.Module):
def __init__(self, input_dim, latent_dim=32):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, latent_dim)
)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 64),
nn.ReLU(),
nn.Linear(64, input_dim)
)
def forward(self, x):
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed
# 이상 점수 = 재구성 오차
def anomaly_score(model, data):
with torch.no_grad():
reconstructed = model(data)
mse = ((data - reconstructed) ** 2).mean(dim=1)
return mse
VAE (Variational Autoencoder)¶
class VAE(nn.Module):
def __init__(self, input_dim, latent_dim=32):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU()
)
self.fc_mu = nn.Linear(64, latent_dim)
self.fc_var = nn.Linear(64, latent_dim)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 64),
nn.ReLU(),
nn.Linear(64, input_dim)
)
def encode(self, x):
h = self.encoder(x)
return self.fc_mu(h), self.fc_var(h)
def reparameterize(self, mu, logvar):
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def forward(self, x):
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
return self.decoder(z), mu, logvar
# 이상 점수 = 재구성 오차 + KL divergence
시계열 이상탐지¶
통계적 방법¶
import pandas as pd
import numpy as np
def statistical_anomaly(series, window=20, n_std=3):
"""이동 평균/표준편차 기반 이상탐지"""
rolling_mean = series.rolling(window=window).mean()
rolling_std = series.rolling(window=window).std()
upper_bound = rolling_mean + n_std * rolling_std
lower_bound = rolling_mean - n_std * rolling_std
anomalies = (series > upper_bound) | (series < lower_bound)
return anomalies
ARIMA 잔차 기반¶
from statsmodels.tsa.arima.model import ARIMA
# 모델 학습
model = ARIMA(train_series, order=(1, 1, 1))
fitted = model.fit()
# 잔차 기반 이상탐지
residuals = fitted.resid
threshold = 3 * residuals.std()
anomalies = np.abs(residuals) > threshold
Prophet 활용¶
from prophet import Prophet
# 학습
model = Prophet(interval_width=0.99)
model.fit(df[['ds', 'y']])
# 예측 및 이상탐지
forecast = model.predict(df)
df['anomaly'] = (df['y'] < forecast['yhat_lower']) | \
(df['y'] > forecast['yhat_upper'])
LSTM Autoencoder¶
class LSTMAutoencoder(nn.Module):
def __init__(self, input_dim, hidden_dim, seq_len):
super().__init__()
self.encoder = nn.LSTM(input_dim, hidden_dim, batch_first=True)
self.decoder = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
self.output = nn.Linear(hidden_dim, input_dim)
self.seq_len = seq_len
def forward(self, x):
# Encode
_, (hidden, cell) = self.encoder(x)
# Decode
decoder_input = hidden.repeat(self.seq_len, 1, 1).permute(1, 0, 2)
output, _ = self.decoder(decoder_input, (hidden, cell))
return self.output(output)
비즈니스 적용¶
1. 금융 사기 탐지¶
# 거래 데이터 피처
features = [
'amount', # 거래 금액
'hour_of_day', # 시간대
'merchant_category',# 가맹점 유형
'distance_from_home',# 집과의 거리
'time_since_last', # 이전 거래 후 경과 시간
'amount_ratio_to_avg' # 평균 대비 금액 비율
]
# Isolation Forest + Rule 결합
model = IsolationForest(contamination=0.001)
model.fit(normal_transactions[features])
def detect_fraud(transaction):
score = model.score_samples([transaction[features]])[0]
rules = check_business_rules(transaction)
if score < threshold or rules['suspicious']:
return 'REVIEW'
return 'PASS'
2. 시스템 모니터링¶
# 서버 메트릭
metrics = {
'cpu_usage': [],
'memory_usage': [],
'disk_io': [],
'network_latency': [],
'error_rate': []
}
# 다변량 이상탐지
from pyod.models.copod import COPOD
model = COPOD()
model.fit(historical_metrics)
# 실시간 탐지
def monitor(current_metrics):
score = model.decision_function([current_metrics])[0]
if score > alert_threshold:
send_alert(score, current_metrics)
3. 제조 품질 관리¶
# 센서 데이터 기반 불량 탐지
sensor_features = ['temperature', 'pressure', 'vibration', 'speed']
# VAE로 정상 패턴 학습
vae = VAE(input_dim=len(sensor_features))
train_vae(vae, normal_samples)
# 생산 중 모니터링
def quality_check(sensor_reading):
recon_error = anomaly_score(vae, sensor_reading)
if recon_error > threshold:
return {'status': 'DEFECT_RISK', 'score': recon_error}
return {'status': 'NORMAL', 'score': recon_error}
기법 선택 가이드¶
| 상황 | 추천 기법 |
|---|---|
| 고차원 tabular | Isolation Forest |
| 지역 밀도 중요 | LOF |
| 시계열 (단변량) | 통계적 방법, Prophet |
| 시계열 (다변량) | LSTM Autoencoder |
| 이미지/복잡 패턴 | VAE, Autoencoder |
| 대용량 실시간 | Isolation Forest, COPOD |
| 레이블 일부 있음 | Semi-supervised (PU Learning) |
임계값 설정¶
분위수 기반¶
F1 최적화 (레이블 있을 때)¶
from sklearn.metrics import f1_score
best_threshold = None
best_f1 = 0
for threshold in np.linspace(min_score, max_score, 100):
preds = (scores > threshold).astype(int)
f1 = f1_score(labels, preds)
if f1 > best_f1:
best_f1 = f1
best_threshold = threshold
비즈니스 비용 기반¶
# FP 비용: $10 (정상인데 조사)
# FN 비용: $1000 (사기 놓침)
def total_cost(threshold, scores, labels):
preds = scores > threshold
fp = ((preds == 1) & (labels == 0)).sum()
fn = ((preds == 0) & (labels == 1)).sum()
return fp * 10 + fn * 1000
# 비용 최소화 임계값 찾기
optimal_threshold = minimize(total_cost, ...)
참고 라이브러리¶
| 라이브러리 | 특징 |
|---|---|
| scikit-learn | Isolation Forest, LOF, One-Class SVM |
| PyOD | 40+ 이상탐지 알고리즘 |
| Alibi Detect | 드리프트 탐지 포함 |
| PyCaret | AutoML 이상탐지 |
| ADTK | 시계열 이상탐지 특화 |
참고 자료¶
- "Isolation Forest" (Liu et al., 2008)
- "LOF: Identifying Density-Based Local Outliers" (Breunig et al., 2000)
- "Deep Learning for Anomaly Detection: A Survey" (2021)
- PyOD Documentation
최종 업데이트: 2026-02-18