콘텐츠로 이동
Data Prep
상세

스트리밍 LLM 서빙

개요

스트리밍 서빙은 LLM 응답을 토큰 단위로 실시간 전송하는 패턴이다. 체감 레이턴시 감소, UX 향상, 긴 응답 처리에 필수적인 아키텍처다.

스트리밍 프로토콜

1. Server-Sent Events (SSE)

가장 널리 사용되는 방식. HTTP/1.1 기반, 단방향 스트림.

┌──────────┐    HTTP GET (Accept: text/event-stream)    ┌──────────┐
│  Client  │ ──────────────────────────────────────────▶ │  Server  │
│          │ ◀────────────────────────────────────────── │          │
└──────────┘         data: {"token": "안"}               └──────────┘
                     data: {"token": "녕"}
                     data: {"token": "하"}
                     data: [DONE]

서버 (FastAPI):

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
import asyncio
import json

app = FastAPI()

async def generate_stream(prompt: str) -> AsyncGenerator[str, None]:
    """토큰 스트림 생성기"""
    # 실제로는 LLM 호출
    tokens = ["안녕", "하세요", "!", " ", "무엇을", " ", "도와", "드릴까요", "?"]

    for token in tokens:
        # SSE 포맷
        data = json.dumps({"token": token, "done": False})
        yield f"data: {data}\n\n"
        await asyncio.sleep(0.1)  # 토큰 생성 시뮬레이션

    # 종료 신호
    yield "data: [DONE]\n\n"

@app.get("/v1/chat/completions/stream")
async def stream_completion(prompt: str):
    return StreamingResponse(
        generate_stream(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Nginx 버퍼링 비활성화
        }
    )

클라이언트 (JavaScript):

async function streamChat(prompt) {
    const response = await fetch(`/v1/chat/completions/stream?prompt=${encodeURIComponent(prompt)}`, {
        method: 'GET',
        headers: { 'Accept': 'text/event-stream' }
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    let buffer = '';

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop(); // 불완전한 라인 보존

        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = line.slice(6);
                if (data === '[DONE]') {
                    console.log('Stream completed');
                    return;
                }
                const parsed = JSON.parse(data);
                process.stdout.write(parsed.token);
            }
        }
    }
}

2. WebSocket

양방향 통신, 복잡한 인터랙션에 적합.

┌──────────┐    WebSocket Upgrade    ┌──────────┐
│  Client  │ ◀─────────────────────▶ │  Server  │
│          │    양방향 메시지        │          │
└──────────┘                          └──────────┘

서버:

from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect
import json

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_token(self, websocket: WebSocket, token: str):
        await websocket.send_json({"type": "token", "content": token})

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await manager.connect(websocket)

    try:
        while True:
            # 클라이언트 메시지 수신
            data = await websocket.receive_json()
            prompt = data.get("prompt", "")

            # 스트리밍 응답
            async for token in generate_tokens(prompt):
                await manager.send_token(websocket, token)

            # 완료 신호
            await websocket.send_json({"type": "done"})

    except WebSocketDisconnect:
        manager.disconnect(websocket)

async def generate_tokens(prompt: str):
    """LLM 토큰 생성 (pseudo)"""
    # 실제로는 vLLM, TGI 등 호출
    import asyncio
    tokens = ["응답", " ", "생성", " ", "중", "..."]
    for token in tokens:
        yield token
        await asyncio.sleep(0.05)

클라이언트:

class StreamingChat {
    constructor(url) {
        this.ws = new WebSocket(url);
        this.responseBuffer = '';

        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);

            if (data.type === 'token') {
                this.responseBuffer += data.content;
                this.onToken(data.content);
            } else if (data.type === 'done') {
                this.onComplete(this.responseBuffer);
                this.responseBuffer = '';
            }
        };
    }

    send(prompt) {
        this.ws.send(JSON.stringify({ prompt }));
    }

    onToken(token) {
        // 오버라이드 가능
        document.getElementById('output').textContent += token;
    }

    onComplete(fullResponse) {
        console.log('Complete:', fullResponse);
    }
}

