"""Benchmark UI for face comparison.""" import asyncio import base64 import logging from io import BytesIO import cv2 import numpy as np from fastapi import APIRouter, File, UploadFile from fastapi.responses import HTMLResponse from app.face import face_area, get_faces_async, validate_embedding from app.image import read_upload_image from app.resources import inference_executor logger = logging.getLogger("face_service") router = APIRouter(prefix="/benchmark", tags=["benchmark"]) def cosine_similarity(emb1: np.ndarray, emb2: np.ndarray) -> float: """Compute cosine similarity between two embeddings.""" # Embeddings are already normalized, so dot product = cosine similarity return float(np.dot(emb1, emb2)) def draw_faces_on_image(img: np.ndarray, faces: list, face_indices: list[int]) -> np.ndarray: """Draw bounding boxes and indices on image.""" img_copy = img.copy() for idx, face in zip(face_indices, faces): bbox = face.bbox.astype(int) x1, y1, x2, y2 = bbox # Draw rectangle cv2.rectangle(img_copy, (x1, y1), (x2, y2), (0, 255, 0), 2) # Draw index label label = f"#{idx}" (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2) cv2.rectangle(img_copy, (x1, y1 - th - 10), (x1 + tw + 10, y1), (0, 255, 0), -1) cv2.putText(img_copy, label, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2) return img_copy def encode_image_to_base64(img: np.ndarray, max_dim: int = 800) -> str: """Encode image to base64 for display in HTML, resizing if needed.""" h, w = img.shape[:2] if max(h, w) > max_dim: scale = max_dim / max(h, w) img = cv2.resize(img, (int(w * scale), int(h * scale))) # Convert BGR to RGB for proper display img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) from PIL import Image pil_img = Image.fromarray(img_rgb) buffer = BytesIO() pil_img.save(buffer, format="JPEG", quality=85) return base64.b64encode(buffer.getvalue()).decode() def crop_face(img: np.ndarray, bbox: np.ndarray, padding: float = 0.2) -> np.ndarray: """Crop face from image with padding.""" h, w = img.shape[:2] x1, y1, x2, y2 = bbox.astype(int) # Add padding face_w = x2 - x1 face_h = y2 - y1 pad_x = int(face_w * padding) pad_y = int(face_h * padding) x1 = max(0, x1 - pad_x) y1 = max(0, y1 - pad_y) x2 = min(w, x2 + pad_x) y2 = min(h, y2 + pad_y) return img[y1:y2, x1:x2] @router.get("/", response_class=HTMLResponse) async def benchmark_ui(): """Serve the benchmark UI.""" return """ Face Benchmark

Face Comparison Benchmark

Image 1 (Source)

Upload an image to find faces from

Image 2 (Target)

Upload an image to search for matching faces

""" @router.post("/compare") async def compare_faces( image1: UploadFile = File(...), image2: UploadFile = File(...), ): """ Compare faces between two uploaded images. Returns: - Annotated images with face bounding boxes - Similarity matrix between all detected faces - Best matches for each face in image1 """ logger.info("benchmark/compare: image1=%s image2=%s", image1.filename, image2.filename) # Read both images data1 = await image1.read() data2 = await image2.read() img1 = read_upload_image(data1, image1.filename or "image1") img2 = read_upload_image(data2, image2.filename or "image2") # Detect faces in both images concurrently faces1, faces2 = await asyncio.gather( get_faces_async(img1, inference_executor), get_faces_async(img2, inference_executor), ) logger.info( "benchmark/compare: detected %d faces in image1, %d faces in image2", len(faces1), len(faces2) ) # Sort faces by area (largest first) faces1.sort(key=face_area, reverse=True) faces2.sort(key=face_area, reverse=True) # Draw faces on images face1_indices = list(range(len(faces1))) face2_indices = list(range(len(faces2))) img1_annotated = draw_faces_on_image(img1, faces1, face1_indices) img2_annotated = draw_faces_on_image(img2, faces2, face2_indices) # Encode images for response img1_b64 = encode_image_to_base64(img1_annotated) img2_b64 = encode_image_to_base64(img2_annotated) # Compute similarity matrix similarity_matrix: list[list[float]] = [] best_matches: list[dict] = [] for i, f1 in enumerate(faces1): emb1 = f1.normed_embedding.astype(np.float32) if not validate_embedding(emb1): similarity_matrix.append([0.0] * len(faces2)) continue row = [] best_sim = -1.0 best_j = -1 for j, f2 in enumerate(faces2): emb2 = f2.normed_embedding.astype(np.float32) if not validate_embedding(emb2): row.append(0.0) continue sim = cosine_similarity(emb1, emb2) row.append(sim) if sim > best_sim: best_sim = sim best_j = j similarity_matrix.append(row) if best_j >= 0: best_matches.append({ "face1_idx": i, "face2_idx": best_j, "similarity": best_sim, }) # Sort best matches by similarity and keep top 3 best_matches.sort(key=lambda m: m["similarity"], reverse=True) best_matches = best_matches[:3] # Add cropped face images for top 3 matches for match in best_matches: i, j = match["face1_idx"], match["face2_idx"] crop1 = crop_face(img1, faces1[i].bbox) crop2 = crop_face(img2, faces2[j].bbox) match["face1_crop"] = encode_image_to_base64(crop1, max_dim=150) match["face2_crop"] = encode_image_to_base64(crop2, max_dim=150) return { "image1_faces": len(faces1), "image2_faces": len(faces2), "image1_annotated": img1_b64, "image2_annotated": img2_b64, "similarity_matrix": similarity_matrix, "similarities": [ {"face1": i, "face2": j, "score": similarity_matrix[i][j]} for i in range(len(faces1)) for j in range(len(faces2)) ], "best_matches": best_matches, }