Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM python:3.10-slim
RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
144 changes: 144 additions & 0 deletions api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# api.py

import os
import pickle
import json
import time
from functools import lru_cache
import requests # Importante: para hacer peticiones HTTP
import pandas as pd
from fastapi import FastAPI, HTTPException, Response
from pydantic import BaseModel
from typing import Dict, Any, List

from prometheus_client import Histogram, generate_latest

# --- Modelos Pydantic (se mantienen igual) ---
class PredictionRequest(BaseModel):
features: Dict[str, float]

class PredictionResponse(BaseModel):
prediction: Any
model_details: Dict[str, str]

# --- Métricas de Prometheus (se mantienen igual) ---
INFERENCE_LATENCY = Histogram(
'inference_latency_seconds',
'Latencia de las peticiones de inferencia (en segundos)',
['dataset', 'model_type']
)

# --- Aplicación FastAPI (SIN LIFESPAN) ---
app = FastAPI(
title="API de Inferencia (v3 - Desacoplada)",
version="3.0.0",
description="Provee acceso a modelos de ML consumiendo la API de Registro."
)

# URL del servicio de gestión, leído del entorno
MANAGEMENT_API_URL = os.environ.get("MANAGEMENT_API_URL", "http://localhost:9000")

# --- NUEVAS Funciones de Ayuda ---
@lru_cache(maxsize=32)
def get_artifact_from_management_api(dataset: str, model_type: str, artifact_name: str):
"""
Llama a la management-api para obtener un artefacto, lo deserializa y lo cachea.
"""
print(f"CACHE MISS: Pidiendo a management-api: {dataset}/{model_type}/{artifact_name}")
try:
url = f"{MANAGEMENT_API_URL}/registry/artifacts/{dataset}/{model_type}/{artifact_name}"
response = requests.get(url, timeout=10)

response.raise_for_status() # Lanza una excepción para códigos de error (4xx o 5xx)

serialized_obj_bytes = response.content

# Deserializar según el tipo
if artifact_name == "metrics":
return json.loads(serialized_obj_bytes.decode('utf-8'))
else:
return pickle.loads(serialized_obj_bytes)

except requests.exceptions.HTTPError as e:
status_code = e.response.status_code if e.response else 503
try:
# Intentar parsear el detalle del error que viene de management-api
detail = e.response.json().get("detail", e.response.text)
except json.JSONDecodeError:
detail = e.response.text
raise HTTPException(status_code=status_code, detail=f"Error desde management-api: {detail}")
except requests.exceptions.RequestException as e:
raise HTTPException(status_code=503, detail=f"No se pudo contactar a management-api: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error inesperado al procesar artefacto: {e}")

# --- Endpoints de la API (REESCRITOS PARA USAR LA NUEVA LÓGICA) ---

@app.get("/models", summary="Listar todos los modelos disponibles")
def list_models():
"""Consulta a la management-api para obtener la lista de modelos."""
try:
url = f"{MANAGEMENT_API_URL}/registry/models"
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
raise HTTPException(status_code=503, detail=f"No se pudo conectar a management-api para listar modelos: {e}")

@app.get("/models/{dataset}/{model_type}/features", summary="Obtener características de un modelo")
def get_model_features(dataset: str, model_type: str):
"""Obtiene la lista de características desde la management-api."""
try:
features = get_artifact_from_management_api(dataset, model_type, "feature_names")
return {"features": features}
except HTTPException as e:
raise e # Re-lanzar la excepción que ya viene formateada

@app.get("/models/{dataset}/{model_type}/metrics", summary="Obtener métricas de entrenamiento")
def get_model_metrics(dataset: str, model_type: str):
"""Obtiene las métricas del modelo desde la management-api."""
try:
metrics = get_artifact_from_management_api(dataset, model_type, "metrics")
return metrics
except HTTPException as e:
raise e

@app.post("/predict/{dataset}/{model_type}", response_model=PredictionResponse, summary="Realizar una predicción")
def predict(dataset: str, model_type: str, request: PredictionRequest):
"""Realiza una predicción usando un modelo y registra la latencia."""
start_time = time.time()
required_features = [] # Para el mensaje de error
try:
pipeline = get_artifact_from_management_api(dataset, model_type, "pipeline")
required_features = get_artifact_from_management_api(dataset, model_type, "feature_names")

