"""Benchmark UI for face comparison."""
import asyncio
import base64
import logging
from io import BytesIO
import cv2
import numpy as np
from fastapi import APIRouter, File, UploadFile
from fastapi.responses import HTMLResponse
from app.face import face_area, get_faces_async, validate_embedding
from app.image import read_upload_image
from app.resources import inference_executor
logger = logging.getLogger("face_service")
router = APIRouter(prefix="/benchmark", tags=["benchmark"])
def cosine_similarity(emb1: np.ndarray, emb2: np.ndarray) -> float:
"""Compute cosine similarity between two embeddings."""
# Embeddings are already normalized, so dot product = cosine similarity
return float(np.dot(emb1, emb2))
def draw_faces_on_image(img: np.ndarray, faces: list, face_indices: list[int]) -> np.ndarray:
"""Draw bounding boxes and indices on image."""
img_copy = img.copy()
for idx, face in zip(face_indices, faces):
bbox = face.bbox.astype(int)
x1, y1, x2, y2 = bbox
# Draw rectangle
cv2.rectangle(img_copy, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw index label
label = f"#{idx}"
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
cv2.rectangle(img_copy, (x1, y1 - th - 10), (x1 + tw + 10, y1), (0, 255, 0), -1)
cv2.putText(img_copy, label, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
return img_copy
def encode_image_to_base64(img: np.ndarray, max_dim: int = 800) -> str:
"""Encode image to base64 for display in HTML, resizing if needed."""
h, w = img.shape[:2]
if max(h, w) > max_dim:
scale = max_dim / max(h, w)
img = cv2.resize(img, (int(w * scale), int(h * scale)))
# Convert BGR to RGB for proper display
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
from PIL import Image
pil_img = Image.fromarray(img_rgb)
buffer = BytesIO()
pil_img.save(buffer, format="JPEG", quality=85)
return base64.b64encode(buffer.getvalue()).decode()
def crop_face(img: np.ndarray, bbox: np.ndarray, padding: float = 0.2) -> np.ndarray:
"""Crop face from image with padding."""
h, w = img.shape[:2]
x1, y1, x2, y2 = bbox.astype(int)
# Add padding
face_w = x2 - x1
face_h = y2 - y1
pad_x = int(face_w * padding)
pad_y = int(face_h * padding)
x1 = max(0, x1 - pad_x)
y1 = max(0, y1 - pad_y)
x2 = min(w, x2 + pad_x)
y2 = min(h, y2 + pad_y)
return img[y1:y2, x1:x2]
@router.get("/", response_class=HTMLResponse)
async def benchmark_ui():
"""Serve the benchmark UI."""
return """
Face Benchmark
Face Comparison Benchmark
"""
@router.post("/compare")
async def compare_faces(
image1: UploadFile = File(...),
image2: UploadFile = File(...),
):
"""
Compare faces between two uploaded images.
Returns:
- Annotated images with face bounding boxes
- Similarity matrix between all detected faces
- Best matches for each face in image1
"""
logger.info("benchmark/compare: image1=%s image2=%s", image1.filename, image2.filename)
# Read both images
data1 = await image1.read()
data2 = await image2.read()
img1 = read_upload_image(data1, image1.filename or "image1")
img2 = read_upload_image(data2, image2.filename or "image2")
# Detect faces in both images concurrently
faces1, faces2 = await asyncio.gather(
get_faces_async(img1, inference_executor),
get_faces_async(img2, inference_executor),
)
logger.info(
"benchmark/compare: detected %d faces in image1, %d faces in image2",
len(faces1), len(faces2)
)
# Sort faces by area (largest first)
faces1.sort(key=face_area, reverse=True)
faces2.sort(key=face_area, reverse=True)
# Draw faces on images
face1_indices = list(range(len(faces1)))
face2_indices = list(range(len(faces2)))
img1_annotated = draw_faces_on_image(img1, faces1, face1_indices)
img2_annotated = draw_faces_on_image(img2, faces2, face2_indices)
# Encode images for response
img1_b64 = encode_image_to_base64(img1_annotated)
img2_b64 = encode_image_to_base64(img2_annotated)
# Compute similarity matrix
similarity_matrix: list[list[float]] = []
best_matches: list[dict] = []
for i, f1 in enumerate(faces1):
emb1 = f1.normed_embedding.astype(np.float32)
if not validate_embedding(emb1):
similarity_matrix.append([0.0] * len(faces2))
continue
row = []
best_sim = -1.0
best_j = -1
for j, f2 in enumerate(faces2):
emb2 = f2.normed_embedding.astype(np.float32)
if not validate_embedding(emb2):
row.append(0.0)
continue
sim = cosine_similarity(emb1, emb2)
row.append(sim)
if sim > best_sim:
best_sim = sim
best_j = j
similarity_matrix.append(row)
if best_j >= 0:
best_matches.append({
"face1_idx": i,
"face2_idx": best_j,
"similarity": best_sim,
})
# Sort best matches by similarity and keep top 3
best_matches.sort(key=lambda m: m["similarity"], reverse=True)
best_matches = best_matches[:3]
# Add cropped face images for top 3 matches
for match in best_matches:
i, j = match["face1_idx"], match["face2_idx"]
crop1 = crop_face(img1, faces1[i].bbox)
crop2 = crop_face(img2, faces2[j].bbox)
match["face1_crop"] = encode_image_to_base64(crop1, max_dim=150)
match["face2_crop"] = encode_image_to_base64(crop2, max_dim=150)
return {
"image1_faces": len(faces1),
"image2_faces": len(faces2),
"image1_annotated": img1_b64,
"image2_annotated": img2_b64,
"similarity_matrix": similarity_matrix,
"similarities": [
{"face1": i, "face2": j, "score": similarity_matrix[i][j]}
for i in range(len(faces1))
for j in range(len(faces2))
],
"best_matches": best_matches,
}