콘텐츠로 이동
Data Prep
상세

구매 성향 예측 (Purchase Propensity Modeling)

개요

구매 성향 예측은 고객이 특정 행동(구매, 전환, 업그레이드)을 할 확률을 예측하는 분석 방법론이다. 마케팅 타겟팅, 개인화, 리소스 최적화에 핵심적으로 활용된다.

분석 프레임워크

모델링 파이프라인

┌─────────────────────────────────────────────────────────────────┐
│                     데이터 수집                                  │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐         │
│  │ 행동 로그 │  │ 거래 이력 │  │ 고객 속성 │  │ 외부 데이터│        │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘         │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                     피처 엔지니어링                              │
│  RFM, 세션 패턴, 이탈 신호, 제품 관심도, 시간 기반 피처           │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                     모델 학습                                    │
│  Logistic Regression, XGBoost, LightGBM, Neural Network         │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                     스코어링 및 활용                             │
│  세그먼트 분류, 캠페인 타겟팅, 개인화 추천                        │
└─────────────────────────────────────────────────────────────────┘

피처 엔지니어링

1. RFM 기반 피처

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

def create_rfm_features(
    transactions: pd.DataFrame,
    customer_id: str = 'customer_id',
    date_col: str = 'order_date',
    amount_col: str = 'amount',
    reference_date: datetime = None
) -> pd.DataFrame:
    """RFM 피처 생성"""

    if reference_date is None:
        reference_date = transactions[date_col].max() + timedelta(days=1)

    rfm = transactions.groupby(customer_id).agg({
        date_col: lambda x: (reference_date - x.max()).days,  # Recency
        amount_col: ['count', 'sum', 'mean', 'std']  # Frequency, Monetary
    })

    rfm.columns = [
        'recency_days',
        'frequency_count',
        'monetary_total',
        'monetary_mean',
        'monetary_std'
    ]

    # 추가 파생 피처
    rfm['monetary_std'] = rfm['monetary_std'].fillna(0)
    rfm['avg_days_between_orders'] = rfm['recency_days'] / (rfm['frequency_count'] + 1)
    rfm['monetary_cv'] = rfm['monetary_std'] / (rfm['monetary_mean'] + 1)  # 변동계수

    return rfm

2. 행동 기반 피처

def create_behavioral_features(
    events: pd.DataFrame,
    customer_id: str = 'customer_id',
    event_type: str = 'event_type',
    timestamp: str = 'timestamp',
    lookback_days: int = 30
) -> pd.DataFrame:
    """행동 로그 기반 피처"""

    cutoff = events[timestamp].max() - timedelta(days=lookback_days)
    recent = events[events[timestamp] >= cutoff]

    features = recent.groupby(customer_id).agg({
        timestamp: ['count', 'nunique'],  # 총 이벤트, 활성 일수
        event_type: lambda x: x.value_counts().to_dict()
    })

    features.columns = ['total_events', 'active_days', 'event_breakdown']

    # 이벤트 타입별 카운트 언팩
    event_types = ['page_view', 'add_to_cart', 'search', 'product_view', 'wishlist']
    for et in event_types:
        features[f'count_{et}'] = features['event_breakdown'].apply(
            lambda x: x.get(et, 0) if isinstance(x, dict) else 0
        )

    # 전환 퍼널 피처
    features['cart_to_view_ratio'] = (
        features['count_add_to_cart'] / (features['count_product_view'] + 1)
    )
    features['search_intensity'] = features['count_search'] / (features['active_days'] + 1)

    return features.drop(columns=['event_breakdown'])

3. 시간 기반 피처

def create_temporal_features(
    events: pd.DataFrame,
    customer_id: str = 'customer_id',
    timestamp: str = 'timestamp'
) -> pd.DataFrame:
    """시간 패턴 피처"""

    events['hour'] = pd.to_datetime(events[timestamp]).dt.hour
    events['dayofweek'] = pd.to_datetime(events[timestamp]).dt.dayofweek
    events['is_weekend'] = events['dayofweek'].isin([5, 6]).astype(int)

    features = events.groupby(customer_id).agg({
        'hour': ['mean', 'std'],
        'is_weekend': 'mean',
        'dayofweek': lambda x: x.mode().iloc[0] if len(x) > 0 else -1
    })

    features.columns = [
        'avg_hour',
        'hour_std',
        'weekend_ratio',
        'preferred_day'
    ]

    # 시간대 분류
    features['is_evening_user'] = (features['avg_hour'] >= 18).astype(int)
    features['is_morning_user'] = (features['avg_hour'] <= 10).astype(int)

    return features

