FastAPI 내부 구조, 의존성 주입, Celery
[FastAPI 해부하기] 내부 구조부터 비동기 동작 원리, DI, 그리고 Celery 연동까지
FastAPI의 내부 동작 원리, 특히 비동기(Async) 처리와 이벤트 루프, 의존성 주입(DI), 그리고 실제 운영에 필요한 백그라운드 작업 처리 방식까지 논리적인 흐름에 따라 깊이 있게 파헤쳐 보겠습니다.
1. FastAPI의 뼈대
FastAPI는 ASGI, Starlette, 그리고 Pydanticㄹㄹ 이용하여 구현되었습니다.
과거의 WSGI와 현재의 ASGI
기존 파이썬 웹 표준인 WSGI(Django, Flask 등)는 한 번에 하나의 요청을 순차적으로 처리하는 동기식 구조였습니다. 반면 ASGI(Asynchronous Server Gateway Interface)는 async/await를 네이티브로 지원하여, DB나 네트워크 응답을 기다리는 동안 다른 요청을 동시에 처리할 수 있는 비동기 웹 표준입니다. FastAPI는 이 ASGI 기반의 초경량 프레임워크인 Starlette 위에서 동작합니다.
요청의 라이프사이클 (Uvicorn과 Router)
클라이언트의 요청이 들어오면 다음과 같은 흐름을 거칩니다.
- Uvicorn (ASGI 서버 / 문지기): 클라이언트와 네트워크 연결을 맺고 HTTP 요청을 수신하여 파이썬 딕셔너리로 파싱합니다.
- Router (교통경찰): FastAPI(Starlette) 내부에서 요청된 URL 경로와 HTTP 메서드를 확인하여 실행할 엔드포인트(함수)를 찾습니다.
- Pydantic (데이터 검증): 파이썬 타입 힌트를 기반으로 요청 데이터(Body, Query 등)의 유효성을 검사합니다.
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
# Pydantic을 이용한 데이터 검증 모델
class User(BaseModel):
name: str
age: int
# Router가 '/users/' POST 요청을 이 함수로 매핑
@app.post("/users/")
async def create_user(user: User):
# Uvicorn이 요청을 받고 -> Router가 연결하고 -> Pydantic이 user 타입을 검증함
return {"message": f"User {user.name} created!"}
2. 단일 스레드와 이벤트 루프(Event Loop)
FastAPI의 핵심 성능은 단일 스레드(Single Thread) + 이벤트 루프(Event Loop) 구조에서 나옵니다.
웹 서버 작업의 90%는 CPU 연산이 아니라 DB 쿼리나 외부 API 응답을 기다리는 I/O Bound 작업입니다. 기존 멀티 스레드 방식은 대기 시간 동안 스레드가 아무것도 하지 않고 멈춰있어 자원 낭비가 심했습니다.
하지만 이벤트 루프 방식은 “혼자 일하는 엄청나게 빠른 웨이터”와 같습니다. 주방(DB)에 주문을 넣고 요리가 나올 때까지 기다리지 않고, 즉시 다른 테이블(클라이언트)의 주문을 받으러 갑니다. 요리가 완성되었다는 알림(Event)이 오면 그때 다시 서빙을 이어갑니다.
치명적인 실수: 이벤트 루프 블로킹 (Event Loop Blocking)
가장 주의해야 할 점은 단일 스레드이기 때문에, 이벤트 루프의 통제권을 넘겨주지 않고(await 생략) 혼자서 오래 걸리는 동기 작업을 수행하면 전체 서버가 멈춰버린다는 것입니다.
import time
import asyncio
from fastapi import FastAPI
app = FastAPI()
# 🚨 나쁜 예시: 이벤트 루프 블로킹
@app.get("/bad")
async def bad_endpoint():
time.sleep(5) # 5초 동안 단일 스레드가 멈춤. 다른 접속자들도 무한 대기!
return {"status": "bad"}
# ✅ 좋은 예시: 제어권 반환
@app.get("/good")
async def good_endpoint():
await asyncio.sleep(5) # 대기하는 동안 이벤트 루프가 다른 요청을 처리함
return {"status": "good"}
3. 멀티 스레딩을 품은 비동기: def vs async def
“그렇다면 기존의 동기식 라이브러리(requests, 일반 SQLAlchemy)는 FastAPI에서 아예 못 쓰나요? 멀티 스레딩은 불가능한가요?”
그렇지 않습니다. FastAPI는 함수를 선언하는 방식에 따라 스레드를 영리하게 분배합니다.
async def: 메인 스레드의 이벤트 루프 위에서 비동기적으로 실행됩니다.def: 메인 스레드가 블로킹되는 것을 막기 위해, 내부적으로AnyIO라이브러리를 통해 외부 스레드 풀(Thread Pool)의 별도 스레드에 작업을 위임합니다. (내부적으로anyio.to_thread.run_sync()혹은asyncio.to_thread()와 동일한 원리로 동작합니다.)
import requests
import httpx
from fastapi import FastAPI
app = FastAPI()
# 1. 동기 라이브러리 사용 시 -> 일반 def 사용 (스레드 풀에서 멀티 스레드로 실행)
@app.get("/sync-call")
def sync_call():
# 메인 이벤트 루프를 막지 않기 위해 FastAPI가 알아서 다른 스레드로 던져서 실행함
response = requests.get("[https://example.com](https://example.com)")
return response.json()
# 2. 비동기 라이브러리 사용 시 -> async def 사용 (메인 이벤트 루프에서 실행)
@app.get("/async-call")
async def async_call():
async with httpx.AsyncClient() as client:
response = await client.get("[https://example.com](https://example.com)")
return response.json()
단, 워커 스레드 풀의 개수에는 한계가 있으므로 트래픽이 높은 서비스라면 I/O 작업은 가급적 async def와 비동기 라이브러리를 사용하는 것이 성능을 극대화하는 길입니다.
4. 의존성 주입(DI)과 싱글톤(Singleton) 객체 관리
FastAPI는 자체적으로 훌륭한 DI(Depends) 시스템을 갖추고 있습니다. 하지만 이는 주로 요청(Request) 스코프에 맞춰져 있어, 요청이 들어올 때마다 객체를 새로 생성합니다.
만약 DB 커넥션 풀처럼 애플리케이션 수명 주기 동안 단 하나의 객체만 유지하는 싱글톤(Singleton) 패턴이나, 더 정교한 IoC 컨테이너가 필요하다면 외부 라이브러리인 dependency_injector를 결합하여 사용합니다.
from fastapi import FastAPI, Depends
from dependency_injector import containers, providers
from dependency_injector.wiring import inject, Provide
# 1. 사용할 서비스 객체
class DatabaseService:
def __init__(self):
print("DB 연결 객체 생성됨! (최초 1회만 출력되어야 함)")
self.status = "connected"
# 2. DI 컨테이너 정의 (Singleton Provider 사용)
class Container(containers.DeclarativeContainer):
db_service = providers.Singleton(DatabaseService)
app = FastAPI()
# 3. 엔드포인트에 주입
@app.get("/status")
@inject
def get_status(db: DatabaseService = Depends(Provide[Container.db_service])):
# 요청이 100번 들어와도 DatabaseService는 단 1개만 생성되어 재사용됨
return {"db_status": db.status}
# 4. 앱 시작 시 컨테이너 연결
container = Container()
container.wire(modules=[__name__])
5. 백그라운드 작업 분리: BackgroundTasks와 Celery
API 응답은 0.1초 만에 나가야 하는데, 데이터 파이프라인 트리거, 이메일 발송, 영상 인코딩처럼 오래 걸리는 작업이 있다면 어떻게 해야 할까요? 작업의 무거운 정도에 따라 두 가지 전략을 취할 수 있습니다.
A. 가벼운 작업: 내장 BackgroundTasks
FastAPI 프로세스 내에서 응답을 보낸 직후 남은 작업을 마저 처리합니다. 설정이 필요 없어 매우 간편합니다.
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as f:
f.write(message)
@app.post("/log")
async def log_message(msg: str, bg_tasks: BackgroundTasks):
# 응답은 즉시 반환하고, 파일 쓰기는 백그라운드에서 실행
bg_tasks.add_task(write_log, f"Log: {msg}\n")
return {"message": "Log will be written in background"}
B. 무거운 작업 (CPU 집약적): Celery + Redis 연동
몇 분 이상 걸리거나, 실패 시 재시도가 필요한 무거운 작업은 메인 웹 서버(Uvicorn/Gunicorn)와 워커를 완전히 분리해야 합니다. 이때 메시지 브로커(Redis)와 비동기 작업 큐(Celery)를 사용합니다.
# --- celery_app.py (워커 정의) ---
from celery import Celery
import time
celery_app = Celery("tasks", broker="redis://localhost:6379/0")
@celery_app.task
def heavy_computation(data: int):
time.sleep(10) # 10초 걸리는 무거운 작업
return data * 100
# --- main.py (FastAPI) ---
from fastapi import FastAPI
from celery_app import heavy_computation
app = FastAPI()
@app.post("/start-computation/")
async def start_computation(data: int):
# Redis 큐에 작업 지시서만 던져놓고 즉시 빠져나옴 (대기 X)
task = heavy_computation.delay(data)
return {"message": "작업 시작됨", "task_id": task.id}
이 구조를 통해 FastAPI는 클라이언트와 HTTP 통신을 빠르게 주고받는 데만 집중하고, 무거운 짐은 뒤에 있는 Celery 워커들이 나누어 지게 됩니다.
마무리
FastAPI는 단순한 도구를 넘어 파이썬 백엔드 개발의 패러다임을 동기에서 비동기로 옮겨가는 데 큰 역할을 하고 있습니다.
- 가벼운 I/O 작업은 단일 스레드 + 이벤트 루프로 극강의 효율을 내고
- 동기식 구형 라이브러리는 스레드 풀(
def)로 유연하게 감싸 안으며 - 무거운 CPU/데이터 처리 연산은 Celery 같은 외부 워커로 분리하는 아키텍처
이러한 내부 동작의 원리를 명확히 이해한다면, 확장성 있고 견고한 현대적 데이터/웹 서비스 백엔드 시스템을 성공적으로 구축할 수 있을 것입니다.