Como Fazer Deploy de Aplicações com LLMs
Guia completo do notebook para produção
•18 min de leitura
Fazer um chatbot funcionar no notebook é fácil. Colocar em produção com performance, segurança e custos controlados é outro nível.
Arquitetura de Produção
┌─────────────────────────────────────────────────────────┐
│ PRODUÇÃO │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ │
│ │ Cliente │───▶│ CDN │───▶│ Load Balancer │ │
│ └─────────┘ └─────────┘ └────────┬────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ │ │
│ ┌────▼────┐ ┌────▼────┤
│ │ API 1 │ │ API 2 │
│ └────┬────┘ └────┬────┘
│ │ │ │
│ ┌─────────┴─────────────────────┘ │
│ │ │
│ ┌─────────▼─────────┐ ┌───────────────┐ │
│ │ Cache │ │ Rate Limiter│ │
│ │ (Redis) │ │ │ │
│ └─────────┬─────────┘ └───────────────┘ │
│ │ │
│ ┌─────────▼─────────┐ ┌───────────────┐ │
│ │ OpenAI/Claude │ │ Vector DB │ │
│ │ API │ │ (Pinecone) │ │
│ └───────────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘1. API com FastAPI
# main.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from openai import OpenAI
import os
app = FastAPI(title="AI Chat API")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure para seu domínio
allow_methods=["*"],
allow_headers=["*"],
)
# Client OpenAI
client = OpenAI()
class ChatRequest(BaseModel):
message: str
conversation_id: str | None = None
class ChatResponse(BaseModel):
response: str
conversation_id: str
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
try:
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "Você é um assistente útil."},
{"role": "user", "content": request.message}
]
)
return ChatResponse(
response=response.choices[0].message.content,
conversation_id=request.conversation_id or "new"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy"}2. Streaming de Respostas
from fastapi.responses import StreamingResponse
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
stream = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "Você é um assistente útil."},
{"role": "user", "content": request.message}
],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield f"data: {chunk.choices[0].delta.content}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)3. Cache com Redis
import redis
import hashlib
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_cache_key(messages: list) -> str:
"""Gera key única baseada nas mensagens."""
content = json.dumps(messages, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
def get_cached_response(messages: list) -> str | None:
"""Busca resposta em cache."""
key = get_cache_key(messages)
cached = redis_client.get(key)
if cached:
return cached.decode()
return None
def cache_response(messages: list, response: str, ttl: int = 3600):
"""Salva resposta em cache."""
key = get_cache_key(messages)
redis_client.setex(key, ttl, response)
# Usar no endpoint
@app.post("/chat")
async def chat(request: ChatRequest):
messages = [
{"role": "system", "content": "Você é um assistente."},
{"role": "user", "content": request.message}
]
# Verificar cache
cached = get_cached_response(messages)
if cached:
return ChatResponse(response=cached, cached=True)
# Chamar API
response = client.chat.completions.create(
model="gpt-4",
messages=messages
)
result = response.choices[0].message.content
# Salvar em cache
cache_response(messages, result)
return ChatResponse(response=result, cached=False)4. Rate Limiting
from fastapi import Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/chat")
@limiter.limit("10/minute") # 10 requests por minuto por IP
async def chat(request: Request, chat_request: ChatRequest):
# ...
# Rate limit por usuário (com autenticação)
def get_user_id(request: Request) -> str:
return request.headers.get("X-User-ID", get_remote_address(request))
limiter = Limiter(key_func=get_user_id)5. Dockerfile
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Instalar dependências
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copiar código
COPY . .
# Expor porta
EXPOSE 8000
# Comando para rodar
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
redis_data:6. Monitoramento
import logging
import time
from functools import wraps
# Configurar logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Decorator para medir tempo
def measure_time(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
result = await func(*args, **kwargs)
elapsed = time.time() - start
logger.info(f"{func.__name__} took {elapsed:.2f}s")
return result
return wrapper
# Métricas com Prometheus
from prometheus_client import Counter, Histogram
request_count = Counter(
'chat_requests_total',
'Total chat requests',
['status']
)
request_latency = Histogram(
'chat_request_latency_seconds',
'Chat request latency'
)
@app.post("/chat")
@measure_time
async def chat(request: ChatRequest):
with request_latency.time():
try:
response = await process_chat(request)
request_count.labels(status='success').inc()
return response
except Exception as e:
request_count.labels(status='error').inc()
raise7. Gerenciamento de Secrets
# Nunca coloque API keys no código!
# Opção 1: Variáveis de ambiente
import os
openai_key = os.environ["OPENAI_API_KEY"]
# Opção 2: .env com python-dotenv
from dotenv import load_dotenv
load_dotenv()
# Opção 3: Secret manager (produção)
# AWS Secrets Manager, GCP Secret Manager, etc.
# Opção 4: Vault
# HashiCorp Vault para secrets em escalaOpções de Deploy
| Plataforma | Complexidade | Custo | Melhor para |
|---|---|---|---|
| Railway | Baixa | $ | MVPs, startups |
| Render | Baixa | $ | Apps simples |
| Vercel | Baixa | $ | Serverless, Next.js |
| AWS ECS | Média | $$ | Escala, enterprise |
| GCP Cloud Run | Baixa | $$ | Containers serverless |
| Kubernetes | Alta | $$$ | Grande escala |
Checklist de Produção
Antes de deployar:
□ API keys em variáveis de ambiente
□ Rate limiting configurado
□ Cache implementado
□ Logging estruturado
□ Health check endpoint
□ CORS configurado
□ HTTPS habilitado
□ Error handling robusto
□ Timeouts configurados
□ Retry logic para APIs externas
□ Monitoramento de custos
□ Alertas para erros
□ Backup de dados (se aplicável)
□ Documentação da API
□ Testes automatizadosCustos Típicos
Estimativa para 10.000 requests/dia:
API OpenAI (GPT-4):
- ~500 tokens/request médio
- 5M tokens/dia = ~$50-150/dia
Infraestrutura:
- Railway/Render: $20-50/mês
- AWS ECS: $50-200/mês
- Redis: $15-30/mês
Vector DB (se usar RAG):
- Pinecone: $70+/mês
- Chroma (self-hosted): custo de servidor
Total estimado: $200-500/mês para 10k req/dia
Otimizações:
- Cache pode reduzir 30-50% dos custos de API
- Modelo menor (GPT-3.5) para queries simples
- Batch requests quando possível