4. 제품/카테고리 피처

def create_product_features(
    views: pd.DataFrame,
    purchases: pd.DataFrame,
    customer_id: str = 'customer_id',
    category: str = 'category',
    price: str = 'price'
) -> pd.DataFrame:
    """제품 관심도 피처"""

    # 조회 기반
    view_features = views.groupby(customer_id).agg({
        category: 'nunique',  # 관심 카테고리 다양성
        price: ['mean', 'max', 'min']  # 가격대 관심
    })
    view_features.columns = [
        'category_diversity',
        'avg_viewed_price',
        'max_viewed_price',
        'min_viewed_price'
    ]

    # 구매 기반
    if len(purchases) > 0:
        purchase_features = purchases.groupby(customer_id).agg({
            category: lambda x: x.mode().iloc[0] if len(x) > 0 else 'unknown',
            price: ['mean', 'sum']
        })
        purchase_features.columns = [
            'preferred_category',
            'avg_purchase_price',
            'total_purchase_amount'
        ]

        # 조인
        features = view_features.join(purchase_features, how='left')
    else:
        features = view_features

    # 가격 민감도 지표
    features['price_range_interest'] = (
        features['max_viewed_price'] - features['min_viewed_price']
    )

    return features

모델 학습

1. 데이터 준비

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

def prepare_data(
    features: pd.DataFrame,
    target: pd.Series,
    test_size: float = 0.2,
    handle_imbalance: bool = True
) -> tuple:
    """학습 데이터 준비"""

    # 결측치 처리
    features = features.fillna(0)

    # 분할
    X_train, X_test, y_train, y_test = train_test_split(
        features, target, 
        test_size=test_size, 
        stratify=target,
        random_state=42
    )

    # 스케일링
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # 클래스 불균형 처리
    if handle_imbalance:
        smote = SMOTE(random_state=42)
        X_train_scaled, y_train = smote.fit_resample(X_train_scaled, y_train)

    return X_train_scaled, X_test_scaled, y_train, y_test, scaler

2. 모델 비교

모델 장점 단점 적합 상황
Logistic Regression 해석 가능, 빠름 비선형 패턴 약함 베이스라인, 해석 필요
XGBoost 고성능, 피처 중요도 튜닝 필요 정확도 중시
LightGBM 빠름, 대용량 처리 과적합 위험 대규모 데이터
CatBoost 범주형 처리 우수 느림 많은 범주형 피처
Neural Network 복잡한 패턴 해석 어려움 대규모, 복잡 관계
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
import lightgbm as lgb
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc

def train_propensity_models(
    X_train, X_test, y_train, y_test
) -> dict:
    """여러 모델 학습 및 비교"""

    models = {
        'logistic': LogisticRegression(max_iter=1000, random_state=42),
        'gbm': GradientBoostingClassifier(n_estimators=100, random_state=42),
        'lightgbm': lgb.LGBMClassifier(n_estimators=100, random_state=42)
    }

    results = {}

    for name, model in models.items():
        model.fit(X_train, y_train)

        # 예측
        y_prob = model.predict_proba(X_test)[:, 1]

        # 평가
        roc_auc = roc_auc_score(y_test, y_prob)
        precision, recall, _ = precision_recall_curve(y_test, y_prob)
        pr_auc = auc(recall, precision)

        results[name] = {
            'model': model,
            'roc_auc': round(roc_auc, 4),
            'pr_auc': round(pr_auc, 4)
        }

    return results

3. 모델 캘리브레이션

구매 확률 예측의 정확한 확률 추정을 위해:

from sklearn.calibration import CalibratedClassifierCV, calibration_curve
import matplotlib.pyplot as plt

