콘텐츠로 이동
Data Prep
상세

Agentic RAG 아키텍처

개요

Agentic RAG는 전통적인 RAG의 단일 검색-생성 패턴을 넘어, LLM 에이전트가 동적으로 검색 전략을 선택하고 다단계 워크플로우를 실행하는 아키텍처다.

핵심 개념

Traditional RAG vs Agentic RAG

구분 Traditional RAG Agentic RAG
워크플로우 단일 검색 → 생성 계획 → 다단계 검색 → 추론 → 생성
검색 전략 고정 동적 선택
자체 평가 없음 결과 검증 및 재검색
복잡도 낮음 높음
지연시간 낮음 중간~높음
복잡한 질문 제한적 강점

아키텍처 패턴

1. Router Agent Pattern

Router Agent Pattern

라우터 에이전트가 쿼리 특성에 따라 최적의 데이터 소스를 선택한다.

장점: - 데이터 소스 특성에 맞는 최적 검색 - 불필요한 검색 호출 감소 - 확장 가능한 구조

2. ReAct Pattern (Reason + Act)

agentic-rag diagram

단계 역할 예시
Thought 다음 행동 계획 "사용자가 2024년 매출을 물었으니 SQL 쿼리가 필요"
Action 도구 실행 query_database(SELECT SUM(revenue)...)
Observation 결과 확인 "총 매출: 150억원"

3. Plan-and-Execute Pattern

agentic-rag diagram

4. Self-Reflection Pattern

agentic-rag diagram


핵심 구성요소

1. Query Understanding Module

from dataclasses import dataclass
from typing import List, Literal
from enum import Enum

class QueryIntent(Enum):
    FACTUAL = "factual"           # 단순 사실 확인
    ANALYTICAL = "analytical"      # 분석/비교 필요
    PROCEDURAL = "procedural"      # 절차/방법 설명
    CONVERSATIONAL = "conversational"  # 대화/잡담

class QueryComplexity(Enum):
    SINGLE = "single"              # 단일 검색으로 해결
    MULTI_HOP = "multi_hop"        # 여러 단계 추론 필요
    AGGREGATION = "aggregation"    # 집계/종합 필요

@dataclass
class QueryAnalysis:
    intent: QueryIntent
    complexity: QueryComplexity
    domains: List[str]
    time_scope: str
    required_tools: List[str]
    confidence: float


class QueryAnalyzer:
    """
    쿼리 분석기

    사용자 질문을 분석하여 최적의 처리 전략 결정
    """

    def __init__(self, llm_client):
        self.llm = llm_client

    def analyze(self, query: str) -> QueryAnalysis:
        """
        쿼리 분석 수행

        Args:
            query: 사용자 질문

        Returns:
            QueryAnalysis 객체
        """
        # LLM에게 쿼리 분석 요청
        prompt = f"""
        Analyze this query and return JSON:
        Query: {query}

        Return:
        - intent: factual/analytical/procedural/conversational
        - complexity: single/multi_hop/aggregation
        - domains: list of relevant domains
        - time_scope: relevant time range
        - required_tools: list of tools needed (vector_search, sql_query, web_search, calculator)
        """

        response = self.llm.generate(prompt)
        # Parse JSON response...

        return QueryAnalysis(
            intent=QueryIntent.ANALYTICAL,
            complexity=QueryComplexity.MULTI_HOP,
            domains=["sales", "marketing"],
            time_scope="2024-Q1 to Q4",
            required_tools=["sql_query", "vector_search"],
            confidence=0.85
        )


# 사용 예시
analyzer = QueryAnalyzer(llm_client)
analysis = analyzer.analyze("2024년 분기별 매출과 원인 분석해줘")
print(f"Intent: {analysis.intent}")
print(f"Complexity: {analysis.complexity}")
print(f"Tools needed: {analysis.required_tools}")

2. Tool Registry

도구 용도 입력 출력
vector_search 문서 검색 쿼리, top_k 문서 청크 리스트
sql_query 구조화 데이터 SQL문 테이블 결과
web_search 최신 정보 검색어 웹 스니펫
calculator 수치 계산 수식 계산 결과
code_executor 코드 실행 Python 코드 실행 결과
from typing import Callable, Dict, Any, List
from dataclasses import dataclass
import json

@dataclass
class Tool:
    name: str
    description: str
    parameters: Dict[str, Any]
    function: Callable

class ToolRegistry:
    """
    도구 레지스트리

    에이전트가 사용할 수 있는 도구들을 등록하고 관리
    """

    def __init__(self):
        self.tools: Dict[str, Tool] = {}

    def register(self, tool: Tool):
        """도구 등록"""
        self.tools[tool.name] = tool

    def get(self, name: str) -> Tool:
        """도구 조회"""
        if name not in self.tools:
            raise ValueError(f"Tool not found: {name}")
        return self.tools[name]

    def execute(self, name: str, **kwargs) -> Any:
        """도구 실행"""
        tool = self.get(name)
        return tool.function(**kwargs)

    def get_tool_descriptions(self) -> str:
        """LLM에 전달할 도구 설명 생성"""
        descriptions = []
        for name, tool in self.tools.items():
            desc = f"- {name}: {tool.description}"
            desc += f"\n  Parameters: {json.dumps(tool.parameters)}"
            descriptions.append(desc)
        return "\n".join(descriptions)


