콘텐츠로 이동
Data Prep
상세

Feature Store

Feature Store는 ML 특성(Feature)의 중앙 저장소로, 특성의 생성, 저장, 검색, 서빙을 관리함. 학습과 추론 간의 일관성을 보장하고 특성 재사용을 촉진함.


1. Feature Store가 필요한 이유

1.1 기존 문제점

문제 증상 결과
특성 중복 개발 팀마다 동일 특성 재구현 시간 낭비, 불일치
Training-Serving Skew 학습/추론 시 다른 로직 성능 저하
특성 발견 어려움 "이 특성 어디서 만들어?" 생산성 저하
데이터 누수 미래 정보 사용 과적합
점 시간 재현 불가 과거 시점 데이터 재현 불가 디버깅 어려움

1.2 Feature Store 해결책

Feature Store는 학습(Training)추론(Inference) 간의 일관성을 보장하는 중앙 특성 저장소다.

  • 동일한 특성 정의: 한 번 정의하면 학습/추론에서 동일하게 사용
  • 중앙 관리: 특성 발견, 재사용, 버전 관리
  • Training-Serving Skew 방지: 학습과 서빙에서 같은 로직 보장

2. Feature Store 구성 요소

2.1 핵심 개념

개념 설명 예시
Entity 특성의 주체 user_id, product_id, transaction_id
Feature 개별 속성 값 user_age, product_price, click_count
Feature View 특성 그룹 user_features, product_features
Feature Service 서빙용 특성 조합 recommendation_features
Offline Store 배치 학습용 (분석/학습) BigQuery, S3, Parquet
Online Store 실시간 추론용 (저지연 <10ms) Redis, DynamoDB

2.2 아키텍처

Feature Store 아키텍처 | ^ v | +------------------+ +------------------+ | Transformation | -----------------> | Feature Store | | (Spark, Flink) | +------------------+ +------------------+ | - Registry | | - Offline Store | | - Online Store | +------------------+

---

## 3. Feast (Feature Store)

### 3.1 설치 및 초기화

