Feature Store¶
Feature Store는 ML 특성(Feature)의 중앙 저장소로, 특성의 생성, 저장, 검색, 서빙을 관리함. 학습과 추론 간의 일관성을 보장하고 특성 재사용을 촉진함.
1. Feature Store가 필요한 이유¶
1.1 기존 문제점¶
| 문제 | 증상 | 결과 |
|---|---|---|
| 특성 중복 개발 | 팀마다 동일 특성 재구현 | 시간 낭비, 불일치 |
| Training-Serving Skew | 학습/추론 시 다른 로직 | 성능 저하 |
| 특성 발견 어려움 | "이 특성 어디서 만들어?" | 생산성 저하 |
| 데이터 누수 | 미래 정보 사용 | 과적합 |
| 점 시간 재현 불가 | 과거 시점 데이터 재현 불가 | 디버깅 어려움 |
1.2 Feature Store 해결책¶
Feature Store는 학습(Training)과 추론(Inference) 간의 일관성을 보장하는 중앙 특성 저장소다.
- 동일한 특성 정의: 한 번 정의하면 학습/추론에서 동일하게 사용
- 중앙 관리: 특성 발견, 재사용, 버전 관리
- Training-Serving Skew 방지: 학습과 서빙에서 같은 로직 보장
2. Feature Store 구성 요소¶
2.1 핵심 개념¶
| 개념 | 설명 | 예시 |
|---|---|---|
| Entity | 특성의 주체 | user_id, product_id, transaction_id |
| Feature | 개별 속성 값 | user_age, product_price, click_count |
| Feature View | 특성 그룹 | user_features, product_features |
| Feature Service | 서빙용 특성 조합 | recommendation_features |
| Offline Store | 배치 학습용 (분석/학습) | BigQuery, S3, Parquet |
| Online Store | 실시간 추론용 (저지연 <10ms) | Redis, DynamoDB |
2.2 아키텍처¶
| ^
v |
+------------------+ +------------------+
| Transformation | -----------------> | Feature Store |
| (Spark, Flink) | +------------------+
+------------------+ | - Registry |
| - Offline Store |
| - Online Store |
+------------------+
---
## 3. Feast (Feature Store)
### 3.1 설치 및 초기화
```bash
pip install feast
# 프로젝트 초기화
feast init my_feature_repo
cd my_feature_repo
# 구조
my_feature_repo/
├── feature_repo/
│ ├── __init__.py
│ ├── entities.py
│ ├── features.py
│ └── feature_services.py
├── feature_store.yaml
└── data/
└── driver_stats.parquet
3.2 Feature Store 설정¶
# feature_store.yaml
project: my_ml_project
registry: data/registry.db
provider: local
online_store:
type: redis
connection_string: "localhost:6379"
offline_store:
type: file # 또는 bigquery, snowflake, redshift
entity_key_serialization_version: 2
3.3 Entity 정의¶
# entities.py
from feast import Entity
# 사용자 엔티티
user = Entity(
name="user",
join_keys=["user_id"],
description="User entity for customer features",
)
# 상품 엔티티
product = Entity(
name="product",
join_keys=["product_id"],
description="Product entity",
)
# 거래 엔티티
transaction = Entity(
name="transaction",
join_keys=["transaction_id"],
description="Transaction entity",
)
3.4 Feature View 정의¶
# features.py
from datetime import timedelta
from feast import FeatureView, Field, FileSource, PushSource
from feast.types import Float32, Int64, String
# 데이터 소스 정의
user_stats_source = FileSource(
name="user_stats_source",
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# Feature View 정의
user_stats_fv = FeatureView(
name="user_stats",
entities=[user],
ttl=timedelta(days=365),
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_purchase_amount", dtype=Float32),
Field(name="purchase_count_30d", dtype=Int64),
Field(name="last_purchase_category", dtype=String),
],
source=user_stats_source,
online=True, # Online Store에 저장
tags={"team": "ml", "version": "v1"},
)
# 실시간 특성 (Push Source)
user_realtime_source = PushSource(
name="user_realtime_source",
batch_source=user_stats_source,
)
user_realtime_fv = FeatureView(
name="user_realtime",
entities=[user],
ttl=timedelta(hours=24),
schema=[
Field(name="session_duration_sec", dtype=Float32),
Field(name="page_views", dtype=Int64),
Field(name="cart_items", dtype=Int64),
],
source=user_realtime_source,
online=True,
)
3.5 Feature Service 정의¶
# feature_services.py
from feast import FeatureService
# 추천 모델용 특성 서비스
recommendation_fs = FeatureService(
name="recommendation_features",
features=[
user_stats_fv[["total_purchases", "avg_purchase_amount"]],
user_realtime_fv[["session_duration_sec", "page_views"]],
],
tags={"model": "recommendation"},
)
# 이탈 예측 모델용 특성 서비스
churn_prediction_fs = FeatureService(
name="churn_prediction_features",
features=[
user_stats_fv, # 모든 특성
],
tags={"model": "churn"},
)
3.6 적용 및 사용¶
# Feature Store 적용 (Registry 업데이트, Online Store 로드)
feast apply
# 특성 목록 조회
feast feature-views list
feast feature-services list
# Materialization (Offline → Online Store)
feast materialize-incremental $(date +%Y-%m-%dT%H:%M:%S)
# 또는 특정 기간
feast materialize 2024-01-01T00:00:00 2024-02-01T00:00:00
3.7 학습용 특성 조회 (Offline)¶
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path=".")
# 학습 데이터 (Entity DataFrame)
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003, 1004, 1005],
"event_timestamp": pd.to_datetime([
"2024-01-15", "2024-01-16", "2024-01-17",
"2024-01-18", "2024-01-19"
]),
})
# 특성 조회 (Point-in-Time Join)
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_stats:purchase_count_30d",
],
).to_df()
print(training_df)
# Output:
# user_id event_timestamp total_purchases avg_purchase_amount purchase_count_30d
# 0 1001 2024-01-15 25 150.50 5
# 1 1002 2024-01-16 12 85.30 3
# ...
3.8 추론용 특성 조회 (Online)¶
from feast import FeatureStore
store = FeatureStore(repo_path=".")
# 단일 엔티티
feature_vector = store.get_online_features(
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
],
entity_rows=[{"user_id": 1001}],
).to_dict()
print(feature_vector)
# {'user_id': [1001], 'total_purchases': [25], 'avg_purchase_amount': [150.5]}
# 배치 조회
batch_features = store.get_online_features(
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
],
entity_rows=[
{"user_id": 1001},
{"user_id": 1002},
{"user_id": 1003},
],
).to_dict()
3.9 실시간 특성 Push¶
from feast import FeatureStore
import pandas as pd
from datetime import datetime
store = FeatureStore(repo_path=".")
# 실시간 이벤트 데이터
realtime_df = pd.DataFrame({
"user_id": [1001],
"session_duration_sec": [245.5],
"page_views": [12],
"cart_items": [3],
"event_timestamp": [datetime.now()],
"created_timestamp": [datetime.now()],
})
# Online Store에 Push
store.push("user_realtime_source", realtime_df)
4. 특성 엔지니어링 패턴¶
4.1 집계 특성¶
# Spark를 이용한 집계 특성 생성
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
transactions = spark.read.parquet("data/transactions.parquet")
# 윈도우 정의
user_window_30d = Window.partitionBy("user_id").orderBy("event_timestamp").rangeBetween(-30*24*60*60, 0)
user_window_7d = Window.partitionBy("user_id").orderBy("event_timestamp").rangeBetween(-7*24*60*60, 0)
# 집계 특성 생성
user_features = transactions.withColumn(
"purchase_count_30d", F.count("*").over(user_window_30d)
).withColumn(
"purchase_sum_30d", F.sum("amount").over(user_window_30d)
).withColumn(
"purchase_avg_7d", F.avg("amount").over(user_window_7d)
).withColumn(
"last_purchase_days_ago", F.datediff(
F.current_date(),
F.max("event_timestamp").over(user_window_30d)
)
)
4.2 시계열 특성¶
# 시간 기반 특성
user_features = user_features.withColumn(
"hour_of_day", F.hour("event_timestamp")
).withColumn(
"day_of_week", F.dayofweek("event_timestamp")
).withColumn(
"is_weekend", F.when(F.dayofweek("event_timestamp").isin([1, 7]), 1).otherwise(0)
).withColumn(
"month", F.month("event_timestamp")
)
4.3 교차 특성¶
# 사용자-상품 상호작용 특성
user_product_features = transactions.groupBy("user_id", "product_category").agg(
F.count("*").alias("category_purchase_count"),
F.avg("amount").alias("category_avg_amount"),
)
# 사용자 선호 카테고리
user_preferred_category = user_product_features.withColumn(
"rank", F.row_number().over(
Window.partitionBy("user_id").orderBy(F.desc("category_purchase_count"))
)
).filter(F.col("rank") == 1).select("user_id", "product_category")
5. Point-in-Time Join¶
5.1 개념¶
Point-in-Time Join: 미래 정보 누출 방지를 위한 시간 기반 조인
예시:
- 학습 데이터: 2024-01-15 시점의 예측
- 특성: 2024-01-15 이전 데이터만 사용해야 함
잘못된 예:
Entity: user_1, 2024-01-15
Feature: user_1, 2024-01-20의 특성 사용 # 미래 정보!
올바른 예:
Entity: user_1, 2024-01-15
Feature: user_1, 2024-01-14의 특성 사용 # 과거 정보
5.2 구현¶
# Feast는 자동으로 Point-in-Time Join 수행
training_df = store.get_historical_features(
entity_df=entity_df, # event_timestamp 포함
features=[...],
).to_df()
# 수동 구현 (Spark)
def point_in_time_join(entities_df, features_df, entity_key, time_col):
"""
entities_df: 엔티티와 타임스탬프
features_df: 특성과 타임스탬프
"""
# 조인
joined = entities_df.join(
features_df,
on=[entity_key],
how="left"
)
# 미래 데이터 필터링
filtered = joined.filter(
F.col(f"features_{time_col}") <= F.col(f"entities_{time_col}")
)
# 가장 최근 특성 선택
window = Window.partitionBy(entity_key, f"entities_{time_col}").orderBy(
F.desc(f"features_{time_col}")
)
result = filtered.withColumn("rank", F.row_number().over(window)).filter(
F.col("rank") == 1
).drop("rank")
return result
6. 프로덕션 배포¶
6.1 AWS 배포 예시¶
# feature_store.yaml (AWS)
project: production_ml
registry: s3://my-bucket/feast/registry.db
provider: aws
online_store:
type: dynamodb
region: ap-northeast-2
table_name: feast_online_store
offline_store:
type: redshift
cluster_id: my-redshift-cluster
database: ml_features
user: feast_user
s3_staging_location: s3://my-bucket/feast/staging/
iam_role: arn:aws:iam::123456789:role/feast-redshift-role
6.2 특성 서빙 API¶
from fastapi import FastAPI
from feast import FeatureStore
app = FastAPI()
store = FeatureStore(repo_path="/app/feature_repo")
@app.get("/features/{user_id}")
async def get_features(user_id: int):
features = store.get_online_features(
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_realtime:session_duration_sec",
],
entity_rows=[{"user_id": user_id}],
).to_dict()
return features
@app.post("/features/batch")
async def get_batch_features(user_ids: list[int]):
entity_rows = [{"user_id": uid} for uid in user_ids]
features = store.get_online_features(
features=[...],
entity_rows=entity_rows,
).to_dict()
return features
6.3 Materialization 스케줄링 (Airflow)¶
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
with DAG(
dag_id='feast_materialization',
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
materialize = BashOperator(
task_id='materialize_features',
bash_command='''
cd /app/feature_repo && \
feast materialize-incremental $(date -u +%Y-%m-%dT%H:%M:%S)
''',
)
7. 모범 사례¶
7.1 특성 명명 규칙¶
형식: {entity}_{timeframe}_{aggregation}_{attribute}
예시:
- user_30d_sum_purchase_amount
- user_7d_avg_session_duration
- product_lifetime_count_views
- user_product_30d_count_interactions
7.2 특성 문서화¶
user_stats_fv = FeatureView(
name="user_stats",
description="사용자 구매 통계 특성. 일별 배치로 업데이트.",
entities=[user],
schema=[
Field(
name="total_purchases",
dtype=Int64,
description="전체 구매 횟수",
),
Field(
name="avg_purchase_amount",
dtype=Float32,
description="평균 구매 금액 (원)",
),
],
tags={
"team": "ml-platform",
"owner": "data-team@company.com",
"update_frequency": "daily",
"data_source": "transactions_db",
},
)
7.3 모니터링¶
# 특성 통계 모니터링
from evidently.metrics import DataDriftTable
from evidently.report import Report
def monitor_feature_drift(reference_df, current_df):
report = Report(metrics=[DataDriftTable()])
report.run(reference_data=reference_df, current_data=current_df)
# 드리프트 감지 시 알림
drift_results = report.as_dict()
for feature, stats in drift_results["metrics"][0]["result"]["drift_by_columns"].items():
if stats["drift_detected"]:
send_alert(f"Feature drift detected: {feature}")
8. 실무 케이스 스터디¶
8.1 케이스 1: 추천 시스템 특성 파이프라인¶
상황: 이커머스 추천 시스템에서 실시간 사용자 행동과 배치 집계 특성 통합 필요
# feature_repo/features/recommendation_features.py
from feast import Entity, FeatureView, Field, PushSource, FileSource
from feast.types import Float32, Int64, String
from datetime import timedelta
# 엔티티 정의
user = Entity(name="user", join_keys=["user_id"])
product = Entity(name="product", join_keys=["product_id"])
# 배치 특성: 일별 집계
user_daily_source = FileSource(
name="user_daily_stats",
path="s3://feature-store/user_daily_stats/",
timestamp_field="event_date",
file_format="parquet",
)
user_daily_fv = FeatureView(
name="user_daily_stats",
entities=[user],
ttl=timedelta(days=30),
schema=[
Field(name="purchase_count_7d", dtype=Int64),
Field(name="purchase_count_30d", dtype=Int64),
Field(name="avg_order_value_30d", dtype=Float32),
Field(name="favorite_category", dtype=String),
Field(name="churn_risk_score", dtype=Float32),
],
source=user_daily_source,
online=True,
tags={"team": "recommendation", "update_frequency": "daily"},
)
# 실시간 특성: 세션 기반
user_realtime_source = PushSource(
name="user_realtime_source",
batch_source=user_daily_source,
)
user_session_fv = FeatureView(
name="user_session_features",
entities=[user],
ttl=timedelta(hours=24),
schema=[
Field(name="current_session_duration", dtype=Float32),
Field(name="pages_viewed_session", dtype=Int64),
Field(name="items_in_cart", dtype=Int64),
Field(name="last_viewed_category", dtype=String),
],
source=user_realtime_source,
online=True,
)
# 상품-사용자 교차 특성
user_product_source = FileSource(
name="user_product_interactions",
path="s3://feature-store/user_product_interactions/",
timestamp_field="interaction_date",
)
user_product_fv = FeatureView(
name="user_product_affinity",
entities=[user, product],
ttl=timedelta(days=90),
schema=[
Field(name="view_count", dtype=Int64),
Field(name="purchase_count", dtype=Int64),
Field(name="affinity_score", dtype=Float32),
],
source=user_product_source,
online=True,
)
# 실시간 특성 업데이트 (Kafka Consumer)
from feast import FeatureStore
import pandas as pd
from kafka import KafkaConsumer
import json
store = FeatureStore(repo_path="/app/feature_repo")
consumer = KafkaConsumer(
"user-events",
bootstrap_servers=["kafka:9092"],
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)
def process_user_event(event):
"""사용자 이벤트 처리 및 특성 업데이트"""
user_id = event["user_id"]
# 세션 상태 조회 (Redis 등에서)
session_state = get_session_state(user_id)
# 특성 계산
features_df = pd.DataFrame([{
"user_id": user_id,
"current_session_duration": session_state["duration"],
"pages_viewed_session": session_state["page_views"],
"items_in_cart": session_state["cart_items"],
"last_viewed_category": event.get("category", "unknown"),
"event_timestamp": pd.Timestamp.now(),
"created_timestamp": pd.Timestamp.now(),
}])
# Feature Store에 Push
store.push("user_realtime_source", features_df)
for message in consumer:
process_user_event(message.value)
8.2 케이스 2: 사기 탐지 특성 엔지니어링¶
# Spark를 이용한 복잡한 특성 계산
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
def compute_fraud_features(transactions_df):
"""사기 탐지용 특성 계산"""
# 시간 윈도우 정의
user_window_1h = Window.partitionBy("user_id").orderBy(
F.col("timestamp").cast("long")
).rangeBetween(-3600, 0)
user_window_24h = Window.partitionBy("user_id").orderBy(
F.col("timestamp").cast("long")
).rangeBetween(-86400, 0)
# 속도 기반 특성 (Velocity Features)
velocity_features = transactions_df.withColumn(
"tx_count_1h", F.count("*").over(user_window_1h)
).withColumn(
"tx_count_24h", F.count("*").over(user_window_24h)
).withColumn(
"tx_amount_1h", F.sum("amount").over(user_window_1h)
).withColumn(
"tx_amount_24h", F.sum("amount").over(user_window_24h)
).withColumn(
"unique_merchants_1h", F.countDistinct("merchant_id").over(user_window_1h)
).withColumn(
"unique_countries_24h", F.countDistinct("country").over(user_window_24h)
)
# 이상치 특성
user_stats = transactions_df.groupBy("user_id").agg(
F.avg("amount").alias("user_avg_amount"),
F.stddev("amount").alias("user_std_amount"),
)
anomaly_features = velocity_features.join(
user_stats, on="user_id", how="left"
).withColumn(
"amount_zscore",
(F.col("amount") - F.col("user_avg_amount")) / F.col("user_std_amount")
).withColumn(
"is_amount_outlier",
F.when(F.abs(F.col("amount_zscore")) > 3, 1).otherwise(0)
)
# 시간 기반 특성
time_features = anomaly_features.withColumn(
"hour_of_day", F.hour("timestamp")
).withColumn(
"is_night", F.when(
(F.hour("timestamp") >= 0) & (F.hour("timestamp") < 6), 1
).otherwise(0)
).withColumn(
"is_weekend", F.when(
F.dayofweek("timestamp").isin([1, 7]), 1
).otherwise(0)
)
return time_features
# 배치 실행 및 Feature Store 저장
features = compute_fraud_features(transactions)
features.write.parquet("s3://feature-store/fraud_features/")
8.3 케이스 3: Feature Store 마이그레이션¶
상황: 수동 특성 파이프라인에서 Feast로 마이그레이션
# 마이그레이션 체크리스트 및 검증
import pandas as pd
from feast import FeatureStore
import numpy as np
def validate_migration(legacy_features_df, feast_store, entity_df):
"""기존 특성과 Feast 특성 비교 검증"""
# Feast에서 특성 조회
feast_features = feast_store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:purchase_count_30d",
"user_stats:avg_order_value",
],
).to_df()
# 데이터 정합성 검증
merged = legacy_features_df.merge(
feast_features,
on=["user_id", "event_timestamp"],
suffixes=("_legacy", "_feast"),
)
validation_results = {}
for feature in ["purchase_count_30d", "avg_order_value"]:
legacy_col = f"{feature}_legacy"
feast_col = f"{feature}_feast"
# 정확히 일치하는 비율
exact_match = (merged[legacy_col] == merged[feast_col]).mean()
# 근사 일치 (수치형의 경우)
if merged[legacy_col].dtype in [np.float64, np.float32]:
approx_match = np.isclose(
merged[legacy_col],
merged[feast_col],
rtol=1e-5
).mean()
else:
approx_match = exact_match
validation_results[feature] = {
"exact_match_rate": exact_match,
"approx_match_rate": approx_match,
"sample_mismatches": merged[
merged[legacy_col] != merged[feast_col]
][[legacy_col, feast_col]].head(5).to_dict(),
}
return validation_results
# 점진적 마이그레이션 전략
"""
Phase 1: Shadow Mode
- Feast 병렬 운영, 기존 시스템 유지
- 두 시스템 결과 비교
Phase 2: Canary
- 일부 트래픽만 Feast 사용
- 모니터링 강화
Phase 3: Full Migration
- 전체 트래픽 Feast로 전환
- 기존 시스템 유지 (롤백용)
Phase 4: Decommission
- 기존 시스템 제거
"""
9. 트러블슈팅 가이드¶
9.1 일반적인 문제¶
| 문제 | 원인 | 해결책 |
|---|---|---|
| Online 조회 지연 | Redis 연결 문제 | 커넥션 풀 확인, 타임아웃 조정 |
| 특성 불일치 | Point-in-Time 조인 오류 | 타임스탬프 형식 검증 |
| Materialization 실패 | 데이터 스키마 변경 | Feature View 스키마 업데이트 |
| 메모리 부족 | 대용량 특성 로드 | 배치 크기 조정, 특성 선택 |
| 특성 누락 | TTL 만료 | TTL 연장, 재 Materialization |
9.2 디버깅 스크립트¶
# scripts/debug_feature_store.py
from feast import FeatureStore
from datetime import datetime, timedelta
import pandas as pd
def diagnose_feature_store():
"""Feature Store 진단"""
store = FeatureStore(repo_path=".")
diagnostics = {
"timestamp": datetime.now().isoformat(),
"registry": {},
"online_store": {},
"offline_store": {},
}
# Registry 상태
try:
fvs = store.list_feature_views()
diagnostics["registry"]["feature_views"] = [fv.name for fv in fvs]
diagnostics["registry"]["status"] = "healthy"
except Exception as e:
diagnostics["registry"]["status"] = f"error: {str(e)}"
# Online Store 연결
try:
test_entity = pd.DataFrame({"user_id": [1]})
result = store.get_online_features(
features=["user_stats:purchase_count_30d"],
entity_rows=[{"user_id": 1}],
)
diagnostics["online_store"]["status"] = "healthy"
diagnostics["online_store"]["sample_result"] = result.to_dict()
except Exception as e:
diagnostics["online_store"]["status"] = f"error: {str(e)}"
# Offline Store 연결
try:
entity_df = pd.DataFrame({
"user_id": [1],
"event_timestamp": [datetime.now()],
})
result = store.get_historical_features(
entity_df=entity_df,
features=["user_stats:purchase_count_30d"],
).to_df()
diagnostics["offline_store"]["status"] = "healthy"
except Exception as e:
diagnostics["offline_store"]["status"] = f"error: {str(e)}"
return diagnostics
def check_feature_freshness(store, feature_view_name, max_age_hours=24):
"""특성 신선도 체크"""
fv = store.get_feature_view(feature_view_name)
# 최신 Materialization 시간 확인
# (실제 구현은 Feature Store 백엔드에 따라 다름)
issues = []
# TTL 체크
if fv.ttl and fv.ttl < timedelta(hours=max_age_hours):
issues.append(f"TTL ({fv.ttl}) is shorter than max_age ({max_age_hours}h)")
return {
"feature_view": feature_view_name,
"ttl": str(fv.ttl),
"online_enabled": fv.online,
"issues": issues,
}
9.3 성능 최적화¶
# 배치 조회 최적화
def optimized_batch_lookup(store, entity_ids, features, batch_size=1000):
"""대량 엔티티 조회 최적화"""
results = []
for i in range(0, len(entity_ids), batch_size):
batch = entity_ids[i:i+batch_size]
entity_rows = [{"user_id": uid} for uid in batch]
batch_result = store.get_online_features(
features=features,
entity_rows=entity_rows,
).to_dict()
results.append(pd.DataFrame(batch_result))
return pd.concat(results, ignore_index=True)
# 캐싱 레이어 추가
from functools import lru_cache
import hashlib
class CachedFeatureStore:
def __init__(self, store, cache_ttl=300):
self.store = store
self.cache_ttl = cache_ttl
self._cache = {}
def get_online_features(self, features, entity_rows):
# 캐시 키 생성
cache_key = hashlib.md5(
str(sorted(features) + sorted([str(e) for e in entity_rows])).encode()
).hexdigest()
# 캐시 확인
if cache_key in self._cache:
cached_time, cached_result = self._cache[cache_key]
if (datetime.now() - cached_time).seconds < self.cache_ttl:
return cached_result
# 새로 조회
result = self.store.get_online_features(features, entity_rows)
self._cache[cache_key] = (datetime.now(), result)
return result