# 도구 함수 정의
def vector_search(query: str, top_k: int = 5) -> List[Dict]:
    """벡터 DB 검색"""
    # 실제 구현: vectordb.search(query, top_k)
    return [{"content": "...", "score": 0.95}]

def sql_query(query: str) -> List[Dict]:
    """SQL 쿼리 실행"""
    # 실제 구현: db.execute(query)
    return [{"revenue": 35000000, "quarter": "Q1"}]

def web_search(query: str) -> List[Dict]:
    """웹 검색"""
    # 실제 구현: search_api.search(query)
    return [{"title": "...", "snippet": "...", "url": "..."}]


# 레지스트리 설정
registry = ToolRegistry()

registry.register(Tool(
    name="vector_search",
    description="문서 데이터베이스에서 관련 문서 검색",
    parameters={"query": "str", "top_k": "int"},
    function=vector_search
))

registry.register(Tool(
    name="sql_query",
    description="SQL 데이터베이스에서 구조화된 데이터 조회",
    parameters={"query": "str"},
    function=sql_query
))

registry.register(Tool(
    name="web_search",
    description="웹에서 최신 정보 검색",
    parameters={"query": "str"},
    function=web_search
))

3. Memory & State Management

agentic-rag diagram

from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json

@dataclass
class MemoryEntry:
    """메모리 항목"""
    key: str
    value: Any
    timestamp: datetime = field(default_factory=datetime.now)
    ttl_seconds: Optional[int] = None  # None = 영구 저장


class AgentMemory:
    """
    에이전트 메모리 관리

    단기/장기 메모리를 분리하여 관리
    """

    def __init__(self):
        # 단기 메모리: 현재 대화 세션용
        self.short_term: Dict[str, MemoryEntry] = {}

        # 장기 메모리: 세션 간 유지
        self.long_term: Dict[str, MemoryEntry] = {}

        # 검색 결과 캐시
        self.search_cache: Dict[str, Any] = {}

        # 도구 실행 이력
        self.tool_history: List[Dict] = []

    def store_short_term(self, key: str, value: Any, ttl: int = 3600):
        """단기 메모리에 저장"""
        self.short_term[key] = MemoryEntry(
            key=key, 
            value=value, 
            ttl_seconds=ttl
        )

    def store_long_term(self, key: str, value: Any):
        """장기 메모리에 저장"""
        self.long_term[key] = MemoryEntry(key=key, value=value)

    def get(self, key: str) -> Optional[Any]:
        """메모리에서 값 조회 (단기 우선)"""
        if key in self.short_term:
            entry = self.short_term[key]
            if entry.ttl_seconds:
                age = (datetime.now() - entry.timestamp).seconds
                if age > entry.ttl_seconds:
                    del self.short_term[key]
                    return None
            return entry.value

        if key in self.long_term:
            return self.long_term[key].value

        return None

    def cache_search(self, query: str, results: Any):
        """검색 결과 캐싱"""
        self.search_cache[query] = {
            "results": results,
            "timestamp": datetime.now()
        }

    def get_cached_search(self, query: str, max_age_seconds: int = 300) -> Optional[Any]:
        """캐시된 검색 결과 조회"""
        if query in self.search_cache:
            cached = self.search_cache[query]
            age = (datetime.now() - cached["timestamp"]).seconds
            if age <= max_age_seconds:
                return cached["results"]
        return None

    def log_tool_use(self, tool_name: str, inputs: Dict, outputs: Any):
        """도구 사용 로깅"""
        self.tool_history.append({
            "tool": tool_name,
            "inputs": inputs,
            "outputs": outputs,
            "timestamp": datetime.now().isoformat()
        })

    def get_recent_context(self, n: int = 5) -> str:
        """최근 컨텍스트 요약"""
        recent = self.tool_history[-n:]
        return json.dumps(recent, indent=2, default=str)

    def clear_session(self):
        """세션 종료 시 단기 메모리 정리"""
        self.short_term.clear()
        self.search_cache.clear()
        self.tool_history.clear()

구현 고려사항

지연시간 최적화

전략 설명 예상 효과
병렬 검색 독립적인 검색을 동시 실행 30-50% 감소
스트리밍 중간 결과 즉시 반환 체감 지연 감소
캐싱 동일 쿼리 결과 재사용 70-90% 감소
조기 종료 충분한 정보 확보시 중단 20-40% 감소
import asyncio
from typing import List, Dict

async def parallel_search(queries: List[Dict], registry: ToolRegistry) -> List[Any]:
    """
    독립적인 검색을 병렬로 실행
    """
    async def execute_single(q: Dict) -> Any:
        tool_name = q["tool"]
        params = q["params"]
        return registry.execute(tool_name, **params)

    tasks = [execute_single(q) for q in queries]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    return results


