Initial commit: Face embedding service with modular structure

- FastAPI-based face embedding service using InsightFace
- Modular package structure (app/config, app/models, app/face, app/image, app/routes)
- Async HTTP downloads with retry logic (httpx + tenacity)
- Image size/dimension limits and decompression bomb protection
- Comprehensive image decoding with color mode handling
- BBox coordinate clamping and embedding validation
- Production-ready structure with TODOs for security features
This commit is contained in:
Hung Luu 2025-12-01 11:38:50 +07:00
commit d34b27ecc5
12 changed files with 807 additions and 0 deletions

35
.gitignore vendored Normal file
View File

@ -0,0 +1,35 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# Virtual environments
.venv/
venv/
ENV/
# Environment variables
.env
.env.local
# Logs
*.log
# IDE
.vscode/
.idea/
*.swp
*.swo
# InsightFace models (usually in ~/.insightface, but just in case)
models/
*.onnx
# OS files
.DS_Store
Thumbs.db
# Test/temp files
*.tmp
*.bak

101
README.md Normal file
View File

@ -0,0 +1,101 @@
## Genealog Face Service
FastAPI-based face embedding and matching microservice using InsightFace + ONNX Runtime GPU. This service is designed to be called from the `genealog-api` backend via HTTP.
### Endpoints
- `GET /healthz` basic health check and model info.
- `POST /embed-avatar` JSON body: `{ "image_url": "https://..." }`, returns a single best face embedding for an avatar image.
- `POST /embed-image` JSON body: `{ "image_url": "https://..." }`, returns all detected faces and embeddings.
- `POST /test-avatar` multipart form with fields:
- `tag`: string tag for logging / correlation
- `avatar`: avatar image file (face to match)
- `image`: target image file (search space)
All embeddings are normalized float vectors suitable for cosine-similarity comparison.
`/embed-avatar` notes:
- Images are decoded with Pillow and EXIF orientation is applied (e.g. iPhone photos) before running detection.
- If no face is detected, the service will fall back to a center square crop and run the recognition model directly to still produce an embedding. In this case, the `score` field will be `0.0` and `bbox` is the used crop.
### Installation (WSL2, Python venv)
From `/home/hung/genealog-face`:
```bash
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
GPU support assumes:
- WSL2 with GPU enabled.
- NVIDIA drivers installed on Windows.
- `nvidia-smi` works inside WSL.
The service uses `insightface` with `CUDAExecutionProvider` first, falling back to CPU if needed.
### Running the service
Use the helper script (recommended):
```bash
cd /home/hung/genealog-face
./run_face_service.sh
```
Defaults:
- Host: `0.0.0.0`
- Port: `18081`
- Model: `buffalo_l`
- Detection size: `1024`
- Workers: `nproc` (all CPU cores detected)
You can override via environment variables:
```bash
PORT=18081 \
FACE_MODEL_NAME=buffalo_l \
FACE_DET_SIZE=1024 \
UVICORN_WORKERS=20 \
./run_face_service.sh
```
To run in the background:
```bash
nohup ./run_face_service.sh > face_service.log 2>&1 &
```
Logs are written to `face_service.log` in the repo root.
### Integration with genealog-api (Docker)
The `genealog-api` service expects this face service to be reachable at:
- `FACE_SERVICE_URL: http://host.docker.internal:18081`
You only need to ensure the service is running in WSL on port `18081` before starting the Docker stack.
### Autostart on Windows reboot (via WSL2)
You can have Windows start this service automatically at logon using Task Scheduler:
1. Open **Task Scheduler****Create Task…**.
2. **General** tab:
- Name: `GenealogFaceService`.
- Configure to run for your Windows user.
3. **Triggers** tab:
- New → Begin the task: **At log on**.
4. **Actions** tab:
- Program/script: `wsl.exe`
- Arguments:
```text
-d Ubuntu -- bash -lc "cd /home/hung/genealog-face && nohup ./run_face_service.sh >> face_service.log 2>&1"
```
5. Save the task (provide credentials if prompted).
After this, logging into Windows will start WSL and launch the face service in the background, ready to be used by `genealog-api`.