```bash
pip install feast

# 프로젝트 초기화
feast init my_feature_repo
cd my_feature_repo

# 구조
my_feature_repo/
├── feature_repo/
│   ├── __init__.py
│   ├── entities.py
│   ├── features.py
│   └── feature_services.py
├── feature_store.yaml
└── data/
    └── driver_stats.parquet

3.2 Feature Store 설정

# feature_store.yaml
project: my_ml_project
registry: data/registry.db
provider: local

online_store:
  type: redis
  connection_string: "localhost:6379"

offline_store:
  type: file  # 또는 bigquery, snowflake, redshift

entity_key_serialization_version: 2

3.3 Entity 정의

# entities.py
from feast import Entity

# 사용자 엔티티
user = Entity(
    name="user",
    join_keys=["user_id"],
    description="User entity for customer features",
)

# 상품 엔티티
product = Entity(
    name="product",
    join_keys=["product_id"],
    description="Product entity",
)

# 거래 엔티티
transaction = Entity(
    name="transaction",
    join_keys=["transaction_id"],
    description="Transaction entity",
)

3.4 Feature View 정의

# features.py
from datetime import timedelta
from feast import FeatureView, Field, FileSource, PushSource
from feast.types import Float32, Int64, String

# 데이터 소스 정의
user_stats_source = FileSource(
    name="user_stats_source",
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

# Feature View 정의
user_stats_fv = FeatureView(
    name="user_stats",
    entities=[user],
    ttl=timedelta(days=365),
    schema=[
        Field(name="total_purchases", dtype=Int64),
        Field(name="avg_purchase_amount", dtype=Float32),
        Field(name="purchase_count_30d", dtype=Int64),
        Field(name="last_purchase_category", dtype=String),
    ],
    source=user_stats_source,
    online=True,  # Online Store에 저장
    tags={"team": "ml", "version": "v1"},
)

# 실시간 특성 (Push Source)
user_realtime_source = PushSource(
    name="user_realtime_source",
    batch_source=user_stats_source,
)

user_realtime_fv = FeatureView(
    name="user_realtime",
    entities=[user],
    ttl=timedelta(hours=24),
    schema=[
        Field(name="session_duration_sec", dtype=Float32),
        Field(name="page_views", dtype=Int64),
        Field(name="cart_items", dtype=Int64),
    ],
    source=user_realtime_source,
    online=True,
)

3.5 Feature Service 정의

# feature_services.py
from feast import FeatureService

# 추천 모델용 특성 서비스
recommendation_fs = FeatureService(
    name="recommendation_features",
    features=[
        user_stats_fv[["total_purchases", "avg_purchase_amount"]],
        user_realtime_fv[["session_duration_sec", "page_views"]],
    ],
    tags={"model": "recommendation"},
)

# 이탈 예측 모델용 특성 서비스
churn_prediction_fs = FeatureService(
    name="churn_prediction_features",
    features=[
        user_stats_fv,  # 모든 특성
    ],
    tags={"model": "churn"},
)

3.6 적용 및 사용

# Feature Store 적용 (Registry 업데이트, Online Store 로드)
feast apply

# 특성 목록 조회
feast feature-views list
feast feature-services list

# Materialization (Offline → Online Store)
feast materialize-incremental $(date +%Y-%m-%dT%H:%M:%S)

# 또는 특정 기간
feast materialize 2024-01-01T00:00:00 2024-02-01T00:00:00

3.7 학습용 특성 조회 (Offline)

from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path=".")

# 학습 데이터 (Entity DataFrame)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003, 1004, 1005],
    "event_timestamp": pd.to_datetime([
        "2024-01-15", "2024-01-16", "2024-01-17", 
        "2024-01-18", "2024-01-19"
    ]),
})

# 특성 조회 (Point-in-Time Join)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
        "user_stats:purchase_count_30d",
    ],
).to_df()

print(training_df)
# Output:
#    user_id event_timestamp  total_purchases  avg_purchase_amount  purchase_count_30d
# 0     1001      2024-01-15              25              150.50                   5
# 1     1002      2024-01-16              12               85.30                   3
# ...

3.8 추론용 특성 조회 (Online)

from feast import FeatureStore

store = FeatureStore(repo_path=".")

# 단일 엔티티
feature_vector = store.get_online_features(
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
    ],
    entity_rows=[{"user_id": 1001}],
).to_dict()

print(feature_vector)
# {'user_id': [1001], 'total_purchases': [25], 'avg_purchase_amount': [150.5]}

# 배치 조회
batch_features = store.get_online_features(
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
    ],
    entity_rows=[
        {"user_id": 1001},
        {"user_id": 1002},
        {"user_id": 1003},
    ],
).to_dict()

3.9 실시간 특성 Push

from feast import FeatureStore
import pandas as pd
from datetime import datetime

store = FeatureStore(repo_path=".")

# 실시간 이벤트 데이터
realtime_df = pd.DataFrame({
    "user_id": [1001],
    "session_duration_sec": [245.5],
    "page_views": [12],
    "cart_items": [3],
    "event_timestamp": [datetime.now()],
    "created_timestamp": [datetime.now()],
})

# Online Store에 Push
store.push("user_realtime_source", realtime_df)

4. 특성 엔지니어링 패턴

4.1 집계 특성

# Spark를 이용한 집계 특성 생성
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

transactions = spark.read.parquet("data/transactions.parquet")

# 윈도우 정의
user_window_30d = Window.partitionBy("user_id").orderBy("event_timestamp").rangeBetween(-30*24*60*60, 0)
user_window_7d = Window.partitionBy("user_id").orderBy("event_timestamp").rangeBetween(-7*24*60*60, 0)

# 집계 특성 생성
user_features = transactions.withColumn(
    "purchase_count_30d", F.count("*").over(user_window_30d)
).withColumn(
    "purchase_sum_30d", F.sum("amount").over(user_window_30d)
).withColumn(
    "purchase_avg_7d", F.avg("amount").over(user_window_7d)
).withColumn(
    "last_purchase_days_ago", F.datediff(
        F.current_date(), 
        F.max("event_timestamp").over(user_window_30d)
    )
)

4.2 시계열 특성

# 시간 기반 특성
user_features = user_features.withColumn(
    "hour_of_day", F.hour("event_timestamp")
).withColumn(
    "day_of_week", F.dayofweek("event_timestamp")
).withColumn(
    "is_weekend", F.when(F.dayofweek("event_timestamp").isin([1, 7]), 1).otherwise(0)
).withColumn(
    "month", F.month("event_timestamp")
)

4.3 교차 특성

# 사용자-상품 상호작용 특성
user_product_features = transactions.groupBy("user_id", "product_category").agg(
    F.count("*").alias("category_purchase_count"),
    F.avg("amount").alias("category_avg_amount"),
)

# 사용자 선호 카테고리
user_preferred_category = user_product_features.withColumn(
    "rank", F.row_number().over(
        Window.partitionBy("user_id").orderBy(F.desc("category_purchase_count"))
    )
).filter(F.col("rank") == 1).select("user_id", "product_category")

5. Point-in-Time Join

5.1 개념

Point-in-Time Join: 미래 정보 누출 방지를 위한 시간 기반 조인

예시:
- 학습 데이터: 2024-01-15 시점의 예측
- 특성: 2024-01-15 이전 데이터만 사용해야 함

잘못된 예:
  Entity: user_1, 2024-01-15
  Feature: user_1, 2024-01-20의 특성 사용  # 미래 정보!

올바른 예:
  Entity: user_1, 2024-01-15
  Feature: user_1, 2024-01-14의 특성 사용  # 과거 정보

5.2 구현

# Feast는 자동으로 Point-in-Time Join 수행
training_df = store.get_historical_features(
    entity_df=entity_df,  # event_timestamp 포함
    features=[...],
).to_df()

# 수동 구현 (Spark)
def point_in_time_join(entities_df, features_df, entity_key, time_col):
    """
    entities_df: 엔티티와 타임스탬프
    features_df: 특성과 타임스탬프
    """
    # 조인
    joined = entities_df.join(
        features_df,
        on=[entity_key],
        how="left"
    )

    # 미래 데이터 필터링
    filtered = joined.filter(
        F.col(f"features_{time_col}") <= F.col(f"entities_{time_col}")
    )

    # 가장 최근 특성 선택
    window = Window.partitionBy(entity_key, f"entities_{time_col}").orderBy(
        F.desc(f"features_{time_col}")
    )

    result = filtered.withColumn("rank", F.row_number().over(window)).filter(
        F.col("rank") == 1
    ).drop("rank")

    return result

6. 프로덕션 배포

6.1 AWS 배포 예시

# feature_store.yaml (AWS)
project: production_ml
registry: s3://my-bucket/feast/registry.db
provider: aws

online_store:
  type: dynamodb
  region: ap-northeast-2
  table_name: feast_online_store

offline_store:
  type: redshift
  cluster_id: my-redshift-cluster
  database: ml_features
  user: feast_user
  s3_staging_location: s3://my-bucket/feast/staging/
  iam_role: arn:aws:iam::123456789:role/feast-redshift-role

6.2 특성 서빙 API

from fastapi import FastAPI
from feast import FeatureStore

app = FastAPI()
store = FeatureStore(repo_path="/app/feature_repo")

@app.get("/features/{user_id}")
async def get_features(user_id: int):
    features = store.get_online_features(
        features=[
            "user_stats:total_purchases",
            "user_stats:avg_purchase_amount",
            "user_realtime:session_duration_sec",
        ],
        entity_rows=[{"user_id": user_id}],
    ).to_dict()

    return features

@app.post("/features/batch")
async def get_batch_features(user_ids: list[int]):
    entity_rows = [{"user_id": uid} for uid in user_ids]

    features = store.get_online_features(
        features=[...],
        entity_rows=entity_rows,
    ).to_dict()

    return features

6.3 Materialization 스케줄링 (Airflow)

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='feast_materialization',
    schedule_interval='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    materialize = BashOperator(
        task_id='materialize_features',
        bash_command='''
            cd /app/feature_repo && \
            feast materialize-incremental $(date -u +%Y-%m-%dT%H:%M:%S)
        ''',
    )

7. 모범 사례

7.1 특성 명명 규칙

형식: {entity}_{timeframe}_{aggregation}_{attribute}

예시:
- user_30d_sum_purchase_amount
- user_7d_avg_session_duration
- product_lifetime_count_views
- user_product_30d_count_interactions

7.2 특성 문서화

user_stats_fv = FeatureView(
    name="user_stats",
    description="사용자 구매 통계 특성. 일별 배치로 업데이트.",
    entities=[user],
    schema=[
        Field(
            name="total_purchases",
            dtype=Int64,
            description="전체 구매 횟수",
        ),
        Field(
            name="avg_purchase_amount",
            dtype=Float32,
            description="평균 구매 금액 (원)",
        ),
    ],
    tags={
        "team": "ml-platform",
        "owner": "data-team@company.com",
        "update_frequency": "daily",
        "data_source": "transactions_db",
    },
)

7.3 모니터링

# 특성 통계 모니터링
from evidently.metrics import DataDriftTable
from evidently.report import Report

def monitor_feature_drift(reference_df, current_df):
    report = Report(metrics=[DataDriftTable()])
    report.run(reference_data=reference_df, current_data=current_df)

    # 드리프트 감지 시 알림
    drift_results = report.as_dict()
    for feature, stats in drift_results["metrics"][0]["result"]["drift_by_columns"].items():
        if stats["drift_detected"]:
            send_alert(f"Feature drift detected: {feature}")

8. 실무 케이스 스터디

8.1 케이스 1: 추천 시스템 특성 파이프라인

상황: 이커머스 추천 시스템에서 실시간 사용자 행동과 배치 집계 특성 통합 필요

# feature_repo/features/recommendation_features.py
from feast import Entity, FeatureView, Field, PushSource, FileSource
from feast.types import Float32, Int64, String
from datetime import timedelta

# 엔티티 정의
user = Entity(name="user", join_keys=["user_id"])
product = Entity(name="product", join_keys=["product_id"])

# 배치 특성: 일별 집계
user_daily_source = FileSource(
    name="user_daily_stats",
    path="s3://feature-store/user_daily_stats/",
    timestamp_field="event_date",
    file_format="parquet",
)

user_daily_fv = FeatureView(
    name="user_daily_stats",
    entities=[user],
    ttl=timedelta(days=30),
    schema=[
        Field(name="purchase_count_7d", dtype=Int64),
        Field(name="purchase_count_30d", dtype=Int64),
        Field(name="avg_order_value_30d", dtype=Float32),
        Field(name="favorite_category", dtype=String),
        Field(name="churn_risk_score", dtype=Float32),
    ],
    source=user_daily_source,
    online=True,
    tags={"team": "recommendation", "update_frequency": "daily"},
)

# 실시간 특성: 세션 기반
user_realtime_source = PushSource(
    name="user_realtime_source",
    batch_source=user_daily_source,
)

user_session_fv = FeatureView(
    name="user_session_features",
    entities=[user],
    ttl=timedelta(hours=24),
    schema=[
        Field(name="current_session_duration", dtype=Float32),
        Field(name="pages_viewed_session", dtype=Int64),
        Field(name="items_in_cart", dtype=Int64),
        Field(name="last_viewed_category", dtype=String),
    ],
    source=user_realtime_source,
    online=True,
)

# 상품-사용자 교차 특성
user_product_source = FileSource(
    name="user_product_interactions",
    path="s3://feature-store/user_product_interactions/",
    timestamp_field="interaction_date",
)

user_product_fv = FeatureView(
    name="user_product_affinity",
    entities=[user, product],
    ttl=timedelta(days=90),
    schema=[
        Field(name="view_count", dtype=Int64),
        Field(name="purchase_count", dtype=Int64),
        Field(name="affinity_score", dtype=Float32),
    ],
    source=user_product_source,
    online=True,
)
# 실시간 특성 업데이트 (Kafka Consumer)
from feast import FeatureStore
import pandas as pd
from kafka import KafkaConsumer
import json

store = FeatureStore(repo_path="/app/feature_repo")
consumer = KafkaConsumer(
    "user-events",
    bootstrap_servers=["kafka:9092"],
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)

def process_user_event(event):
    """사용자 이벤트 처리 및 특성 업데이트"""
    user_id = event["user_id"]

    # 세션 상태 조회 (Redis 등에서)
    session_state = get_session_state(user_id)

    # 특성 계산
    features_df = pd.DataFrame([{
        "user_id": user_id,
        "current_session_duration": session_state["duration"],
        "pages_viewed_session": session_state["page_views"],
        "items_in_cart": session_state["cart_items"],
        "last_viewed_category": event.get("category", "unknown"),
        "event_timestamp": pd.Timestamp.now(),
        "created_timestamp": pd.Timestamp.now(),
    }])

    # Feature Store에 Push
    store.push("user_realtime_source", features_df)

for message in consumer:
    process_user_event(message.value)

8.2 케이스 2: 사기 탐지 특성 엔지니어링

# Spark를 이용한 복잡한 특성 계산
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

def compute_fraud_features(transactions_df):
    """사기 탐지용 특성 계산"""

    # 시간 윈도우 정의
    user_window_1h = Window.partitionBy("user_id").orderBy(
        F.col("timestamp").cast("long")
    ).rangeBetween(-3600, 0)

    user_window_24h = Window.partitionBy("user_id").orderBy(
        F.col("timestamp").cast("long")
    ).rangeBetween(-86400, 0)

    # 속도 기반 특성 (Velocity Features)
    velocity_features = transactions_df.withColumn(
        "tx_count_1h", F.count("*").over(user_window_1h)
    ).withColumn(
        "tx_count_24h", F.count("*").over(user_window_24h)
    ).withColumn(
        "tx_amount_1h", F.sum("amount").over(user_window_1h)
    ).withColumn(
        "tx_amount_24h", F.sum("amount").over(user_window_24h)
    ).withColumn(
        "unique_merchants_1h", F.countDistinct("merchant_id").over(user_window_1h)
    ).withColumn(
        "unique_countries_24h", F.countDistinct("country").over(user_window_24h)
    )

    # 이상치 특성
    user_stats = transactions_df.groupBy("user_id").agg(
        F.avg("amount").alias("user_avg_amount"),
        F.stddev("amount").alias("user_std_amount"),
    )

    anomaly_features = velocity_features.join(
        user_stats, on="user_id", how="left"
    ).withColumn(
        "amount_zscore", 
        (F.col("amount") - F.col("user_avg_amount")) / F.col("user_std_amount")
    ).withColumn(
        "is_amount_outlier",
        F.when(F.abs(F.col("amount_zscore")) > 3, 1).otherwise(0)
    )

    # 시간 기반 특성
    time_features = anomaly_features.withColumn(
        "hour_of_day", F.hour("timestamp")
    ).withColumn(
        "is_night", F.when(
            (F.hour("timestamp") >= 0) & (F.hour("timestamp") < 6), 1
        ).otherwise(0)
    ).withColumn(
        "is_weekend", F.when(
            F.dayofweek("timestamp").isin([1, 7]), 1
        ).otherwise(0)
    )

    return time_features

# 배치 실행 및 Feature Store 저장
features = compute_fraud_features(transactions)
features.write.parquet("s3://feature-store/fraud_features/")

8.3 케이스 3: Feature Store 마이그레이션

상황: 수동 특성 파이프라인에서 Feast로 마이그레이션

# 마이그레이션 체크리스트 및 검증
import pandas as pd
from feast import FeatureStore
import numpy as np

def validate_migration(legacy_features_df, feast_store, entity_df):
    """기존 특성과 Feast 특성 비교 검증"""

    # Feast에서 특성 조회
    feast_features = feast_store.get_historical_features(
        entity_df=entity_df,
        features=[
            "user_stats:purchase_count_30d",
            "user_stats:avg_order_value",
        ],
    ).to_df()

    # 데이터 정합성 검증
    merged = legacy_features_df.merge(
        feast_features,
        on=["user_id", "event_timestamp"],
        suffixes=("_legacy", "_feast"),
    )

    validation_results = {}

    for feature in ["purchase_count_30d", "avg_order_value"]:
        legacy_col = f"{feature}_legacy"
        feast_col = f"{feature}_feast"

        # 정확히 일치하는 비율
        exact_match = (merged[legacy_col] == merged[feast_col]).mean()

        # 근사 일치 (수치형의 경우)
        if merged[legacy_col].dtype in [np.float64, np.float32]:
            approx_match = np.isclose(
                merged[legacy_col], 
                merged[feast_col], 
                rtol=1e-5
            ).mean()
        else:
            approx_match = exact_match

        validation_results[feature] = {
            "exact_match_rate": exact_match,
            "approx_match_rate": approx_match,
            "sample_mismatches": merged[
                merged[legacy_col] != merged[feast_col]
            ][[legacy_col, feast_col]].head(5).to_dict(),
        }

    return validation_results

# 점진적 마이그레이션 전략
"""
Phase 1: Shadow Mode
- Feast 병렬 운영, 기존 시스템 유지
- 두 시스템 결과 비교

Phase 2: Canary
- 일부 트래픽만 Feast 사용
- 모니터링 강화

Phase 3: Full Migration
- 전체 트래픽 Feast로 전환
- 기존 시스템 유지 (롤백용)

Phase 4: Decommission
- 기존 시스템 제거
"""

9. 트러블슈팅 가이드

9.1 일반적인 문제

문제 원인 해결책
Online 조회 지연 Redis 연결 문제 커넥션 풀 확인, 타임아웃 조정
특성 불일치 Point-in-Time 조인 오류 타임스탬프 형식 검증
Materialization 실패 데이터 스키마 변경 Feature View 스키마 업데이트
메모리 부족 대용량 특성 로드 배치 크기 조정, 특성 선택
특성 누락 TTL 만료 TTL 연장, 재 Materialization

9.2 디버깅 스크립트

# scripts/debug_feature_store.py
from feast import FeatureStore
from datetime import datetime, timedelta
import pandas as pd

def diagnose_feature_store():
    """Feature Store 진단"""
    store = FeatureStore(repo_path=".")

    diagnostics = {
        "timestamp": datetime.now().isoformat(),
        "registry": {},
        "online_store": {},
        "offline_store": {},
    }

    # Registry 상태
    try:
        fvs = store.list_feature_views()
        diagnostics["registry"]["feature_views"] = [fv.name for fv in fvs]
        diagnostics["registry"]["status"] = "healthy"
    except Exception as e:
        diagnostics["registry"]["status"] = f"error: {str(e)}"

    # Online Store 연결
    try:
        test_entity = pd.DataFrame({"user_id": [1]})
        result = store.get_online_features(
            features=["user_stats:purchase_count_30d"],
            entity_rows=[{"user_id": 1}],
        )
        diagnostics["online_store"]["status"] = "healthy"
        diagnostics["online_store"]["sample_result"] = result.to_dict()
    except Exception as e:
        diagnostics["online_store"]["status"] = f"error: {str(e)}"

    # Offline Store 연결
    try:
        entity_df = pd.DataFrame({
            "user_id": [1],
            "event_timestamp": [datetime.now()],
        })
        result = store.get_historical_features(
            entity_df=entity_df,
            features=["user_stats:purchase_count_30d"],
        ).to_df()
        diagnostics["offline_store"]["status"] = "healthy"
    except Exception as e:
        diagnostics["offline_store"]["status"] = f"error: {str(e)}"

    return diagnostics

def check_feature_freshness(store, feature_view_name, max_age_hours=24):
    """특성 신선도 체크"""
    fv = store.get_feature_view(feature_view_name)

    # 최신 Materialization 시간 확인
    # (실제 구현은 Feature Store 백엔드에 따라 다름)

    issues = []

    # TTL 체크
    if fv.ttl and fv.ttl < timedelta(hours=max_age_hours):
        issues.append(f"TTL ({fv.ttl}) is shorter than max_age ({max_age_hours}h)")

    return {
        "feature_view": feature_view_name,
        "ttl": str(fv.ttl),
        "online_enabled": fv.online,
        "issues": issues,
    }

9.3 성능 최적화

# 배치 조회 최적화
def optimized_batch_lookup(store, entity_ids, features, batch_size=1000):
    """대량 엔티티 조회 최적화"""
    results = []

    for i in range(0, len(entity_ids), batch_size):
        batch = entity_ids[i:i+batch_size]
        entity_rows = [{"user_id": uid} for uid in batch]

        batch_result = store.get_online_features(
            features=features,
            entity_rows=entity_rows,
        ).to_dict()

        results.append(pd.DataFrame(batch_result))

    return pd.concat(results, ignore_index=True)

# 캐싱 레이어 추가
from functools import lru_cache
import hashlib

class CachedFeatureStore:
    def __init__(self, store, cache_ttl=300):
        self.store = store
        self.cache_ttl = cache_ttl
        self._cache = {}

    def get_online_features(self, features, entity_rows):
        # 캐시 키 생성
        cache_key = hashlib.md5(
            str(sorted(features) + sorted([str(e) for e in entity_rows])).encode()
        ).hexdigest()

        # 캐시 확인
        if cache_key in self._cache:
            cached_time, cached_result = self._cache[cache_key]
            if (datetime.now() - cached_time).seconds < self.cache_ttl:
                return cached_result

        # 새로 조회
        result = self.store.get_online_features(features, entity_rows)
        self._cache[cache_key] = (datetime.now(), result)

        return result

참고 자료