# 사용 예시
queries = [
    {"tool": "vector_search", "params": {"query": "2024 매출", "top_k": 5}},
    {"tool": "sql_query", "params": {"query": "SELECT * FROM sales WHERE year=2024"}},
    {"tool": "web_search", "params": {"query": "시장 동향 2024"}}
]

# 병렬 실행으로 3개 검색을 동시에
results = asyncio.run(parallel_search(queries, registry))

에러 처리

from tenacity import retry, stop_after_attempt, wait_exponential

class AgentError(Exception):
    """에이전트 에러 기본 클래스"""
    pass

class ToolExecutionError(AgentError):
    """도구 실행 실패"""
    pass

class MaxRetriesExceeded(AgentError):
    """최대 재시도 초과"""
    pass


@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
def execute_with_retry(tool_name: str, registry: ToolRegistry, **kwargs):
    """
    재시도 로직이 포함된 도구 실행

    1. 재시도 (최대 3회, 지수 백오프)
    2. 대체 도구로 전환
    3. 부분 결과로 진행
    4. 사용자에게 한계 명시
    """
    try:
        return registry.execute(tool_name, **kwargs)
    except Exception as e:
        # 로깅
        print(f"Tool {tool_name} failed: {e}")
        raise ToolExecutionError(f"Tool execution failed: {e}")


def execute_with_fallback(
    primary_tool: str, 
    fallback_tool: str,
    registry: ToolRegistry,
    **kwargs
) -> Any:
    """
    대체 도구를 사용한 에러 처리
    """
    try:
        return execute_with_retry(primary_tool, registry, **kwargs)
    except (ToolExecutionError, MaxRetriesExceeded):
        print(f"Falling back to {fallback_tool}")
        return registry.execute(fallback_tool, **kwargs)

비용 관리

항목 최적화 방안
LLM 호출 작은 모델로 라우팅/판단, 큰 모델로 생성
검색 횟수 배치 처리, 관련성 점수 기반 조기 종료
토큰 수 컨텍스트 압축, 불필요 정보 제거
class CostAwareAgent:
    """
    비용을 고려한 에이전트
    """

    def __init__(self, small_llm, large_llm, budget_limit: float):
        self.small_llm = small_llm  # 라우팅, 평가용
        self.large_llm = large_llm  # 최종 생성용
        self.budget_limit = budget_limit
        self.current_cost = 0.0

    def route_query(self, query: str) -> str:
        """작은 모델로 라우팅 결정"""
        # 비용: ~$0.001
        return self.small_llm.generate(
            f"Classify query type: {query}"
        )

    def evaluate_response(self, response: str, query: str) -> bool:
        """작은 모델로 응답 평가"""
        # 비용: ~$0.001
        return self.small_llm.generate(
            f"Is this response adequate for '{query}'? Response: {response}"
        )

    def generate_final(self, context: str, query: str) -> str:
        """큰 모델로 최종 응답 생성"""
        # 비용: ~$0.01-0.05
        if self.current_cost + 0.05 > self.budget_limit:
            raise Exception("Budget exceeded")

        response = self.large_llm.generate(
            f"Context: {context}\nQuery: {query}\nAnswer:"
        )
        self.current_cost += 0.03  # 예상 비용
        return response

프레임워크 비교

프레임워크 특징 적합 케이스 GitHub Stars
LangChain 범용, 풍부한 통합 빠른 프로토타입 75k+
LlamaIndex 데이터 연결 특화 RAG 중심 30k+
Haystack 프로덕션 지향 엔터프라이즈 13k+
AutoGen 멀티에이전트 협업 워크플로우 25k+
CrewAI 역할 기반 에이전트 복잡한 태스크 분업 15k+

적용 시나리오

적합한 케이스

시나리오 이유
복잡한 분석 질문 다중 데이터 소스 통합
동적 정보 요구 실시간 + 저장 데이터
다단계 추론 필요 순차적 정보 수집
사용자 의도 불명확 명확화 대화

부적합한 케이스

시나리오 이유 대안
단순 FAQ 오버헤드 직접 검색
실시간 응답 (< 1초) 지연시간 캐싱 + 단순 RAG
단일 데이터 소스 라우팅 불필요 표준 RAG
대량 처리 비용 배치 파이프라인

트러블슈팅

흔한 문제와 해결책

문제 원인 해결책
무한 루프 종료 조건 미흡 max_iterations 설정
잘못된 도구 선택 라우팅 실패 프롬프트 개선, 예시 추가
할루시네이션 검색 실패 시 생성 검색 결과 검증 필수화
높은 지연 순차 실행 병렬화, 캐싱
비용 초과 불필요한 LLM 호출 작은 모델 활용, 조기 종료

참고 자료

자료 링크 설명
Lewis et al. NeurIPS 2020 RAG 원논문
Yao et al. ICLR 2023 ReAct 논문
LangChain Docs docs.langchain.com 공식 문서
LlamaIndex Docs docs.llamaindex.ai 공식 문서
NVIDIA Blog developer.nvidia.com Production RAG 가이드