2
app/__init__.py Normal file
View File

@ -0,0 +1,2 @@
"""Face Service - Face embedding microservice using InsightFace."""

27
app/config.py Normal file
View File

@ -0,0 +1,27 @@
"""Configuration and environment variables."""
import os
# Model configuration
MODEL_NAME = os.getenv("FACE_MODEL_NAME", "buffalo_l")
DET_SIZE = int(os.getenv("FACE_DET_SIZE", "1024"))
# Image processing limits
MAX_DOWNLOAD_SIZE = int(os.getenv("MAX_DOWNLOAD_SIZE", 20 * 1024 * 1024)) # 20MB
MAX_IMAGE_DIMENSION = int(os.getenv("MAX_IMAGE_DIMENSION", 8192)) # 8192px
MIN_IMAGE_DIMENSION = int(os.getenv("MIN_IMAGE_DIMENSION", 32)) # 32px
# HTTP client settings
DOWNLOAD_TIMEOUT = float(os.getenv("DOWNLOAD_TIMEOUT", 15.0)) # 15 seconds
MAX_RETRIES = int(os.getenv("MAX_RETRIES", 3))
# TODO [PROD]: Add URL allowlist for SSRF protection
# ALLOWED_URL_PATTERNS = os.getenv("ALLOWED_URL_PATTERNS", "").split(",")
# TODO [PROD]: Add API key authentication
# API_KEY = os.getenv("API_KEY", "")
# TODO [PROD]: Add rate limiting configuration
# RATE_LIMIT_REQUESTS = int(os.getenv("RATE_LIMIT_REQUESTS", 100))
# RATE_LIMIT_WINDOW = int(os.getenv("RATE_LIMIT_WINDOW", 60))

142
app/face.py Normal file
View File

