콘텐츠로 이동
Data Prep
상세

네트워크 (Networks)

컴퓨터 간 데이터 통신을 위한 기술. LLM/VLM에서는 API 서빙, 분산 학습, 스트리밍 응답 등에 필수적.


왜 네트워크를 알아야 하는가

  1. API 설계: RESTful, gRPC, WebSocket 선택과 최적화
  2. 디버깅: 지연 시간, 타임아웃, 연결 문제 해결
  3. 분산 시스템: 분산 학습, 모델 서빙 아키텍처
  4. 보안: HTTPS, 인증, Rate Limiting

OSI 7계층 모델

계층 이름 역할 프로토콜 PDU
7 응용 (Application) 사용자 인터페이스 HTTP, FTP, SMTP, DNS 데이터
6 표현 (Presentation) 데이터 변환/암호화 SSL/TLS, JPEG, JSON 데이터
5 세션 (Session) 연결 관리 NetBIOS, RPC 데이터
4 전송 (Transport) 종단 간 전송 TCP, UDP 세그먼트/데이터그램
3 네트워크 (Network) 라우팅 IP, ICMP, ARP 패킷
2 데이터 링크 (Data Link) 프레임 전송 Ethernet, Wi-Fi 프레임
1 물리 (Physical) 비트 전송 케이블, 무선 비트

실무에서는 TCP/IP 4계층 모델:

+-----------------+  HTTP, gRPC, DNS
|    응용 계층     |  (OSI 5-7)
+-----------------+
|    전송 계층     |  TCP, UDP
+-----------------+
|   인터넷 계층    |  IP
+-----------------+
| 네트워크 접근 계층|  Ethernet, Wi-Fi
+-----------------+

TCP/IP

IP (Internet Protocol)

비연결성, 비신뢰성 프로토콜. 패킷을 목적지까지 전달.

IPv4 주소:

192.168.1.100
= 32비트 (4 옥텟)

클래스:
A: 0.0.0.0 ~ 127.255.255.255 (대규모)
B: 128.0.0.0 ~ 191.255.255.255 (중규모)
C: 192.0.0.0 ~ 223.255.255.255 (소규모)

사설 IP:
10.0.0.0/8
172.16.0.0/12
192.168.0.0/16

CIDR 표기법:

192.168.1.0/24
= 192.168.1.0 ~ 192.168.1.255 (256개)
= 서브넷 마스크 255.255.255.0

/24: 2^8 = 256 호스트
/16: 2^16 = 65,536 호스트

TCP (Transmission Control Protocol)

연결 지향적, 신뢰성 있는 전송.

3-way Handshake (연결 수립):

클라이언트              서버
    |                    |
    |------- SYN ------->|  seq=x
    |                    |
    |<---- SYN+ACK ------|  seq=y, ack=x+1
    |                    |
    |------- ACK ------->|  seq=x+1, ack=y+1
    |                    |
    |     연결 수립       |

4-way Handshake (연결 종료):

클라이언트              서버
    |                    |
    |------- FIN ------->|  "보낼 거 다 보냄"
    |                    |
    |<------ ACK --------|  "알겠음"
    |                    |
    |<------ FIN --------|  "나도 다 보냄"
    |                    |
    |------- ACK ------->|  "알겠음"
    |                    |
    |   TIME_WAIT (2MSL) |  <- 지연된 패킷 처리 대기

TIME_WAIT 상태:

- 2 * MSL(Maximum Segment Lifetime) 동안 유지
- 지연된 패킷 처리, ACK 손실 대비
- 서버에서 많은 연결 종료 시 포트 고갈 가능
- 해결: SO_REUSEADDR 옵션

TCP 특징:

특징 설명 구현 방법
순서 보장 보낸 순서대로 전달 시퀀스 번호
신뢰성 손실 시 재전송 ACK, 타임아웃, 중복 ACK
흐름 제어 수신자 처리 속도 맞춤 슬라이딩 윈도우
혼잡 제어 네트워크 혼잡 대응 느린 시작, 혼잡 회피, 빠른 복구