3. gRPC Streaming

고성능, 타입 안전, 마이크로서비스 간 통신에 적합.

// chat.proto
syntax = "proto3";

service ChatService {
    // 서버 스트리밍 RPC
    rpc StreamChat(ChatRequest) returns (stream ChatResponse);

    // 양방향 스트리밍
    rpc BiStreamChat(stream ChatRequest) returns (stream ChatResponse);
}

message ChatRequest {
    string prompt = 1;
    int32 max_tokens = 2;
}

message ChatResponse {
    string token = 1;
    bool done = 2;
    float latency_ms = 3;
}

vLLM 스트리밍 통합

AsyncLLMEngine 사용

from vllm import AsyncLLMEngine, SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
import asyncio

class VLLMStreamer:
    def __init__(self, model: str = "meta-llama/Llama-3.2-3B-Instruct"):
        engine_args = AsyncEngineArgs(
            model=model,
            tensor_parallel_size=1,
            max_model_len=4096,
        )
        self.engine = AsyncLLMEngine.from_engine_args(engine_args)

    async def stream(self, prompt: str, request_id: str = None):
        """토큰 스트리밍"""
        if request_id is None:
            request_id = f"req-{asyncio.get_event_loop().time()}"

        sampling_params = SamplingParams(
            temperature=0.7,
            max_tokens=512,
            stream=True,
        )

        results_generator = self.engine.generate(
            prompt,
            sampling_params,
            request_id,
        )

        async for request_output in results_generator:
            for output in request_output.outputs:
                # 새로 생성된 토큰만 반환
                yield output.text

# FastAPI 연동
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()
streamer = VLLMStreamer()

@app.get("/stream")
async def stream_endpoint(prompt: str):
    async def event_generator():
        async for token in streamer.stream(prompt):
            yield f"data: {json.dumps({'token': token})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

OpenAI 호환 스트리밍

import openai

client = openai.OpenAI(
    base_url="http://localhost:8000/v1",
    api_key="dummy"
)