input_df = pd.DataFrame([request.features])[required_features]
prediction = pipeline.predict(input_df)

prediction_value = prediction[0].item() if hasattr(prediction[0], 'item') else prediction[0]

return PredictionResponse(prediction=prediction_value, model_details={"dataset": dataset, "model_type": model_type})

except KeyError as e:
raise HTTPException(status_code=400, detail=f"Falta la característica requerida: {e}. Se requieren: {required_features}")
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error durante la predicción: {e}")
finally:
latency = time.time() - start_time
INFERENCE_LATENCY.labels(dataset=dataset, model_type=model_type).observe(latency)
print(f"PREDICT LATENCY: {dataset}/{model_type} -> {latency:.4f}s")



@app.get("/health", summary="Comprobación de Salud", status_code=200)
def health_check():
"""
Endpoint simple para que Docker Healthcheck pueda verificar que el servicio está vivo.
"""
return {"status": "ok"}

@app.get("/metrics", summary="Exponer métricas para Prometheus")
def get_prometheus_metrics():
return Response(content=generate_latest(), media_type="text/plain")
78 changes: 78 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# docker-compose.yml (VERSIÓN FINAL - RESILIENTE)

services:
# Los servicios de Ray no cambian. ray-head sigue siendo el SPOF principal por diseño.
ray-head:
build: .
# Ya no usamos container_name para facilitar el escalado, aunque en head no es estrictamente necesario.
ports:
- "8265:8265"
- "10001:10001"
command: |
bash -c "
ray start --head --num-cpus=1 --port=6379 --dashboard-host=0.0.0.0 --include-dashboard=true --ray-client-server-port=10001 --resources='{\"is_head_node\": 1}' && \
echo 'Nodo HEAD en marcha...' && \
tail -f /dev/null
"
shm_size: 2.5gb

ray-worker: # Unificamos los workers para facilitar el escalado
build: .
depends_on:
- ray-head
deploy:
replicas: 2 # Podemos escalar los workers de Ray fácilmente
command: |
bash -c "
ray start --address=ray-head:6379 --num-cpus=2 && \
echo 'Worker conectado al HEAD' && \
tail -f /dev/null
"
shm_size: 2.5gb

# SOLUCIÓN #1: Replicación del servicio crítico
management-api:
build: .
# Ya no se usa container_name para permitir la replicación.
# Ya no se exponen puertos, la comunicación es interna a través de la GUI.
deploy:
replicas: 3 # Creamos un grupo de 3 réplicas para alta disponibilidad.
environment:
- RAY_ADDRESS=ray://ray-head:10001
- SERVICE_NAME=management-api # Para que el cliente resiliente sepa a quién buscar
command: uvicorn management_api:app --host 0.0.0.0 --port 9000
# Quitamos el healthcheck porque la GUI ahora es responsable de manejar los fallos.

# Hacemos lo mismo para el servicio de inferencia para consistencia
api-service:
build: .
deploy:
replicas: 3 # También replicamos el servicio de inferencia.
# ports:
# Exponemos este puerto para que sea más fácil de probar externamente (ej. con curl)
# Pero idealmente, la GUI también usaría un cliente resiliente para este.
# - "8000:8000"
environment:
# El api-service también se beneficia del balanceo de carga interno de Docker
# al hablar con las réplicas de management-api.
- MANAGEMENT_API_URL=http://management-api:9000
command: uvicorn api:app --host 0.0.0.0 --port 8000

# SOLUCIÓN #2: La GUI ahora es el cliente inteligente
gui-service:
build: .
container_name: gui-service # La GUI es única, así que puede tener nombre.
depends_on:
- management-api
- api-service
ports:
- "8501:8501"
environment:
# Pasamos la información necesaria para que el ResilientClient haga su trabajo.
- MANAGEMENT_API_SERVICE_NAME=management-api
- MANAGEMENT_API_PORT=9000
# La URL de inferencia puede apuntar al nombre del servicio; Docker hará un balanceo de carga simple (round-robin).
- INFERENCE_API_URL=http://api-service:8000
- RAY_DASHBOARD_URL=http://ray-head:8265
- RAY_ADDRESS=ray://ray-head:10001
command: streamlit run gui.py --server.port=8501 --server.address=0.0.0.0
Loading