def calibrate_model(model, X_train, y_train, X_test, y_test):
    """모델 캘리브레이션"""

    # Platt Scaling (sigmoid)
    calibrated = CalibratedClassifierCV(model, method='sigmoid', cv=5)
    calibrated.fit(X_train, y_train)

    # 캘리브레이션 전후 비교
    prob_uncal = model.predict_proba(X_test)[:, 1]
    prob_cal = calibrated.predict_proba(X_test)[:, 1]

    # Reliability diagram
    fraction_pos_uncal, mean_pred_uncal = calibration_curve(y_test, prob_uncal, n_bins=10)
    fraction_pos_cal, mean_pred_cal = calibration_curve(y_test, prob_cal, n_bins=10)

    return {
        'calibrated_model': calibrated,
        'brier_before': np.mean((prob_uncal - y_test) ** 2),
        'brier_after': np.mean((prob_cal - y_test) ** 2)
    }

모델 평가

평가 지표

지표 설명 목표
ROC-AUC 순위 정확도 > 0.75
PR-AUC 불균형 데이터 성능 > 0.3 (불균형 시)
Brier Score 확률 정확도 < 0.1
Lift@10% 상위 10% 리프트 > 3x
Precision@K 상위 K개 정밀도 캠페인별

Lift Chart 분석

def calculate_lift(y_true, y_prob, n_bins: int = 10) -> pd.DataFrame:
    """Lift Chart 계산"""

    df = pd.DataFrame({'y_true': y_true, 'y_prob': y_prob})
    df['decile'] = pd.qcut(df['y_prob'], n_bins, labels=False, duplicates='drop')
    df['decile'] = n_bins - df['decile']  # 높은 확률 = 1

    lift_table = df.groupby('decile').agg({
        'y_true': ['sum', 'count', 'mean']
    })
    lift_table.columns = ['conversions', 'customers', 'conversion_rate']

    overall_rate = y_true.mean()
    lift_table['lift'] = lift_table['conversion_rate'] / overall_rate
    lift_table['cumulative_conversions'] = lift_table['conversions'].cumsum()
    lift_table['cumulative_pct'] = (
        lift_table['cumulative_conversions'] / lift_table['conversions'].sum()
    )

    return lift_table

Gain Chart

def plot_gain_chart(y_true, y_prob):
    """Gain Chart 시각화"""

    lift_df = calculate_lift(y_true, y_prob)

    # 누적 이득
    cumulative_customers = np.arange(1, len(lift_df) + 1) / len(lift_df)
    cumulative_gain = lift_df['cumulative_pct'].values

    # Random 모델 비교
    random_line = cumulative_customers

    # 이득 면적 (Gini 유사)
    gain_area = np.trapz(cumulative_gain, cumulative_customers)
    random_area = 0.5
    gini = (gain_area - random_area) / (1 - random_area)

    return {
        'cumulative_customers': cumulative_customers,
        'cumulative_gain': cumulative_gain,
        'gini_coefficient': round(gini, 4)
    }

스코어링 및 활용

1. 배치 스코어링

def batch_scoring(
    model,
    scaler,
    customer_features: pd.DataFrame,
    threshold: float = 0.5
) -> pd.DataFrame:
    """고객 배치 스코어링"""

    # 피처 준비
    X = customer_features.fillna(0)
    X_scaled = scaler.transform(X)

    # 확률 예측
    probabilities = model.predict_proba(X_scaled)[:, 1]

    # 결과 데이터프레임
    results = pd.DataFrame({
        'customer_id': customer_features.index,
        'propensity_score': probabilities,
        'decile': pd.qcut(probabilities, 10, labels=range(10, 0, -1)),
        'target_flag': (probabilities >= threshold).astype(int)
    })

    return results.sort_values('propensity_score', ascending=False)

2. 세그먼트 전략

세그먼트 스코어 범위 전략
Hot 0.7 - 1.0 즉시 전환 유도, 한정 오퍼
Warm 0.4 - 0.7 리타겟팅, 추가 인센티브
Lukewarm 0.2 - 0.4 인지도 향상, 콘텐츠 마케팅
Cold 0.0 - 0.2 비용 절감, 선별적 접근

3. 캠페인 최적화

