"""Face analysis and embedding logic using InsightFace.""" import asyncio import logging from concurrent.futures import ThreadPoolExecutor import numpy as np from insightface.app import FaceAnalysis from app.config import DET_SIZE, MODEL_NAME, USE_TENSORRT from app.models import BBox logger = logging.getLogger("face_service") face_app: FaceAnalysis | None = None def _check_tensorrt_available() -> bool: """Check if TensorRT libraries are actually installed.""" try: import tensorrt return True except ImportError: return False def _get_providers() -> list: """Get ONNX Runtime execution providers based on configuration.""" import onnxruntime as ort available_providers = ort.get_available_providers() logger.info(f"Available ONNX providers: {available_providers}") providers = [] # Try TensorRT first if enabled and libraries are actually installed if USE_TENSORRT and "TensorrtExecutionProvider" in available_providers: if _check_tensorrt_available(): providers.append( ( "TensorrtExecutionProvider", { "trt_max_workspace_size": 2 * 1024 * 1024 * 1024, # 2GB "trt_fp16_enable": True, # FP16 for faster inference "trt_engine_cache_enable": True, # Cache TensorRT engines }, ) ) else: logger.warning("TensorRT requested but libnvinfer.so.10 not found, skipping") # CUDA fallback if "CUDAExecutionProvider" in available_providers: providers.append("CUDAExecutionProvider") # CPU fallback (always available) providers.append("CPUExecutionProvider") logger.info(f"Using providers: {[p[0] if isinstance(p, tuple) else p for p in providers]}") return providers def load_face_app() -> FaceAnalysis: """Load and initialize the FaceAnalysis model (singleton).""" global face_app if face_app is not None: return face_app providers = _get_providers() logger.info( f"Loading InsightFace model pack={MODEL_NAME}, det_size={DET_SIZE}, " f"tensorrt={USE_TENSORRT}" ) fa = FaceAnalysis( name=MODEL_NAME, providers=providers, ) fa.prepare(ctx_id=0, det_size=(DET_SIZE, DET_SIZE)) face_app = fa logger.info("FaceAnalysis initialized") return face_app async def get_faces_async( fa: FaceAnalysis, img: np.ndarray, executor: ThreadPoolExecutor, ) -> list: """Run face detection/embedding in thread pool to not block event loop.""" loop = asyncio.get_running_loop() return await loop.run_in_executor(executor, fa.get, img) def to_pixel_bbox(bbox, width: int, height: int) -> BBox: """Convert InsightFace bbox to pixel BBox with coordinate clamping.""" x1, y1, x2, y2 = bbox # Clamp coordinates to image bounds x1 = max(0, min(int(x1), width)) y1 = max(0, min(int(y1), height)) x2 = max(0, min(int(x2), width)) y2 = max(0, min(int(y2), height)) w = max(x2 - x1, 1) h = max(y2 - y1, 1) return BBox(x=x1, y=y1, w=w, h=h) def validate_embedding(embedding: np.ndarray) -> bool: """Check if embedding contains valid values (no NaN or Inf).""" return bool(np.isfinite(embedding).all()) def normalize_embedding(embedding: np.ndarray) -> np.ndarray: """Normalize embedding vector to unit length.""" emb = embedding.astype(np.float32) norm = float(np.linalg.norm(emb)) if norm > 0.0: emb = emb / norm return emb def fallback_avatar_embedding( fa: FaceAnalysis, img: np.ndarray, width: int, height: int, ) -> tuple[list[float], BBox, float] | None: """ Generate embedding from center crop when no face is detected. This fallback uses the recognition model directly on a center square crop, useful for avatar images where the face might not be detected. """ rec_model = getattr(fa, "models", {}).get("recognition") if rec_model is None: logger.warning("embed_avatar_fallback: recognition model is not available") return None side = min(height, width) if side <= 0: logger.warning( "embed_avatar_fallback: invalid image size width=%d height=%d", width, height, ) return None cy, cx = height // 2, width // 2 x1 = max(cx - side // 2, 0) y1 = max(cy - side // 2, 0) x2 = min(x1 + side, width) y2 = min(y1 + side, height) crop = img[y1:y2, x1:x2] if crop.size == 0: logger.warning("embed_avatar_fallback: empty crop region") return None try: import cv2 target_size = getattr(rec_model, "input_size", None) if not target_size: logger.warning("embed_avatar_fallback: recognition model has no input_size") return None face_img = cv2.resize(crop, target_size) except Exception as e: logger.exception("embed_avatar_fallback: failed to resize crop: %s", e) return None try: feat = rec_model.get_feat(face_img)[0] except Exception as e: logger.exception("embed_avatar_fallback: get_feat failed: %s", e) return None emb = normalize_embedding(feat) if not validate_embedding(emb): logger.warning("embed_avatar_fallback: embedding contains NaN/Inf values") return None bbox = BBox( x=int(x1), y=int(y1), w=int(x2 - x1), h=int(y2 - y1), ) score = 0.0 logger.info( "embed_avatar_fallback: generated embedding bbox=(%d,%d,%d,%d) score=%.4f len=%d", bbox.x, bbox.y, bbox.w, bbox.h, score, len(emb), ) return emb.tolist(), bbox, score