운영체제 (Operating Systems)¶
하드웨어 자원을 관리하고 응용 프로그램에 서비스를 제공하는 시스템 소프트웨어. LLM/VLM에서는 GPU 관리, 메모리 최적화, 병렬 처리에 직접적으로 관련됨.
왜 운영체제를 알아야 하는가¶
- 자원 관리: CPU, 메모리, I/O 동작 원리 이해
- 성능 최적화: 병목 지점 파악, 병렬화 전략
- 디버깅: 데드락, 메모리 누수, 성능 저하 원인 분석
- GPU 프로그래밍: CUDA, 메모리 계층 이해
프로세스와 스레드¶
프로세스 (Process)¶
실행 중인 프로그램의 인스턴스. 독립된 메모리 공간을 가짐.
프로세스 메모리 구조:
높은 주소
+------------------+
| Stack | <- 지역 변수, 함수 호출 (아래로 성장)
| | |
| v |
| |
| ^ |
| | |
| Heap | <- 동적 할당 (위로 성장)
+------------------+
| BSS (미초기화) | <- 초기화 안된 전역/정적 변수 (0으로 초기화)
+------------------+
| Data (초기화) | <- 초기화된 전역/정적 변수
+------------------+
| Code (Text) | <- 실행 코드 (읽기 전용)
+------------------+
낮은 주소
프로세스 상태:
생성
|
v
+-------+ 디스패치 +--------+
| | ---------> | |
| Ready | | Running|
| | <--------- | |
+-------+ 타임아웃 +--------+
^ |
| I/O 완료 | I/O 요청
| v
| +--------+
+-------------- | Waiting|
+--------+
프로세스 생성:
import os
import multiprocessing
# fork (Unix 계열)
# 부모 프로세스를 복제하여 자식 생성
pid = os.fork()
if pid == 0:
print(f"자식 프로세스: {os.getpid()}")
else:
print(f"부모 프로세스: {os.getpid()}, 자식: {pid}")
# multiprocessing 모듈 (크로스 플랫폼)
def worker(name):
print(f"Worker {name}, PID: {os.getpid()}")
if __name__ == "__main__":
processes = []
for i in range(4):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join() # 자식 프로세스 종료 대기
스레드 (Thread)¶
프로세스 내의 실행 단위. 코드, 데이터, 힙 영역을 공유함.
프로세스 vs 스레드:
| 항목 | 프로세스 | 스레드 |
|---|---|---|
| 메모리 | 독립 | 공유 (스택만 개별) |
| 생성 비용 | 높음 | 낮음 |
| 컨텍스트 스위칭 | 느림 (TLB 플러시 등) | 빠름 |
| 통신 | IPC 필요 (파이프, 소켓, 공유 메모리) | 직접 공유 |
| 안정성 | 높음 (격리) | 낮음 (하나가 죽으면 전체 영향) |
| 동기화 | 불필요 (대부분) | 필요 (공유 자원) |
import threading
counter = 0
lock = threading.Lock()
def worker(name):
global counter
for _ in range(100000):
with lock: # 임계 영역 보호
counter += 1
print(f"Thread {name} done, TID: {threading.current_thread().ident}")
threads = []
for i in range(4):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Counter: {counter}") # lock 없으면 400000 미만
Python GIL (Global Interpreter Lock)¶
CPython에서 한 번에 하나의 스레드만 파이썬 바이트코드 실행 가능.
GIL이 존재하는 이유: - CPython의 메모리 관리(참조 카운팅)가 스레드 안전하지 않음 - 단순성과 C 확장 호환성
GIL의 영향:
# CPU 바운드 작업: 멀티스레딩 효과 없음 -> 멀티프로세싱 사용
from multiprocessing import Pool
def cpu_intensive(x):
return sum(i*i for i in range(x))
with Pool(4) as p:
results = p.map(cpu_intensive, [10**6]*4)
# I/O 바운드 작업: 멀티스레딩 효과 있음 (I/O 대기 중 GIL 해제)
import asyncio
async def fetch_data(url):
await asyncio.sleep(1) # I/O 대기 시뮬레이션
return f"Data from {url}"
async def main():
urls = ["url1", "url2", "url3"]
results = await asyncio.gather(*[fetch_data(url) for url in urls])
print(results)
asyncio.run(main())
LLM 관점에서 GIL: - GPU 연산은 GIL 밖에서 실행 -> PyTorch/TensorFlow에서 영향 적음 - 데이터 로딩: 멀티프로세싱 DataLoader 사용 - API 서빙: asyncio로 동시 요청 처리
프로세스 vs 스레드 선택 기준¶
멀티프로세싱 선택:
- CPU 바운드 작업
- 격리가 필요한 경우 (안정성)
- GIL 우회 필요
멀티스레딩 선택:
- I/O 바운드 작업
- 공유 메모리 필요
- 가벼운 동시성
asyncio 선택:
- I/O 바운드 + 많은 동시 연결
- 이벤트 기반 프로그래밍
메모리 관리¶
가상 메모리 (Virtual Memory)¶
물리 메모리보다 큰 주소 공간 제공. 페이지 단위로 관리.
왜 가상 메모리가 필요한가? 1. 프로세스 격리: 각 프로세스가 독립적인 주소 공간 2. 메모리 보호: 잘못된 접근 방지 3. 물리 메모리보다 큰 프로그램 실행 4. 메모리 공유 효율화
주소 변환:
가상 주소 (CPU)
|
v
+-------+
| MMU | <- Memory Management Unit
+-------+
|
v
+---------+
| TLB | <- Translation Lookaside Buffer (캐시)
+---------+
|
miss?
|
v
+-----------+
| Page Table|
+-----------+
|
v
물리 주소 (RAM)
페이지 테이블:
가상 주소: [페이지 번호 | 오프셋]
|
v
Page Table
+---------+
| Frame 0 |
| Frame 3 |
| Invalid |
| Frame 7 |
+---------+
|
v
물리 주소: [프레임 번호 | 오프셋]
페이지 폴트 (Page Fault)¶
접근하려는 페이지가 물리 메모리에 없을 때 발생.
1. 페이지 테이블에서 valid bit = 0 확인
2. 트랩 발생 -> OS로 제어 이동
3. 디스크에서 페이지 로드 (I/O 발생)
4. 페이지 테이블 업데이트
5. 명령어 재실행
페이지 교체 알고리즘¶
| 알고리즘 | 설명 | 장단점 |
|---|---|---|
| FIFO | 가장 먼저 들어온 페이지 교체 | 단순, Belady's Anomaly 발생 가능 |
| LRU | 가장 오래 사용 안된 페이지 | 좋은 성능, 구현 복잡 |
| LFU | 가장 적게 사용된 페이지 | 초기 사용 후 안 쓰는 페이지 문제 |
| Clock | LRU 근사, 원형 큐 + 참조 비트 | 효율적 구현 |
| Optimal | 미래에 가장 늦게 사용될 페이지 | 이론적 최적 (구현 불가) |
# LRU 캐시 구현 (OrderedDict 활용)
from collections import OrderedDict
class LRUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key not in self.cache:
return -1
# 최근 사용으로 이동
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False) # 가장 오래된 것 제거
GPU 메모리 관리¶
LLM에서 가장 중요한 메모리 관리 영역.
GPU 메모리 계층:
+------------------+
| Global Memory | <- 가장 큼, 가장 느림 (모델 파라미터)
+------------------+
| Shared Memory | <- 블록 내 공유, 빠름
+------------------+
| L2 Cache |
+------------------+
| L1 Cache |
+------------------+
| Registers | <- 가장 빠름
+------------------+
메모리 모니터링과 최적화:
import torch
# GPU 메모리 확인
print(f"할당된 메모리: {torch.cuda.memory_allocated() / 1e9:.2f} GB")
print(f"캐시된 메모리: {torch.cuda.memory_reserved() / 1e9:.2f} GB")
print(f"최대 할당: {torch.cuda.max_memory_allocated() / 1e9:.2f} GB")
# 메모리 해제
torch.cuda.empty_cache()
# 메모리 스냅샷 (디버깅용)
torch.cuda.memory._record_memory_history(enabled=True)
# ... 작업 ...
torch.cuda.memory._dump_snapshot("memory_snapshot.pickle")
# 메모리 효율적 모델 로딩
model = AutoModelForCausalLM.from_pretrained(
"model_name",
torch_dtype=torch.float16, # FP32 -> FP16
device_map="auto", # 자동 디바이스 배치
low_cpu_mem_usage=True, # CPU 메모리 절약
)
# 8비트 양자화
model = AutoModelForCausalLM.from_pretrained(
"model_name",
load_in_8bit=True,
device_map="auto",
)
LLM 메모리 구성:
+------------------------+
| 모델 가중치 | <- 파라미터 수 * precision
| | 7B * 2 bytes (FP16) = 14GB
+------------------------+
| KV Cache | <- 2 * batch * layers * seq * dim * precision
| | 추론 시 시퀀스 길이에 비례 증가
+------------------------+
| 활성화 값 | <- 순전파 중간 결과
| | 배치 크기에 비례
+------------------------+
| 그래디언트 | <- 학습 시에만, 파라미터와 동일 크기
+------------------------+
| 옵티마이저 상태 | <- Adam: 파라미터 * 2 (m, v)
+------------------------+
메모리 추정:
def estimate_model_memory(
num_params,
batch_size,
seq_len,
num_layers,
hidden_dim,
num_heads,
precision_bytes=2, # FP16
training=False
):
"""LLM 메모리 사용량 추정"""
# 모델 가중치
model_memory = num_params * precision_bytes
# KV Cache (추론 시)
kv_cache = 2 * batch_size * num_layers * seq_len * hidden_dim * precision_bytes
# 활성화 값 (대략적 추정)
activation = batch_size * seq_len * hidden_dim * num_layers * precision_bytes * 2
# 학습 시 추가
if training:
gradients = num_params * precision_bytes
optimizer = num_params * 8 # Adam: 2 * FP32
total = model_memory + activation + gradients + optimizer
else:
total = model_memory + kv_cache + activation
return total / (1024**3) # GB
# 예시: Llama-7B 추론
memory = estimate_model_memory(
num_params=7e9,
batch_size=1,
seq_len=4096,
num_layers=32,
hidden_dim=4096,
num_heads=32,
training=False
)
print(f"예상 메모리: {memory:.2f} GB")
동기화와 병행성¶
경쟁 조건 (Race Condition)¶
여러 스레드/프로세스가 공유 자원에 동시 접근할 때 발생.
# 문제 상황
counter = 0
def increment():
global counter
temp = counter # 읽기
temp += 1 # 연산
counter = temp # 쓰기
# 이 사이에 다른 스레드가 끼어들 수 있음
# 해결: 락 사용
import threading
lock = threading.Lock()
def safe_increment():
global counter
with lock: # 임계 영역
counter += 1
동기화 기법¶
| 기법 | 설명 | 사용 사례 |
|---|---|---|
| Mutex | 상호 배제, 한 번에 하나만 | 임계 영역 보호 |
| Semaphore | 카운팅, n개까지 허용 | 리소스 풀 |
| Condition Variable | 조건 대기 | 생산자-소비자 |
| RWLock | 읽기 여럿, 쓰기 하나 | 읽기 많은 캐시 |
| Barrier | 모두 도착할 때까지 대기 | 병렬 계산 동기화 |
from threading import Semaphore, Condition, Barrier
import queue
# 세마포어: 동시 접근 수 제한
gpu_semaphore = Semaphore(2) # 최대 2개 GPU 동시 사용
def use_gpu(job):
with gpu_semaphore:
# GPU 작업 (최대 2개 스레드만 동시 실행)
process_on_gpu(job)
# Condition Variable: 생산자-소비자
class BoundedBuffer:
def __init__(self, capacity):
self.buffer = []
self.capacity = capacity
self.lock = threading.Lock()
self.not_full = threading.Condition(self.lock)
self.not_empty = threading.Condition(self.lock)
def put(self, item):
with self.not_full:
while len(self.buffer) >= self.capacity:
self.not_full.wait() # 빈 공간 대기
self.buffer.append(item)
self.not_empty.notify()
def get(self):
with self.not_empty:
while len(self.buffer) == 0:
self.not_empty.wait() # 항목 대기
item = self.buffer.pop(0)
self.not_full.notify()
return item
# Barrier: 동기화 지점
barrier = Barrier(4) # 4개 스레드 대기
def worker(id):
print(f"Worker {id} doing part 1")
barrier.wait() # 모두 완료할 때까지 대기
print(f"Worker {id} doing part 2")
데드락 (Deadlock)¶
두 개 이상의 프로세스가 서로의 자원을 기다리며 무한 대기.
데드락 발생 조건 (4가지 모두 충족 시): 1. 상호 배제 (Mutual Exclusion): 자원을 동시에 사용 불가 2. 점유 대기 (Hold and Wait): 자원 보유하면서 다른 자원 대기 3. 비선점 (No Preemption): 자원 강제 회수 불가 4. 순환 대기 (Circular Wait): 프로세스들이 원형으로 대기
데드락 예시:
# 데드락 발생
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread1():
lock_a.acquire()
time.sleep(0.1)
lock_b.acquire() # thread2가 보유 중 -> 대기
lock_b.release()
lock_a.release()
def thread2():
lock_b.acquire()
time.sleep(0.1)
lock_a.acquire() # thread1이 보유 중 -> 대기
lock_a.release()
lock_b.release()
# 해결: 락 순서 고정
def thread1_fixed():
lock_a.acquire() # 항상 a 먼저
lock_b.acquire()
# ...
lock_b.release()
lock_a.release()
def thread2_fixed():
lock_a.acquire() # 항상 a 먼저
lock_b.acquire()
# ...
lock_b.release()
lock_a.release()
데드락 방지 전략:
1. 예방 (Prevention): 4가지 조건 중 하나 제거
- 자원 순서 정하기 (순환 대기 방지)
- 한 번에 모든 자원 요청 (점유 대기 방지)
2. 회피 (Avoidance): 은행원 알고리즘
- 안전 상태인 경우만 자원 할당
3. 탐지 및 복구 (Detection & Recovery)
- 주기적으로 탐지, 프로세스 종료로 복구
4. 무시 (Ostrich Algorithm)
- 데드락 발생 확률 낮으면 무시 (Linux, Windows)
CPU 스케줄링¶
스케줄링 알고리즘¶
| 알고리즘 | 설명 | 장단점 |
|---|---|---|
| FCFS | 선입선출 | 단순, 호송 효과 |
| SJF | 최단 작업 우선 | 평균 대기 시간 최소, 기아 가능 |
| SRTF | 최단 잔여 시간 우선 | SJF의 선점형 |
| Round Robin | 시간 할당량 순환 | 공정, 응답 시간 좋음 |
| Priority | 우선순위 기반 | 기아 문제 (에이징으로 해결) |
| Multilevel Queue | 큐 분리 | 작업 유형별 처리 |
| CFS | Completely Fair Scheduler | Linux 기본, 레드-블랙 트리 |
시간 할당량 (Time Quantum) 선택:
컨텍스트 스위칭 (Context Switch)¶
CPU를 한 프로세스/스레드에서 다른 것으로 전환.
저장해야 할 상태: - 프로그램 카운터 (PC) - 레지스터 - 스택 포인터 - 메모리 관리 정보 (페이지 테이블 등)
비용: - 직접 비용: 상태 저장/복원 - 간접 비용: TLB, 캐시 무효화
LLM 서빙에서의 스케줄링¶
# vLLM의 연속 배칭 개념
class ContinuousBatcher:
"""
동적 배칭: 완료된 요청 제거, 새 요청 추가
vs 정적 배칭: 모든 요청이 완료될 때까지 대기
"""
def __init__(self, max_batch_size, max_tokens):
self.pending = []
self.running = []
self.max_batch_size = max_batch_size
self.max_tokens = max_tokens
def add_request(self, request):
self.pending.append(request)
def schedule(self):
# 완료된 요청 제거
self.running = [r for r in self.running if not r.is_finished]
# 새 요청 추가 (제약 조건 내에서)
current_tokens = sum(r.current_len for r in self.running)
while self.pending:
if len(self.running) >= self.max_batch_size:
break
next_req = self.pending[0]
if current_tokens + next_req.input_len > self.max_tokens:
break
self.pending.pop(0)
self.running.append(next_req)
current_tokens += next_req.input_len
return self.running
파일 시스템¶
파일 시스템 구조¶
+------------------+
| Boot Block | <- 부팅 정보
+------------------+
| Super Block | <- 파일 시스템 메타데이터
+------------------+
| Inode Table | <- 파일 메타데이터 (권한, 크기, 블록 포인터)
+------------------+
| Data Blocks | <- 실제 파일 내용
+------------------+
Inode 구조:
+---------------+
| 파일 모드 |
| 소유자 UID |
| 크기 |
| 타임스탬프 |
+---------------+
| 직접 블록 포인터 (12개) |
| 간접 블록 포인터 |
| 이중 간접 블록 포인터 |
| 삼중 간접 블록 포인터 |
+---------------+
파일 I/O 최적화¶
# 버퍼링 I/O
with open('large_file.txt', 'r', buffering=8192) as f:
for line in f: # 라인 단위 버퍼링
process(line)
# 메모리 맵 I/O (대용량 파일)
import mmap
with open('large_file.bin', 'r+b') as f:
mm = mmap.mmap(f.fileno(), 0)
# 파일을 메모리처럼 접근 (Lazy Loading)
data = mm[0:100]
mm.close()
# 비동기 I/O
import aiofiles
async def async_read():
async with aiofiles.open('file.txt', 'r') as f:
content = await f.read()
return content
LLM 모델 로딩:
import torch
from safetensors.torch import load_file
# 일반 로딩 (전체를 메모리에 로드)
model_state = torch.load('model.pt')
# 메모리 맵 로딩 (필요한 부분만)
model_state = torch.load('model.pt', map_location='cpu', mmap=True)
# Safetensors (메모리 맵 + 보안)
model_state = load_file('model.safetensors')
# 점진적 로딩 (레이어 단위)
from accelerate import load_checkpoint_and_dispatch
model = load_checkpoint_and_dispatch(
model,
checkpoint='./model',
device_map='auto',
no_split_module_classes=['LlamaDecoderLayer']
)
입출력 시스템¶
I/O 방식¶
| 방식 | 설명 | 장단점 |
|---|---|---|
| Blocking I/O | 완료까지 대기 | 단순, 자원 비효율 |
| Non-blocking I/O | 즉시 반환 | 폴링 필요 |
| I/O Multiplexing | 여러 I/O 모니터링 | 단일 스레드로 다중 처리 |
| Async I/O | 완료 시 콜백/알림 | 복잡, 고성능 |
select/poll/epoll:
import select
import socket
# select: 비트맵 기반, O(n) 스캔
# poll: 배열 기반, FD 수 제한 없음
# epoll: 이벤트 기반, O(1), Linux 전용
# asyncio는 내부적으로 epoll 사용 (Linux)
import asyncio
async def handle_client(reader, writer):
data = await reader.read(1024)
writer.write(data)
await writer.drain()
writer.close()
async def main():
server = await asyncio.start_server(handle_client, '0.0.0.0', 8080)
async with server:
await server.serve_forever()
GPU I/O¶
import torch
# CPU -> GPU 전송
data_cpu = torch.randn(1000, 1000)
data_gpu = data_cpu.to('cuda') # 동기 전송
# 비동기 전송 (오버랩 가능)
data_gpu = data_cpu.to('cuda', non_blocking=True)
torch.cuda.synchronize() # 명시적 동기화 필요
# 핀드 메모리 (전송 속도 향상)
# 페이지 아웃 방지 -> DMA 전송 가능
data_pinned = torch.randn(1000, 1000, pin_memory=True)
data_gpu = data_pinned.to('cuda', non_blocking=True)
# CUDA 스트림 (연산 병렬화)
stream1 = torch.cuda.Stream()
stream2 = torch.cuda.Stream()
# 서로 다른 스트림의 연산은 병렬 실행 가능
with torch.cuda.stream(stream1):
output1 = model1(input1)
with torch.cuda.stream(stream2):
output2 = model2(input2)
torch.cuda.synchronize() # 모든 스트림 대기
분산 시스템 기초¶
프로세스 간 통신 (IPC)¶
from multiprocessing import Process, Queue, Pipe, Value, Array
# Queue (다대다, 스레드/프로세스 안전)
def producer(q):
for i in range(10):
q.put(f"item_{i}")
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"Got: {item}")
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
q.put(None) # 종료 신호
p2.join()
# Pipe (일대일, 양방향)
parent_conn, child_conn = Pipe()
def child_process(conn):
conn.send("Hello from child")
print(conn.recv())
conn.close()
p = Process(target=child_process, args=(child_conn,))
p.start()
print(parent_conn.recv())
parent_conn.send("Hello from parent")
p.join()
# 공유 메모리 (Value, Array)
counter = Value('i', 0) # 정수
arr = Array('d', [0.0, 0.0, 0.0]) # double 배열
def increment(counter, lock):
for _ in range(10000):
with lock:
counter.value += 1
분산 학습 기초¶
import torch.distributed as dist
# 초기화 (NCCL: GPU, Gloo: CPU)
dist.init_process_group(
backend='nccl',
init_method='env://', # MASTER_ADDR, MASTER_PORT 환경 변수
world_size=4,
rank=0
)
# 집합 통신 (Collective Communication)
tensor = torch.ones(10).cuda() * rank
# All-Reduce: 모든 값을 집계하여 모든 노드에 배포
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
# All-Gather: 모든 텐서를 모든 노드에 수집
gathered = [torch.zeros_like(tensor) for _ in range(world_size)]
dist.all_gather(gathered, tensor)
# Broadcast: 한 노드에서 모든 노드로 전송
dist.broadcast(tensor, src=0)
# Reduce: 모든 값을 집계하여 한 노드에만
dist.reduce(tensor, dst=0, op=dist.ReduceOp.SUM)
# 정리
dist.destroy_process_group()
핵심 개념¶
자주 나오는 질문¶
- 프로세스와 스레드의 차이점은?
- 메모리: 프로세스는 독립, 스레드는 공유
- 생성 비용: 프로세스 > 스레드
-
통신: 프로세스는 IPC 필요
-
데드락의 4가지 조건과 해결 방법은?
- 상호 배제, 점유 대기, 비선점, 순환 대기
-
해결: 자원 순서 고정, 타임아웃, 은행원 알고리즘
-
가상 메모리란? 페이지 폴트란?
- 가상 메모리: 물리 메모리 추상화, 프로세스별 독립 주소 공간
-
페이지 폴트: 접근하려는 페이지가 물리 메모리에 없을 때
-
LRU 캐시 구현하기
- 해시맵 + 이중 연결 리스트
-
OrderedDict 활용
-
컨텍스트 스위칭이란? 비용은?
- CPU를 다른 프로세스/스레드로 전환
-
비용: 상태 저장/복원 + 캐시/TLB 무효화
-
동기 vs 비동기, 블로킹 vs 논블로킹
- 동기/비동기: 작업 완료 확인 방식
- 블로킹/논블로킹: 호출 시 대기 여부
실무 연결¶
1. "왜 LLM 추론 시 배치 크기를 늘리면 처리량이 증가하는가?"
-> GPU 병렬성 활용, 커널 런칭 오버헤드 분산
2. "메모리 부족 시 어떻게 대응하는가?"
-> 양자화, 그래디언트 체크포인팅, 오프로딩
3. "API 서버에서 많은 동시 요청을 처리하려면?"
-> asyncio, 연속 배칭, 로드 밸런싱
4. "분산 학습 시 통신 병목을 줄이려면?"
-> 그래디언트 압축, 비동기 업데이트, 효율적 토폴로지