스트리밍 LLM 서빙¶
개요¶
스트리밍 서빙은 LLM 응답을 토큰 단위로 실시간 전송하는 패턴이다. 체감 레이턴시 감소, UX 향상, 긴 응답 처리에 필수적인 아키텍처다.
스트리밍 프로토콜¶
1. Server-Sent Events (SSE)¶
가장 널리 사용되는 방식. HTTP/1.1 기반, 단방향 스트림.
┌──────────┐ HTTP GET (Accept: text/event-stream) ┌──────────┐
│ Client │ ──────────────────────────────────────────▶ │ Server │
│ │ ◀────────────────────────────────────────── │ │
└──────────┘ data: {"token": "안"} └──────────┘
data: {"token": "녕"}
data: {"token": "하"}
data: [DONE]
서버 (FastAPI):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
import asyncio
import json
app = FastAPI()
async def generate_stream(prompt: str) -> AsyncGenerator[str, None]:
"""토큰 스트림 생성기"""
# 실제로는 LLM 호출
tokens = ["안녕", "하세요", "!", " ", "무엇을", " ", "도와", "드릴까요", "?"]
for token in tokens:
# SSE 포맷
data = json.dumps({"token": token, "done": False})
yield f"data: {data}\n\n"
await asyncio.sleep(0.1) # 토큰 생성 시뮬레이션
# 종료 신호
yield "data: [DONE]\n\n"
@app.get("/v1/chat/completions/stream")
async def stream_completion(prompt: str):
return StreamingResponse(
generate_stream(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Nginx 버퍼링 비활성화
}
)
클라이언트 (JavaScript):
async function streamChat(prompt) {
const response = await fetch(`/v1/chat/completions/stream?prompt=${encodeURIComponent(prompt)}`, {
method: 'GET',
headers: { 'Accept': 'text/event-stream' }
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // 불완전한 라인 보존
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
console.log('Stream completed');
return;
}
const parsed = JSON.parse(data);
process.stdout.write(parsed.token);
}
}
}
}
2. WebSocket¶
양방향 통신, 복잡한 인터랙션에 적합.
┌──────────┐ WebSocket Upgrade ┌──────────┐
│ Client │ ◀─────────────────────▶ │ Server │
│ │ 양방향 메시지 │ │
└──────────┘ └──────────┘
서버:
from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect
import json
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_token(self, websocket: WebSocket, token: str):
await websocket.send_json({"type": "token", "content": token})
manager = ConnectionManager()
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
# 클라이언트 메시지 수신
data = await websocket.receive_json()
prompt = data.get("prompt", "")
# 스트리밍 응답
async for token in generate_tokens(prompt):
await manager.send_token(websocket, token)
# 완료 신호
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
manager.disconnect(websocket)
async def generate_tokens(prompt: str):
"""LLM 토큰 생성 (pseudo)"""
# 실제로는 vLLM, TGI 등 호출
import asyncio
tokens = ["응답", " ", "생성", " ", "중", "..."]
for token in tokens:
yield token
await asyncio.sleep(0.05)
클라이언트:
class StreamingChat {
constructor(url) {
this.ws = new WebSocket(url);
this.responseBuffer = '';
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'token') {
this.responseBuffer += data.content;
this.onToken(data.content);
} else if (data.type === 'done') {
this.onComplete(this.responseBuffer);
this.responseBuffer = '';
}
};
}
send(prompt) {
this.ws.send(JSON.stringify({ prompt }));
}
onToken(token) {
// 오버라이드 가능
document.getElementById('output').textContent += token;
}
onComplete(fullResponse) {
console.log('Complete:', fullResponse);
}
}
3. gRPC Streaming¶
고성능, 타입 안전, 마이크로서비스 간 통신에 적합.
// chat.proto
syntax = "proto3";
service ChatService {
// 서버 스트리밍 RPC
rpc StreamChat(ChatRequest) returns (stream ChatResponse);
// 양방향 스트리밍
rpc BiStreamChat(stream ChatRequest) returns (stream ChatResponse);
}
message ChatRequest {
string prompt = 1;
int32 max_tokens = 2;
}
message ChatResponse {
string token = 1;
bool done = 2;
float latency_ms = 3;
}
vLLM 스트리밍 통합¶
AsyncLLMEngine 사용¶
from vllm import AsyncLLMEngine, SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
import asyncio
class VLLMStreamer:
def __init__(self, model: str = "meta-llama/Llama-3.2-3B-Instruct"):
engine_args = AsyncEngineArgs(
model=model,
tensor_parallel_size=1,
max_model_len=4096,
)
self.engine = AsyncLLMEngine.from_engine_args(engine_args)
async def stream(self, prompt: str, request_id: str = None):
"""토큰 스트리밍"""
if request_id is None:
request_id = f"req-{asyncio.get_event_loop().time()}"
sampling_params = SamplingParams(
temperature=0.7,
max_tokens=512,
stream=True,
)
results_generator = self.engine.generate(
prompt,
sampling_params,
request_id,
)
async for request_output in results_generator:
for output in request_output.outputs:
# 새로 생성된 토큰만 반환
yield output.text
# FastAPI 연동
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
streamer = VLLMStreamer()
@app.get("/stream")
async def stream_endpoint(prompt: str):
async def event_generator():
async for token in streamer.stream(prompt):
yield f"data: {json.dumps({'token': token})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
OpenAI 호환 스트리밍¶
import openai
client = openai.OpenAI(
base_url="http://localhost:8000/v1",
api_key="dummy"
)
# 스트리밍 요청
stream = client.chat.completions.create(
model="llama-3.2-3b",
messages=[{"role": "user", "content": "안녕하세요"}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
프록시 구성¶
Nginx SSE 설정¶
upstream llm_backend {
server 127.0.0.1:8000;
keepalive 32;
}
server {
listen 443 ssl http2;
server_name api.example.com;
location /v1/chat/completions {
proxy_pass http://llm_backend;
# SSE 필수 설정
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
# 타임아웃 설정 (긴 응답 대비)
proxy_read_timeout 300s;
proxy_send_timeout 300s;
# 청크 전송
chunked_transfer_encoding on;
# Nginx 버퍼 비활성화
proxy_set_header X-Accel-Buffering no;
}
}
Cloudflare Workers 프록시¶
export default {
async fetch(request, env) {
const url = new URL(request.url);
if (url.pathname.startsWith('/stream')) {
const backendUrl = `${env.BACKEND_URL}${url.pathname}${url.search}`;
const response = await fetch(backendUrl, {
method: request.method,
headers: request.headers,
});
// 스트리밍 응답 전달
return new Response(response.body, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
});
}
return new Response('Not Found', { status: 404 });
}
};
에러 처리¶
연결 끊김 복구¶
class ResilientStreamClient:
def __init__(self, base_url: str, max_retries: int = 3):
self.base_url = base_url
self.max_retries = max_retries
async def stream_with_retry(self, prompt: str):
"""재연결 로직 포함 스트리밍"""
received_tokens = []
retry_count = 0
while retry_count < self.max_retries:
try:
async with aiohttp.ClientSession() as session:
# 이미 받은 토큰 수 전달 (서버에서 이어서 전송)
params = {
"prompt": prompt,
"offset": len(received_tokens)
}
async with session.get(
f"{self.base_url}/stream",
params=params
) as response:
async for line in response.content:
line = line.decode().strip()
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
return received_tokens
token = json.loads(data)["token"]
received_tokens.append(token)
yield token
return received_tokens
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
retry_count += 1
await asyncio.sleep(2 ** retry_count) # 지수 백오프
raise Exception("Max retries exceeded")
타임아웃 처리¶
async def stream_with_timeout(prompt: str, token_timeout: float = 30.0):
"""토큰 간 타임아웃 처리"""
last_token_time = asyncio.get_event_loop().time()
async for token in streamer.stream(prompt):
current_time = asyncio.get_event_loop().time()
if current_time - last_token_time > token_timeout:
raise TimeoutError(f"No token received for {token_timeout}s")
last_token_time = current_time
yield token
성능 최적화¶
배치 스트리밍¶
여러 요청을 동시에 처리하면서 각각 스트리밍:
class BatchStreamManager:
def __init__(self, engine: AsyncLLMEngine):
self.engine = engine
self.active_streams: dict[str, asyncio.Queue] = {}
async def add_request(self, request_id: str, prompt: str):
"""요청 추가 및 스트림 큐 생성"""
queue = asyncio.Queue()
self.active_streams[request_id] = queue
# 백그라운드에서 생성 시작
asyncio.create_task(self._generate(request_id, prompt))
return queue
async def _generate(self, request_id: str, prompt: str):
"""토큰 생성 및 큐에 추가"""
async for token in self.engine.generate(prompt, request_id):
await self.active_streams[request_id].put(token)
# 완료 신호
await self.active_streams[request_id].put(None)
메모리 관리¶
import gc
class StreamingMemoryManager:
def __init__(self, max_concurrent: int = 100):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.active_count = 0
async def stream(self, prompt: str):
async with self.semaphore:
self.active_count += 1
try:
async for token in generate_tokens(prompt):
yield token
finally:
self.active_count -= 1
# 주기적 GC
if self.active_count == 0:
gc.collect()
모니터링¶
메트릭¶
| 메트릭 | 설명 | 수집 방법 |
|---|---|---|
| ttft (Time to First Token) | 첫 토큰까지 시간 | 요청 시작 ~ 첫 토큰 |
| tpot (Time per Output Token) | 토큰당 생성 시간 | 총 시간 / 토큰 수 |
| stream_duration | 총 스트림 시간 | 시작 ~ [DONE] |
| connection_drops | 연결 끊김 횟수 | WebSocket/SSE 에러 카운트 |
| concurrent_streams | 동시 스트림 수 | 활성 연결 수 |
from prometheus_client import Histogram, Counter, Gauge
import time
ttft_histogram = Histogram('llm_ttft_seconds', 'Time to first token')
tpot_histogram = Histogram('llm_tpot_seconds', 'Time per output token')
concurrent_streams = Gauge('llm_concurrent_streams', 'Number of active streams')
async def monitored_stream(prompt: str):
start_time = time.time()
first_token_time = None
token_count = 0
concurrent_streams.inc()
try:
async for token in generate_tokens(prompt):
if first_token_time is None:
first_token_time = time.time()
ttft_histogram.observe(first_token_time - start_time)
token_count += 1
yield token
# TPOT 계산
if token_count > 0:
total_time = time.time() - first_token_time
tpot_histogram.observe(total_time / token_count)
finally:
concurrent_streams.dec()
체크리스트¶
- [ ] SSE 또는 WebSocket 프로토콜 선택
- [ ] Nginx/로드밸런서 버퍼링 비활성화
- [ ] 연결 끊김 복구 로직 구현
- [ ] 토큰 타임아웃 처리
- [ ] 동시 연결 수 제한
- [ ] TTFT/TPOT 메트릭 수집
- [ ] 메모리 누수 모니터링
참고 자료¶
마지막 업데이트: 2026-03-04