diff --git a/.gitignore b/.gitignore index 7d413ff..45c2a32 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,10 @@ ENV/ models/ *.onnx +# TensorRT artifacts +*.engine +*.profile + # OS files .DS_Store Thumbs.db @@ -32,4 +36,3 @@ Thumbs.db # Test/temp files *.tmp *.bak - diff --git a/README.md b/README.md index 2780411..48da52b 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,23 @@ ## Genealog Face Service -FastAPI-based face embedding and matching microservice using InsightFace + ONNX Runtime GPU. This service is designed to be called from the `genealog-api` backend via HTTP. +FastAPI-based face embedding microservice using InsightFace + ONNX Runtime GPU. This service generates face embeddings from images and is designed to be called from the `genealog-api` backend via HTTP. ### Endpoints - `GET /healthz` – basic health check and model info. - `POST /embed-avatar` – JSON body: `{ "image_url": "https://..." }`, returns a single best face embedding for an avatar image. - `POST /embed-image` – JSON body: `{ "image_url": "https://..." }`, returns all detected faces and embeddings. -- `POST /test-avatar` – multipart form with fields: - - `tag`: string tag for logging / correlation - - `avatar`: avatar image file (face to match) - - `image`: target image file (search space) -All embeddings are normalized float vectors suitable for cosine-similarity comparison. +All embeddings are normalized float vectors suitable for cosine-similarity comparison. Face matching/comparison is handled by the calling service (`genealog-api`). + +### Features + +- **Async HTTP downloads** with retry logic (httpx + tenacity) +- **Image validation**: size limits (20MB max), dimension limits (32px-8192px), decompression bomb protection +- **Robust image decoding**: handles all color modes (RGB, RGBA, L, LA, PA, CMYK, I, F), EXIF orientation correction +- **Face detection fallback**: If no face is detected in `/embed-avatar`, falls back to center crop embedding +- **Embedding validation**: Checks for NaN/Inf values before returning +- **Modular structure**: Clean separation of concerns (config, models, face processing, image handling, routes) `/embed-avatar` notes: @@ -29,6 +34,14 @@ source .venv/bin/activate pip install -r requirements.txt ``` +**Dependencies:** +- `fastapi`, `uvicorn` - Web framework +- `insightface` - Face detection and recognition +- `onnxruntime-gpu` - GPU-accelerated inference +- `httpx` - Async HTTP client for image downloads +- `tenacity` - Retry logic +- `opencv-python-headless`, `numpy`, `Pillow` - Image processing + GPU support assumes: - WSL2 with GPU enabled. @@ -60,6 +73,10 @@ You can override via environment variables: PORT=18081 \ FACE_MODEL_NAME=buffalo_l \ FACE_DET_SIZE=1024 \ +MAX_DOWNLOAD_SIZE=20971520 \ +MAX_IMAGE_DIMENSION=8192 \ +DOWNLOAD_TIMEOUT=15.0 \ +MAX_RETRIES=3 \ UVICORN_WORKERS=20 \ ./run_face_service.sh ``` @@ -72,6 +89,26 @@ nohup ./run_face_service.sh > face_service.log 2>&1 & Logs are written to `face_service.log` in the repo root. +### Project Structure + +``` +genealog-face/ +├── app/ +│ ├── __init__.py +│ ├── main.py # FastAPI app, lifespan handler, health endpoint +│ ├── config.py # Environment variables and constants +│ ├── models.py # Pydantic request/response models +│ ├── face.py # FaceAnalysis loading, embedding logic +│ ├── image.py # Image download, decode, validation +│ └── routes/ +│ ├── __init__.py +│ └── embed.py # /embed-avatar, /embed-image endpoints +├── .gitignore +├── requirements.txt +├── run_face_service.sh +└── README.md +``` + ### Integration with genealog-api (Docker) The `genealog-api` service expects this face service to be reachable at: diff --git a/app/config.py b/app/config.py index 5198cbd..3feecf9 100644 --- a/app/config.py +++ b/app/config.py @@ -5,16 +5,25 @@ import os # Model configuration MODEL_NAME = os.getenv("FACE_MODEL_NAME", "buffalo_l") DET_SIZE = int(os.getenv("FACE_DET_SIZE", "1024")) +USE_TENSORRT = os.getenv("USE_TENSORRT", "true").lower() in ("true", "1", "yes") # Image processing limits MAX_DOWNLOAD_SIZE = int(os.getenv("MAX_DOWNLOAD_SIZE", 20 * 1024 * 1024)) # 20MB -MAX_IMAGE_DIMENSION = int(os.getenv("MAX_IMAGE_DIMENSION", 8192)) # 8192px +MAX_IMAGE_DIMENSION = int(os.getenv("MAX_IMAGE_DIMENSION", 4096)) # 4096px (reduced from 8192) MIN_IMAGE_DIMENSION = int(os.getenv("MIN_IMAGE_DIMENSION", 32)) # 32px +TARGET_MAX_DIMENSION = int(os.getenv("TARGET_MAX_DIMENSION", 2048)) # Downscale large images # HTTP client settings DOWNLOAD_TIMEOUT = float(os.getenv("DOWNLOAD_TIMEOUT", 15.0)) # 15 seconds MAX_RETRIES = int(os.getenv("MAX_RETRIES", 3)) +# HTTP connection pool settings +HTTP_POOL_MAX_CONNECTIONS = int(os.getenv("HTTP_POOL_MAX_CONNECTIONS", 100)) +HTTP_POOL_MAX_KEEPALIVE = int(os.getenv("HTTP_POOL_MAX_KEEPALIVE", 20)) + +# Thread pool for blocking operations (GPU inference, image decode) +INFERENCE_THREADS = int(os.getenv("INFERENCE_THREADS", 4)) + # TODO [PROD]: Add URL allowlist for SSRF protection # ALLOWED_URL_PATTERNS = os.getenv("ALLOWED_URL_PATTERNS", "").split(",") diff --git a/app/face.py b/app/face.py index 6836fd4..9163a8e 100644 --- a/app/face.py +++ b/app/face.py @@ -1,11 +1,13 @@ """Face analysis and embedding logic using InsightFace.""" +import asyncio import logging +from concurrent.futures import ThreadPoolExecutor import numpy as np from insightface.app import FaceAnalysis -from app.config import DET_SIZE, MODEL_NAME +from app.config import DET_SIZE, MODEL_NAME, USE_TENSORRT from app.models import BBox logger = logging.getLogger("face_service") @@ -13,16 +15,65 @@ logger = logging.getLogger("face_service") face_app: FaceAnalysis | None = None +def _check_tensorrt_available() -> bool: + """Check if TensorRT libraries are actually installed.""" + try: + import tensorrt + return True + except ImportError: + return False + + +def _get_providers() -> list: + """Get ONNX Runtime execution providers based on configuration.""" + import onnxruntime as ort + + available_providers = ort.get_available_providers() + logger.info(f"Available ONNX providers: {available_providers}") + + providers = [] + + # Try TensorRT first if enabled and libraries are actually installed + if USE_TENSORRT and "TensorrtExecutionProvider" in available_providers: + if _check_tensorrt_available(): + providers.append( + ( + "TensorrtExecutionProvider", + { + "trt_max_workspace_size": 2 * 1024 * 1024 * 1024, # 2GB + "trt_fp16_enable": True, # FP16 for faster inference + "trt_engine_cache_enable": True, # Cache TensorRT engines + }, + ) + ) + else: + logger.warning("TensorRT requested but libnvinfer.so.10 not found, skipping") + + # CUDA fallback + if "CUDAExecutionProvider" in available_providers: + providers.append("CUDAExecutionProvider") + + # CPU fallback (always available) + providers.append("CPUExecutionProvider") + + logger.info(f"Using providers: {[p[0] if isinstance(p, tuple) else p for p in providers]}") + return providers + + def load_face_app() -> FaceAnalysis: """Load and initialize the FaceAnalysis model (singleton).""" global face_app if face_app is not None: return face_app - logger.info(f"Loading InsightFace model pack={MODEL_NAME}, det_size={DET_SIZE}") + providers = _get_providers() + logger.info( + f"Loading InsightFace model pack={MODEL_NAME}, det_size={DET_SIZE}, " + f"tensorrt={USE_TENSORRT}" + ) fa = FaceAnalysis( name=MODEL_NAME, - providers=["CUDAExecutionProvider", "CPUExecutionProvider"], + providers=providers, ) fa.prepare(ctx_id=0, det_size=(DET_SIZE, DET_SIZE)) face_app = fa @@ -30,6 +81,16 @@ def load_face_app() -> FaceAnalysis: return face_app +async def get_faces_async( + fa: FaceAnalysis, + img: np.ndarray, + executor: ThreadPoolExecutor, +) -> list: + """Run face detection/embedding in thread pool to not block event loop.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(executor, fa.get, img) + + def to_pixel_bbox(bbox, width: int, height: int) -> BBox: """Convert InsightFace bbox to pixel BBox with coordinate clamping.""" x1, y1, x2, y2 = bbox diff --git a/app/image.py b/app/image.py index d5c6945..2e994bc 100644 --- a/app/image.py +++ b/app/image.py @@ -1,6 +1,8 @@ """Image download, decoding, and validation utilities.""" +import asyncio import logging +from concurrent.futures import ThreadPoolExecutor from io import BytesIO import cv2 @@ -21,6 +23,7 @@ from app.config import ( MAX_IMAGE_DIMENSION, MAX_RETRIES, MIN_IMAGE_DIMENSION, + TARGET_MAX_DIMENSION, ) logger = logging.getLogger("face_service") @@ -114,6 +117,9 @@ def _decode_image_bytes(data: bytes, source: str) -> np.ndarray: img = np.array(pil_image) img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) + # Downscale large images for faster processing + img = _maybe_downscale(img) + logger.info( "decode_image_bytes: source=%s shape=%s mode=%s", source, @@ -123,6 +129,25 @@ def _decode_image_bytes(data: bytes, source: str) -> np.ndarray: return img +def _maybe_downscale(img: np.ndarray, max_dim: int = TARGET_MAX_DIMENSION) -> np.ndarray: + """Downscale image if larger than max_dim while preserving aspect ratio.""" + h, w = img.shape[:2] + if max(h, w) <= max_dim: + return img + + scale = max_dim / max(h, w) + new_w = int(w * scale) + new_h = int(h * scale) + + logger.info( + "downscaling image from %dx%d to %dx%d (scale=%.2f)", + w, h, new_w, new_h, scale, + ) + + # Use INTER_AREA for downscaling (best quality) + return cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA) + + @retry( retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)), stop=stop_after_attempt(MAX_RETRIES), @@ -158,19 +183,46 @@ async def _download_with_retry(client: httpx.AsyncClient, url: str) -> bytes: return content -async def download_image(image_url: str) -> np.ndarray: +async def download_image( + image_url: str, + client: httpx.AsyncClient | None = None, + executor: ThreadPoolExecutor | None = None, +) -> np.ndarray: """ Download and decode an image from URL. - + Features: - - Async HTTP with connection pooling + - Async HTTP with connection pooling (uses shared client if provided) - Retry with exponential backoff for transient failures - Size validation before and after download - - Comprehensive image decoding + - Async image decoding in thread pool + + Args: + image_url: URL to download image from + client: Shared httpx client (falls back to creating new one if None) + executor: Thread pool for blocking decode (runs sync if None) """ + # Use shared client or create temporary one + if client is None: + from app.resources import http_client + client = http_client + + # Fallback to temporary client if still None (e.g., during tests) + if client is None: + async with httpx.AsyncClient(timeout=DOWNLOAD_TIMEOUT) as temp_client: + return await _download_and_decode(temp_client, image_url, executor) + + return await _download_and_decode(client, image_url, executor) + + +async def _download_and_decode( + client: httpx.AsyncClient, + image_url: str, + executor: ThreadPoolExecutor | None, +) -> np.ndarray: + """Internal helper to download and decode image.""" try: - async with httpx.AsyncClient(timeout=DOWNLOAD_TIMEOUT) as client: - data = await _download_with_retry(client, image_url) + data = await _download_with_retry(client, image_url) except httpx.TimeoutException: logger.exception("Timeout downloading image") raise HTTPException(status_code=408, detail="Timeout downloading image") @@ -186,8 +238,15 @@ async def download_image(image_url: str) -> np.ndarray: logger.exception("Failed to download image") raise HTTPException(status_code=400, detail=f"Failed to download image: {e}") + # Decode in thread pool to avoid blocking event loop try: - img = _decode_image_bytes(data, image_url) + if executor is not None: + loop = asyncio.get_running_loop() + img = await loop.run_in_executor( + executor, _decode_image_bytes, data, image_url + ) + else: + img = _decode_image_bytes(data, image_url) except (ImageDecodeError, ImageValidationError) as e: raise HTTPException(status_code=400, detail=str(e)) diff --git a/app/main.py b/app/main.py index d80f554..824ac1e 100644 --- a/app/main.py +++ b/app/main.py @@ -1,11 +1,22 @@ """FastAPI application entry point.""" import logging +from concurrent.futures import ThreadPoolExecutor from contextlib import asynccontextmanager +import httpx from fastapi import FastAPI -from app.config import DET_SIZE, MODEL_NAME +import app.resources as resources +from app.config import ( + DET_SIZE, + DOWNLOAD_TIMEOUT, + HTTP_POOL_MAX_CONNECTIONS, + HTTP_POOL_MAX_KEEPALIVE, + INFERENCE_THREADS, + MODEL_NAME, + USE_TENSORRT, +) from app.face import load_face_app from app.routes import embed @@ -15,12 +26,42 @@ logger = logging.getLogger("face_service") @asynccontextmanager async def lifespan(app: FastAPI): - """Application lifespan handler - load models on startup.""" + """Application lifespan handler - load models and shared resources on startup.""" logger.info("Starting face service...") + + # Create HTTP client with connection pooling + limits = httpx.Limits( + max_connections=HTTP_POOL_MAX_CONNECTIONS, + max_keepalive_connections=HTTP_POOL_MAX_KEEPALIVE, + ) + resources.http_client = httpx.AsyncClient( + timeout=DOWNLOAD_TIMEOUT, + limits=limits, + # http2=True requires 'h2' package - disable for now + ) + logger.info( + f"HTTP client initialized (max_conn={HTTP_POOL_MAX_CONNECTIONS}, " + f"keepalive={HTTP_POOL_MAX_KEEPALIVE})" + ) + + # Create thread pool for blocking operations (GPU inference, image decode) + resources.inference_executor = ThreadPoolExecutor( + max_workers=INFERENCE_THREADS, + thread_name_prefix="inference", + ) + logger.info(f"Thread pool initialized (workers={INFERENCE_THREADS})") + + # Load face model (may take time if TensorRT engines need building) load_face_app() + logger.info("Face service ready") yield + + # Cleanup logger.info("Shutting down face service...") + await resources.http_client.aclose() + resources.inference_executor.shutdown(wait=True) + logger.info("Cleanup complete") app = FastAPI( @@ -40,5 +81,6 @@ def healthz(): "status": "ok", "model": MODEL_NAME, "det_size": DET_SIZE, + "tensorrt": USE_TENSORRT, } diff --git a/app/resources.py b/app/resources.py new file mode 100644 index 0000000..09530fe --- /dev/null +++ b/app/resources.py @@ -0,0 +1,9 @@ +"""Shared application resources (HTTP client, thread pool, etc.).""" + +from concurrent.futures import ThreadPoolExecutor + +import httpx + +# Global shared resources (initialized in app lifespan) +http_client: httpx.AsyncClient | None = None +inference_executor: ThreadPoolExecutor | None = None diff --git a/app/routes/embed.py b/app/routes/embed.py index 2209ea1..62f2aec 100644 --- a/app/routes/embed.py +++ b/app/routes/embed.py @@ -8,11 +8,13 @@ from fastapi import APIRouter, HTTPException from app.face import ( fallback_avatar_embedding, + get_faces_async, load_face_app, to_pixel_bbox, validate_embedding, ) from app.image import download_image +from app.resources import http_client, inference_executor from app.models import ( BBox, EmbedAvatarResponse, @@ -30,16 +32,16 @@ router = APIRouter() async def embed_avatar(req: EmbedRequest): """ Extract face embedding from an avatar image. - + Returns the largest detected face. If no face is detected, falls back to center crop embedding with score=0.0. """ logger.info("embed_avatar: image_url=%s", req.image_url) fa = load_face_app() - img = await download_image(str(req.image_url)) + img = await download_image(str(req.image_url), http_client, inference_executor) h, w = img.shape[:2] - faces = fa.get(img) + faces = await get_faces_async(fa, img, inference_executor) if len(faces) == 0: logger.warning( "embed_avatar: no faces detected image_url=%s size=%dx%d, using fallback", @@ -98,15 +100,15 @@ async def embed_avatar(req: EmbedRequest): async def embed_image(req: EmbedRequest): """ Extract face embeddings from all faces in an image. - + Returns all detected faces sorted by detection score (highest first). Returns empty list if no faces detected. """ fa = load_face_app() - img = await download_image(str(req.image_url)) + img = await download_image(str(req.image_url), http_client, inference_executor) h, w = img.shape[:2] - faces = fa.get(img) + faces = await get_faces_async(fa, img, inference_executor) if len(faces) == 0: logger.warning( "embed_image: no faces detected image_url=%s size=%dx%d", diff --git a/run_face_service.sh b/run_face_service.sh index 510ed4c..525c4a3 100755 --- a/run_face_service.sh +++ b/run_face_service.sh @@ -5,27 +5,66 @@ set -euo pipefail if [ -d ".venv" ]; then # shellcheck disable=SC1091 source ".venv/bin/activate" + + # Add TensorRT libs to library path if installed via pip + TENSORRT_LIBS=".venv/lib/python3.12/site-packages/tensorrt_libs" + if [ -d "$TENSORRT_LIBS" ]; then + export LD_LIBRARY_PATH="${TENSORRT_LIBS}:${LD_LIBRARY_PATH:-}" + fi fi -# Model configuration (can be overridden via env) +# ============================================================================= +# Model Configuration +# ============================================================================= export FACE_MODEL_NAME="${FACE_MODEL_NAME:-buffalo_l}" export FACE_DET_SIZE="${FACE_DET_SIZE:-1024}" -# Tune CPU thread usage and workers +# TensorRT acceleration (2-3x faster inference) +# First startup is slow (~30-60s) while TensorRT builds optimized engines +# Engines are cached in ~/.cache/onnxruntime/ for subsequent runs +export USE_TENSORRT="${USE_TENSORRT:-true}" + +# ============================================================================= +# Performance Tuning +# ============================================================================= CPU_CORES="$(nproc || echo 4)" -DEFAULT_WORKERS="${CPU_CORES}" -if [ "$DEFAULT_WORKERS" -lt 2 ]; then - DEFAULT_WORKERS=2 -fi -export OMP_NUM_THREADS="${OMP_NUM_THREADS:-2}" -export MKL_NUM_THREADS="${MKL_NUM_THREADS:-2}" +# GPU inference is the bottleneck - use 1 worker to avoid loading multiple +# copies of the model into GPU memory. Concurrency is handled via thread pool. +DEFAULT_WORKERS=1 +# Thread pool for blocking operations (GPU inference, image decode) +# 4 threads allows overlapping I/O with GPU work +export INFERENCE_THREADS="${INFERENCE_THREADS:-4}" + +# CPU threading for numpy/BLAS operations +export OMP_NUM_THREADS="${OMP_NUM_THREADS:-4}" +export MKL_NUM_THREADS="${MKL_NUM_THREADS:-4}" + +# ============================================================================= +# HTTP Connection Pool +# ============================================================================= +export HTTP_POOL_MAX_CONNECTIONS="${HTTP_POOL_MAX_CONNECTIONS:-100}" +export HTTP_POOL_MAX_KEEPALIVE="${HTTP_POOL_MAX_KEEPALIVE:-20}" + +# ============================================================================= +# Image Processing +# ============================================================================= +# Max dimension for input images (reject larger) +export MAX_IMAGE_DIMENSION="${MAX_IMAGE_DIMENSION:-4096}" +# Downscale large images to this size before processing (det_size handles rest) +export TARGET_MAX_DIMENSION="${TARGET_MAX_DIMENSION:-2048}" + +# ============================================================================= +# Server Configuration +# ============================================================================= WORKERS="${UVICORN_WORKERS:-$DEFAULT_WORKERS}" -# Match genealog-api FACE_SERVICE_URL: http://host.docker.internal:18081 PORT="${PORT:-18081}" -echo "Starting face service on port ${PORT} with ${WORKERS} workers (CPU cores: ${CPU_CORES})" +echo "Starting face service on port ${PORT} with ${WORKERS} workers" +echo " Model: ${FACE_MODEL_NAME}, det_size: ${FACE_DET_SIZE}, TensorRT: ${USE_TENSORRT}" +echo " Thread pool: ${INFERENCE_THREADS} workers" +echo " CPU cores: ${CPU_CORES}" exec uvicorn app.main:app \ --host 0.0.0.0 \