mirror of
https://github.com/hacksider/Deep-Live-Cam.git
synced 2026-04-22 15:57:20 +08:00
bcdd0ce2dd
Fix CoreML execution provider falling back to CPU silently, eliminate redundant per-frame face detection, and optimize the paste-back blend to operate on the face bounding box instead of the full frame. All changes are quality-neutral (pixel-identical output verified) and benefit non-Mac platforms via the shared detection and paste-back improvements. Changes: - Remove unsupported CoreML options (RequireStaticShapes, MaximumCacheSize) that caused ORT 1.24 to silently fall back to CPUExecutionProvider - Add _fast_paste_back(): bbox-restricted erode/blur/blend, skip dead fake_diff code in insightface's inswapper (computed but never used) - process_frame() accepts optional pre-detected target_face to avoid redundant get_one_face() call (~30-40ms saved per frame, all platforms) - In-memory pipeline detects face once and shares across processors - Fix get_face_swapper() to fall back to FP16 model when FP32 absent - Fix pre_start() to accept either model variant (was FP16-only check) - Make tensorflow import conditional (fixes crash on macOS) - Add missing tqdm dep, make tensorflow/pygrabber platform-conditional Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1247 lines
56 KiB
Python
1247 lines
56 KiB
Python
from typing import Any, List, Optional
|
|
import cv2
|
|
import insightface
|
|
import threading
|
|
import numpy as np
|
|
import platform
|
|
import modules.globals
|
|
import modules.processors.frame.core
|
|
from modules.core import update_status
|
|
from modules.face_analyser import get_one_face, get_many_faces, default_source_face
|
|
from modules.typing import Face, Frame
|
|
from modules.utilities import (
|
|
conditional_download,
|
|
is_image,
|
|
is_video,
|
|
)
|
|
from modules.cluster_analysis import find_closest_centroid
|
|
from modules.gpu_processing import gpu_gaussian_blur, gpu_sharpen, gpu_add_weighted, gpu_resize, gpu_cvt_color
|
|
import os
|
|
from collections import deque
|
|
import time
|
|
|
|
FACE_SWAPPER = None
|
|
THREAD_LOCK = threading.Lock()
|
|
NAME = "DLC.FACE-SWAPPER"
|
|
|
|
# --- START: Added for Interpolation ---
|
|
PREVIOUS_FRAME_RESULT = None # Stores the final processed frame from the previous step
|
|
# --- END: Added for Interpolation ---
|
|
|
|
# --- START: Mac M1-M5 Optimizations ---
|
|
IS_APPLE_SILICON = platform.system() == 'Darwin' and platform.machine() == 'arm64'
|
|
FRAME_CACHE = deque(maxlen=3) # Cache for frame reuse
|
|
FACE_DETECTION_CACHE = {} # Cache face detections
|
|
LAST_DETECTION_TIME = 0
|
|
DETECTION_INTERVAL = 0.033 # ~30 FPS detection rate for live mode
|
|
FRAME_SKIP_COUNTER = 0
|
|
ADAPTIVE_QUALITY = True
|
|
# --- END: Mac M1-M5 Optimizations ---
|
|
|
|
abs_dir = os.path.dirname(os.path.abspath(__file__))
|
|
models_dir = os.path.join(
|
|
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
|
|
)
|
|
|
|
def pre_check() -> bool:
|
|
# Use models_dir instead of abs_dir to save to the correct location
|
|
download_directory_path = models_dir
|
|
|
|
# Make sure the models directory exists, catch permission errors if they occur
|
|
try:
|
|
os.makedirs(download_directory_path, exist_ok=True)
|
|
except OSError as e:
|
|
logging.error(f"Failed to create directory {download_directory_path} due to permission error: {e}")
|
|
return False
|
|
|
|
# Use the direct download URL from Hugging Face (FP32 model for broad GPU compatibility)
|
|
conditional_download(
|
|
download_directory_path,
|
|
[
|
|
"https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128.onnx"
|
|
],
|
|
)
|
|
return True
|
|
|
|
|
|
def pre_start() -> bool:
|
|
# Check for either model variant
|
|
fp16_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
|
|
fp32_path = os.path.join(models_dir, "inswapper_128.onnx")
|
|
if not os.path.exists(fp16_path) and not os.path.exists(fp32_path):
|
|
update_status(f"Model not found in {models_dir}. Please download inswapper_128.onnx.", NAME)
|
|
return False
|
|
|
|
# Try to get the face swapper to ensure it loads correctly
|
|
if get_face_swapper() is None:
|
|
# Error message already printed within get_face_swapper
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def get_face_swapper() -> Any:
|
|
global FACE_SWAPPER
|
|
|
|
with THREAD_LOCK:
|
|
if FACE_SWAPPER is None:
|
|
# Prefer FP32 for broad GPU compatibility (FP16 can produce NaN
|
|
# on GPUs without Tensor Cores, e.g. GTX 16xx). Fall back to
|
|
# FP16 when FP32 is not available.
|
|
fp32_path = os.path.join(models_dir, "inswapper_128.onnx")
|
|
fp16_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
|
|
if os.path.exists(fp32_path):
|
|
model_path = fp32_path
|
|
elif os.path.exists(fp16_path):
|
|
model_path = fp16_path
|
|
else:
|
|
update_status(f"No inswapper model found in {models_dir}.", NAME)
|
|
return None
|
|
update_status(f"Loading face swapper model from: {model_path}", NAME)
|
|
try:
|
|
# Optimized provider configuration for Apple Silicon
|
|
providers_config = []
|
|
for p in modules.globals.execution_providers:
|
|
if p == "CoreMLExecutionProvider" and IS_APPLE_SILICON:
|
|
# Enhanced CoreML configuration for M1-M5
|
|
providers_config.append((
|
|
"CoreMLExecutionProvider",
|
|
{
|
|
"ModelFormat": "MLProgram",
|
|
"MLComputeUnits": "ALL", # Use Neural Engine + GPU + CPU
|
|
"SpecializationStrategy": "FastPrediction",
|
|
"AllowLowPrecisionAccumulationOnGPU": 1,
|
|
"EnableOnSubgraphs": 1,
|
|
}
|
|
))
|
|
elif p == "CUDAExecutionProvider":
|
|
providers_config.append((
|
|
"CUDAExecutionProvider",
|
|
{
|
|
"arena_extend_strategy": "kSameAsRequested",
|
|
"cudnn_conv_algo_search": "EXHAUSTIVE",
|
|
"cudnn_conv_use_max_workspace": "1",
|
|
"do_copy_in_default_stream": "0",
|
|
}
|
|
))
|
|
else:
|
|
providers_config.append(p)
|
|
FACE_SWAPPER = insightface.model_zoo.get_model(
|
|
model_path,
|
|
providers=providers_config,
|
|
)
|
|
update_status("Face swapper model loaded successfully.", NAME)
|
|
except Exception as e:
|
|
update_status(f"Error loading face swapper model: {e}", NAME)
|
|
FACE_SWAPPER = None
|
|
return None
|
|
return FACE_SWAPPER
|
|
|
|
|
|
def _fast_paste_back(target_img: Frame, bgr_fake: np.ndarray, aimg: np.ndarray, M: np.ndarray) -> Frame:
|
|
"""Optimized paste-back that restricts blending to the face bounding box.
|
|
|
|
Same visual output as insightface's built-in paste_back, but:
|
|
- Skips dead fake_diff code (computed but unused in insightface)
|
|
- Runs erosion, blur, and blend on the face bbox instead of the full frame
|
|
"""
|
|
h, w = target_img.shape[:2]
|
|
IM = cv2.invertAffineTransform(M)
|
|
|
|
# Warp swapped face and mask to full frame (fast: ~0.4ms each)
|
|
bgr_fake_full = cv2.warpAffine(bgr_fake, IM, (w, h), borderValue=0.0)
|
|
img_white = np.full((aimg.shape[0], aimg.shape[1]), 255, dtype=np.float32)
|
|
img_white_full = cv2.warpAffine(img_white, IM, (w, h), borderValue=0.0)
|
|
|
|
# Find tight bounding box of the warped face mask
|
|
rows = np.any(img_white_full > 20, axis=1)
|
|
cols = np.any(img_white_full > 20, axis=0)
|
|
row_idx = np.where(rows)[0]
|
|
col_idx = np.where(cols)[0]
|
|
if len(row_idx) == 0 or len(col_idx) == 0:
|
|
return target_img
|
|
y1, y2 = row_idx[0], row_idx[-1]
|
|
x1, x2 = col_idx[0], col_idx[-1]
|
|
|
|
# Compute mask/blur kernel sizes from the full mask extent
|
|
mask_h = y2 - y1
|
|
mask_w = x2 - x1
|
|
mask_size = int(np.sqrt(mask_h * mask_w))
|
|
k_erode = max(mask_size // 10, 10)
|
|
k_blur = max(mask_size // 20, 5)
|
|
|
|
# Add padding for erosion + blur kernels, then crop
|
|
pad = k_erode + k_blur + 2
|
|
y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1)
|
|
x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1)
|
|
|
|
# Work on cropped region only
|
|
mask_crop = img_white_full[y1p:y2p, x1p:x2p]
|
|
mask_crop[mask_crop > 20] = 255
|
|
|
|
kernel = np.ones((k_erode, k_erode), np.uint8)
|
|
mask_crop = cv2.erode(mask_crop, kernel, iterations=1)
|
|
|
|
blur_size = tuple(2 * i + 1 for i in (k_blur, k_blur))
|
|
mask_crop = cv2.GaussianBlur(mask_crop, blur_size, 0)
|
|
mask_crop /= 255.0
|
|
|
|
# Blend only within the crop
|
|
mask_3d = mask_crop[:, :, np.newaxis]
|
|
fake_crop = bgr_fake_full[y1p:y2p, x1p:x2p].astype(np.float32)
|
|
target_crop = target_img[y1p:y2p, x1p:x2p].astype(np.float32)
|
|
blended = mask_3d * fake_crop + (1.0 - mask_3d) * target_crop
|
|
|
|
result = target_img.copy()
|
|
result[y1p:y2p, x1p:x2p] = np.clip(blended, 0, 255).astype(np.uint8)
|
|
return result
|
|
|
|
|
|
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
|
"""Optimized face swapping with better memory management and performance."""
|
|
face_swapper = get_face_swapper()
|
|
if face_swapper is None:
|
|
update_status("Face swapper model not loaded or failed to load. Skipping swap.", NAME)
|
|
return temp_frame
|
|
|
|
# Safety check for faces
|
|
if source_face is None or target_face is None:
|
|
return temp_frame
|
|
if not hasattr(source_face, 'normed_embedding') or source_face.normed_embedding is None:
|
|
return temp_frame
|
|
|
|
# Store a copy of the original frame before swapping for opacity blending and mouth mask
|
|
opacity = getattr(modules.globals, "opacity", 1.0)
|
|
opacity = max(0.0, min(1.0, opacity))
|
|
mouth_mask_enabled = getattr(modules.globals, "mouth_mask", False)
|
|
original_frame = temp_frame.copy() if (opacity < 1.0 or mouth_mask_enabled) else temp_frame
|
|
|
|
if temp_frame.dtype != np.uint8:
|
|
temp_frame = np.clip(temp_frame, 0, 255).astype(np.uint8)
|
|
|
|
try:
|
|
if not temp_frame.flags['C_CONTIGUOUS']:
|
|
temp_frame = np.ascontiguousarray(temp_frame)
|
|
|
|
# Use paste_back=False and our optimized paste-back
|
|
if any("DmlExecutionProvider" in p for p in modules.globals.execution_providers):
|
|
with modules.globals.dml_lock:
|
|
bgr_fake, M = face_swapper.get(
|
|
temp_frame, target_face, source_face, paste_back=False
|
|
)
|
|
else:
|
|
bgr_fake, M = face_swapper.get(
|
|
temp_frame, target_face, source_face, paste_back=False
|
|
)
|
|
|
|
if bgr_fake is None:
|
|
return original_frame
|
|
|
|
if not isinstance(bgr_fake, np.ndarray):
|
|
return original_frame
|
|
|
|
# Get the aligned input crop for the mask (same as insightface does internally)
|
|
from insightface.utils import face_align
|
|
aimg, _ = face_align.norm_crop2(temp_frame, target_face.kps, face_swapper.input_size[0])
|
|
|
|
swapped_frame = _fast_paste_back(temp_frame, bgr_fake, aimg, M)
|
|
swapped_frame = np.clip(swapped_frame, 0, 255).astype(np.uint8)
|
|
|
|
except Exception as e:
|
|
print(f"Error during face swap: {e}")
|
|
return original_frame
|
|
|
|
# --- Post-swap Processing (Masking, Opacity, etc.) ---
|
|
# Now, work with the guaranteed uint8 'swapped_frame'
|
|
|
|
if mouth_mask_enabled: # Check if mouth_mask is enabled
|
|
# Create a mask for the target face
|
|
face_mask = create_face_mask(target_face, original_frame) # Use original_frame for mask creation geometry
|
|
|
|
# Create the mouth mask using the ORIGINAL frame (before swap) for cutout
|
|
mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = (
|
|
create_lower_mouth_mask(target_face, original_frame) # Use original_frame for real mouth cutout
|
|
)
|
|
|
|
# Apply the mouth area only if mouth_cutout exists
|
|
if mouth_cutout is not None and mouth_box != (0,0,0,0):
|
|
# Apply mouth area (from original) onto the 'swapped_frame'
|
|
swapped_frame = apply_mouth_area(
|
|
swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon
|
|
)
|
|
|
|
# Draw bounding box only while slider is being dragged
|
|
if getattr(modules.globals, "show_mouth_mask_box", False):
|
|
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
|
|
swapped_frame = draw_mouth_mask_visualization(
|
|
swapped_frame, target_face, mouth_mask_data
|
|
)
|
|
|
|
# --- Poisson Blending ---
|
|
if getattr(modules.globals, "poisson_blend", False):
|
|
face_mask = create_face_mask(target_face, temp_frame)
|
|
if face_mask is not None:
|
|
# Find bounding box of the mask
|
|
y_indices, x_indices = np.where(face_mask > 0)
|
|
if len(x_indices) > 0 and len(y_indices) > 0:
|
|
x_min, x_max = np.min(x_indices), np.max(x_indices)
|
|
y_min, y_max = np.min(y_indices), np.max(y_indices)
|
|
|
|
# Calculate center
|
|
center = (int((x_min + x_max) / 2), int((y_min + y_max) / 2))
|
|
|
|
# Crop src and mask
|
|
src_crop = swapped_frame[y_min : y_max + 1, x_min : x_max + 1]
|
|
mask_crop = face_mask[y_min : y_max + 1, x_min : x_max + 1]
|
|
|
|
try:
|
|
# Use original_frame as destination to blend the swapped face onto it
|
|
swapped_frame = cv2.seamlessClone(
|
|
src_crop,
|
|
original_frame,
|
|
mask_crop,
|
|
center,
|
|
cv2.NORMAL_CLONE,
|
|
)
|
|
except Exception as e:
|
|
print(f"Poisson blending failed: {e}")
|
|
|
|
# Apply opacity blend between the original frame and the swapped frame
|
|
if opacity >= 1.0:
|
|
return swapped_frame.astype(np.uint8)
|
|
|
|
# Blend the original_frame with the (potentially mouth-masked) swapped_frame
|
|
final_swapped_frame = gpu_add_weighted(original_frame.astype(np.uint8), 1 - opacity, swapped_frame.astype(np.uint8), opacity, 0)
|
|
return final_swapped_frame.astype(np.uint8)
|
|
|
|
|
|
# --- START: Mac M1-M5 Optimized Face Detection ---
|
|
def get_faces_optimized(frame: Frame, use_cache: bool = True) -> Optional[List[Face]]:
|
|
"""Optimized face detection for live mode on Apple Silicon"""
|
|
global LAST_DETECTION_TIME, FACE_DETECTION_CACHE
|
|
|
|
if not use_cache or not IS_APPLE_SILICON:
|
|
# Standard detection
|
|
if modules.globals.many_faces:
|
|
return get_many_faces(frame)
|
|
else:
|
|
face = get_one_face(frame)
|
|
return [face] if face else None
|
|
|
|
# Adaptive detection rate for live mode
|
|
current_time = time.time()
|
|
time_since_last = current_time - LAST_DETECTION_TIME
|
|
|
|
# Skip detection if too soon (adaptive frame skipping)
|
|
if time_since_last < DETECTION_INTERVAL and FACE_DETECTION_CACHE:
|
|
return FACE_DETECTION_CACHE.get('faces')
|
|
|
|
# Perform detection
|
|
LAST_DETECTION_TIME = current_time
|
|
if modules.globals.many_faces:
|
|
faces = get_many_faces(frame)
|
|
else:
|
|
face = get_one_face(frame)
|
|
faces = [face] if face else None
|
|
|
|
# Cache results
|
|
FACE_DETECTION_CACHE['faces'] = faces
|
|
FACE_DETECTION_CACHE['timestamp'] = current_time
|
|
|
|
return faces
|
|
# --- END: Mac M1-M5 Optimized Face Detection ---
|
|
|
|
# --- START: Helper function for interpolation and sharpening ---
|
|
def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.ndarray]) -> Frame:
|
|
"""Applies sharpening and interpolation with Apple Silicon optimizations."""
|
|
global PREVIOUS_FRAME_RESULT
|
|
|
|
processed_frame = current_frame.copy()
|
|
|
|
# 1. Apply Sharpening (if enabled) with optimized kernel for Apple Silicon
|
|
sharpness_value = getattr(modules.globals, "sharpness", 0.0)
|
|
if sharpness_value > 0.0 and swapped_face_bboxes:
|
|
height, width = processed_frame.shape[:2]
|
|
for bbox in swapped_face_bboxes:
|
|
# Ensure bbox is iterable and has 4 elements
|
|
if not hasattr(bbox, '__iter__') or len(bbox) != 4:
|
|
# print(f"Warning: Invalid bbox format for sharpening: {bbox}") # Debug
|
|
continue
|
|
x1, y1, x2, y2 = bbox
|
|
# Ensure coordinates are integers and within bounds
|
|
try:
|
|
x1, y1 = max(0, int(x1)), max(0, int(y1))
|
|
x2, y2 = min(width, int(x2)), min(height, int(y2))
|
|
except ValueError:
|
|
# print(f"Warning: Could not convert bbox coordinates to int: {bbox}") # Debug
|
|
continue
|
|
|
|
|
|
if x2 <= x1 or y2 <= y1:
|
|
continue
|
|
|
|
face_region = processed_frame[y1:y2, x1:x2]
|
|
if face_region.size == 0: continue
|
|
|
|
# Apply sharpening (GPU-accelerated when CUDA OpenCV is available)
|
|
try:
|
|
sigma = 2 if IS_APPLE_SILICON else 3
|
|
sharpened_region = gpu_sharpen(face_region, strength=sharpness_value, sigma=sigma)
|
|
processed_frame[y1:y2, x1:x2] = sharpened_region
|
|
except cv2.error:
|
|
pass
|
|
|
|
|
|
# 2. Apply Interpolation (if enabled)
|
|
enable_interpolation = getattr(modules.globals, "enable_interpolation", False)
|
|
interpolation_weight = getattr(modules.globals, "interpolation_weight", 0.2)
|
|
|
|
final_frame = processed_frame # Start with the current (potentially sharpened) frame
|
|
|
|
if enable_interpolation and 0 < interpolation_weight < 1:
|
|
if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape == processed_frame.shape and PREVIOUS_FRAME_RESULT.dtype == processed_frame.dtype:
|
|
# Perform interpolation
|
|
try:
|
|
final_frame = gpu_add_weighted(
|
|
PREVIOUS_FRAME_RESULT, 1.0 - interpolation_weight,
|
|
processed_frame, interpolation_weight,
|
|
0
|
|
)
|
|
# Ensure final frame is uint8
|
|
final_frame = np.clip(final_frame, 0, 255).astype(np.uint8)
|
|
except cv2.error as interp_e:
|
|
# print(f"Warning: OpenCV error during interpolation: {interp_e}") # Debug
|
|
final_frame = processed_frame # Use current frame if interpolation fails
|
|
PREVIOUS_FRAME_RESULT = None # Reset state if error occurs
|
|
|
|
# Update the state for the next frame *with the interpolated result*
|
|
PREVIOUS_FRAME_RESULT = final_frame.copy()
|
|
else:
|
|
# If previous frame invalid or doesn't match, use current frame and update state
|
|
if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape != processed_frame.shape:
|
|
# print("Info: Frame shape changed, resetting interpolation state.") # Debug
|
|
pass
|
|
PREVIOUS_FRAME_RESULT = processed_frame.copy()
|
|
else:
|
|
# Interpolation is off or weight is invalid — no need to cache
|
|
PREVIOUS_FRAME_RESULT = None
|
|
|
|
|
|
return final_frame
|
|
# --- END: Helper function for interpolation and sharpening ---
|
|
|
|
|
|
def process_frame(source_face: Face, temp_frame: Frame, target_face: Face = None) -> Frame:
|
|
"""Process a single frame, swapping source_face onto detected target(s).
|
|
|
|
Args:
|
|
target_face: Pre-detected target face. When provided, skips the
|
|
internal face detection call (saves ~30-40ms per frame).
|
|
Ignored when many_faces mode is active.
|
|
"""
|
|
if getattr(modules.globals, "opacity", 1.0) == 0:
|
|
global PREVIOUS_FRAME_RESULT
|
|
PREVIOUS_FRAME_RESULT = None
|
|
return temp_frame
|
|
|
|
processed_frame = temp_frame
|
|
swapped_face_bboxes = []
|
|
|
|
if modules.globals.many_faces:
|
|
many_faces = get_many_faces(processed_frame)
|
|
if many_faces:
|
|
current_swap_target = processed_frame.copy()
|
|
for face in many_faces:
|
|
current_swap_target = swap_face(source_face, face, current_swap_target)
|
|
if face is not None and hasattr(face, "bbox") and face.bbox is not None:
|
|
swapped_face_bboxes.append(face.bbox.astype(int))
|
|
processed_frame = current_swap_target
|
|
else:
|
|
if target_face is None:
|
|
target_face = get_one_face(processed_frame)
|
|
if target_face:
|
|
processed_frame = swap_face(source_face, target_face, processed_frame)
|
|
if hasattr(target_face, "bbox") and target_face.bbox is not None:
|
|
swapped_face_bboxes.append(target_face.bbox.astype(int))
|
|
|
|
final_frame = apply_post_processing(processed_frame, swapped_face_bboxes)
|
|
return final_frame
|
|
|
|
|
|
def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
|
|
"""Handles complex mapping scenarios (map_faces=True) and live streams."""
|
|
if getattr(modules.globals, "opacity", 1.0) == 0:
|
|
# If opacity is 0, no swap happens, so no post-processing needed.
|
|
# Also reset interpolation state if it was active.
|
|
global PREVIOUS_FRAME_RESULT
|
|
PREVIOUS_FRAME_RESULT = None
|
|
return temp_frame
|
|
|
|
processed_frame = temp_frame # Start with the input frame
|
|
swapped_face_bboxes = [] # Keep track of where swaps happened
|
|
|
|
# Determine source/target pairs based on mode
|
|
source_target_pairs = []
|
|
|
|
# Ensure maps exist before accessing them
|
|
source_target_map = getattr(modules.globals, "source_target_map", None)
|
|
simple_map = getattr(modules.globals, "simple_map", None)
|
|
|
|
# Check if target is a file path (image or video) or live stream
|
|
is_file_target = modules.globals.target_path and (is_image(modules.globals.target_path) or is_video(modules.globals.target_path))
|
|
|
|
if is_file_target:
|
|
# Processing specific image or video file with pre-analyzed maps
|
|
if source_target_map:
|
|
if modules.globals.many_faces:
|
|
source_face = default_source_face() # Use default source for all targets
|
|
if source_face:
|
|
for map_data in source_target_map:
|
|
if is_image(modules.globals.target_path):
|
|
target_info = map_data.get("target", {})
|
|
if target_info: # Check if target info exists
|
|
target_face = target_info.get("face")
|
|
if target_face:
|
|
source_target_pairs.append((source_face, target_face))
|
|
elif is_video(modules.globals.target_path):
|
|
# Find faces for the current frame_path in video map
|
|
target_frames_data = map_data.get("target_faces_in_frame", [])
|
|
if target_frames_data: # Check if frame data exists
|
|
target_frames = [f for f in target_frames_data if f and f.get("location") == temp_frame_path]
|
|
for frame_data in target_frames:
|
|
faces_in_frame = frame_data.get("faces", [])
|
|
if faces_in_frame: # Check if faces exist
|
|
for target_face in faces_in_frame:
|
|
source_target_pairs.append((source_face, target_face))
|
|
else: # Single face or specific mapping
|
|
for map_data in source_target_map:
|
|
source_info = map_data.get("source", {})
|
|
if not source_info: continue # Skip if no source info
|
|
source_face = source_info.get("face")
|
|
if not source_face: continue # Skip if no source defined for this map entry
|
|
|
|
if is_image(modules.globals.target_path):
|
|
target_info = map_data.get("target", {})
|
|
if target_info:
|
|
target_face = target_info.get("face")
|
|
if target_face:
|
|
source_target_pairs.append((source_face, target_face))
|
|
elif is_video(modules.globals.target_path):
|
|
target_frames_data = map_data.get("target_faces_in_frame", [])
|
|
if target_frames_data:
|
|
target_frames = [f for f in target_frames_data if f and f.get("location") == temp_frame_path]
|
|
for frame_data in target_frames:
|
|
faces_in_frame = frame_data.get("faces", [])
|
|
if faces_in_frame:
|
|
for target_face in faces_in_frame:
|
|
source_target_pairs.append((source_face, target_face))
|
|
|
|
else:
|
|
# Live stream or webcam processing (analyze faces on the fly)
|
|
detected_faces = get_many_faces(processed_frame)
|
|
if detected_faces:
|
|
if modules.globals.many_faces:
|
|
source_face = default_source_face() # Use default source for all detected targets
|
|
if source_face:
|
|
for target_face in detected_faces:
|
|
source_target_pairs.append((source_face, target_face))
|
|
elif simple_map:
|
|
# Use simple_map (source_faces <-> target_embeddings)
|
|
source_faces = simple_map.get("source_faces", [])
|
|
target_embeddings = simple_map.get("target_embeddings", [])
|
|
|
|
if source_faces and target_embeddings and len(source_faces) == len(target_embeddings):
|
|
# Match detected faces to the closest target embedding
|
|
if len(detected_faces) <= len(target_embeddings):
|
|
# More targets defined than detected - match each detected face
|
|
for detected_face in detected_faces:
|
|
if detected_face.normed_embedding is None: continue
|
|
closest_idx, _ = find_closest_centroid(target_embeddings, detected_face.normed_embedding)
|
|
if 0 <= closest_idx < len(source_faces):
|
|
source_target_pairs.append((source_faces[closest_idx], detected_face))
|
|
else:
|
|
# More faces detected than targets defined - match each target embedding to closest detected face
|
|
detected_embeddings = [f.normed_embedding for f in detected_faces if f.normed_embedding is not None]
|
|
detected_faces_with_embedding = [f for f in detected_faces if f.normed_embedding is not None]
|
|
if not detected_embeddings: return processed_frame # No embeddings to match
|
|
|
|
for i, target_embedding in enumerate(target_embeddings):
|
|
if 0 <= i < len(source_faces): # Ensure source face exists for this embedding
|
|
closest_idx, _ = find_closest_centroid(detected_embeddings, target_embedding)
|
|
if 0 <= closest_idx < len(detected_faces_with_embedding):
|
|
source_target_pairs.append((source_faces[i], detected_faces_with_embedding[closest_idx]))
|
|
else: # Fallback: if no map, use default source for the single detected face (if any)
|
|
source_face = default_source_face()
|
|
target_face = get_one_face(processed_frame, detected_faces) # Use faces already detected
|
|
if source_face and target_face:
|
|
source_target_pairs.append((source_face, target_face))
|
|
|
|
|
|
# Perform swaps based on the collected pairs
|
|
current_swap_target = processed_frame.copy() # Apply swaps sequentially
|
|
for source_face, target_face in source_target_pairs:
|
|
if source_face and target_face:
|
|
current_swap_target = swap_face(source_face, target_face, current_swap_target)
|
|
if target_face is not None and hasattr(target_face, "bbox") and target_face.bbox is not None:
|
|
swapped_face_bboxes.append(target_face.bbox.astype(int))
|
|
processed_frame = current_swap_target # Assign final result
|
|
|
|
|
|
# Apply sharpening and interpolation
|
|
final_frame = apply_post_processing(processed_frame, swapped_face_bboxes)
|
|
|
|
return final_frame
|
|
|
|
|
|
def process_frames(
|
|
source_path: str, temp_frame_paths: List[str], progress: Any = None
|
|
) -> None:
|
|
"""
|
|
Processes a list of frame paths (typically for video).
|
|
Optimized with better memory management and caching.
|
|
Iterates through frames, applies the appropriate swapping logic based on globals,
|
|
and saves the result back to the frame path. Handles multi-threading via caller.
|
|
"""
|
|
# Determine which processing function to use based on map_faces global setting
|
|
use_v2 = getattr(modules.globals, "map_faces", False)
|
|
source_face = None # Initialize source_face
|
|
|
|
# --- Pre-load source face only if needed (Simple Mode: map_faces=False) ---
|
|
if not use_v2:
|
|
if not source_path or not os.path.exists(source_path):
|
|
update_status(f"Error: Source path invalid or not provided for simple mode: {source_path}", NAME)
|
|
# Log the error but allow proceeding; subsequent check will stop processing.
|
|
else:
|
|
try:
|
|
source_img = cv2.imread(source_path)
|
|
if source_img is None:
|
|
# Specific error for file reading failure
|
|
update_status(f"Error reading source image file {source_path}. Please check the path and file integrity.", NAME)
|
|
else:
|
|
source_face = get_one_face(source_img)
|
|
if source_face is None:
|
|
# Specific message for no face detected after successful read
|
|
update_status(f"Warning: Successfully read source image {source_path}, but no face was detected. Swaps will be skipped.", NAME)
|
|
# Free memory immediately after extracting face
|
|
del source_img
|
|
except Exception as e:
|
|
# Print the specific exception caught
|
|
import traceback
|
|
print(f"{NAME}: Caught exception during source image processing for {source_path}:")
|
|
traceback.print_exc() # Print the full traceback
|
|
update_status(f"Error during source image reading or analysis {source_path}: {e}", NAME)
|
|
# Log general exception during the process
|
|
|
|
total_frames = len(temp_frame_paths)
|
|
# update_status(f"Processing {total_frames} frames. Use V2 (map_faces): {use_v2}", NAME) # Optional Debug
|
|
|
|
# --- Stop processing entirely if in Simple Mode and source face is invalid ---
|
|
if not use_v2 and source_face is None:
|
|
update_status(f"Halting video processing: Invalid or no face detected in source image for simple mode.", NAME)
|
|
if progress:
|
|
# Ensure the progress bar completes if it was started
|
|
remaining_updates = total_frames - progress.n if hasattr(progress, 'n') else total_frames
|
|
if remaining_updates > 0:
|
|
progress.update(remaining_updates)
|
|
return # Exit the function entirely
|
|
|
|
# --- Process each frame path provided in the list ---
|
|
# Note: In the current core.py multi_process_frame, temp_frame_paths will usually contain only ONE path per call.
|
|
for i, temp_frame_path in enumerate(temp_frame_paths):
|
|
# update_status(f"Processing frame {i+1}/{total_frames}: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
|
|
|
|
# Read the target frame
|
|
temp_frame = None
|
|
try:
|
|
temp_frame = cv2.imread(temp_frame_path)
|
|
if temp_frame is None:
|
|
print(f"{NAME}: Error: Could not read frame: {temp_frame_path}, skipping.")
|
|
if progress: progress.update(1)
|
|
continue # Skip this frame if read fails
|
|
except Exception as read_e:
|
|
print(f"{NAME}: Error reading frame {temp_frame_path}: {read_e}, skipping.")
|
|
if progress: progress.update(1)
|
|
continue
|
|
|
|
# Select processing function and execute
|
|
result_frame = None
|
|
try:
|
|
if use_v2:
|
|
# V2 uses global maps and needs the frame path for lookup in video mode
|
|
# update_status(f"Using process_frame_v2 for: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
|
|
result_frame = process_frame_v2(temp_frame, temp_frame_path)
|
|
else:
|
|
# Simple mode uses the pre-loaded source_face (already checked for validity above)
|
|
# update_status(f"Using process_frame (simple) for: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
|
|
result_frame = process_frame(source_face, temp_frame) # source_face is guaranteed to be valid here
|
|
|
|
# Check if processing actually returned a frame
|
|
if result_frame is None:
|
|
print(f"{NAME}: Warning: Processing returned None for frame {temp_frame_path}. Using original.")
|
|
result_frame = temp_frame
|
|
|
|
except Exception as proc_e:
|
|
print(f"{NAME}: Error processing frame {temp_frame_path}: {proc_e}")
|
|
# import traceback # Optional for detailed debugging
|
|
# traceback.print_exc()
|
|
result_frame = temp_frame # Use original frame on processing error
|
|
|
|
# Write the result back to the same frame path with optimized compression
|
|
try:
|
|
# Use PNG compression level 3 (faster) instead of default 9
|
|
write_success = cv2.imwrite(temp_frame_path, result_frame, [cv2.IMWRITE_PNG_COMPRESSION, 3])
|
|
if not write_success:
|
|
print(f"{NAME}: Error: Failed to write processed frame to {temp_frame_path}")
|
|
except Exception as write_e:
|
|
print(f"{NAME}: Error writing frame {temp_frame_path}: {write_e}")
|
|
|
|
# Free memory immediately after processing
|
|
del temp_frame
|
|
if result_frame is not None:
|
|
del result_frame
|
|
|
|
# Update progress bar
|
|
if progress:
|
|
progress.update(1)
|
|
# else: # Basic console progress (optional)
|
|
# if (i + 1) % 10 == 0 or (i + 1) == total_frames: # Update every 10 frames or on last frame
|
|
# update_status(f"Processed frame {i+1}/{total_frames}", NAME)
|
|
|
|
|
|
def process_image(source_path: str, target_path: str, output_path: str) -> None:
|
|
"""Processes a single target image."""
|
|
# --- Reset interpolation state for single image processing ---
|
|
global PREVIOUS_FRAME_RESULT
|
|
PREVIOUS_FRAME_RESULT = None
|
|
# ---
|
|
|
|
use_v2 = getattr(modules.globals, "map_faces", False)
|
|
|
|
# Read target first
|
|
try:
|
|
target_frame = cv2.imread(target_path)
|
|
if target_frame is None:
|
|
update_status(f"Error: Could not read target image: {target_path}", NAME)
|
|
return
|
|
except Exception as read_e:
|
|
update_status(f"Error reading target image {target_path}: {read_e}", NAME)
|
|
return
|
|
|
|
result = None
|
|
try:
|
|
if use_v2:
|
|
if getattr(modules.globals, "many_faces", False):
|
|
update_status("Processing image with 'map_faces' and 'many_faces'. Using pre-analysis map.", NAME)
|
|
# V2 processes based on global maps, doesn't need source_path here directly
|
|
# Assumes maps are pre-populated. Pass target_path for map lookup.
|
|
result = process_frame_v2(target_frame, target_path)
|
|
|
|
else: # Simple mode
|
|
try:
|
|
source_img = cv2.imread(source_path)
|
|
if source_img is None:
|
|
update_status(f"Error: Could not read source image: {source_path}", NAME)
|
|
return
|
|
source_face = get_one_face(source_img)
|
|
if not source_face:
|
|
update_status(f"Error: No face found in source image: {source_path}", NAME)
|
|
return
|
|
except Exception as src_e:
|
|
update_status(f"Error reading or analyzing source image {source_path}: {src_e}", NAME)
|
|
return
|
|
|
|
result = process_frame(source_face, target_frame)
|
|
|
|
# Write the result if processing was successful
|
|
if result is not None:
|
|
write_success = cv2.imwrite(output_path, result)
|
|
if write_success:
|
|
update_status(f"Output image saved to: {output_path}", NAME)
|
|
else:
|
|
update_status(f"Error: Failed to write output image to {output_path}", NAME)
|
|
else:
|
|
# This case might occur if process_frame/v2 returns None unexpectedly
|
|
update_status("Image processing failed (result was None).", NAME)
|
|
|
|
except Exception as proc_e:
|
|
update_status(f"Error during image processing: {proc_e}", NAME)
|
|
# import traceback
|
|
# traceback.print_exc()
|
|
|
|
|
|
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
|
|
"""Sets up and calls the frame processing for video."""
|
|
# --- Reset interpolation state before starting video processing ---
|
|
global PREVIOUS_FRAME_RESULT
|
|
PREVIOUS_FRAME_RESULT = None
|
|
# ---
|
|
|
|
mode_desc = "'map_faces'" if getattr(modules.globals, "map_faces", False) else "'simple'"
|
|
if getattr(modules.globals, "map_faces", False) and getattr(modules.globals, "many_faces", False):
|
|
mode_desc += " and 'many_faces'. Using pre-analysis map."
|
|
update_status(f"Processing video with {mode_desc} mode.", NAME)
|
|
|
|
# Pass the correct source_path (needed for simple mode in process_frames)
|
|
# The core processing logic handles calling the right frame function (process_frames)
|
|
modules.processors.frame.core.process_video(
|
|
source_path, temp_frame_paths, process_frames # Pass the newly modified process_frames
|
|
)
|
|
|
|
# ==========================
|
|
# MASKING FUNCTIONS (Mostly unchanged, added safety checks and minor improvements)
|
|
# ==========================
|
|
|
|
def create_lower_mouth_mask(
|
|
face: Face, frame: Frame
|
|
) -> (np.ndarray, np.ndarray, tuple, np.ndarray):
|
|
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
|
|
mouth_cutout = None
|
|
lower_lip_polygon = None # Initialize
|
|
mouth_box = (0,0,0,0) # Initialize
|
|
|
|
# Validate face and landmarks
|
|
if face is None or not hasattr(face, 'landmark_2d_106'):
|
|
# print("Warning: Invalid face object passed to create_lower_mouth_mask.")
|
|
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
|
|
|
landmarks = face.landmark_2d_106
|
|
|
|
# Check landmark validity
|
|
if landmarks is None or not isinstance(landmarks, np.ndarray) or landmarks.shape[0] < 106:
|
|
# print("Warning: Invalid or insufficient landmarks for mouth mask.")
|
|
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
|
|
|
try: # Wrap main logic in try-except
|
|
# Use outer mouth landmarks (52-71) to capture the full mouth area
|
|
# This covers both upper and lower lips for proper mouth preservation
|
|
lower_lip_order = list(range(52, 72))
|
|
|
|
# Check if all indices are valid for the loaded landmarks (already partially done by < 106 check)
|
|
if max(lower_lip_order) >= landmarks.shape[0]:
|
|
# print(f"Warning: Landmark index {max(lower_lip_order)} out of bounds for shape {landmarks.shape[0]}.")
|
|
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
|
|
|
lower_lip_landmarks = landmarks[lower_lip_order].astype(np.float32)
|
|
|
|
# Filter out potential NaN or Inf values in landmarks
|
|
if not np.all(np.isfinite(lower_lip_landmarks)):
|
|
# print("Warning: Non-finite values detected in lower lip landmarks.")
|
|
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
|
|
|
center = np.mean(lower_lip_landmarks, axis=0)
|
|
if not np.all(np.isfinite(center)): # Check center calculation
|
|
# print("Warning: Could not calculate valid center for mouth mask.")
|
|
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
|
|
|
|
|
mouth_mask_size = getattr(modules.globals, "mouth_mask_size", 0.0) # 0-100 slider
|
|
# 0=tight lip outline, 50=covers mouth area, 100=mouth to chin
|
|
expansion_factor = 1 + (mouth_mask_size / 100.0) * 2.5
|
|
|
|
# Expand landmarks from center, with extra downward bias toward chin
|
|
offsets = lower_lip_landmarks - center
|
|
# Add extra downward expansion for points below center (toward chin)
|
|
chin_bias = 1 + (mouth_mask_size / 100.0) * 1.5 # extra vertical stretch downward
|
|
scale_y = np.where(offsets[:, 1] > 0, expansion_factor * chin_bias, expansion_factor)
|
|
expanded_landmarks = lower_lip_landmarks.copy()
|
|
expanded_landmarks[:, 0] = center[0] + offsets[:, 0] * expansion_factor
|
|
expanded_landmarks[:, 1] = center[1] + offsets[:, 1] * scale_y
|
|
|
|
# Ensure landmarks are finite after adjustments
|
|
if not np.all(np.isfinite(expanded_landmarks)):
|
|
# print("Warning: Non-finite values detected after expanding landmarks.")
|
|
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
|
|
|
expanded_landmarks = expanded_landmarks.astype(np.int32)
|
|
|
|
min_x, min_y = np.min(expanded_landmarks, axis=0)
|
|
max_x, max_y = np.max(expanded_landmarks, axis=0)
|
|
|
|
# Add padding *after* initial min/max calculation
|
|
padding_ratio = 0.1 # Percentage padding
|
|
padding_x = int((max_x - min_x) * padding_ratio)
|
|
padding_y = int((max_y - min_y) * padding_ratio) # Use y-range for y-padding
|
|
|
|
# Apply padding and clamp to frame boundaries
|
|
frame_h, frame_w = frame.shape[:2]
|
|
min_x = max(0, min_x - padding_x)
|
|
min_y = max(0, min_y - padding_y)
|
|
max_x = min(frame_w, max_x + padding_x)
|
|
max_y = min(frame_h, max_y + padding_y)
|
|
|
|
|
|
if max_x > min_x and max_y > min_y:
|
|
# Create the mask ROI
|
|
mask_roi_h = max_y - min_y
|
|
mask_roi_w = max_x - min_x
|
|
mask_roi = np.zeros((mask_roi_h, mask_roi_w), dtype=np.uint8)
|
|
|
|
# Shift polygon coordinates relative to the ROI's top-left corner
|
|
polygon_relative_to_roi = expanded_landmarks - [min_x, min_y]
|
|
|
|
# Draw polygon on the ROI mask
|
|
cv2.fillPoly(mask_roi, [polygon_relative_to_roi], 255)
|
|
|
|
# Apply Gaussian blur (GPU-accelerated when available)
|
|
blur_k_size = getattr(modules.globals, "mask_blur_kernel", 15) # Default 15
|
|
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd
|
|
mask_roi = gpu_gaussian_blur(mask_roi, (blur_k_size, blur_k_size), 0)
|
|
|
|
# Place the mask ROI in the full-sized mask
|
|
mask[min_y:max_y, min_x:max_x] = mask_roi
|
|
|
|
# Extract the masked area from the *original* frame
|
|
mouth_cutout = frame[min_y:max_y, min_x:max_x].copy()
|
|
|
|
lower_lip_polygon = expanded_landmarks # Return polygon in original frame coords
|
|
mouth_box = (min_x, min_y, max_x, max_y) # Return the calculated box
|
|
else:
|
|
# print("Warning: Invalid mouth mask bounding box after padding/clamping.") # Optional debug
|
|
pass
|
|
|
|
except IndexError as idx_e:
|
|
# print(f"Warning: Landmark index out of bounds during mouth mask creation: {idx_e}") # Optional debug
|
|
pass
|
|
except Exception as e:
|
|
print(f"Error in create_lower_mouth_mask: {e}") # Print unexpected errors
|
|
# import traceback
|
|
# traceback.print_exc()
|
|
pass
|
|
|
|
# Return values, ensuring defaults if errors occurred
|
|
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
|
|
|
|
|
def draw_mouth_mask_visualization(
|
|
frame: Frame, face: Face, mouth_mask_data: tuple
|
|
) -> Frame:
|
|
|
|
# Validate inputs
|
|
if frame is None or face is None or mouth_mask_data is None or len(mouth_mask_data) != 4:
|
|
return frame # Return original frame if inputs are invalid
|
|
|
|
mask, mouth_cutout, box, lower_lip_polygon = mouth_mask_data
|
|
(min_x, min_y, max_x, max_y) = box
|
|
|
|
# Check if polygon is valid for drawing
|
|
if lower_lip_polygon is None or not isinstance(lower_lip_polygon, np.ndarray) or len(lower_lip_polygon) < 3:
|
|
return frame # Cannot draw without a valid polygon
|
|
|
|
vis_frame = frame.copy()
|
|
height, width = vis_frame.shape[:2]
|
|
|
|
# Ensure box coordinates are valid integers within frame bounds
|
|
try:
|
|
min_x, min_y = max(0, int(min_x)), max(0, int(min_y))
|
|
max_x, max_y = min(width, int(max_x)), min(height, int(max_y))
|
|
except ValueError:
|
|
# print("Warning: Invalid coordinates for mask visualization box.")
|
|
return frame
|
|
|
|
if max_x <= min_x or max_y <= min_y:
|
|
return frame # Invalid box
|
|
|
|
# Draw the lower lip polygon (green outline)
|
|
try:
|
|
# Ensure polygon points are within frame boundaries before drawing
|
|
safe_polygon = lower_lip_polygon.copy()
|
|
safe_polygon[:, 0] = np.clip(safe_polygon[:, 0], 0, width - 1)
|
|
safe_polygon[:, 1] = np.clip(safe_polygon[:, 1], 0, height - 1)
|
|
cv2.polylines(vis_frame, [safe_polygon.astype(np.int32)], isClosed=True, color=(0, 255, 0), thickness=2)
|
|
except Exception as e:
|
|
print(f"Error drawing polygon for visualization: {e}") # Optional debug
|
|
pass
|
|
|
|
# Draw bounding box (red rectangle)
|
|
cv2.rectangle(vis_frame, (min_x, min_y), (max_x, max_y), (0, 0, 255), 2)
|
|
|
|
# Optional: Add labels
|
|
label_pos_y = min_y - 10 if min_y > 20 else max_y + 15 # Adjust position based on box location
|
|
label_pos_x = min_x
|
|
try:
|
|
cv2.putText(vis_frame, "Mouth Mask", (label_pos_x, label_pos_y),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
|
|
except Exception as e:
|
|
# print(f"Error drawing text for visualization: {e}") # Optional debug
|
|
pass
|
|
|
|
|
|
return vis_frame
|
|
|
|
|
|
def apply_mouth_area(
|
|
frame: np.ndarray,
|
|
mouth_cutout: np.ndarray,
|
|
mouth_box: tuple,
|
|
face_mask: np.ndarray, # Full face mask (for blending edges)
|
|
mouth_polygon: np.ndarray, # Specific polygon for the mouth area itself
|
|
) -> np.ndarray:
|
|
|
|
# Basic validation
|
|
if (frame is None or mouth_cutout is None or mouth_box is None or
|
|
face_mask is None or mouth_polygon is None):
|
|
# print("Warning: Invalid input (None value) to apply_mouth_area") # Optional debug
|
|
return frame
|
|
if (mouth_cutout.size == 0 or face_mask.size == 0 or len(mouth_polygon) < 3):
|
|
# print("Warning: Invalid input (empty array/polygon) to apply_mouth_area") # Optional debug
|
|
return frame
|
|
|
|
try: # Wrap main logic in try-except
|
|
min_x, min_y, max_x, max_y = map(int, mouth_box) # Ensure integer coords
|
|
box_width = max_x - min_x
|
|
box_height = max_y - min_y
|
|
|
|
# Check box validity
|
|
if box_width <= 0 or box_height <= 0:
|
|
# print("Warning: Invalid mouth box dimensions in apply_mouth_area.")
|
|
return frame
|
|
|
|
# Define the Region of Interest (ROI) on the target frame (swapped frame)
|
|
frame_h, frame_w = frame.shape[:2]
|
|
# Clamp coordinates strictly within frame boundaries
|
|
min_y, max_y = max(0, min_y), min(frame_h, max_y)
|
|
min_x, max_x = max(0, min_x), min(frame_w, max_x)
|
|
|
|
# Recalculate box dimensions based on clamped coords
|
|
box_width = max_x - min_x
|
|
box_height = max_y - min_y
|
|
if box_width <= 0 or box_height <= 0:
|
|
# print("Warning: ROI became invalid after clamping in apply_mouth_area.")
|
|
return frame # ROI is invalid
|
|
|
|
roi = frame[min_y:max_y, min_x:max_x]
|
|
|
|
# Ensure ROI extraction was successful
|
|
if roi.size == 0:
|
|
# print("Warning: Extracted ROI is empty in apply_mouth_area.")
|
|
return frame
|
|
|
|
# Resize mouth cutout from original frame to fit the ROI size
|
|
resized_mouth_cutout = None
|
|
if roi.shape[:2] != mouth_cutout.shape[:2]:
|
|
# Check if mouth_cutout has valid dimensions before resizing
|
|
if mouth_cutout.shape[0] > 0 and mouth_cutout.shape[1] > 0:
|
|
resized_mouth_cutout = gpu_resize(mouth_cutout, (box_width, box_height), interpolation=cv2.INTER_LINEAR)
|
|
else:
|
|
# print("Warning: mouth_cutout has invalid dimensions, cannot resize.")
|
|
return frame # Cannot proceed without valid cutout
|
|
else:
|
|
resized_mouth_cutout = mouth_cutout
|
|
|
|
# If resize failed or original was invalid
|
|
if resized_mouth_cutout is None or resized_mouth_cutout.size == 0:
|
|
# print("Warning: Mouth cutout is invalid after resize attempt.")
|
|
return frame
|
|
|
|
# --- Mask Creation ---
|
|
# Create a mask based on the mouth_polygon, relative to the ROI
|
|
polygon_mask_roi = np.zeros(roi.shape[:2], dtype=np.uint8)
|
|
adjusted_polygon = mouth_polygon - [min_x, min_y]
|
|
cv2.fillPoly(polygon_mask_roi, [adjusted_polygon.astype(np.int32)], 255)
|
|
|
|
# Feather the edges with Gaussian blur for smooth blending
|
|
feather_amount = max(1, min(30, min(box_width, box_height) // 8))
|
|
kernel_size = 2 * feather_amount + 1
|
|
feathered_mask = cv2.GaussianBlur(polygon_mask_roi.astype(np.float32), (kernel_size, kernel_size), 0)
|
|
|
|
# Normalize to [0.0, 1.0]
|
|
max_val = feathered_mask.max()
|
|
if max_val > 1e-6:
|
|
feathered_mask = feathered_mask / max_val
|
|
else:
|
|
feathered_mask.fill(0.0)
|
|
|
|
# --- Blending: paste original mouth onto swapped face ---
|
|
if len(frame.shape) == 3 and frame.shape[2] == 3:
|
|
mask_3ch = feathered_mask[:, :, np.newaxis].astype(np.float32)
|
|
inv_mask = 1.0 - mask_3ch
|
|
|
|
# Blend: (original_mouth * mask) + (swapped_face * (1 - mask))
|
|
blended_roi = (resized_mouth_cutout.astype(np.float32) * mask_3ch +
|
|
roi.astype(np.float32) * inv_mask)
|
|
|
|
frame[min_y:max_y, min_x:max_x] = np.clip(blended_roi, 0, 255).astype(np.uint8)
|
|
|
|
except Exception as e:
|
|
print(f"Error applying mouth area: {e}") # Optional debug
|
|
# import traceback
|
|
# traceback.print_exc()
|
|
pass # Don't crash, just return the frame as is
|
|
|
|
return frame
|
|
|
|
|
|
def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
|
"""Creates a feathered mask covering the whole face area based on landmarks."""
|
|
mask = np.zeros(frame.shape[:2], dtype=np.uint8) # Start with uint8
|
|
|
|
# Validate inputs
|
|
if face is None or not hasattr(face, 'landmark_2d_106') or frame is None:
|
|
# print("Warning: Invalid face or frame for create_face_mask.")
|
|
return mask # Return empty mask
|
|
|
|
landmarks = face.landmark_2d_106
|
|
if landmarks is None or not isinstance(landmarks, np.ndarray) or landmarks.shape[0] < 106:
|
|
# print("Warning: Invalid or insufficient landmarks for face mask.")
|
|
return mask # Return empty mask
|
|
|
|
try: # Wrap main logic in try-except
|
|
# Filter out non-finite landmark values
|
|
if not np.all(np.isfinite(landmarks)):
|
|
# print("Warning: Non-finite values detected in landmarks for face mask.")
|
|
return mask
|
|
|
|
landmarks_int = landmarks.astype(np.int32)
|
|
|
|
# Use standard face outline landmarks (0-32)
|
|
# Use standard face outline (0-32)
|
|
face_outline = landmarks_int[0:33]
|
|
|
|
# Estimate forehead points to ensure mask covers the whole face (including forehead)
|
|
# This is critical for Poisson blending to work correctly on the forehead
|
|
eyebrows = landmarks_int[33:43]
|
|
if eyebrows.shape[0] > 0:
|
|
chin = landmarks_int[16]
|
|
eyebrow_center = np.mean(eyebrows, axis=0)
|
|
|
|
# Vector from chin to eyebrows (upwards)
|
|
up_vector = eyebrow_center - chin
|
|
norm = np.linalg.norm(up_vector)
|
|
if norm > 0:
|
|
up_vector /= norm
|
|
|
|
# Extend upwards by 1.0 of the chin-to-eyebrow distance (aggressive coverage)
|
|
# This ensures the mask covers the entire forehead for proper blending
|
|
forehead_offset = up_vector * (norm * 1.0)
|
|
|
|
# Shift eyebrows up to create forehead points
|
|
forehead_points = eyebrows + forehead_offset
|
|
|
|
# Expand the top points slightly outwards to cover forehead corners
|
|
# Calculate the center of the new top points
|
|
top_center = np.mean(forehead_points, axis=0)
|
|
|
|
# Expand outwards by 20%
|
|
forehead_points = (forehead_points - top_center) * 1.2 + top_center
|
|
|
|
# Combine outline and forehead points
|
|
face_outline = np.concatenate((face_outline, forehead_points.astype(np.int32)), axis=0)
|
|
|
|
# Calculate convex hull of these points
|
|
# Use try-except as convexHull can fail on degenerate input
|
|
try:
|
|
hull = cv2.convexHull(face_outline.astype(np.float32)) # Use float for accuracy
|
|
if hull is None or len(hull) < 3:
|
|
# print("Warning: Convex hull calculation failed or returned too few points.")
|
|
# Fallback: use bounding box of landmarks? Or just return empty mask?
|
|
return mask
|
|
|
|
# Draw the filled convex hull on the mask
|
|
cv2.fillConvexPoly(mask, hull.astype(np.int32), 255)
|
|
except Exception as hull_e:
|
|
print(f"Error creating convex hull for face mask: {hull_e}")
|
|
return mask # Return empty mask on error
|
|
|
|
|
|
# Apply Gaussian blur to feather the mask edges (GPU-accelerated when available)
|
|
blur_k_size = getattr(modules.globals, "face_mask_blur", 31) # Default 31
|
|
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd and positive
|
|
mask = gpu_gaussian_blur(mask, (blur_k_size, blur_k_size), 0)
|
|
|
|
# --- Optional: Return float mask for apply_mouth_area ---
|
|
# mask = mask.astype(float) / 255.0
|
|
# ---
|
|
|
|
except IndexError:
|
|
# print("Warning: Landmark index out of bounds for face mask.") # Optional debug
|
|
pass
|
|
except Exception as e:
|
|
print(f"Error creating face mask: {e}") # Print unexpected errors
|
|
# import traceback
|
|
# traceback.print_exc()
|
|
pass
|
|
|
|
return mask # Return uint8 mask
|
|
|
|
|
|
def apply_color_transfer(source, target):
|
|
"""
|
|
Apply color transfer using LAB color space. Handles potential division by zero and ensures output is uint8.
|
|
"""
|
|
# Input validation
|
|
if source is None or target is None or source.size == 0 or target.size == 0:
|
|
# print("Warning: Invalid input to apply_color_transfer.")
|
|
return source # Return original source if invalid input
|
|
|
|
# Ensure images are 3-channel BGR uint8
|
|
if len(source.shape) != 3 or source.shape[2] != 3 or source.dtype != np.uint8:
|
|
# print("Warning: Source image for color transfer is not uint8 BGR.")
|
|
# Attempt conversion if possible, otherwise return original
|
|
try:
|
|
if len(source.shape) == 2: # Grayscale
|
|
source = cv2.cvtColor(source, cv2.COLOR_GRAY2BGR)
|
|
source = np.clip(source, 0, 255).astype(np.uint8)
|
|
if len(source.shape)!= 3 or source.shape[2]!= 3: raise ValueError("Conversion failed")
|
|
except Exception:
|
|
return source
|
|
if len(target.shape) != 3 or target.shape[2] != 3 or target.dtype != np.uint8:
|
|
# print("Warning: Target image for color transfer is not uint8 BGR.")
|
|
try:
|
|
if len(target.shape) == 2: # Grayscale
|
|
target = cv2.cvtColor(target, cv2.COLOR_GRAY2BGR)
|
|
target = np.clip(target, 0, 255).astype(np.uint8)
|
|
if len(target.shape)!= 3 or target.shape[2]!= 3: raise ValueError("Conversion failed")
|
|
except Exception:
|
|
return source # Return original source if target invalid
|
|
|
|
result_bgr = source # Default to original source in case of errors
|
|
|
|
try:
|
|
# Convert to float32 [0, 1] range for LAB conversion
|
|
source_float = source.astype(np.float32) / 255.0
|
|
target_float = target.astype(np.float32) / 255.0
|
|
|
|
source_lab = cv2.cvtColor(source_float, cv2.COLOR_BGR2LAB)
|
|
target_lab = cv2.cvtColor(target_float, cv2.COLOR_BGR2LAB)
|
|
|
|
# Compute statistics
|
|
source_mean, source_std = cv2.meanStdDev(source_lab)
|
|
target_mean, target_std = cv2.meanStdDev(target_lab)
|
|
|
|
# Reshape for broadcasting
|
|
source_mean = source_mean.reshape((1, 1, 3))
|
|
source_std = source_std.reshape((1, 1, 3))
|
|
target_mean = target_mean.reshape((1, 1, 3))
|
|
target_std = target_std.reshape((1, 1, 3))
|
|
|
|
# Avoid division by zero or very small std deviations (add epsilon)
|
|
epsilon = 1e-6
|
|
source_std = np.maximum(source_std, epsilon)
|
|
# target_std = np.maximum(target_std, epsilon) # Target std can be small
|
|
|
|
# Perform color transfer in LAB space
|
|
result_lab = (source_lab - source_mean) * (target_std / source_std) + target_mean
|
|
|
|
# --- No explicit clipping needed in LAB space typically ---
|
|
# Clipping is handled implicitly by the conversion back to BGR and then to uint8
|
|
|
|
# Convert back to BGR float [0, 1]
|
|
result_bgr_float = cv2.cvtColor(result_lab, cv2.COLOR_LAB2BGR)
|
|
|
|
# Clip final BGR values to [0, 1] range before scaling to [0, 255]
|
|
result_bgr_float = np.clip(result_bgr_float, 0.0, 1.0)
|
|
|
|
# Convert back to uint8 [0, 255]
|
|
result_bgr = (result_bgr_float * 255.0).astype("uint8")
|
|
|
|
except cv2.error as e:
|
|
# print(f"OpenCV error during color transfer: {e}. Returning original source.") # Optional debug
|
|
return source # Return original source if conversion fails
|
|
except Exception as e:
|
|
# print(f"Unexpected color transfer error: {e}. Returning original source.") # Optional debug
|
|
# import traceback
|
|
# traceback.print_exc()
|
|
return source
|
|
|
|
return result_bgr
|