슬라이딩 윈도우:

송신자:
[1][2][3][4][5][6][7][8][9][10]
|<-- 전송됨 -->|<- 전송 가능 ->|<- 대기 ->|

윈도우 크기 = min(수신 윈도우, 혼잡 윈도우)

혼잡 제어:

1. 느린 시작 (Slow Start)
   - cwnd를 1 MSS부터 시작, 매 RTT마다 2배
   - ssthresh 도달 시 혼잡 회피로 전환

2. 혼잡 회피 (Congestion Avoidance)
   - cwnd를 매 RTT마다 1 MSS 증가

3. 빠른 재전송 (Fast Retransmit)
   - 3 중복 ACK 수신 시 즉시 재전송

4. 빠른 복구 (Fast Recovery)
   - cwnd를 절반으로, 느린 시작 생략

UDP (User Datagram Protocol)

비연결성, 빠른 전송.

특성 TCP UDP
연결 연결 지향 비연결
신뢰성 보장 비보장
순서 보장 비보장
속도 느림 빠름
헤더 크기 20+ bytes 8 bytes
용도 HTTP, SSH, DB DNS, VoIP, 게임, 스트리밍
import socket

# UDP 서버
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(('0.0.0.0', 8080))

data, addr = server.recvfrom(1024)
server.sendto(b"Response", addr)

# UDP 클라이언트
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.sendto(b"Hello", ('localhost', 8080))
data, _ = client.recvfrom(1024)

언제 UDP를 쓰는가?

- 실시간성 > 신뢰성: 스트리밍, VoIP, 게임
- 간단한 요청-응답: DNS
- 브로드캐스트/멀티캐스트
- 애플리케이션에서 직접 재전송 구현

DNS (Domain Name System)

도메인 이름을 IP 주소로 변환.

DNS 계층 구조:

                   . (root)
                    |
    +-------+-------+-------+
    |       |       |       |
   com     org     net     kr
    |
  google
    |
   www

DNS 조회 과정:

1. 브라우저 캐시 확인
2. OS 캐시 확인 (/etc/hosts)
3. 로컬 DNS 서버 (ISP)
4. 루트 DNS 서버
5. TLD DNS 서버 (.com)
6. 권한 DNS 서버 (google.com)
7. IP 주소 반환

DNS 레코드 타입:

타입 설명 예시
A IPv4 주소 example.com -> 93.184.216.34
AAAA IPv6 주소 example.com -> 2606:2800:...
CNAME 별칭 www.example.com -> example.com
MX 메일 서버 example.com -> mail.example.com
TXT 텍스트 SPF, DKIM 검증
NS 네임 서버 example.com -> ns1.example.com
# DNS 조회
$ nslookup google.com
$ dig google.com
$ host google.com

# 특정 DNS 서버로 조회
$ dig @8.8.8.8 google.com

HTTP/HTTPS

HTTP 메서드

메서드 설명 멱등성 안전 캐시
GET 리소스 조회 O O O
POST 리소스 생성/처리 X X X
PUT 리소스 전체 수정 O X X
PATCH 리소스 부분 수정 X X X
DELETE 리소스 삭제 O X X
HEAD 헤더만 조회 O O O
OPTIONS 지원 메서드 확인 O O X

멱등성 (Idempotency): 같은 요청을 여러 번 해도 결과가 같음

HTTP 상태 코드

범위 분류 주요 코드
1xx 정보 100 Continue
2xx 성공 200 OK, 201 Created, 204 No Content
3xx 리다이렉션 301 Moved Permanently, 302 Found, 304 Not Modified
4xx 클라이언트 오류 400 Bad Request, 401 Unauthorized, 403 Forbidden, 404 Not Found, 429 Too Many Requests
5xx 서버 오류 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable

LLM API 관련:

