구매 성향 예측 (Purchase Propensity Modeling)¶
개요¶
구매 성향 예측은 고객이 특정 행동(구매, 전환, 업그레이드)을 할 확률을 예측하는 분석 방법론이다. 마케팅 타겟팅, 개인화, 리소스 최적화에 핵심적으로 활용된다.
분석 프레임워크¶
모델링 파이프라인¶
┌─────────────────────────────────────────────────────────────────┐
│ 데이터 수집 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 행동 로그 │ │ 거래 이력 │ │ 고객 속성 │ │ 외부 데이터│ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 피처 엔지니어링 │
│ RFM, 세션 패턴, 이탈 신호, 제품 관심도, 시간 기반 피처 │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 모델 학습 │
│ Logistic Regression, XGBoost, LightGBM, Neural Network │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 스코어링 및 활용 │
│ 세그먼트 분류, 캠페인 타겟팅, 개인화 추천 │
└─────────────────────────────────────────────────────────────────┘
피처 엔지니어링¶
1. RFM 기반 피처¶
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def create_rfm_features(
transactions: pd.DataFrame,
customer_id: str = 'customer_id',
date_col: str = 'order_date',
amount_col: str = 'amount',
reference_date: datetime = None
) -> pd.DataFrame:
"""RFM 피처 생성"""
if reference_date is None:
reference_date = transactions[date_col].max() + timedelta(days=1)
rfm = transactions.groupby(customer_id).agg({
date_col: lambda x: (reference_date - x.max()).days, # Recency
amount_col: ['count', 'sum', 'mean', 'std'] # Frequency, Monetary
})
rfm.columns = [
'recency_days',
'frequency_count',
'monetary_total',
'monetary_mean',
'monetary_std'
]
# 추가 파생 피처
rfm['monetary_std'] = rfm['monetary_std'].fillna(0)
rfm['avg_days_between_orders'] = rfm['recency_days'] / (rfm['frequency_count'] + 1)
rfm['monetary_cv'] = rfm['monetary_std'] / (rfm['monetary_mean'] + 1) # 변동계수
return rfm
2. 행동 기반 피처¶
def create_behavioral_features(
events: pd.DataFrame,
customer_id: str = 'customer_id',
event_type: str = 'event_type',
timestamp: str = 'timestamp',
lookback_days: int = 30
) -> pd.DataFrame:
"""행동 로그 기반 피처"""
cutoff = events[timestamp].max() - timedelta(days=lookback_days)
recent = events[events[timestamp] >= cutoff]
features = recent.groupby(customer_id).agg({
timestamp: ['count', 'nunique'], # 총 이벤트, 활성 일수
event_type: lambda x: x.value_counts().to_dict()
})
features.columns = ['total_events', 'active_days', 'event_breakdown']
# 이벤트 타입별 카운트 언팩
event_types = ['page_view', 'add_to_cart', 'search', 'product_view', 'wishlist']
for et in event_types:
features[f'count_{et}'] = features['event_breakdown'].apply(
lambda x: x.get(et, 0) if isinstance(x, dict) else 0
)
# 전환 퍼널 피처
features['cart_to_view_ratio'] = (
features['count_add_to_cart'] / (features['count_product_view'] + 1)
)
features['search_intensity'] = features['count_search'] / (features['active_days'] + 1)
return features.drop(columns=['event_breakdown'])
3. 시간 기반 피처¶
def create_temporal_features(
events: pd.DataFrame,
customer_id: str = 'customer_id',
timestamp: str = 'timestamp'
) -> pd.DataFrame:
"""시간 패턴 피처"""
events['hour'] = pd.to_datetime(events[timestamp]).dt.hour
events['dayofweek'] = pd.to_datetime(events[timestamp]).dt.dayofweek
events['is_weekend'] = events['dayofweek'].isin([5, 6]).astype(int)
features = events.groupby(customer_id).agg({
'hour': ['mean', 'std'],
'is_weekend': 'mean',
'dayofweek': lambda x: x.mode().iloc[0] if len(x) > 0 else -1
})
features.columns = [
'avg_hour',
'hour_std',
'weekend_ratio',
'preferred_day'
]
# 시간대 분류
features['is_evening_user'] = (features['avg_hour'] >= 18).astype(int)
features['is_morning_user'] = (features['avg_hour'] <= 10).astype(int)
return features
4. 제품/카테고리 피처¶
def create_product_features(
views: pd.DataFrame,
purchases: pd.DataFrame,
customer_id: str = 'customer_id',
category: str = 'category',
price: str = 'price'
) -> pd.DataFrame:
"""제품 관심도 피처"""
# 조회 기반
view_features = views.groupby(customer_id).agg({
category: 'nunique', # 관심 카테고리 다양성
price: ['mean', 'max', 'min'] # 가격대 관심
})
view_features.columns = [
'category_diversity',
'avg_viewed_price',
'max_viewed_price',
'min_viewed_price'
]
# 구매 기반
if len(purchases) > 0:
purchase_features = purchases.groupby(customer_id).agg({
category: lambda x: x.mode().iloc[0] if len(x) > 0 else 'unknown',
price: ['mean', 'sum']
})
purchase_features.columns = [
'preferred_category',
'avg_purchase_price',
'total_purchase_amount'
]
# 조인
features = view_features.join(purchase_features, how='left')
else:
features = view_features
# 가격 민감도 지표
features['price_range_interest'] = (
features['max_viewed_price'] - features['min_viewed_price']
)
return features
모델 학습¶
1. 데이터 준비¶
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE
def prepare_data(
features: pd.DataFrame,
target: pd.Series,
test_size: float = 0.2,
handle_imbalance: bool = True
) -> tuple:
"""학습 데이터 준비"""
# 결측치 처리
features = features.fillna(0)
# 분할
X_train, X_test, y_train, y_test = train_test_split(
features, target,
test_size=test_size,
stratify=target,
random_state=42
)
# 스케일링
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 클래스 불균형 처리
if handle_imbalance:
smote = SMOTE(random_state=42)
X_train_scaled, y_train = smote.fit_resample(X_train_scaled, y_train)
return X_train_scaled, X_test_scaled, y_train, y_test, scaler
2. 모델 비교¶
| 모델 | 장점 | 단점 | 적합 상황 |
|---|---|---|---|
| Logistic Regression | 해석 가능, 빠름 | 비선형 패턴 약함 | 베이스라인, 해석 필요 |
| XGBoost | 고성능, 피처 중요도 | 튜닝 필요 | 정확도 중시 |
| LightGBM | 빠름, 대용량 처리 | 과적합 위험 | 대규모 데이터 |
| CatBoost | 범주형 처리 우수 | 느림 | 많은 범주형 피처 |
| Neural Network | 복잡한 패턴 | 해석 어려움 | 대규모, 복잡 관계 |
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
import lightgbm as lgb
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc
def train_propensity_models(
X_train, X_test, y_train, y_test
) -> dict:
"""여러 모델 학습 및 비교"""
models = {
'logistic': LogisticRegression(max_iter=1000, random_state=42),
'gbm': GradientBoostingClassifier(n_estimators=100, random_state=42),
'lightgbm': lgb.LGBMClassifier(n_estimators=100, random_state=42)
}
results = {}
for name, model in models.items():
model.fit(X_train, y_train)
# 예측
y_prob = model.predict_proba(X_test)[:, 1]
# 평가
roc_auc = roc_auc_score(y_test, y_prob)
precision, recall, _ = precision_recall_curve(y_test, y_prob)
pr_auc = auc(recall, precision)
results[name] = {
'model': model,
'roc_auc': round(roc_auc, 4),
'pr_auc': round(pr_auc, 4)
}
return results
3. 모델 캘리브레이션¶
구매 확률 예측의 정확한 확률 추정을 위해:
from sklearn.calibration import CalibratedClassifierCV, calibration_curve
import matplotlib.pyplot as plt
def calibrate_model(model, X_train, y_train, X_test, y_test):
"""모델 캘리브레이션"""
# Platt Scaling (sigmoid)
calibrated = CalibratedClassifierCV(model, method='sigmoid', cv=5)
calibrated.fit(X_train, y_train)
# 캘리브레이션 전후 비교
prob_uncal = model.predict_proba(X_test)[:, 1]
prob_cal = calibrated.predict_proba(X_test)[:, 1]
# Reliability diagram
fraction_pos_uncal, mean_pred_uncal = calibration_curve(y_test, prob_uncal, n_bins=10)
fraction_pos_cal, mean_pred_cal = calibration_curve(y_test, prob_cal, n_bins=10)
return {
'calibrated_model': calibrated,
'brier_before': np.mean((prob_uncal - y_test) ** 2),
'brier_after': np.mean((prob_cal - y_test) ** 2)
}
모델 평가¶
평가 지표¶
| 지표 | 설명 | 목표 |
|---|---|---|
| ROC-AUC | 순위 정확도 | > 0.75 |
| PR-AUC | 불균형 데이터 성능 | > 0.3 (불균형 시) |
| Brier Score | 확률 정확도 | < 0.1 |
| Lift@10% | 상위 10% 리프트 | > 3x |
| Precision@K | 상위 K개 정밀도 | 캠페인별 |
Lift Chart 분석¶
def calculate_lift(y_true, y_prob, n_bins: int = 10) -> pd.DataFrame:
"""Lift Chart 계산"""
df = pd.DataFrame({'y_true': y_true, 'y_prob': y_prob})
df['decile'] = pd.qcut(df['y_prob'], n_bins, labels=False, duplicates='drop')
df['decile'] = n_bins - df['decile'] # 높은 확률 = 1
lift_table = df.groupby('decile').agg({
'y_true': ['sum', 'count', 'mean']
})
lift_table.columns = ['conversions', 'customers', 'conversion_rate']
overall_rate = y_true.mean()
lift_table['lift'] = lift_table['conversion_rate'] / overall_rate
lift_table['cumulative_conversions'] = lift_table['conversions'].cumsum()
lift_table['cumulative_pct'] = (
lift_table['cumulative_conversions'] / lift_table['conversions'].sum()
)
return lift_table
Gain Chart¶
def plot_gain_chart(y_true, y_prob):
"""Gain Chart 시각화"""
lift_df = calculate_lift(y_true, y_prob)
# 누적 이득
cumulative_customers = np.arange(1, len(lift_df) + 1) / len(lift_df)
cumulative_gain = lift_df['cumulative_pct'].values
# Random 모델 비교
random_line = cumulative_customers
# 이득 면적 (Gini 유사)
gain_area = np.trapz(cumulative_gain, cumulative_customers)
random_area = 0.5
gini = (gain_area - random_area) / (1 - random_area)
return {
'cumulative_customers': cumulative_customers,
'cumulative_gain': cumulative_gain,
'gini_coefficient': round(gini, 4)
}
스코어링 및 활용¶
1. 배치 스코어링¶
def batch_scoring(
model,
scaler,
customer_features: pd.DataFrame,
threshold: float = 0.5
) -> pd.DataFrame:
"""고객 배치 스코어링"""
# 피처 준비
X = customer_features.fillna(0)
X_scaled = scaler.transform(X)
# 확률 예측
probabilities = model.predict_proba(X_scaled)[:, 1]
# 결과 데이터프레임
results = pd.DataFrame({
'customer_id': customer_features.index,
'propensity_score': probabilities,
'decile': pd.qcut(probabilities, 10, labels=range(10, 0, -1)),
'target_flag': (probabilities >= threshold).astype(int)
})
return results.sort_values('propensity_score', ascending=False)
2. 세그먼트 전략¶
| 세그먼트 | 스코어 범위 | 전략 |
|---|---|---|
| Hot | 0.7 - 1.0 | 즉시 전환 유도, 한정 오퍼 |
| Warm | 0.4 - 0.7 | 리타겟팅, 추가 인센티브 |
| Lukewarm | 0.2 - 0.4 | 인지도 향상, 콘텐츠 마케팅 |
| Cold | 0.0 - 0.2 | 비용 절감, 선별적 접근 |
3. 캠페인 최적화¶
def optimize_campaign_targeting(
scored_customers: pd.DataFrame,
budget: float,
cost_per_contact: float,
conversion_value: float
) -> dict:
"""캠페인 타겟팅 최적화"""
df = scored_customers.sort_values('propensity_score', ascending=False).copy()
# 누적 ROI 계산
df['expected_conversions'] = df['propensity_score'].cumsum()
df['cumulative_cost'] = (np.arange(1, len(df) + 1)) * cost_per_contact
df['cumulative_revenue'] = df['expected_conversions'] * conversion_value
df['cumulative_profit'] = df['cumulative_revenue'] - df['cumulative_cost']
df['roi'] = df['cumulative_profit'] / df['cumulative_cost']
# 최적 타겟 수 찾기
# 방법 1: 예산 제약
budget_optimal = df[df['cumulative_cost'] <= budget]['cumulative_profit'].idxmax()
# 방법 2: ROI 최대화
roi_optimal = df['roi'].idxmax()
# 방법 3: 이익 최대화
profit_optimal = df['cumulative_profit'].idxmax()
return {
'budget_constraint': {
'target_count': budget_optimal + 1,
'expected_profit': df.loc[budget_optimal, 'cumulative_profit']
},
'roi_maximization': {
'target_count': roi_optimal + 1,
'max_roi': df.loc[roi_optimal, 'roi']
},
'profit_maximization': {
'target_count': profit_optimal + 1,
'max_profit': df.loc[profit_optimal, 'cumulative_profit']
}
}
실시간 스코어링¶
API 서빙¶
from fastapi import FastAPI
from pydantic import BaseModel
import pickle
app = FastAPI()
# 모델 로드
with open('propensity_model.pkl', 'rb') as f:
model_artifacts = pickle.load(f)
model = model_artifacts['model']
scaler = model_artifacts['scaler']
feature_names = model_artifacts['feature_names']
class CustomerFeatures(BaseModel):
recency_days: float
frequency_count: int
monetary_total: float
avg_viewed_price: float
cart_to_view_ratio: float
# ... 기타 피처
@app.post("/predict")
def predict_propensity(features: CustomerFeatures):
"""실시간 구매 성향 예측"""
X = np.array([[
features.recency_days,
features.frequency_count,
features.monetary_total,
features.avg_viewed_price,
features.cart_to_view_ratio
]])
X_scaled = scaler.transform(X)
probability = model.predict_proba(X_scaled)[0, 1]
return {
"propensity_score": round(probability, 4),
"decile": int(probability * 10) + 1,
"recommendation": "high_intent" if probability > 0.7 else "nurture"
}
모델 모니터링¶
드리프트 감지¶
from scipy import stats
def detect_drift(
reference_scores: np.ndarray,
current_scores: np.ndarray,
threshold: float = 0.05
) -> dict:
"""스코어 분포 드리프트 감지"""
# KS Test
ks_stat, ks_pvalue = stats.ks_2samp(reference_scores, current_scores)
# PSI (Population Stability Index)
def calculate_psi(expected, actual, bins=10):
breakpoints = np.quantile(expected, np.linspace(0, 1, bins + 1))
expected_counts = np.histogram(expected, bins=breakpoints)[0] / len(expected)
actual_counts = np.histogram(actual, bins=breakpoints)[0] / len(actual)
# Avoid division by zero
expected_counts = np.clip(expected_counts, 0.001, None)
actual_counts = np.clip(actual_counts, 0.001, None)
psi = np.sum((actual_counts - expected_counts) *
np.log(actual_counts / expected_counts))
return psi
psi = calculate_psi(reference_scores, current_scores)
return {
'ks_statistic': round(ks_stat, 4),
'ks_pvalue': round(ks_pvalue, 4),
'psi': round(psi, 4),
'drift_detected': psi > 0.2 or ks_pvalue < threshold,
'severity': 'high' if psi > 0.25 else ('medium' if psi > 0.1 else 'low')
}
체크리스트¶
- [ ] 타겟 변수 정의 (구매, 전환, 업그레이드)
- [ ] 관측 기간 및 예측 기간 설정
- [ ] RFM, 행동, 시간, 제품 피처 생성
- [ ] 클래스 불균형 처리
- [ ] 모델 학습 및 캘리브레이션
- [ ] Lift/Gain 분석
- [ ] 타겟 세그먼트 전략 수립
- [ ] 실시간/배치 스코어링 파이프라인
- [ ] 드리프트 모니터링
참고 자료¶
마지막 업데이트: 2026-03-04