def optimize_campaign_targeting(
    scored_customers: pd.DataFrame,
    budget: float,
    cost_per_contact: float,
    conversion_value: float
) -> dict:
    """캠페인 타겟팅 최적화"""

    df = scored_customers.sort_values('propensity_score', ascending=False).copy()

    # 누적 ROI 계산
    df['expected_conversions'] = df['propensity_score'].cumsum()
    df['cumulative_cost'] = (np.arange(1, len(df) + 1)) * cost_per_contact
    df['cumulative_revenue'] = df['expected_conversions'] * conversion_value
    df['cumulative_profit'] = df['cumulative_revenue'] - df['cumulative_cost']
    df['roi'] = df['cumulative_profit'] / df['cumulative_cost']

    # 최적 타겟 수 찾기
    # 방법 1: 예산 제약
    budget_optimal = df[df['cumulative_cost'] <= budget]['cumulative_profit'].idxmax()

    # 방법 2: ROI 최대화
    roi_optimal = df['roi'].idxmax()

    # 방법 3: 이익 최대화
    profit_optimal = df['cumulative_profit'].idxmax()

    return {
        'budget_constraint': {
            'target_count': budget_optimal + 1,
            'expected_profit': df.loc[budget_optimal, 'cumulative_profit']
        },
        'roi_maximization': {
            'target_count': roi_optimal + 1,
            'max_roi': df.loc[roi_optimal, 'roi']
        },
        'profit_maximization': {
            'target_count': profit_optimal + 1,
            'max_profit': df.loc[profit_optimal, 'cumulative_profit']
        }
    }

실시간 스코어링

API 서빙

from fastapi import FastAPI
from pydantic import BaseModel
import pickle

app = FastAPI()

# 모델 로드
with open('propensity_model.pkl', 'rb') as f:
    model_artifacts = pickle.load(f)
    model = model_artifacts['model']
    scaler = model_artifacts['scaler']
    feature_names = model_artifacts['feature_names']

class CustomerFeatures(BaseModel):
    recency_days: float
    frequency_count: int
    monetary_total: float
    avg_viewed_price: float
    cart_to_view_ratio: float
    # ... 기타 피처

@app.post("/predict")
def predict_propensity(features: CustomerFeatures):
    """실시간 구매 성향 예측"""

    X = np.array([[
        features.recency_days,
        features.frequency_count,
        features.monetary_total,
        features.avg_viewed_price,
        features.cart_to_view_ratio
    ]])

    X_scaled = scaler.transform(X)
    probability = model.predict_proba(X_scaled)[0, 1]

    return {
        "propensity_score": round(probability, 4),
        "decile": int(probability * 10) + 1,
        "recommendation": "high_intent" if probability > 0.7 else "nurture"
    }

모델 모니터링

드리프트 감지

from scipy import stats

def detect_drift(
    reference_scores: np.ndarray,
    current_scores: np.ndarray,
    threshold: float = 0.05
) -> dict:
    """스코어 분포 드리프트 감지"""

    # KS Test
    ks_stat, ks_pvalue = stats.ks_2samp(reference_scores, current_scores)

    # PSI (Population Stability Index)
    def calculate_psi(expected, actual, bins=10):
        breakpoints = np.quantile(expected, np.linspace(0, 1, bins + 1))
        expected_counts = np.histogram(expected, bins=breakpoints)[0] / len(expected)
        actual_counts = np.histogram(actual, bins=breakpoints)[0] / len(actual)

        # Avoid division by zero
        expected_counts = np.clip(expected_counts, 0.001, None)
        actual_counts = np.clip(actual_counts, 0.001, None)

        psi = np.sum((actual_counts - expected_counts) * 
                     np.log(actual_counts / expected_counts))
        return psi

    psi = calculate_psi(reference_scores, current_scores)

    return {
        'ks_statistic': round(ks_stat, 4),
        'ks_pvalue': round(ks_pvalue, 4),
        'psi': round(psi, 4),
        'drift_detected': psi > 0.2 or ks_pvalue < threshold,
        'severity': 'high' if psi > 0.25 else ('medium' if psi > 0.1 else 'low')
    }

체크리스트

  • [ ] 타겟 변수 정의 (구매, 전환, 업그레이드)
  • [ ] 관측 기간 및 예측 기간 설정
  • [ ] RFM, 행동, 시간, 제품 피처 생성
  • [ ] 클래스 불균형 처리
  • [ ] 모델 학습 및 캘리브레이션
  • [ ] Lift/Gain 분석
  • [ ] 타겟 세그먼트 전략 수립
  • [ ] 실시간/배치 스코어링 파이프라인
  • [ ] 드리프트 모니터링

참고 자료


마지막 업데이트: 2026-03-04