200: 정상 응답
400: 잘못된 프롬프트 형식
401: API 키 오류
429: Rate Limit 초과 (Retry-After 헤더 확인)
500: 모델 오류
503: 서버 과부하 (재시도 필요)

HTTP/1.1 vs HTTP/2 vs HTTP/3

특성 HTTP/1.1 HTTP/2 HTTP/3
다중화 X (커넥션당 1개) O (스트림) O (스트림)
헤더 압축 X HPACK QPACK
서버 푸시 X O O
전송 프로토콜 TCP TCP QUIC (UDP)
HOL 블로킹 응용 + 전송 전송 계층만 없음

HTTP/2 다중화:

HTTP/1.1:
  요청1 -----> 응답1
  요청2 -----> 응답2
  요청3 -----> 응답3
  (순차적)

HTTP/2:
  요청1 -+
  요청2 -+---> 응답2, 응답1, 응답3
  요청3 -+
  (병렬)

QUIC (HTTP/3):

- UDP 기반이지만 TCP 수준의 신뢰성
- 연결 수립 0-RTT (재연결 시)
- 스트림별 독립적 전송 (HOL 블로킹 없음)
- 내장 암호화 (TLS 1.3)

HTTPS와 TLS

TLS 핸드셰이크 (1.2):

클라이언트              서버
    |                    |
    |--- ClientHello --->|  지원 암호화 목록
    |                    |
    |<-- ServerHello ----|  선택된 암호화
    |<-- Certificate ----|  서버 인증서
    |<-- ServerHelloDone-|
    |                    |
    |-- ClientKeyExch -->|  Pre-master secret
    |-- ChangeCipherSpec->|
    |-- Finished ------->|
    |                    |
    |<- ChangeCipherSpec-|
    |<-- Finished -------|
    |                    |
    |  암호화 통신 시작   |

TLS 1.3 개선점:

- 1-RTT 핸드셰이크 (1.2는 2-RTT)
- 0-RTT 재연결
- 불안전한 암호화 제거
- 더 강력한 PFS (Perfect Forward Secrecy)

LLM API 설계

REST API 설계

from fastapi import FastAPI, HTTPException, Security
from fastapi.security import APIKeyHeader
from pydantic import BaseModel
from typing import List, Optional
import time
import uuid

app = FastAPI(title="LLM API")

# 모델 정의
class Message(BaseModel):
    role: str  # "system", "user", "assistant"
    content: str

class ChatRequest(BaseModel):
    model: str
    messages: List[Message]
    temperature: Optional[float] = 1.0
    max_tokens: Optional[int] = 100
    stream: Optional[bool] = False
    top_p: Optional[float] = 1.0
    frequency_penalty: Optional[float] = 0.0

class Choice(BaseModel):
    index: int
    message: Message
    finish_reason: str

class Usage(BaseModel):
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int

class ChatResponse(BaseModel):
    id: str
    object: str = "chat.completion"
    created: int
    model: str
    choices: List[Choice]
    usage: Usage

# API 키 인증
api_key_header = APIKeyHeader(name="Authorization")

async def verify_api_key(api_key: str = Security(api_key_header)):
    if not api_key.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Invalid API key format")
    token = api_key[7:]
    if not is_valid_token(token):
        raise HTTPException(status_code=401, detail="Invalid API key")
    return token

# 엔드포인트
@app.post("/v1/chat/completions", response_model=ChatResponse)
async def chat_completions(
    request: ChatRequest,
    api_key: str = Security(verify_api_key)
):
    # 모델 추론
    response_text = await generate_response(request)

    return ChatResponse(
        id=f"chatcmpl-{uuid.uuid4().hex[:8]}",
        created=int(time.time()),
        model=request.model,
        choices=[
            Choice(
                index=0,
                message=Message(role="assistant", content=response_text),
                finish_reason="stop"
            )
        ],
        usage=Usage(
            prompt_tokens=count_tokens(request.messages),
            completion_tokens=count_tokens(response_text),
            total_tokens=0  # 계산 필요
        )
    )

