Files
Deep-Live-Cam/modules/processors/frame/face_swapper.py
T
Max Buckley 646b0f816f Move hot-path imports to module scope
Address Sourcery review feedback: move face_align and get_one_face
imports from inside per-frame functions to module-level to avoid
repeated attribute lookup overhead in the processing loop.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 14:34:53 +02:00

1247 lines
56 KiB
Python

from typing import Any, List, Optional
import cv2
import insightface
from insightface.utils import face_align
import threading
import numpy as np
import platform
import modules.globals
import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face, get_many_faces, default_source_face
from modules.typing import Face, Frame
from modules.utilities import (
conditional_download,
is_image,
is_video,
)
from modules.cluster_analysis import find_closest_centroid
from modules.gpu_processing import gpu_gaussian_blur, gpu_sharpen, gpu_add_weighted, gpu_resize, gpu_cvt_color
import os
from collections import deque
import time
FACE_SWAPPER = None
THREAD_LOCK = threading.Lock()
NAME = "DLC.FACE-SWAPPER"
# --- START: Added for Interpolation ---
PREVIOUS_FRAME_RESULT = None # Stores the final processed frame from the previous step
# --- END: Added for Interpolation ---
# --- START: Mac M1-M5 Optimizations ---
IS_APPLE_SILICON = platform.system() == 'Darwin' and platform.machine() == 'arm64'
FRAME_CACHE = deque(maxlen=3) # Cache for frame reuse
FACE_DETECTION_CACHE = {} # Cache face detections
LAST_DETECTION_TIME = 0
DETECTION_INTERVAL = 0.033 # ~30 FPS detection rate for live mode
FRAME_SKIP_COUNTER = 0
ADAPTIVE_QUALITY = True
# --- END: Mac M1-M5 Optimizations ---
abs_dir = os.path.dirname(os.path.abspath(__file__))
models_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
)
def pre_check() -> bool:
# Use models_dir instead of abs_dir to save to the correct location
download_directory_path = models_dir
# Make sure the models directory exists, catch permission errors if they occur
try:
os.makedirs(download_directory_path, exist_ok=True)
except OSError as e:
logging.error(f"Failed to create directory {download_directory_path} due to permission error: {e}")
return False
# Use the direct download URL from Hugging Face (FP32 model for broad GPU compatibility)
conditional_download(
download_directory_path,
[
"https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128.onnx"
],
)
return True
def pre_start() -> bool:
# Check for either model variant
fp16_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
fp32_path = os.path.join(models_dir, "inswapper_128.onnx")
if not os.path.exists(fp16_path) and not os.path.exists(fp32_path):
update_status(f"Model not found in {models_dir}. Please download inswapper_128.onnx.", NAME)
return False
# Try to get the face swapper to ensure it loads correctly
if get_face_swapper() is None:
# Error message already printed within get_face_swapper
return False
return True
def get_face_swapper() -> Any:
global FACE_SWAPPER
with THREAD_LOCK:
if FACE_SWAPPER is None:
# Prefer FP32 for broad GPU compatibility (FP16 can produce NaN
# on GPUs without Tensor Cores, e.g. GTX 16xx). Fall back to
# FP16 when FP32 is not available.
fp32_path = os.path.join(models_dir, "inswapper_128.onnx")
fp16_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
if os.path.exists(fp32_path):
model_path = fp32_path
elif os.path.exists(fp16_path):
model_path = fp16_path
else:
update_status(f"No inswapper model found in {models_dir}.", NAME)
return None
update_status(f"Loading face swapper model from: {model_path}", NAME)
try:
# Optimized provider configuration for Apple Silicon
providers_config = []
for p in modules.globals.execution_providers:
if p == "CoreMLExecutionProvider" and IS_APPLE_SILICON:
# Enhanced CoreML configuration for M1-M5
providers_config.append((
"CoreMLExecutionProvider",
{
"ModelFormat": "MLProgram",
"MLComputeUnits": "ALL", # Use Neural Engine + GPU + CPU
"SpecializationStrategy": "FastPrediction",
"AllowLowPrecisionAccumulationOnGPU": 1,
"EnableOnSubgraphs": 1,
}
))
elif p == "CUDAExecutionProvider":
providers_config.append((
"CUDAExecutionProvider",
{
"arena_extend_strategy": "kSameAsRequested",
"cudnn_conv_algo_search": "EXHAUSTIVE",
"cudnn_conv_use_max_workspace": "1",
"do_copy_in_default_stream": "0",
}
))
else:
providers_config.append(p)
FACE_SWAPPER = insightface.model_zoo.get_model(
model_path,
providers=providers_config,
)
update_status("Face swapper model loaded successfully.", NAME)
except Exception as e:
update_status(f"Error loading face swapper model: {e}", NAME)
FACE_SWAPPER = None
return None
return FACE_SWAPPER
def _fast_paste_back(target_img: Frame, bgr_fake: np.ndarray, aimg: np.ndarray, M: np.ndarray) -> Frame:
"""Optimized paste-back that restricts blending to the face bounding box.
Same visual output as insightface's built-in paste_back, but:
- Skips dead fake_diff code (computed but unused in insightface)
- Runs erosion, blur, and blend on the face bbox instead of the full frame
"""
h, w = target_img.shape[:2]
IM = cv2.invertAffineTransform(M)
# Warp swapped face and mask to full frame (fast: ~0.4ms each)
bgr_fake_full = cv2.warpAffine(bgr_fake, IM, (w, h), borderValue=0.0)
img_white = np.full((aimg.shape[0], aimg.shape[1]), 255, dtype=np.float32)
img_white_full = cv2.warpAffine(img_white, IM, (w, h), borderValue=0.0)
# Find tight bounding box of the warped face mask
rows = np.any(img_white_full > 20, axis=1)
cols = np.any(img_white_full > 20, axis=0)
row_idx = np.where(rows)[0]
col_idx = np.where(cols)[0]
if len(row_idx) == 0 or len(col_idx) == 0:
return target_img
y1, y2 = row_idx[0], row_idx[-1]
x1, x2 = col_idx[0], col_idx[-1]
# Compute mask/blur kernel sizes from the full mask extent
mask_h = y2 - y1
mask_w = x2 - x1
mask_size = int(np.sqrt(mask_h * mask_w))
k_erode = max(mask_size // 10, 10)
k_blur = max(mask_size // 20, 5)
# Add padding for erosion + blur kernels, then crop
pad = k_erode + k_blur + 2
y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1)
x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1)
# Work on cropped region only
mask_crop = img_white_full[y1p:y2p, x1p:x2p]
mask_crop[mask_crop > 20] = 255
kernel = np.ones((k_erode, k_erode), np.uint8)
mask_crop = cv2.erode(mask_crop, kernel, iterations=1)
blur_size = tuple(2 * i + 1 for i in (k_blur, k_blur))
mask_crop = cv2.GaussianBlur(mask_crop, blur_size, 0)
mask_crop /= 255.0
# Blend only within the crop
mask_3d = mask_crop[:, :, np.newaxis]
fake_crop = bgr_fake_full[y1p:y2p, x1p:x2p].astype(np.float32)
target_crop = target_img[y1p:y2p, x1p:x2p].astype(np.float32)
blended = mask_3d * fake_crop + (1.0 - mask_3d) * target_crop
result = target_img.copy()
result[y1p:y2p, x1p:x2p] = np.clip(blended, 0, 255).astype(np.uint8)
return result
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
"""Optimized face swapping with better memory management and performance."""
face_swapper = get_face_swapper()
if face_swapper is None:
update_status("Face swapper model not loaded or failed to load. Skipping swap.", NAME)
return temp_frame
# Safety check for faces
if source_face is None or target_face is None:
return temp_frame
if not hasattr(source_face, 'normed_embedding') or source_face.normed_embedding is None:
return temp_frame
# Store a copy of the original frame before swapping for opacity blending and mouth mask
opacity = getattr(modules.globals, "opacity", 1.0)
opacity = max(0.0, min(1.0, opacity))
mouth_mask_enabled = getattr(modules.globals, "mouth_mask", False)
original_frame = temp_frame.copy() if (opacity < 1.0 or mouth_mask_enabled) else temp_frame
if temp_frame.dtype != np.uint8:
temp_frame = np.clip(temp_frame, 0, 255).astype(np.uint8)
try:
if not temp_frame.flags['C_CONTIGUOUS']:
temp_frame = np.ascontiguousarray(temp_frame)
# Use paste_back=False and our optimized paste-back
if any("DmlExecutionProvider" in p for p in modules.globals.execution_providers):
with modules.globals.dml_lock:
bgr_fake, M = face_swapper.get(
temp_frame, target_face, source_face, paste_back=False
)
else:
bgr_fake, M = face_swapper.get(
temp_frame, target_face, source_face, paste_back=False
)
if bgr_fake is None:
return original_frame
if not isinstance(bgr_fake, np.ndarray):
return original_frame
# Get the aligned input crop for the mask (same as insightface does internally)
aimg, _ = face_align.norm_crop2(temp_frame, target_face.kps, face_swapper.input_size[0])
swapped_frame = _fast_paste_back(temp_frame, bgr_fake, aimg, M)
swapped_frame = np.clip(swapped_frame, 0, 255).astype(np.uint8)
except Exception as e:
print(f"Error during face swap: {e}")
return original_frame
# --- Post-swap Processing (Masking, Opacity, etc.) ---
# Now, work with the guaranteed uint8 'swapped_frame'
if mouth_mask_enabled: # Check if mouth_mask is enabled
# Create a mask for the target face
face_mask = create_face_mask(target_face, original_frame) # Use original_frame for mask creation geometry
# Create the mouth mask using the ORIGINAL frame (before swap) for cutout
mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = (
create_lower_mouth_mask(target_face, original_frame) # Use original_frame for real mouth cutout
)
# Apply the mouth area only if mouth_cutout exists
if mouth_cutout is not None and mouth_box != (0,0,0,0):
# Apply mouth area (from original) onto the 'swapped_frame'
swapped_frame = apply_mouth_area(
swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon
)
# Draw bounding box only while slider is being dragged
if getattr(modules.globals, "show_mouth_mask_box", False):
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
swapped_frame = draw_mouth_mask_visualization(
swapped_frame, target_face, mouth_mask_data
)
# --- Poisson Blending ---
if getattr(modules.globals, "poisson_blend", False):
face_mask = create_face_mask(target_face, temp_frame)
if face_mask is not None:
# Find bounding box of the mask
y_indices, x_indices = np.where(face_mask > 0)
if len(x_indices) > 0 and len(y_indices) > 0:
x_min, x_max = np.min(x_indices), np.max(x_indices)
y_min, y_max = np.min(y_indices), np.max(y_indices)
# Calculate center
center = (int((x_min + x_max) / 2), int((y_min + y_max) / 2))
# Crop src and mask
src_crop = swapped_frame[y_min : y_max + 1, x_min : x_max + 1]
mask_crop = face_mask[y_min : y_max + 1, x_min : x_max + 1]
try:
# Use original_frame as destination to blend the swapped face onto it
swapped_frame = cv2.seamlessClone(
src_crop,
original_frame,
mask_crop,
center,
cv2.NORMAL_CLONE,
)
except Exception as e:
print(f"Poisson blending failed: {e}")
# Apply opacity blend between the original frame and the swapped frame
if opacity >= 1.0:
return swapped_frame.astype(np.uint8)
# Blend the original_frame with the (potentially mouth-masked) swapped_frame
final_swapped_frame = gpu_add_weighted(original_frame.astype(np.uint8), 1 - opacity, swapped_frame.astype(np.uint8), opacity, 0)
return final_swapped_frame.astype(np.uint8)
# --- START: Mac M1-M5 Optimized Face Detection ---
def get_faces_optimized(frame: Frame, use_cache: bool = True) -> Optional[List[Face]]:
"""Optimized face detection for live mode on Apple Silicon"""
global LAST_DETECTION_TIME, FACE_DETECTION_CACHE
if not use_cache or not IS_APPLE_SILICON:
# Standard detection
if modules.globals.many_faces:
return get_many_faces(frame)
else:
face = get_one_face(frame)
return [face] if face else None
# Adaptive detection rate for live mode
current_time = time.time()
time_since_last = current_time - LAST_DETECTION_TIME
# Skip detection if too soon (adaptive frame skipping)
if time_since_last < DETECTION_INTERVAL and FACE_DETECTION_CACHE:
return FACE_DETECTION_CACHE.get('faces')
# Perform detection
LAST_DETECTION_TIME = current_time
if modules.globals.many_faces:
faces = get_many_faces(frame)
else:
face = get_one_face(frame)
faces = [face] if face else None
# Cache results
FACE_DETECTION_CACHE['faces'] = faces
FACE_DETECTION_CACHE['timestamp'] = current_time
return faces
# --- END: Mac M1-M5 Optimized Face Detection ---
# --- START: Helper function for interpolation and sharpening ---
def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.ndarray]) -> Frame:
"""Applies sharpening and interpolation with Apple Silicon optimizations."""
global PREVIOUS_FRAME_RESULT
processed_frame = current_frame.copy()
# 1. Apply Sharpening (if enabled) with optimized kernel for Apple Silicon
sharpness_value = getattr(modules.globals, "sharpness", 0.0)
if sharpness_value > 0.0 and swapped_face_bboxes:
height, width = processed_frame.shape[:2]
for bbox in swapped_face_bboxes:
# Ensure bbox is iterable and has 4 elements
if not hasattr(bbox, '__iter__') or len(bbox) != 4:
# print(f"Warning: Invalid bbox format for sharpening: {bbox}") # Debug
continue
x1, y1, x2, y2 = bbox
# Ensure coordinates are integers and within bounds
try:
x1, y1 = max(0, int(x1)), max(0, int(y1))
x2, y2 = min(width, int(x2)), min(height, int(y2))
except ValueError:
# print(f"Warning: Could not convert bbox coordinates to int: {bbox}") # Debug
continue
if x2 <= x1 or y2 <= y1:
continue
face_region = processed_frame[y1:y2, x1:x2]
if face_region.size == 0: continue
# Apply sharpening (GPU-accelerated when CUDA OpenCV is available)
try:
sigma = 2 if IS_APPLE_SILICON else 3
sharpened_region = gpu_sharpen(face_region, strength=sharpness_value, sigma=sigma)
processed_frame[y1:y2, x1:x2] = sharpened_region
except cv2.error:
pass
# 2. Apply Interpolation (if enabled)
enable_interpolation = getattr(modules.globals, "enable_interpolation", False)
interpolation_weight = getattr(modules.globals, "interpolation_weight", 0.2)
final_frame = processed_frame # Start with the current (potentially sharpened) frame
if enable_interpolation and 0 < interpolation_weight < 1:
if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape == processed_frame.shape and PREVIOUS_FRAME_RESULT.dtype == processed_frame.dtype:
# Perform interpolation
try:
final_frame = gpu_add_weighted(
PREVIOUS_FRAME_RESULT, 1.0 - interpolation_weight,
processed_frame, interpolation_weight,
0
)
# Ensure final frame is uint8
final_frame = np.clip(final_frame, 0, 255).astype(np.uint8)
except cv2.error as interp_e:
# print(f"Warning: OpenCV error during interpolation: {interp_e}") # Debug
final_frame = processed_frame # Use current frame if interpolation fails
PREVIOUS_FRAME_RESULT = None # Reset state if error occurs
# Update the state for the next frame *with the interpolated result*
PREVIOUS_FRAME_RESULT = final_frame.copy()
else:
# If previous frame invalid or doesn't match, use current frame and update state
if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape != processed_frame.shape:
# print("Info: Frame shape changed, resetting interpolation state.") # Debug
pass
PREVIOUS_FRAME_RESULT = processed_frame.copy()
else:
# Interpolation is off or weight is invalid — no need to cache
PREVIOUS_FRAME_RESULT = None
return final_frame
# --- END: Helper function for interpolation and sharpening ---
def process_frame(source_face: Face, temp_frame: Frame, target_face: Face = None) -> Frame:
"""Process a single frame, swapping source_face onto detected target(s).
Args:
target_face: Pre-detected target face. When provided, skips the
internal face detection call (saves ~30-40ms per frame).
Ignored when many_faces mode is active.
"""
if getattr(modules.globals, "opacity", 1.0) == 0:
global PREVIOUS_FRAME_RESULT
PREVIOUS_FRAME_RESULT = None
return temp_frame
processed_frame = temp_frame
swapped_face_bboxes = []
if modules.globals.many_faces:
many_faces = get_many_faces(processed_frame)
if many_faces:
current_swap_target = processed_frame.copy()
for face in many_faces:
current_swap_target = swap_face(source_face, face, current_swap_target)
if face is not None and hasattr(face, "bbox") and face.bbox is not None:
swapped_face_bboxes.append(face.bbox.astype(int))
processed_frame = current_swap_target
else:
if target_face is None:
target_face = get_one_face(processed_frame)
if target_face:
processed_frame = swap_face(source_face, target_face, processed_frame)
if hasattr(target_face, "bbox") and target_face.bbox is not None:
swapped_face_bboxes.append(target_face.bbox.astype(int))
final_frame = apply_post_processing(processed_frame, swapped_face_bboxes)
return final_frame
def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
"""Handles complex mapping scenarios (map_faces=True) and live streams."""
if getattr(modules.globals, "opacity", 1.0) == 0:
# If opacity is 0, no swap happens, so no post-processing needed.
# Also reset interpolation state if it was active.
global PREVIOUS_FRAME_RESULT
PREVIOUS_FRAME_RESULT = None
return temp_frame
processed_frame = temp_frame # Start with the input frame
swapped_face_bboxes = [] # Keep track of where swaps happened
# Determine source/target pairs based on mode
source_target_pairs = []
# Ensure maps exist before accessing them
source_target_map = getattr(modules.globals, "source_target_map", None)
simple_map = getattr(modules.globals, "simple_map", None)
# Check if target is a file path (image or video) or live stream
is_file_target = modules.globals.target_path and (is_image(modules.globals.target_path) or is_video(modules.globals.target_path))
if is_file_target:
# Processing specific image or video file with pre-analyzed maps
if source_target_map:
if modules.globals.many_faces:
source_face = default_source_face() # Use default source for all targets
if source_face:
for map_data in source_target_map:
if is_image(modules.globals.target_path):
target_info = map_data.get("target", {})
if target_info: # Check if target info exists
target_face = target_info.get("face")
if target_face:
source_target_pairs.append((source_face, target_face))
elif is_video(modules.globals.target_path):
# Find faces for the current frame_path in video map
target_frames_data = map_data.get("target_faces_in_frame", [])
if target_frames_data: # Check if frame data exists
target_frames = [f for f in target_frames_data if f and f.get("location") == temp_frame_path]
for frame_data in target_frames:
faces_in_frame = frame_data.get("faces", [])
if faces_in_frame: # Check if faces exist
for target_face in faces_in_frame:
source_target_pairs.append((source_face, target_face))
else: # Single face or specific mapping
for map_data in source_target_map:
source_info = map_data.get("source", {})
if not source_info: continue # Skip if no source info
source_face = source_info.get("face")
if not source_face: continue # Skip if no source defined for this map entry
if is_image(modules.globals.target_path):
target_info = map_data.get("target", {})
if target_info:
target_face = target_info.get("face")
if target_face:
source_target_pairs.append((source_face, target_face))
elif is_video(modules.globals.target_path):
target_frames_data = map_data.get("target_faces_in_frame", [])
if target_frames_data:
target_frames = [f for f in target_frames_data if f and f.get("location") == temp_frame_path]
for frame_data in target_frames:
faces_in_frame = frame_data.get("faces", [])
if faces_in_frame:
for target_face in faces_in_frame:
source_target_pairs.append((source_face, target_face))
else:
# Live stream or webcam processing (analyze faces on the fly)
detected_faces = get_many_faces(processed_frame)
if detected_faces:
if modules.globals.many_faces:
source_face = default_source_face() # Use default source for all detected targets
if source_face:
for target_face in detected_faces:
source_target_pairs.append((source_face, target_face))
elif simple_map:
# Use simple_map (source_faces <-> target_embeddings)
source_faces = simple_map.get("source_faces", [])
target_embeddings = simple_map.get("target_embeddings", [])
if source_faces and target_embeddings and len(source_faces) == len(target_embeddings):
# Match detected faces to the closest target embedding
if len(detected_faces) <= len(target_embeddings):
# More targets defined than detected - match each detected face
for detected_face in detected_faces:
if detected_face.normed_embedding is None: continue
closest_idx, _ = find_closest_centroid(target_embeddings, detected_face.normed_embedding)
if 0 <= closest_idx < len(source_faces):
source_target_pairs.append((source_faces[closest_idx], detected_face))
else:
# More faces detected than targets defined - match each target embedding to closest detected face
detected_embeddings = [f.normed_embedding for f in detected_faces if f.normed_embedding is not None]
detected_faces_with_embedding = [f for f in detected_faces if f.normed_embedding is not None]
if not detected_embeddings: return processed_frame # No embeddings to match
for i, target_embedding in enumerate(target_embeddings):
if 0 <= i < len(source_faces): # Ensure source face exists for this embedding
closest_idx, _ = find_closest_centroid(detected_embeddings, target_embedding)
if 0 <= closest_idx < len(detected_faces_with_embedding):
source_target_pairs.append((source_faces[i], detected_faces_with_embedding[closest_idx]))
else: # Fallback: if no map, use default source for the single detected face (if any)
source_face = default_source_face()
target_face = get_one_face(processed_frame, detected_faces) # Use faces already detected
if source_face and target_face:
source_target_pairs.append((source_face, target_face))
# Perform swaps based on the collected pairs
current_swap_target = processed_frame.copy() # Apply swaps sequentially
for source_face, target_face in source_target_pairs:
if source_face and target_face:
current_swap_target = swap_face(source_face, target_face, current_swap_target)
if target_face is not None and hasattr(target_face, "bbox") and target_face.bbox is not None:
swapped_face_bboxes.append(target_face.bbox.astype(int))
processed_frame = current_swap_target # Assign final result
# Apply sharpening and interpolation
final_frame = apply_post_processing(processed_frame, swapped_face_bboxes)
return final_frame
def process_frames(
source_path: str, temp_frame_paths: List[str], progress: Any = None
) -> None:
"""
Processes a list of frame paths (typically for video).
Optimized with better memory management and caching.
Iterates through frames, applies the appropriate swapping logic based on globals,
and saves the result back to the frame path. Handles multi-threading via caller.
"""
# Determine which processing function to use based on map_faces global setting
use_v2 = getattr(modules.globals, "map_faces", False)
source_face = None # Initialize source_face
# --- Pre-load source face only if needed (Simple Mode: map_faces=False) ---
if not use_v2:
if not source_path or not os.path.exists(source_path):
update_status(f"Error: Source path invalid or not provided for simple mode: {source_path}", NAME)
# Log the error but allow proceeding; subsequent check will stop processing.
else:
try:
source_img = cv2.imread(source_path)
if source_img is None:
# Specific error for file reading failure
update_status(f"Error reading source image file {source_path}. Please check the path and file integrity.", NAME)
else:
source_face = get_one_face(source_img)
if source_face is None:
# Specific message for no face detected after successful read
update_status(f"Warning: Successfully read source image {source_path}, but no face was detected. Swaps will be skipped.", NAME)
# Free memory immediately after extracting face
del source_img
except Exception as e:
# Print the specific exception caught
import traceback
print(f"{NAME}: Caught exception during source image processing for {source_path}:")
traceback.print_exc() # Print the full traceback
update_status(f"Error during source image reading or analysis {source_path}: {e}", NAME)
# Log general exception during the process
total_frames = len(temp_frame_paths)
# update_status(f"Processing {total_frames} frames. Use V2 (map_faces): {use_v2}", NAME) # Optional Debug
# --- Stop processing entirely if in Simple Mode and source face is invalid ---
if not use_v2 and source_face is None:
update_status(f"Halting video processing: Invalid or no face detected in source image for simple mode.", NAME)
if progress:
# Ensure the progress bar completes if it was started
remaining_updates = total_frames - progress.n if hasattr(progress, 'n') else total_frames
if remaining_updates > 0:
progress.update(remaining_updates)
return # Exit the function entirely
# --- Process each frame path provided in the list ---
# Note: In the current core.py multi_process_frame, temp_frame_paths will usually contain only ONE path per call.
for i, temp_frame_path in enumerate(temp_frame_paths):
# update_status(f"Processing frame {i+1}/{total_frames}: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
# Read the target frame
temp_frame = None
try:
temp_frame = cv2.imread(temp_frame_path)
if temp_frame is None:
print(f"{NAME}: Error: Could not read frame: {temp_frame_path}, skipping.")
if progress: progress.update(1)
continue # Skip this frame if read fails
except Exception as read_e:
print(f"{NAME}: Error reading frame {temp_frame_path}: {read_e}, skipping.")
if progress: progress.update(1)
continue
# Select processing function and execute
result_frame = None
try:
if use_v2:
# V2 uses global maps and needs the frame path for lookup in video mode
# update_status(f"Using process_frame_v2 for: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
result_frame = process_frame_v2(temp_frame, temp_frame_path)
else:
# Simple mode uses the pre-loaded source_face (already checked for validity above)
# update_status(f"Using process_frame (simple) for: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
result_frame = process_frame(source_face, temp_frame) # source_face is guaranteed to be valid here
# Check if processing actually returned a frame
if result_frame is None:
print(f"{NAME}: Warning: Processing returned None for frame {temp_frame_path}. Using original.")
result_frame = temp_frame
except Exception as proc_e:
print(f"{NAME}: Error processing frame {temp_frame_path}: {proc_e}")
# import traceback # Optional for detailed debugging
# traceback.print_exc()
result_frame = temp_frame # Use original frame on processing error
# Write the result back to the same frame path with optimized compression
try:
# Use PNG compression level 3 (faster) instead of default 9
write_success = cv2.imwrite(temp_frame_path, result_frame, [cv2.IMWRITE_PNG_COMPRESSION, 3])
if not write_success:
print(f"{NAME}: Error: Failed to write processed frame to {temp_frame_path}")
except Exception as write_e:
print(f"{NAME}: Error writing frame {temp_frame_path}: {write_e}")
# Free memory immediately after processing
del temp_frame
if result_frame is not None:
del result_frame
# Update progress bar
if progress:
progress.update(1)
# else: # Basic console progress (optional)
# if (i + 1) % 10 == 0 or (i + 1) == total_frames: # Update every 10 frames or on last frame
# update_status(f"Processed frame {i+1}/{total_frames}", NAME)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
"""Processes a single target image."""
# --- Reset interpolation state for single image processing ---
global PREVIOUS_FRAME_RESULT
PREVIOUS_FRAME_RESULT = None
# ---
use_v2 = getattr(modules.globals, "map_faces", False)
# Read target first
try:
target_frame = cv2.imread(target_path)
if target_frame is None:
update_status(f"Error: Could not read target image: {target_path}", NAME)
return
except Exception as read_e:
update_status(f"Error reading target image {target_path}: {read_e}", NAME)
return
result = None
try:
if use_v2:
if getattr(modules.globals, "many_faces", False):
update_status("Processing image with 'map_faces' and 'many_faces'. Using pre-analysis map.", NAME)
# V2 processes based on global maps, doesn't need source_path here directly
# Assumes maps are pre-populated. Pass target_path for map lookup.
result = process_frame_v2(target_frame, target_path)
else: # Simple mode
try:
source_img = cv2.imread(source_path)
if source_img is None:
update_status(f"Error: Could not read source image: {source_path}", NAME)
return
source_face = get_one_face(source_img)
if not source_face:
update_status(f"Error: No face found in source image: {source_path}", NAME)
return
except Exception as src_e:
update_status(f"Error reading or analyzing source image {source_path}: {src_e}", NAME)
return
result = process_frame(source_face, target_frame)
# Write the result if processing was successful
if result is not None:
write_success = cv2.imwrite(output_path, result)
if write_success:
update_status(f"Output image saved to: {output_path}", NAME)
else:
update_status(f"Error: Failed to write output image to {output_path}", NAME)
else:
# This case might occur if process_frame/v2 returns None unexpectedly
update_status("Image processing failed (result was None).", NAME)
except Exception as proc_e:
update_status(f"Error during image processing: {proc_e}", NAME)
# import traceback
# traceback.print_exc()
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
"""Sets up and calls the frame processing for video."""
# --- Reset interpolation state before starting video processing ---
global PREVIOUS_FRAME_RESULT
PREVIOUS_FRAME_RESULT = None
# ---
mode_desc = "'map_faces'" if getattr(modules.globals, "map_faces", False) else "'simple'"
if getattr(modules.globals, "map_faces", False) and getattr(modules.globals, "many_faces", False):
mode_desc += " and 'many_faces'. Using pre-analysis map."
update_status(f"Processing video with {mode_desc} mode.", NAME)
# Pass the correct source_path (needed for simple mode in process_frames)
# The core processing logic handles calling the right frame function (process_frames)
modules.processors.frame.core.process_video(
source_path, temp_frame_paths, process_frames # Pass the newly modified process_frames
)
# ==========================
# MASKING FUNCTIONS (Mostly unchanged, added safety checks and minor improvements)
# ==========================
def create_lower_mouth_mask(
face: Face, frame: Frame
) -> (np.ndarray, np.ndarray, tuple, np.ndarray):
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
mouth_cutout = None
lower_lip_polygon = None # Initialize
mouth_box = (0,0,0,0) # Initialize
# Validate face and landmarks
if face is None or not hasattr(face, 'landmark_2d_106'):
# print("Warning: Invalid face object passed to create_lower_mouth_mask.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
landmarks = face.landmark_2d_106
# Check landmark validity
if landmarks is None or not isinstance(landmarks, np.ndarray) or landmarks.shape[0] < 106:
# print("Warning: Invalid or insufficient landmarks for mouth mask.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
try: # Wrap main logic in try-except
# Use outer mouth landmarks (52-71) to capture the full mouth area
# This covers both upper and lower lips for proper mouth preservation
lower_lip_order = list(range(52, 72))
# Check if all indices are valid for the loaded landmarks (already partially done by < 106 check)
if max(lower_lip_order) >= landmarks.shape[0]:
# print(f"Warning: Landmark index {max(lower_lip_order)} out of bounds for shape {landmarks.shape[0]}.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
lower_lip_landmarks = landmarks[lower_lip_order].astype(np.float32)
# Filter out potential NaN or Inf values in landmarks
if not np.all(np.isfinite(lower_lip_landmarks)):
# print("Warning: Non-finite values detected in lower lip landmarks.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
center = np.mean(lower_lip_landmarks, axis=0)
if not np.all(np.isfinite(center)): # Check center calculation
# print("Warning: Could not calculate valid center for mouth mask.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
mouth_mask_size = getattr(modules.globals, "mouth_mask_size", 0.0) # 0-100 slider
# 0=tight lip outline, 50=covers mouth area, 100=mouth to chin
expansion_factor = 1 + (mouth_mask_size / 100.0) * 2.5
# Expand landmarks from center, with extra downward bias toward chin
offsets = lower_lip_landmarks - center
# Add extra downward expansion for points below center (toward chin)
chin_bias = 1 + (mouth_mask_size / 100.0) * 1.5 # extra vertical stretch downward
scale_y = np.where(offsets[:, 1] > 0, expansion_factor * chin_bias, expansion_factor)
expanded_landmarks = lower_lip_landmarks.copy()
expanded_landmarks[:, 0] = center[0] + offsets[:, 0] * expansion_factor
expanded_landmarks[:, 1] = center[1] + offsets[:, 1] * scale_y
# Ensure landmarks are finite after adjustments
if not np.all(np.isfinite(expanded_landmarks)):
# print("Warning: Non-finite values detected after expanding landmarks.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
expanded_landmarks = expanded_landmarks.astype(np.int32)
min_x, min_y = np.min(expanded_landmarks, axis=0)
max_x, max_y = np.max(expanded_landmarks, axis=0)
# Add padding *after* initial min/max calculation
padding_ratio = 0.1 # Percentage padding
padding_x = int((max_x - min_x) * padding_ratio)
padding_y = int((max_y - min_y) * padding_ratio) # Use y-range for y-padding
# Apply padding and clamp to frame boundaries
frame_h, frame_w = frame.shape[:2]
min_x = max(0, min_x - padding_x)
min_y = max(0, min_y - padding_y)
max_x = min(frame_w, max_x + padding_x)
max_y = min(frame_h, max_y + padding_y)
if max_x > min_x and max_y > min_y:
# Create the mask ROI
mask_roi_h = max_y - min_y
mask_roi_w = max_x - min_x
mask_roi = np.zeros((mask_roi_h, mask_roi_w), dtype=np.uint8)
# Shift polygon coordinates relative to the ROI's top-left corner
polygon_relative_to_roi = expanded_landmarks - [min_x, min_y]
# Draw polygon on the ROI mask
cv2.fillPoly(mask_roi, [polygon_relative_to_roi], 255)
# Apply Gaussian blur (GPU-accelerated when available)
blur_k_size = getattr(modules.globals, "mask_blur_kernel", 15) # Default 15
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd
mask_roi = gpu_gaussian_blur(mask_roi, (blur_k_size, blur_k_size), 0)
# Place the mask ROI in the full-sized mask
mask[min_y:max_y, min_x:max_x] = mask_roi
# Extract the masked area from the *original* frame
mouth_cutout = frame[min_y:max_y, min_x:max_x].copy()
lower_lip_polygon = expanded_landmarks # Return polygon in original frame coords
mouth_box = (min_x, min_y, max_x, max_y) # Return the calculated box
else:
# print("Warning: Invalid mouth mask bounding box after padding/clamping.") # Optional debug
pass
except IndexError as idx_e:
# print(f"Warning: Landmark index out of bounds during mouth mask creation: {idx_e}") # Optional debug
pass
except Exception as e:
print(f"Error in create_lower_mouth_mask: {e}") # Print unexpected errors
# import traceback
# traceback.print_exc()
pass
# Return values, ensuring defaults if errors occurred
return mask, mouth_cutout, mouth_box, lower_lip_polygon
def draw_mouth_mask_visualization(
frame: Frame, face: Face, mouth_mask_data: tuple
) -> Frame:
# Validate inputs
if frame is None or face is None or mouth_mask_data is None or len(mouth_mask_data) != 4:
return frame # Return original frame if inputs are invalid
mask, mouth_cutout, box, lower_lip_polygon = mouth_mask_data
(min_x, min_y, max_x, max_y) = box
# Check if polygon is valid for drawing
if lower_lip_polygon is None or not isinstance(lower_lip_polygon, np.ndarray) or len(lower_lip_polygon) < 3:
return frame # Cannot draw without a valid polygon
vis_frame = frame.copy()
height, width = vis_frame.shape[:2]
# Ensure box coordinates are valid integers within frame bounds
try:
min_x, min_y = max(0, int(min_x)), max(0, int(min_y))
max_x, max_y = min(width, int(max_x)), min(height, int(max_y))
except ValueError:
# print("Warning: Invalid coordinates for mask visualization box.")
return frame
if max_x <= min_x or max_y <= min_y:
return frame # Invalid box
# Draw the lower lip polygon (green outline)
try:
# Ensure polygon points are within frame boundaries before drawing
safe_polygon = lower_lip_polygon.copy()
safe_polygon[:, 0] = np.clip(safe_polygon[:, 0], 0, width - 1)
safe_polygon[:, 1] = np.clip(safe_polygon[:, 1], 0, height - 1)
cv2.polylines(vis_frame, [safe_polygon.astype(np.int32)], isClosed=True, color=(0, 255, 0), thickness=2)
except Exception as e:
print(f"Error drawing polygon for visualization: {e}") # Optional debug
pass
# Draw bounding box (red rectangle)
cv2.rectangle(vis_frame, (min_x, min_y), (max_x, max_y), (0, 0, 255), 2)
# Optional: Add labels
label_pos_y = min_y - 10 if min_y > 20 else max_y + 15 # Adjust position based on box location
label_pos_x = min_x
try:
cv2.putText(vis_frame, "Mouth Mask", (label_pos_x, label_pos_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
except Exception as e:
# print(f"Error drawing text for visualization: {e}") # Optional debug
pass
return vis_frame
def apply_mouth_area(
frame: np.ndarray,
mouth_cutout: np.ndarray,
mouth_box: tuple,
face_mask: np.ndarray, # Full face mask (for blending edges)
mouth_polygon: np.ndarray, # Specific polygon for the mouth area itself
) -> np.ndarray:
# Basic validation
if (frame is None or mouth_cutout is None or mouth_box is None or
face_mask is None or mouth_polygon is None):
# print("Warning: Invalid input (None value) to apply_mouth_area") # Optional debug
return frame
if (mouth_cutout.size == 0 or face_mask.size == 0 or len(mouth_polygon) < 3):
# print("Warning: Invalid input (empty array/polygon) to apply_mouth_area") # Optional debug
return frame
try: # Wrap main logic in try-except
min_x, min_y, max_x, max_y = map(int, mouth_box) # Ensure integer coords
box_width = max_x - min_x
box_height = max_y - min_y
# Check box validity
if box_width <= 0 or box_height <= 0:
# print("Warning: Invalid mouth box dimensions in apply_mouth_area.")
return frame
# Define the Region of Interest (ROI) on the target frame (swapped frame)
frame_h, frame_w = frame.shape[:2]
# Clamp coordinates strictly within frame boundaries
min_y, max_y = max(0, min_y), min(frame_h, max_y)
min_x, max_x = max(0, min_x), min(frame_w, max_x)
# Recalculate box dimensions based on clamped coords
box_width = max_x - min_x
box_height = max_y - min_y
if box_width <= 0 or box_height <= 0:
# print("Warning: ROI became invalid after clamping in apply_mouth_area.")
return frame # ROI is invalid
roi = frame[min_y:max_y, min_x:max_x]
# Ensure ROI extraction was successful
if roi.size == 0:
# print("Warning: Extracted ROI is empty in apply_mouth_area.")
return frame
# Resize mouth cutout from original frame to fit the ROI size
resized_mouth_cutout = None
if roi.shape[:2] != mouth_cutout.shape[:2]:
# Check if mouth_cutout has valid dimensions before resizing
if mouth_cutout.shape[0] > 0 and mouth_cutout.shape[1] > 0:
resized_mouth_cutout = gpu_resize(mouth_cutout, (box_width, box_height), interpolation=cv2.INTER_LINEAR)
else:
# print("Warning: mouth_cutout has invalid dimensions, cannot resize.")
return frame # Cannot proceed without valid cutout
else:
resized_mouth_cutout = mouth_cutout
# If resize failed or original was invalid
if resized_mouth_cutout is None or resized_mouth_cutout.size == 0:
# print("Warning: Mouth cutout is invalid after resize attempt.")
return frame
# --- Mask Creation ---
# Create a mask based on the mouth_polygon, relative to the ROI
polygon_mask_roi = np.zeros(roi.shape[:2], dtype=np.uint8)
adjusted_polygon = mouth_polygon - [min_x, min_y]
cv2.fillPoly(polygon_mask_roi, [adjusted_polygon.astype(np.int32)], 255)
# Feather the edges with Gaussian blur for smooth blending
feather_amount = max(1, min(30, min(box_width, box_height) // 8))
kernel_size = 2 * feather_amount + 1
feathered_mask = cv2.GaussianBlur(polygon_mask_roi.astype(np.float32), (kernel_size, kernel_size), 0)
# Normalize to [0.0, 1.0]
max_val = feathered_mask.max()
if max_val > 1e-6:
feathered_mask = feathered_mask / max_val
else:
feathered_mask.fill(0.0)
# --- Blending: paste original mouth onto swapped face ---
if len(frame.shape) == 3 and frame.shape[2] == 3:
mask_3ch = feathered_mask[:, :, np.newaxis].astype(np.float32)
inv_mask = 1.0 - mask_3ch
# Blend: (original_mouth * mask) + (swapped_face * (1 - mask))
blended_roi = (resized_mouth_cutout.astype(np.float32) * mask_3ch +
roi.astype(np.float32) * inv_mask)
frame[min_y:max_y, min_x:max_x] = np.clip(blended_roi, 0, 255).astype(np.uint8)
except Exception as e:
print(f"Error applying mouth area: {e}") # Optional debug
# import traceback
# traceback.print_exc()
pass # Don't crash, just return the frame as is
return frame
def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
"""Creates a feathered mask covering the whole face area based on landmarks."""
mask = np.zeros(frame.shape[:2], dtype=np.uint8) # Start with uint8
# Validate inputs
if face is None or not hasattr(face, 'landmark_2d_106') or frame is None:
# print("Warning: Invalid face or frame for create_face_mask.")
return mask # Return empty mask
landmarks = face.landmark_2d_106
if landmarks is None or not isinstance(landmarks, np.ndarray) or landmarks.shape[0] < 106:
# print("Warning: Invalid or insufficient landmarks for face mask.")
return mask # Return empty mask
try: # Wrap main logic in try-except
# Filter out non-finite landmark values
if not np.all(np.isfinite(landmarks)):
# print("Warning: Non-finite values detected in landmarks for face mask.")
return mask
landmarks_int = landmarks.astype(np.int32)
# Use standard face outline landmarks (0-32)
# Use standard face outline (0-32)
face_outline = landmarks_int[0:33]
# Estimate forehead points to ensure mask covers the whole face (including forehead)
# This is critical for Poisson blending to work correctly on the forehead
eyebrows = landmarks_int[33:43]
if eyebrows.shape[0] > 0:
chin = landmarks_int[16]
eyebrow_center = np.mean(eyebrows, axis=0)
# Vector from chin to eyebrows (upwards)
up_vector = eyebrow_center - chin
norm = np.linalg.norm(up_vector)
if norm > 0:
up_vector /= norm
# Extend upwards by 1.0 of the chin-to-eyebrow distance (aggressive coverage)
# This ensures the mask covers the entire forehead for proper blending
forehead_offset = up_vector * (norm * 1.0)
# Shift eyebrows up to create forehead points
forehead_points = eyebrows + forehead_offset
# Expand the top points slightly outwards to cover forehead corners
# Calculate the center of the new top points
top_center = np.mean(forehead_points, axis=0)
# Expand outwards by 20%
forehead_points = (forehead_points - top_center) * 1.2 + top_center
# Combine outline and forehead points
face_outline = np.concatenate((face_outline, forehead_points.astype(np.int32)), axis=0)
# Calculate convex hull of these points
# Use try-except as convexHull can fail on degenerate input
try:
hull = cv2.convexHull(face_outline.astype(np.float32)) # Use float for accuracy
if hull is None or len(hull) < 3:
# print("Warning: Convex hull calculation failed or returned too few points.")
# Fallback: use bounding box of landmarks? Or just return empty mask?
return mask
# Draw the filled convex hull on the mask
cv2.fillConvexPoly(mask, hull.astype(np.int32), 255)
except Exception as hull_e:
print(f"Error creating convex hull for face mask: {hull_e}")
return mask # Return empty mask on error
# Apply Gaussian blur to feather the mask edges (GPU-accelerated when available)
blur_k_size = getattr(modules.globals, "face_mask_blur", 31) # Default 31
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd and positive
mask = gpu_gaussian_blur(mask, (blur_k_size, blur_k_size), 0)
# --- Optional: Return float mask for apply_mouth_area ---
# mask = mask.astype(float) / 255.0
# ---
except IndexError:
# print("Warning: Landmark index out of bounds for face mask.") # Optional debug
pass
except Exception as e:
print(f"Error creating face mask: {e}") # Print unexpected errors
# import traceback
# traceback.print_exc()
pass
return mask # Return uint8 mask
def apply_color_transfer(source, target):
"""
Apply color transfer using LAB color space. Handles potential division by zero and ensures output is uint8.
"""
# Input validation
if source is None or target is None or source.size == 0 or target.size == 0:
# print("Warning: Invalid input to apply_color_transfer.")
return source # Return original source if invalid input
# Ensure images are 3-channel BGR uint8
if len(source.shape) != 3 or source.shape[2] != 3 or source.dtype != np.uint8:
# print("Warning: Source image for color transfer is not uint8 BGR.")
# Attempt conversion if possible, otherwise return original
try:
if len(source.shape) == 2: # Grayscale
source = cv2.cvtColor(source, cv2.COLOR_GRAY2BGR)
source = np.clip(source, 0, 255).astype(np.uint8)
if len(source.shape)!= 3 or source.shape[2]!= 3: raise ValueError("Conversion failed")
except Exception:
return source
if len(target.shape) != 3 or target.shape[2] != 3 or target.dtype != np.uint8:
# print("Warning: Target image for color transfer is not uint8 BGR.")
try:
if len(target.shape) == 2: # Grayscale
target = cv2.cvtColor(target, cv2.COLOR_GRAY2BGR)
target = np.clip(target, 0, 255).astype(np.uint8)
if len(target.shape)!= 3 or target.shape[2]!= 3: raise ValueError("Conversion failed")
except Exception:
return source # Return original source if target invalid
result_bgr = source # Default to original source in case of errors
try:
# Convert to float32 [0, 1] range for LAB conversion
source_float = source.astype(np.float32) / 255.0
target_float = target.astype(np.float32) / 255.0
source_lab = cv2.cvtColor(source_float, cv2.COLOR_BGR2LAB)
target_lab = cv2.cvtColor(target_float, cv2.COLOR_BGR2LAB)
# Compute statistics
source_mean, source_std = cv2.meanStdDev(source_lab)
target_mean, target_std = cv2.meanStdDev(target_lab)
# Reshape for broadcasting
source_mean = source_mean.reshape((1, 1, 3))
source_std = source_std.reshape((1, 1, 3))
target_mean = target_mean.reshape((1, 1, 3))
target_std = target_std.reshape((1, 1, 3))
# Avoid division by zero or very small std deviations (add epsilon)
epsilon = 1e-6
source_std = np.maximum(source_std, epsilon)
# target_std = np.maximum(target_std, epsilon) # Target std can be small
# Perform color transfer in LAB space
result_lab = (source_lab - source_mean) * (target_std / source_std) + target_mean
# --- No explicit clipping needed in LAB space typically ---
# Clipping is handled implicitly by the conversion back to BGR and then to uint8
# Convert back to BGR float [0, 1]
result_bgr_float = cv2.cvtColor(result_lab, cv2.COLOR_LAB2BGR)
# Clip final BGR values to [0, 1] range before scaling to [0, 255]
result_bgr_float = np.clip(result_bgr_float, 0.0, 1.0)
# Convert back to uint8 [0, 255]
result_bgr = (result_bgr_float * 255.0).astype("uint8")
except cv2.error as e:
# print(f"OpenCV error during color transfer: {e}. Returning original source.") # Optional debug
return source # Return original source if conversion fails
except Exception as e:
# print(f"Unexpected color transfer error: {e}. Returning original source.") # Optional debug
# import traceback
# traceback.print_exc()
return source
return result_bgr