@ -0,0 +1,142 @@
"""Face analysis and embedding logic using InsightFace."""
import logging
import numpy as np
from insightface.app import FaceAnalysis
from app.config import DET_SIZE, MODEL_NAME
from app.models import BBox
logger = logging.getLogger("face_service")
face_app: FaceAnalysis | None = None
def load_face_app() -> FaceAnalysis:
"""Load and initialize the FaceAnalysis model (singleton)."""
global face_app
if face_app is not None:
return face_app
logger.info(f"Loading InsightFace model pack={MODEL_NAME}, det_size={DET_SIZE}")
fa = FaceAnalysis(
name=MODEL_NAME,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
fa.prepare(ctx_id=0, det_size=(DET_SIZE, DET_SIZE))
face_app = fa
logger.info("FaceAnalysis initialized")
return face_app
def to_pixel_bbox(bbox, width: int, height: int) -> BBox:
"""Convert InsightFace bbox to pixel BBox with coordinate clamping."""
x1, y1, x2, y2 = bbox
# Clamp coordinates to image bounds
x1 = max(0, min(int(x1), width))
y1 = max(0, min(int(y1), height))
x2 = max(0, min(int(x2), width))
y2 = max(0, min(int(y2), height))
w = max(x2 - x1, 1)
h = max(y2 - y1, 1)
return BBox(x=x1, y=y1, w=w, h=h)
def validate_embedding(embedding: np.ndarray) -> bool:
"""Check if embedding contains valid values (no NaN or Inf)."""
return bool(np.isfinite(embedding).all())
def normalize_embedding(embedding: np.ndarray) -> np.ndarray:
"""Normalize embedding vector to unit length."""
emb = embedding.astype(np.float32)
norm = float(np.linalg.norm(emb))
if norm > 0.0:
emb = emb / norm
return emb
def fallback_avatar_embedding(
fa: FaceAnalysis,
img: np.ndarray,
width: int,
height: int,
) -> tuple[list[float], BBox, float] | None:
"""
Generate embedding from center crop when no face is detected.
This fallback uses the recognition model directly on a center square crop,
useful for avatar images where the face might not be detected.
"""
rec_model = getattr(fa, "models", {}).get("recognition")
if rec_model is None:
logger.warning("embed_avatar_fallback: recognition model is not available")
return None
side = min(height, width)
if side <= 0:
logger.warning(
"embed_avatar_fallback: invalid image size width=%d height=%d",
width,
height,
)
return None
cy, cx = height // 2, width // 2
x1 = max(cx - side // 2, 0)
y1 = max(cy - side // 2, 0)
x2 = min(x1 + side, width)
y2 = min(y1 + side, height)
crop = img[y1:y2, x1:x2]
if crop.size == 0:
logger.warning("embed_avatar_fallback: empty crop region")
return None
try:
import cv2
target_size = getattr(rec_model, "input_size", None)
if not target_size:
logger.warning("embed_avatar_fallback: recognition model has no input_size")
return None
face_img = cv2.resize(crop, target_size)
except Exception as e:
logger.exception("embed_avatar_fallback: failed to resize crop: %s", e)
return None
try:
feat = rec_model.get_feat(face_img)[0]
except Exception as e:
logger.exception("embed_avatar_fallback: get_feat failed: %s", e)
return None
emb = normalize_embedding(feat)
if not validate_embedding(emb):
logger.warning("embed_avatar_fallback: embedding contains NaN/Inf values")
return None
bbox = BBox(
x=int(x1),
y=int(y1),
w=int(x2 - x1),
h=int(y2 - y1),
)
score = 0.0
logger.info(
"embed_avatar_fallback: generated embedding bbox=(%d,%d,%d,%d) score=%.4f len=%d",
bbox.x,
bbox.y,
bbox.w,
bbox.h,
score,
len(emb),
)
return emb.tolist(), bbox, score

221
app/image.py Normal file
View File

@ -0,0 +1,221 @@
"""Image download, decoding, and validation utilities."""
import logging
from io import BytesIO
import cv2
import httpx
import numpy as np
from fastapi import HTTPException
from PIL import Image, ImageOps
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from app.config import (
DOWNLOAD_TIMEOUT,
MAX_DOWNLOAD_SIZE,
MAX_IMAGE_DIMENSION,
MAX_RETRIES,
MIN_IMAGE_DIMENSION,
)
logger = logging.getLogger("face_service")
# Set Pillow's decompression bomb limit
Image.MAX_IMAGE_PIXELS = MAX_IMAGE_DIMENSION * MAX_IMAGE_DIMENSION
class ImageDownloadError(Exception):
"""Raised when image download fails."""
pass
class ImageDecodeError(Exception):
"""Raised when image decoding fails."""
pass
class ImageValidationError(Exception):
"""Raised when image validation fails."""
pass
def _decode_image_bytes(data: bytes, source: str) -> np.ndarray:
"""
Decode image bytes to BGR numpy array.
Handles:
- EXIF orientation correction
- All color modes (RGB, RGBA, L, LA, PA, CMYK, I, F)
- Truncated/corrupted image detection
- Dimension validation
"""
try:
pil_image = Image.open(BytesIO(data))
except Exception as e:
logger.exception("Could not open image from %s", source)
raise ImageDecodeError(f"Could not decode image: {e}")
# Force load to detect truncated/corrupted images
try:
pil_image.load()
except Exception as e:
logger.exception("Image data is corrupted or truncated from %s", source)
raise ImageDecodeError(f"Image data is corrupted or truncated: {e}")
# Apply EXIF orientation
try:
pil_image = ImageOps.exif_transpose(pil_image)
except Exception:
logger.warning("Failed to apply EXIF orientation for %s", source)
# Validate dimensions
width, height = pil_image.size
if width < MIN_IMAGE_DIMENSION or height < MIN_IMAGE_DIMENSION:
raise ImageValidationError(
f"Image too small: {width}x{height}, minimum is {MIN_IMAGE_DIMENSION}x{MIN_IMAGE_DIMENSION}"
)
if width > MAX_IMAGE_DIMENSION or height > MAX_IMAGE_DIMENSION:
raise ImageValidationError(
f"Image too large: {width}x{height}, maximum is {MAX_IMAGE_DIMENSION}x{MAX_IMAGE_DIMENSION}"
)
# Convert to RGB, handling all color modes
mode = pil_image.mode
if mode in ("RGBA", "LA", "PA"):
# Has alpha channel - composite on white background
background = Image.new("RGB", pil_image.size, (255, 255, 255))
if mode == "LA":
pil_image = pil_image.convert("RGBA")
elif mode == "PA":
pil_image = pil_image.convert("RGBA")
background.paste(pil_image, mask=pil_image.split()[-1])
pil_image = background
elif mode == "CMYK":
pil_image = pil_image.convert("RGB")
elif mode in ("I", "F"):
# 16-bit or floating point - normalize to 8-bit
arr = np.array(pil_image)
if mode == "F":
arr = (arr * 255).clip(0, 255).astype(np.uint8)
else:
arr = (arr / 256).clip(0, 255).astype(np.uint8)
pil_image = Image.fromarray(arr, mode="L").convert("RGB")
elif mode == "L":
pil_image = pil_image.convert("RGB")
elif mode != "RGB":
pil_image = pil_image.convert("RGB")
# Convert to BGR for OpenCV/InsightFace
img = np.array(pil_image)
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
logger.info(
"decode_image_bytes: source=%s shape=%s mode=%s",
source,
img.shape,
mode,
)
return img
@retry(
retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError)),
stop=stop_after_attempt(MAX_RETRIES),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True,
)
async def _download_with_retry(client: httpx.AsyncClient, url: str) -> bytes:
"""Download image with retry logic for transient failures."""
# TODO [PROD]: Add URL validation for SSRF protection
# - Block internal IPs (10.x, 172.16-31.x, 192.168.x, 127.x, 169.254.x)
# - Block cloud metadata endpoints
# - Validate against allowlist if configured
logger.info("download_image: url=%s", url) # TODO [PROD]: Redact query params
response = await client.get(url, follow_redirects=True)
response.raise_for_status()
# Check content length if provided
content_length = response.headers.get("content-length")
if content_length and int(content_length) > MAX_DOWNLOAD_SIZE:
raise ImageDownloadError(
f"Image too large: {int(content_length)} bytes, maximum is {MAX_DOWNLOAD_SIZE} bytes"
)
# Read content and check actual size
content = response.content
if len(content) > MAX_DOWNLOAD_SIZE:
raise ImageDownloadError(
f"Image too large: {len(content)} bytes, maximum is {MAX_DOWNLOAD_SIZE} bytes"
)
return content
async def download_image(image_url: str) -> np.ndarray:
"""
Download and decode an image from URL.
Features:
- Async HTTP with connection pooling
- Retry with exponential backoff for transient failures
- Size validation before and after download
- Comprehensive image decoding
"""
try:
async with httpx.AsyncClient(timeout=DOWNLOAD_TIMEOUT) as client:
data = await _download_with_retry(client, image_url)
except httpx.TimeoutException:
logger.exception("Timeout downloading image")
raise HTTPException(status_code=408, detail="Timeout downloading image")
except httpx.HTTPStatusError as e:
logger.exception("HTTP error downloading image")
raise HTTPException(
status_code=400,
detail=f"Failed to download image: HTTP {e.response.status_code}"
)
except ImageDownloadError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.exception("Failed to download image")
raise HTTPException(status_code=400, detail=f"Failed to download image: {e}")
try:
img = _decode_image_bytes(data, image_url)
except (ImageDecodeError, ImageValidationError) as e:
raise HTTPException(status_code=400, detail=str(e))
logger.info(
"download_image: success url=%s shape=%s",
image_url,
img.shape,
)
return img
def read_upload_image(data: bytes, filename: str) -> np.ndarray:
"""
Decode an uploaded image file.
Args:
data: Raw image bytes
filename: Original filename for logging
Returns:
BGR numpy array
"""
try:
img = _decode_image_bytes(data, filename or "<upload>")
except (ImageDecodeError, ImageValidationError) as e:
raise HTTPException(status_code=400, detail=str(e))
if img is None:
raise HTTPException(status_code=400, detail="Could not decode uploaded image")
return img

44
app/main.py Normal file
View File

@ -0,0 +1,44 @@
"""FastAPI application entry point."""
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.config import DET_SIZE, MODEL_NAME
from app.face import load_face_app
from app.routes import embed
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("face_service")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler - load models on startup."""
logger.info("Starting face service...")
load_face_app()
logger.info("Face service ready")
yield
logger.info("Shutting down face service...")
app = FastAPI(
title="Face Service",
version="1.0.0",
lifespan=lifespan,
)
# Include routers
app.include_router(embed.router)
@app.get("/healthz")
def healthz():
"""Health check endpoint."""
return {
"status": "ok",
"model": MODEL_NAME,
"det_size": DET_SIZE,
}

40
app/models.py Normal file
View File

@ -0,0 +1,40 @@
"""Pydantic models for request/response schemas."""
from typing import List
from pydantic import BaseModel, HttpUrl
class EmbedRequest(BaseModel):
image_url: HttpUrl
class BBox(BaseModel):
x: int
y: int
w: int
h: int
class FaceEmbedding(BaseModel):
bbox: BBox
score: float
embedding: List[float]
class EmbedAvatarResponse(BaseModel):
embedding: List[float]
bbox: BBox
score: float
class EmbedImageResponse(BaseModel):
faces: List[FaceEmbedding]
class MatchResult(BaseModel):
tag: str
found: bool
score: float | None = None
bbox: BBox | None = None

2
app/routes/__init__.py Normal file
View File

@ -0,0 +1,2 @@
"""API route modules."""

148
app/routes/embed.py Normal file
View File

@ -0,0 +1,148 @@
"""Face embedding endpoints."""
import logging
from typing import List
import numpy as np
from fastapi import APIRouter, HTTPException
from app.face import (
fallback_avatar_embedding,
load_face_app,
to_pixel_bbox,
validate_embedding,
)
from app.image import download_image
from app.models import (
BBox,
EmbedAvatarResponse,
EmbedImageResponse,
EmbedRequest,
FaceEmbedding,
)
logger = logging.getLogger("face_service")
router = APIRouter()
@router.post("/embed-avatar", response_model=EmbedAvatarResponse)
async def embed_avatar(req: EmbedRequest):
"""
Extract face embedding from an avatar image.
Returns the largest detected face. If no face is detected,
falls back to center crop embedding with score=0.0.
"""
logger.info("embed_avatar: image_url=%s", req.image_url)
fa = load_face_app()
img = await download_image(str(req.image_url))
h, w = img.shape[:2]
faces = fa.get(img)
if len(faces) == 0:
logger.warning(
"embed_avatar: no faces detected image_url=%s size=%dx%d, using fallback",
req.image_url,
w,
h,
)
fallback = fallback_avatar_embedding(fa, img, w, h)
if fallback is None:
raise HTTPException(
status_code=422,
detail="No face detected in avatar image",
)
emb, bbox, score = fallback
logger.info(
"embed_avatar: using fallback bbox=%s score=%.4f embedding_len=%d",
bbox,
score,
len(emb),
)
return EmbedAvatarResponse(embedding=emb, bbox=bbox, score=score)
# Sort by face area (largest first)
faces.sort(
key=lambda f: (f.bbox[2] - f.bbox[0]) * (f.bbox[3] - f.bbox[1]),
reverse=True,
)
face = faces[0]
emb = face.normed_embedding.astype(np.float32)
# Validate embedding
if not validate_embedding(emb):
logger.error("embed_avatar: embedding contains NaN/Inf values")
raise HTTPException(
status_code=422,
detail="Failed to generate valid face embedding",
)
emb_list = emb.tolist()
bbox = to_pixel_bbox(face.bbox, w, h)
score = float(getattr(face, "det_score", 1.0))
logger.info(
"embed_avatar: using face bbox=%s score=%.4f embedding_len=%d",
face.bbox,
score,
len(emb_list),
)
return EmbedAvatarResponse(embedding=emb_list, bbox=bbox, score=score)
@router.post("/embed-image", response_model=EmbedImageResponse)
async def embed_image(req: EmbedRequest):
"""
Extract face embeddings from all faces in an image.
Returns all detected faces sorted by detection score (highest first).
Returns empty list if no faces detected.
"""
fa = load_face_app()
img = await download_image(str(req.image_url))
h, w = img.shape[:2]
faces = fa.get(img)
if len(faces) == 0:
logger.warning(
"embed_image: no faces detected image_url=%s size=%dx%d",
req.image_url,
w,
h,
)
return EmbedImageResponse(faces=[])
logger.info(
"embed_image: detected %d faces image_url=%s size=%dx%d",
len(faces),
req.image_url,
w,
h,
)
# Sort by detection score (highest first)
faces.sort(
key=lambda f: float(getattr(f, "det_score", 1.0)),
reverse=True,
)
result: List[FaceEmbedding] = []
for f in faces:
emb = f.normed_embedding.astype(np.float32)
# Skip faces with invalid embeddings
if not validate_embedding(emb):
logger.warning("embed_image: skipping face with NaN/Inf embedding")
continue
emb_list = emb.tolist()
bbox = to_pixel_bbox(f.bbox, w, h)
score = float(getattr(f, "det_score", 1.0))
result.append(FaceEmbedding(bbox=bbox, score=score, embedding=emb_list))
return EmbedImageResponse(faces=result)

10
requirements.txt Normal file
View File

@ -0,0 +1,10 @@
fastapi
uvicorn[standard]
insightface>=0.7.3
onnxruntime-gpu
opencv-python-headless
numpy
httpx
tenacity
pydantic
Pillow

35
run_face_service.sh Executable file
View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
set -euo pipefail
# Optional: activate local virtualenv if present
if [ -d ".venv" ]; then
# shellcheck disable=SC1091
source ".venv/bin/activate"
fi
# Model configuration (can be overridden via env)
export FACE_MODEL_NAME="${FACE_MODEL_NAME:-buffalo_l}"
export FACE_DET_SIZE="${FACE_DET_SIZE:-1024}"
# Tune CPU thread usage and workers
CPU_CORES="$(nproc || echo 4)"
DEFAULT_WORKERS="${CPU_CORES}"
if [ "$DEFAULT_WORKERS" -lt 2 ]; then
DEFAULT_WORKERS=2
fi
export OMP_NUM_THREADS="${OMP_NUM_THREADS:-2}"
export MKL_NUM_THREADS="${MKL_NUM_THREADS:-2}"
WORKERS="${UVICORN_WORKERS:-$DEFAULT_WORKERS}"
# Match genealog-api FACE_SERVICE_URL: http://host.docker.internal:18081
PORT="${PORT:-18081}"
echo "Starting face service on port ${PORT} with ${WORKERS} workers (CPU cores: ${CPU_CORES})"
exec uvicorn app.main:app \
--host 0.0.0.0 \
--port "${PORT}" \
--workers "${WORKERS}" \
--loop uvloop \
--http httptools