스트리밍 응답 (SSE)

Server-Sent Events 방식으로 토큰 단위 전송.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
import asyncio

@app.post("/v1/chat/completions")
async def chat_completions_stream(request: ChatRequest):
    if request.stream:
        return StreamingResponse(
            generate_stream(request),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
            }
        )
    return await generate_response(request)

async def generate_stream(request: ChatRequest):
    """토큰 단위로 스트리밍"""
    async for token in model.generate_tokens_async(request):
        data = {
            "id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
            "object": "chat.completion.chunk",
            "created": int(time.time()),
            "model": request.model,
            "choices": [{
                "index": 0,
                "delta": {"content": token},
                "finish_reason": None
            }]
        }
        yield f"data: {json.dumps(data)}\n\n"
        await asyncio.sleep(0)  # 이벤트 루프 양보

    # 종료 청크
    final_data = {
        "id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
        "object": "chat.completion.chunk",
        "choices": [{
            "index": 0,
            "delta": {},
            "finish_reason": "stop"
        }]
    }
    yield f"data: {json.dumps(final_data)}\n\n"
    yield "data: [DONE]\n\n"

클라이언트 측 스트리밍 처리:

import httpx

async def stream_chat(messages):
    async with httpx.AsyncClient() as client:
        async with client.stream(
            'POST',
            'http://localhost:8000/v1/chat/completions',
            json={
                "model": "llama",
                "messages": messages,
                "stream": True
            },
            headers={"Authorization": "Bearer xxx"},
            timeout=60.0
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    data = line[6:]
                    if data == "[DONE]":
                        break
                    chunk = json.loads(data)
                    content = chunk["choices"][0]["delta"].get("content", "")
                    if content:
                        print(content, end="", flush=True)

WebSocket

양방향, 전이중 통신. 실시간 상호작용에 적합.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await manager.connect(websocket)

    try:
        while True:
            # 메시지 수신
            data = await websocket.receive_text()
            request = json.loads(data)

            # 스트리밍 응답
            async for token in model.generate_stream(request):
                await websocket.send_text(json.dumps({
                    "type": "token",
                    "content": token
                }))

            # 완료 신호
            await websocket.send_text(json.dumps({
                "type": "done"
            }))

    except WebSocketDisconnect:
        manager.disconnect(websocket)

WebSocket vs SSE:

특성 WebSocket SSE
방향 양방향 서버 -> 클라이언트
프로토콜 ws:// / wss:// HTTP
재연결 수동 자동
복잡성 높음 낮음
용도 채팅, 실시간 협업 알림, 스트리밍 응답

gRPC

고성능 RPC 프레임워크. Protocol Buffers 사용.

Proto 정의:

// chat.proto
syntax = "proto3";

package chat;

service ChatService {
    // Unary RPC
    rpc Chat(ChatRequest) returns (ChatResponse);

    // Server streaming RPC
    rpc ChatStream(ChatRequest) returns (stream ChatChunk);

    // Bidirectional streaming RPC
    rpc ChatBidirectional(stream ChatRequest) returns (stream ChatChunk);
}

message ChatRequest {
    string model = 1;
    repeated Message messages = 2;
    float temperature = 3;
    int32 max_tokens = 4;
}

message Message {
    string role = 1;
    string content = 2;
}

message ChatResponse {
    string id = 1;
    repeated Choice choices = 2;
    Usage usage = 3;
}

message ChatChunk {
    string content = 1;
    bool is_final = 2;
}

message Choice {
    int32 index = 1;
    Message message = 2;
    string finish_reason = 3;
}

message Usage {
    int32 prompt_tokens = 1;
    int32 completion_tokens = 2;
    int32 total_tokens = 3;
}

gRPC 서버:

import grpc
from concurrent import futures
import chat_pb2
import chat_pb2_grpc

class ChatServicer(chat_pb2_grpc.ChatServiceServicer):
    def Chat(self, request, context):
        response = generate_response(request)
        return chat_pb2.ChatResponse(
            id="xxx",
            choices=[chat_pb2.Choice(
                index=0,
                message=chat_pb2.Message(
                    role="assistant",
                    content=response
                ),
                finish_reason="stop"
            )],
            usage=chat_pb2.Usage(
                prompt_tokens=100,
                completion_tokens=50,
                total_tokens=150
            )
        )

    def ChatStream(self, request, context):
        for token in model.generate_tokens(request):
            yield chat_pb2.ChatChunk(
                content=token,
                is_final=False
            )
        yield chat_pb2.ChatChunk(content="", is_final=True)

def serve():
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        options=[
            ('grpc.max_send_message_length', 50 * 1024 * 1024),
            ('grpc.max_receive_message_length', 50 * 1024 * 1024),
        ]
    )
    chat_pb2_grpc.add_ChatServiceServicer_to_server(ChatServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

gRPC vs REST:

특성 gRPC REST
프로토콜 HTTP/2 HTTP/1.1 or 2
직렬화 Protocol Buffers JSON
스트리밍 네이티브 지원 SSE/WebSocket
성능 높음 중간
브라우저 지원 제한적 (grpc-web) 완전

분산 학습 네트워크

통신 패턴

All-Reduce:

모든 노드의 데이터를 집계하여 모든 노드에 배포

초기:
[GPU0: 1] [GPU1: 2] [GPU2: 3] [GPU3: 4]

All-Reduce (SUM):
[GPU0: 10] [GPU1: 10] [GPU2: 10] [GPU3: 10]

용도: 동기적 데이터 병렬 학습의 그래디언트 평균

Ring All-Reduce:

+------+    +------+    +------+    +------+
| GPU0 | -> | GPU1 | -> | GPU2 | -> | GPU3 |
+------+    +------+    +------+    +------+
    ^                                   |
    +-----------------------------------+

단계:
1. Scatter-Reduce: N-1번 전송으로 각 GPU에 부분 합
2. All-Gather: N-1번 전송으로 최종 결과 공유

통신량: 2 * (N-1) / N * data_size ≈ 2 * data_size
GPU 수에 독립적 (대역폭 최적)

NCCL (NVIDIA Collective Communications Library)

GPU 간 고속 통신.

import torch
import torch.distributed as dist

# 환경 변수 설정 후 초기화
# MASTER_ADDR, MASTER_PORT, WORLD_SIZE, RANK
dist.init_process_group(backend='nccl')

rank = dist.get_rank()
world_size = dist.get_world_size()

# 텐서 생성
tensor = torch.ones(1000).cuda() * rank

# All-Reduce
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
# 결과: 모든 GPU에 합계

# Broadcast
if rank == 0:
    tensor = torch.randn(1000).cuda()
else:
    tensor = torch.zeros(1000).cuda()
dist.broadcast(tensor, src=0)
# 결과: 모든 GPU에 rank 0의 텐서

# All-Gather
tensor_list = [torch.zeros(1000).cuda() for _ in range(world_size)]
dist.all_gather(tensor_list, tensor)
# 결과: 모든 GPU의 텐서 수집

# Reduce
dist.reduce(tensor, dst=0, op=dist.ReduceOp.SUM)
# 결과: rank 0에만 합계

# 정리
dist.destroy_process_group()

분산 학습 전략

데이터 병렬 (Data Parallel):

각 GPU에 모델 복제, 데이터 분할
1. Forward: 각 GPU에서 독립적
2. Backward: 각 GPU에서 독립적
3. All-Reduce: 그래디언트 평균
4. Update: 동기적으로 파라미터 업데이트

PyTorch DDP (DistributedDataParallel) 권장

모델 병렬 (Model Parallel):

모델을 여러 GPU에 분할
- 파이프라인 병렬: 레이어 단위 분할
- 텐서 병렬: 레이어 내 분할

큰 모델 학습에 필수 (메모리 부족)

보안

Rate Limiting

from fastapi import Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/v1/chat/completions")
@limiter.limit("60/minute")  # 분당 60회
async def chat(request: Request, body: ChatRequest):
    return await generate_response(body)

# 사용자별 Rate Limit
def get_user_from_token(request: Request):
    auth = request.headers.get("Authorization", "")
    if auth.startswith("Bearer "):
        return decode_token(auth[7:])["user_id"]
    return get_remote_address(request)

user_limiter = Limiter(key_func=get_user_from_token)

연결 관리

import httpx

# Connection Pool 사용
limits = httpx.Limits(
    max_keepalive_connections=20,  # Keep-Alive 연결 수
    max_connections=100,            # 최대 연결 수
    keepalive_expiry=30            # Keep-Alive 만료 시간
)

timeout = httpx.Timeout(
    connect=5.0,    # 연결 타임아웃
    read=30.0,      # 읽기 타임아웃
    write=10.0,     # 쓰기 타임아웃
    pool=5.0        # 풀에서 연결 대기 타임아웃
)

async with httpx.AsyncClient(
    limits=limits,
    timeout=timeout,
    http2=True  # HTTP/2 사용
) as client:
    responses = await asyncio.gather(*[
        client.post(url, json=data) for data in batch
    ])

네트워크 성능 최적화

지연 시간 분석

총 지연 = 전파 지연 + 전송 지연 + 처리 지연 + 큐잉 지연

전파 지연: 물리적 거리 / 빛의 속도
  - 서울-미국 서부: ~50ms RTT

전송 지연: 패킷 크기 / 대역폭
  - 1MB / 1Gbps = 8ms

처리 지연: 라우터/서버 처리 시간
  - 보통 < 1ms

큐잉 지연: 버퍼 대기 시간
  - 혼잡 시 증가

최적화 기법

기법 설명 적용
Connection Pooling 연결 재사용 HTTP, DB
Keep-Alive 연결 유지 HTTP/1.1
Compression gzip, br 대용량 전송
CDN 엣지 캐싱 정적 리소스
배치 처리 요청 묶음 API 호출
Prefetching 미리 로드 예측 가능한 요청

핵심 개념

자주 나오는 질문

  1. TCP와 UDP의 차이점은?
  2. TCP: 연결 지향, 신뢰성, 순서 보장, 느림
  3. UDP: 비연결, 비신뢰성, 빠름

  4. 3-way Handshake를 설명하시오

  5. SYN -> SYN+ACK -> ACK
  6. 양방향 연결 확립

  7. HTTP/1.1, HTTP/2, HTTP/3의 차이점은?

  8. HTTP/2: 다중화, 헤더 압축
  9. HTTP/3: QUIC(UDP), HOL 블로킹 해결

  10. DNS 조회 과정을 설명하시오

  11. 캐시 -> 로컬 DNS -> 루트 -> TLD -> 권한 DNS

  12. HTTPS의 동작 원리는?

  13. TLS 핸드셰이크: 인증서 검증, 대칭키 교환
  14. 이후 대칭키로 암호화 통신

  15. 브라우저에 URL을 입력하면 어떤 일이 일어나는가?

  16. DNS 조회 -> TCP 연결 -> TLS 핸드셰이크 -> HTTP 요청 -> 응답 파싱 -> 렌더링

실무 연결

1. "LLM API의 지연 시간을 줄이려면?"
   -> HTTP/2 사용, Keep-Alive, 지리적 가까운 서버, CDN

2. "스트리밍 응답은 왜 SSE를 쓰는가?"
   -> 단방향으로 충분, HTTP 호환, 브라우저 지원

3. "gRPC는 언제 사용하는가?"
   -> 내부 마이크로서비스 통신, 높은 성능 필요 시

4. "Rate Limiting은 어떻게 구현하는가?"
   -> Token Bucket, Sliding Window, Redis 기반

참고 자료