# 스트리밍 요청
stream = client.chat.completions.create(
    model="llama-3.2-3b",
    messages=[{"role": "user", "content": "안녕하세요"}],
    stream=True,
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

프록시 구성

Nginx SSE 설정

upstream llm_backend {
    server 127.0.0.1:8000;
    keepalive 32;
}

server {
    listen 443 ssl http2;
    server_name api.example.com;

    location /v1/chat/completions {
        proxy_pass http://llm_backend;

        # SSE 필수 설정
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_buffering off;
        proxy_cache off;

        # 타임아웃 설정 (긴 응답 대비)
        proxy_read_timeout 300s;
        proxy_send_timeout 300s;

        # 청크 전송
        chunked_transfer_encoding on;

        # Nginx 버퍼 비활성화
        proxy_set_header X-Accel-Buffering no;
    }
}

Cloudflare Workers 프록시

export default {
    async fetch(request, env) {
        const url = new URL(request.url);

        if (url.pathname.startsWith('/stream')) {
            const backendUrl = `${env.BACKEND_URL}${url.pathname}${url.search}`;

            const response = await fetch(backendUrl, {
                method: request.method,
                headers: request.headers,
            });

            // 스트리밍 응답 전달
            return new Response(response.body, {
                headers: {
                    'Content-Type': 'text/event-stream',
                    'Cache-Control': 'no-cache',
                    'Connection': 'keep-alive',
                }
            });
        }

        return new Response('Not Found', { status: 404 });
    }
};

에러 처리

연결 끊김 복구

class ResilientStreamClient:
    def __init__(self, base_url: str, max_retries: int = 3):
        self.base_url = base_url
        self.max_retries = max_retries

    async def stream_with_retry(self, prompt: str):
        """재연결 로직 포함 스트리밍"""
        received_tokens = []
        retry_count = 0

        while retry_count < self.max_retries:
            try:
                async with aiohttp.ClientSession() as session:
                    # 이미 받은 토큰 수 전달 (서버에서 이어서 전송)
                    params = {
                        "prompt": prompt,
                        "offset": len(received_tokens)
                    }

                    async with session.get(
                        f"{self.base_url}/stream",
                        params=params
                    ) as response:
                        async for line in response.content:
                            line = line.decode().strip()
                            if line.startswith("data: "):
                                data = line[6:]
                                if data == "[DONE]":
                                    return received_tokens

                                token = json.loads(data)["token"]
                                received_tokens.append(token)
                                yield token

                return received_tokens

            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                retry_count += 1
                await asyncio.sleep(2 ** retry_count)  # 지수 백오프

        raise Exception("Max retries exceeded")

타임아웃 처리

async def stream_with_timeout(prompt: str, token_timeout: float = 30.0):
    """토큰 간 타임아웃 처리"""
    last_token_time = asyncio.get_event_loop().time()

    async for token in streamer.stream(prompt):
        current_time = asyncio.get_event_loop().time()

        if current_time - last_token_time > token_timeout:
            raise TimeoutError(f"No token received for {token_timeout}s")

        last_token_time = current_time
        yield token

성능 최적화

배치 스트리밍

여러 요청을 동시에 처리하면서 각각 스트리밍:

class BatchStreamManager:
    def __init__(self, engine: AsyncLLMEngine):
        self.engine = engine
        self.active_streams: dict[str, asyncio.Queue] = {}

    async def add_request(self, request_id: str, prompt: str):
        """요청 추가 및 스트림 큐 생성"""
        queue = asyncio.Queue()
        self.active_streams[request_id] = queue

        # 백그라운드에서 생성 시작
        asyncio.create_task(self._generate(request_id, prompt))

        return queue

    async def _generate(self, request_id: str, prompt: str):
        """토큰 생성 및 큐에 추가"""
        async for token in self.engine.generate(prompt, request_id):
            await self.active_streams[request_id].put(token)

        # 완료 신호
        await self.active_streams[request_id].put(None)

메모리 관리

import gc

class StreamingMemoryManager:
    def __init__(self, max_concurrent: int = 100):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.active_count = 0

    async def stream(self, prompt: str):
        async with self.semaphore:
            self.active_count += 1
            try:
                async for token in generate_tokens(prompt):
                    yield token
            finally:
                self.active_count -= 1

                # 주기적 GC
                if self.active_count == 0:
                    gc.collect()

모니터링

메트릭

메트릭 설명 수집 방법
ttft (Time to First Token) 첫 토큰까지 시간 요청 시작 ~ 첫 토큰
tpot (Time per Output Token) 토큰당 생성 시간 총 시간 / 토큰 수
stream_duration 총 스트림 시간 시작 ~ [DONE]
connection_drops 연결 끊김 횟수 WebSocket/SSE 에러 카운트
concurrent_streams 동시 스트림 수 활성 연결 수
from prometheus_client import Histogram, Counter, Gauge
import time

ttft_histogram = Histogram('llm_ttft_seconds', 'Time to first token')
tpot_histogram = Histogram('llm_tpot_seconds', 'Time per output token')
concurrent_streams = Gauge('llm_concurrent_streams', 'Number of active streams')

async def monitored_stream(prompt: str):
    start_time = time.time()
    first_token_time = None
    token_count = 0

    concurrent_streams.inc()

    try:
        async for token in generate_tokens(prompt):
            if first_token_time is None:
                first_token_time = time.time()
                ttft_histogram.observe(first_token_time - start_time)

            token_count += 1
            yield token

        # TPOT 계산
        if token_count > 0:
            total_time = time.time() - first_token_time
            tpot_histogram.observe(total_time / token_count)

    finally:
        concurrent_streams.dec()

체크리스트

  • [ ] SSE 또는 WebSocket 프로토콜 선택
  • [ ] Nginx/로드밸런서 버퍼링 비활성화
  • [ ] 연결 끊김 복구 로직 구현
  • [ ] 토큰 타임아웃 처리
  • [ ] 동시 연결 수 제한
  • [ ] TTFT/TPOT 메트릭 수집
  • [ ] 메모리 누수 모니터링

참고 자료


마지막 업데이트: 2026-03-04