* Rename calcXXX to calculateXXX

* Add migraphx support

* Add migraphx support

* Add migraphx support

* Add migraphx support

* Add migraphx support

* Add migraphx support

* Use True for the flags

* Add migraphx support

* add face-swapper-weight

* add face-swapper-weight to facefusion.ini

* changes

* change choice

* Fix typing for xxxWeight

* Feat/log inference session (#906)

* Log inference session, Introduce time helper

* Log inference session, Introduce time helper

* Log inference session, Introduce time helper

* Log inference session, Introduce time helper

* Mark as NEXT

* Follow industry standard x1, x2, y1 and y2

* Follow industry standard x1, x2, y1 and y2

* Follow industry standard in terms of naming (#908)

* Follow industry standard in terms of naming

* Improve xxx_embedding naming

* Fix norm vs. norms

* Reduce timeout to 5

* Sort out voice_extractor once again

* changes

* Introduce many to the occlusion mask (#910)

* Introduce many to the occlusion mask

* Then we use minimum

* Add support for wmv

* Run platform tests before has_execution_provider (#911)

* Add support for wmv

* Introduce benchmark mode (#912)

* Honestly makes no difference to me

* Honestly makes no difference to me

* Fix wording

* Bring back YuNet (#922)

* Reintroduce YuNet without cv2 dependency

* Fix variable naming

* Avoid RGB to YUV colorshift using libx264rgb

* Avoid RGB to YUV colorshift using libx264rgb

* Make libx264 the default again

* Make libx264 the default again

* Fix types in ffmpeg builder

* Fix quality stuff in ffmpeg builder

* Fix quality stuff in ffmpeg builder

* Add libx264rgb to test

* Revamp Processors (#923)

* Introduce new concept of pure target frames

* Radical refactoring of process flow

* Introduce new concept of pure target frames

* Fix webcam

* Minor improvements

* Minor improvements

* Use deque for video processing

* Use deque for video processing

* Extend the video manager

* Polish deque

* Polish deque

* Deque is not even used

* Improve speed with multiple futures

* Fix temp frame mutation and

* Fix RAM usage

* Remove old types and manage method

* Remove execution_queue_count

* Use init_state for benchmarker to avoid issues

* add voice extractor option

* Change the order of voice extractor in code

* Use official download urls

* Use official download urls

* add gui

* fix preview

* Add remote updates for voice extractor

* fix crash on headless-run

* update test_job_helper.py

* Fix it for good

* Remove pointless method

* Fix types and unused imports

* Revamp reference (#925)

* Initial revamp of face references

* Initial revamp of face references

* Initial revamp of face references

* Terminate find_similar_faces

* Improve find mutant faces

* Improve find mutant faces

* Move sort where it belongs

* Forward reference vision frame

* Forward reference vision frame also in preview

* Fix reference selection

* Use static video frame

* Fix CI

* Remove reference type from frame processors

* Improve some naming

* Fix types and unused imports

* Fix find mutant faces

* Fix find mutant faces

* Fix imports

* Correct naming

* Correct naming

* simplify pad

* Improve webcam performance on highres

* Camera manager (#932)

* Introduce webcam manager

* Fix order

* Rename to camera manager, improve video manager

* Fix CI

* Remove optional

* Fix naming in webcam options

* Avoid using temp faces (#933)

* output video scale

* Fix imports

* output image scale

* upscale fix (not limiter)

* add unit test scale_resolution & remove unused methods

* fix and add test

* fix

* change pack_resolution

* fix tests

* Simplify output scale testing

* Fix benchmark UI

* Fix benchmark UI

* Update dependencies

* Introduce REAL multi gpu support using multi dimensional inference pool (#935)

* Introduce REAL multi gpu support using multi dimensional inference pool

* Remove the MULTI:GPU flag

* Restore "processing stop"

* Restore "processing stop"

* Remove old templates

* Go fill in with caching

* add expression restorer areas

* re-arrange

* rename method

* Fix stop for extract frames and merge video

* Replace arcface_converter models with latest crossface models

* Replace arcface_converter models with latest crossface models

* Move module logs to debug mode

* Refactor/streamer (#938)

* Introduce webcam manager

* Fix order

* Rename to camera manager, improve video manager

* Fix CI

* Fix naming in webcam options

* Move logic over to streamer

* Fix streamer, improve webcam experience

* Improve webcam experience

* Revert method

* Revert method

* Improve webcam again

* Use release on capture instead

* Only forward valid frames

* Fix resolution logging

* Add AVIF support

* Add AVIF support

* Limit avif to unix systems

* Drop avif

* Drop avif

* Drop avif

* Default to Documents in the UI if output path is not set

* Update wording.py (#939)

"succeed" is grammatically incorrect in the given context. To succeed is the infinitive form of the verb. Correct would be either "succeeded" or alternatively a form involving the noun "success".

* Fix more grammar issue

* Fix more grammar issue

* Sort out caching

* Move webcam choices back to UI

* Move preview options to own file (#940)

* Fix Migraphx execution provider

* Fix benchmark

* Reuse blend frame method

* Fix CI

* Fix CI

* Fix CI

* Hotfix missing check in face debugger, Enable logger for preview

* Fix reference selection (#942)

* Fix reference selection

* Fix reference selection

* Fix reference selection

* Fix reference selection

* Side by side preview (#941)

* Initial side by side preview

* More work on preview, remove UI only stuff from vision.py

* Improve more

* Use fit frame

* Add different fit methods for vision

* Improve preview part2

* Improve preview part3

* Improve preview part4

* Remove none as choice

* Remove useless methods

* Fix CI

* Fix naming

* use 1024 as preview resolution default

* Fix fit_cover_frame

* Uniform fit_xxx_frame methods

* Add back disabled logger

* Use ui choices alias

* Extract select face logic from processors (#943)

* Extract select face logic from processors to use it for face by face in preview

* Fix order

* Remove old code

* Merge methods

* Refactor face debugger (#944)

* Refactor huge method of face debugger

* Remove text metrics from face debugger

* Remove useless copy of temp frame

* Resort methods

* Fix spacing

* Remove old method

* Fix hard exit to work without signals

* Prevent upscaling for face-by-face

* Switch to version

* Improve exiting

---------

Co-authored-by: harisreedhar <h4harisreedhar.s.s@gmail.com>
Co-authored-by: Harisreedhar <46858047+harisreedhar@users.noreply.github.com>
Co-authored-by: Rafael Tappe Maestro <rafael@tappemaestro.com>
This commit is contained in:
Henry Ruhs
2025-09-08 10:43:58 +02:00
committed by GitHub
parent 7b8bea4e0a
commit da0da3a4b4
97 changed files with 2113 additions and 1934 deletions

View File

@@ -1,6 +1,5 @@
from argparse import ArgumentParser
from functools import lru_cache
from typing import List
import cv2
import numpy
@@ -8,26 +7,23 @@ import numpy
import facefusion.choices
import facefusion.jobs.job_manager
import facefusion.jobs.job_store
import facefusion.processors.core as processors
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, process_manager, state_manager, video_manager, wording
from facefusion.common_helper import create_int_metavar
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, state_manager, video_manager, wording
from facefusion.common_helper import create_int_metavar, is_macos
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
from facefusion.execution import has_execution_provider
from facefusion.face_analyser import get_many_faces, get_one_face
from facefusion.face_helper import merge_matrix, paste_back, scale_face_landmark_5, warp_face_by_face_landmark_5
from facefusion.face_masker import create_box_mask, create_occlusion_mask
from facefusion.face_selector import find_similar_faces, sort_and_filter_faces
from facefusion.face_store import get_reference_faces
from facefusion.face_selector import select_faces
from facefusion.filesystem import in_directory, is_image, is_video, resolve_relative_path, same_file_extension
from facefusion.processors import choices as processors_choices
from facefusion.processors.types import AgeModifierDirection, AgeModifierInputs
from facefusion.program_helper import find_argument_group
from facefusion.thread_helper import thread_semaphore
from facefusion.types import ApplyStateItem, Args, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
from facefusion.vision import match_frame_color, read_image, read_static_image, write_image
from facefusion.types import ApplyStateItem, Args, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, VisionFrame
from facefusion.vision import match_frame_color, read_static_image, read_static_video_frame
@lru_cache(maxsize = None)
@lru_cache()
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
return\
{
@@ -115,6 +111,7 @@ def pre_process(mode : ProcessMode) -> bool:
def post_process() -> None:
read_static_image.cache_clear()
read_static_video_frame.cache_clear()
video_manager.clear_video_pool()
if state_manager.get_item('video_memory_strategy') in [ 'strict', 'moderate' ]:
clear_inference_pool()
@@ -143,8 +140,8 @@ def modify_age(target_face : Face, temp_vision_frame : VisionFrame) -> VisionFra
if 'occlusion' in state_manager.get_item('face_mask_types'):
occlusion_mask = create_occlusion_mask(crop_vision_frame)
combined_matrix = merge_matrix([ extend_affine_matrix, cv2.invertAffineTransform(affine_matrix) ])
occlusion_mask = cv2.warpAffine(occlusion_mask, combined_matrix, model_sizes.get('target_with_background'))
temp_matrix = merge_matrix([ extend_affine_matrix, cv2.invertAffineTransform(affine_matrix) ])
occlusion_mask = cv2.warpAffine(occlusion_mask, temp_matrix, model_sizes.get('target_with_background'))
crop_masks.append(occlusion_mask)
crop_vision_frame = prepare_vision_frame(crop_vision_frame)
@@ -164,7 +161,7 @@ def forward(crop_vision_frame : VisionFrame, extend_vision_frame : VisionFrame,
age_modifier = get_inference_pool().get('age_modifier')
age_modifier_inputs = {}
if has_execution_provider('coreml'):
if is_macos() and has_execution_provider('coreml'):
age_modifier.set_providers([ facefusion.choices.execution_provider_set.get('cpu') ])
for age_modifier_input in age_modifier.get_inputs():
@@ -199,56 +196,14 @@ def normalize_extend_frame(extend_vision_frame : VisionFrame) -> VisionFrame:
return extend_vision_frame
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
return modify_age(target_face, temp_vision_frame)
def process_frame(inputs : AgeModifierInputs) -> VisionFrame:
reference_faces = inputs.get('reference_faces')
reference_vision_frame = inputs.get('reference_vision_frame')
target_vision_frame = inputs.get('target_vision_frame')
many_faces = sort_and_filter_faces(get_many_faces([ target_vision_frame ]))
temp_vision_frame = inputs.get('temp_vision_frame')
target_faces = select_faces(reference_vision_frame, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'many':
if many_faces:
for target_face in many_faces:
target_vision_frame = modify_age(target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'one':
target_face = get_one_face(many_faces)
if target_face:
target_vision_frame = modify_age(target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'reference':
similar_faces = find_similar_faces(many_faces, reference_faces, state_manager.get_item('reference_face_distance'))
if similar_faces:
for similar_face in similar_faces:
target_vision_frame = modify_age(similar_face, target_vision_frame)
return target_vision_frame
if target_faces:
for target_face in target_faces:
temp_vision_frame = modify_age(target_face, temp_vision_frame)
def process_frames(source_path : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
for queue_payload in process_manager.manage(queue_payloads):
target_vision_path = queue_payload['frame_path']
target_vision_frame = read_image(target_vision_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'target_vision_frame': target_vision_frame
})
write_image(target_vision_path, output_vision_frame)
update_progress(1)
def process_image(source_path : str, target_path : str, output_path : str) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
target_vision_frame = read_static_image(target_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'target_vision_frame': target_vision_frame
})
write_image(output_path, output_vision_frame)
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
processors.multi_process_frames(None, temp_frame_paths, process_frames)
return temp_vision_frame

View File

@@ -1,6 +1,6 @@
from argparse import ArgumentParser
from functools import lru_cache
from typing import List, Tuple
from typing import Tuple
import cv2
import numpy
@@ -8,25 +8,22 @@ from cv2.typing import Size
import facefusion.jobs.job_manager
import facefusion.jobs.job_store
import facefusion.processors.core as processors
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, process_manager, state_manager, video_manager, wording
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, state_manager, video_manager, wording
from facefusion.common_helper import create_int_metavar
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url_by_provider
from facefusion.face_analyser import get_many_faces, get_one_face
from facefusion.face_helper import paste_back, warp_face_by_face_landmark_5
from facefusion.face_masker import create_area_mask, create_box_mask, create_occlusion_mask, create_region_mask
from facefusion.face_selector import find_similar_faces, sort_and_filter_faces
from facefusion.face_store import get_reference_faces
from facefusion.face_selector import select_faces
from facefusion.filesystem import get_file_name, in_directory, is_image, is_video, resolve_file_paths, resolve_relative_path, same_file_extension
from facefusion.processors import choices as processors_choices
from facefusion.processors.types import DeepSwapperInputs, DeepSwapperMorph
from facefusion.program_helper import find_argument_group
from facefusion.thread_helper import thread_semaphore
from facefusion.types import ApplyStateItem, Args, DownloadScope, Face, InferencePool, Mask, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
from facefusion.vision import conditional_match_frame_color, read_image, read_static_image, write_image
from facefusion.types import ApplyStateItem, Args, DownloadScope, Face, InferencePool, Mask, ModelOptions, ModelSet, ProcessMode, VisionFrame
from facefusion.vision import conditional_match_frame_color, read_static_image, read_static_video_frame
@lru_cache(maxsize = None)
@lru_cache()
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
model_config = []
@@ -311,6 +308,7 @@ def pre_process(mode : ProcessMode) -> bool:
def post_process() -> None:
read_static_image.cache_clear()
read_static_video_frame.cache_clear()
video_manager.clear_video_pool()
if state_manager.get_item('video_memory_strategy') in [ 'strict', 'moderate' ]:
clear_inference_pool()
@@ -409,56 +407,16 @@ def prepare_crop_mask(crop_source_mask : Mask, crop_target_mask : Mask) -> Mask:
return crop_mask
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
return swap_face(target_face, temp_vision_frame)
def process_frame(inputs : DeepSwapperInputs) -> VisionFrame:
reference_faces = inputs.get('reference_faces')
reference_vision_frame = inputs.get('reference_vision_frame')
target_vision_frame = inputs.get('target_vision_frame')
many_faces = sort_and_filter_faces(get_many_faces([ target_vision_frame ]))
temp_vision_frame = inputs.get('temp_vision_frame')
target_faces = select_faces(reference_vision_frame, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'many':
if many_faces:
for target_face in many_faces:
target_vision_frame = swap_face(target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'one':
target_face = get_one_face(many_faces)
if target_face:
target_vision_frame = swap_face(target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'reference':
similar_faces = find_similar_faces(many_faces, reference_faces, state_manager.get_item('reference_face_distance'))
if similar_faces:
for similar_face in similar_faces:
target_vision_frame = swap_face(similar_face, target_vision_frame)
return target_vision_frame
if target_faces:
for target_face in target_faces:
temp_vision_frame = swap_face(target_face, temp_vision_frame)
return temp_vision_frame
def process_frames(source_path : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
for queue_payload in process_manager.manage(queue_payloads):
target_vision_path = queue_payload['frame_path']
target_vision_frame = read_image(target_vision_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'target_vision_frame': target_vision_frame
})
write_image(target_vision_path, output_vision_frame)
update_progress(1)
def process_image(source_path : str, target_path : str, output_path : str) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
target_vision_frame = read_static_image(target_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'target_vision_frame': target_vision_frame
})
write_image(output_path, output_vision_frame)
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
processors.multi_process_frames(None, temp_frame_paths, process_frames)

View File

@@ -1,32 +1,29 @@
from argparse import ArgumentParser
from functools import lru_cache
from typing import List, Tuple
from typing import Tuple
import cv2
import numpy
import facefusion.jobs.job_manager
import facefusion.jobs.job_store
import facefusion.processors.core as processors
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, process_manager, state_manager, video_manager, wording
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, state_manager, video_manager, wording
from facefusion.common_helper import create_int_metavar
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
from facefusion.face_analyser import get_many_faces, get_one_face
from facefusion.face_helper import paste_back, warp_face_by_face_landmark_5
from facefusion.face_masker import create_box_mask, create_occlusion_mask
from facefusion.face_selector import find_similar_faces, sort_and_filter_faces
from facefusion.face_store import get_reference_faces
from facefusion.face_selector import select_faces
from facefusion.filesystem import in_directory, is_image, is_video, resolve_relative_path, same_file_extension
from facefusion.processors import choices as processors_choices
from facefusion.processors.live_portrait import create_rotation, limit_expression
from facefusion.processors.types import ExpressionRestorerInputs, LivePortraitExpression, LivePortraitFeatureVolume, LivePortraitMotionPoints, LivePortraitPitch, LivePortraitRoll, LivePortraitScale, LivePortraitTranslation, LivePortraitYaw
from facefusion.program_helper import find_argument_group
from facefusion.thread_helper import conditional_thread_semaphore, thread_semaphore
from facefusion.types import ApplyStateItem, Args, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
from facefusion.vision import read_image, read_static_image, read_video_frame, write_image
from facefusion.types import ApplyStateItem, Args, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, VisionFrame
from facefusion.vision import read_static_image, read_static_video_frame
@lru_cache(maxsize = None)
@lru_cache()
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
return\
{
@@ -96,12 +93,14 @@ def register_args(program : ArgumentParser) -> None:
if group_processors:
group_processors.add_argument('--expression-restorer-model', help = wording.get('help.expression_restorer_model'), default = config.get_str_value('processors', 'expression_restorer_model', 'live_portrait'), choices = processors_choices.expression_restorer_models)
group_processors.add_argument('--expression-restorer-factor', help = wording.get('help.expression_restorer_factor'), type = int, default = config.get_int_value('processors', 'expression_restorer_factor', '80'), choices = processors_choices.expression_restorer_factor_range, metavar = create_int_metavar(processors_choices.expression_restorer_factor_range))
facefusion.jobs.job_store.register_step_keys([ 'expression_restorer_model', 'expression_restorer_factor' ])
group_processors.add_argument('--expression-restorer-areas', help = wording.get('help.expression_restorer_areas').format(choices = ', '.join(processors_choices.expression_restorer_areas)), default = config.get_str_list('processors', 'expression_restorer_areas', ' '.join(processors_choices.expression_restorer_areas)), choices = processors_choices.expression_restorer_areas, nargs = '+', metavar = 'EXPRESSION_RESTORER_AREAS')
facefusion.jobs.job_store.register_step_keys([ 'expression_restorer_model', 'expression_restorer_factor', 'expression_restorer_areas' ])
def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None:
apply_state_item('expression_restorer_model', args.get('expression_restorer_model'))
apply_state_item('expression_restorer_factor', args.get('expression_restorer_factor'))
apply_state_item('expression_restorer_areas', args.get('expression_restorer_areas'))
def pre_check() -> bool:
@@ -129,6 +128,7 @@ def pre_process(mode : ProcessMode) -> bool:
def post_process() -> None:
read_static_image.cache_clear()
read_static_video_frame.cache_clear()
video_manager.clear_video_pool()
if state_manager.get_item('video_memory_strategy') in [ 'strict', 'moderate' ]:
clear_inference_pool()
@@ -141,46 +141,58 @@ def post_process() -> None:
face_recognizer.clear_inference_pool()
def restore_expression(source_vision_frame : VisionFrame, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
def restore_expression(target_face : Face, target_vision_frame : VisionFrame, temp_vision_frame : VisionFrame) -> VisionFrame:
model_template = get_model_options().get('template')
model_size = get_model_options().get('size')
expression_restorer_factor = float(numpy.interp(float(state_manager.get_item('expression_restorer_factor')), [ 0, 100 ], [ 0, 1.2 ]))
source_vision_frame = cv2.resize(source_vision_frame, temp_vision_frame.shape[:2][::-1])
source_crop_vision_frame, _ = warp_face_by_face_landmark_5(source_vision_frame, target_face.landmark_set.get('5/68'), model_template, model_size)
target_crop_vision_frame, affine_matrix = warp_face_by_face_landmark_5(temp_vision_frame, target_face.landmark_set.get('5/68'), model_template, model_size)
box_mask = create_box_mask(target_crop_vision_frame, state_manager.get_item('face_mask_blur'), (0, 0, 0, 0))
target_crop_vision_frame, _ = warp_face_by_face_landmark_5(target_vision_frame, target_face.landmark_set.get('5/68'), model_template, model_size)
temp_crop_vision_frame, affine_matrix = warp_face_by_face_landmark_5(temp_vision_frame, target_face.landmark_set.get('5/68'), model_template, model_size)
box_mask = create_box_mask(temp_crop_vision_frame, state_manager.get_item('face_mask_blur'), (0, 0, 0, 0))
crop_masks =\
[
box_mask
]
if 'occlusion' in state_manager.get_item('face_mask_types'):
occlusion_mask = create_occlusion_mask(target_crop_vision_frame)
occlusion_mask = create_occlusion_mask(temp_crop_vision_frame)
crop_masks.append(occlusion_mask)
source_crop_vision_frame = prepare_crop_frame(source_crop_vision_frame)
target_crop_vision_frame = prepare_crop_frame(target_crop_vision_frame)
target_crop_vision_frame = apply_restore(source_crop_vision_frame, target_crop_vision_frame, expression_restorer_factor)
target_crop_vision_frame = normalize_crop_frame(target_crop_vision_frame)
temp_crop_vision_frame = prepare_crop_frame(temp_crop_vision_frame)
temp_crop_vision_frame = apply_restore(target_crop_vision_frame, temp_crop_vision_frame, expression_restorer_factor)
temp_crop_vision_frame = normalize_crop_frame(temp_crop_vision_frame)
crop_mask = numpy.minimum.reduce(crop_masks).clip(0, 1)
temp_vision_frame = paste_back(temp_vision_frame, target_crop_vision_frame, crop_mask, affine_matrix)
return temp_vision_frame
paste_vision_frame = paste_back(temp_vision_frame, temp_crop_vision_frame, crop_mask, affine_matrix)
return paste_vision_frame
def apply_restore(source_crop_vision_frame : VisionFrame, target_crop_vision_frame : VisionFrame, expression_restorer_factor : float) -> VisionFrame:
feature_volume = forward_extract_feature(target_crop_vision_frame)
source_expression = forward_extract_motion(source_crop_vision_frame)[5]
pitch, yaw, roll, scale, translation, target_expression, motion_points = forward_extract_motion(target_crop_vision_frame)
def apply_restore(target_crop_vision_frame : VisionFrame, temp_crop_vision_frame : VisionFrame, expression_restorer_factor : float) -> VisionFrame:
feature_volume = forward_extract_feature(temp_crop_vision_frame)
target_expression = forward_extract_motion(target_crop_vision_frame)[5]
pitch, yaw, roll, scale, translation, temp_expression, motion_points = forward_extract_motion(temp_crop_vision_frame)
rotation = create_rotation(pitch, yaw, roll)
source_expression[:, [ 0, 4, 5, 8, 9 ]] = target_expression[:, [ 0, 4, 5, 8, 9 ]]
source_expression = source_expression * expression_restorer_factor + target_expression * (1 - expression_restorer_factor)
source_expression = limit_expression(source_expression)
source_motion_points = scale * (motion_points @ rotation.T + source_expression) + translation
target_expression = restrict_expression_areas(temp_expression, target_expression)
target_expression = target_expression * expression_restorer_factor + temp_expression * (1 - expression_restorer_factor)
target_expression = limit_expression(target_expression)
target_motion_points = scale * (motion_points @ rotation.T + target_expression) + translation
crop_vision_frame = forward_generate_frame(feature_volume, source_motion_points, target_motion_points)
temp_motion_points = scale * (motion_points @ rotation.T + temp_expression) + translation
crop_vision_frame = forward_generate_frame(feature_volume, target_motion_points, temp_motion_points)
return crop_vision_frame
def restrict_expression_areas(temp_expression : LivePortraitExpression, target_expression : LivePortraitExpression) -> LivePortraitExpression:
expression_restorer_areas = state_manager.get_item('expression_restorer_areas')
if 'upper-face' not in expression_restorer_areas:
target_expression[:, [1, 2, 6, 10, 11, 12, 13, 15, 16]] = temp_expression[:, [1, 2, 6, 10, 11, 12, 13, 15, 16]]
if 'lower-face' not in expression_restorer_areas:
target_expression[:, [3, 7, 14, 17, 18, 19, 20]] = temp_expression[:, [3, 7, 14, 17, 18, 19, 20]]
target_expression[:, [0, 4, 5, 8, 9]] = temp_expression[:, [0, 4, 5, 8, 9]]
return target_expression
def forward_extract_feature(crop_vision_frame : VisionFrame) -> LivePortraitFeatureVolume:
feature_extractor = get_inference_pool().get('feature_extractor')
@@ -205,15 +217,15 @@ def forward_extract_motion(crop_vision_frame : VisionFrame) -> Tuple[LivePortrai
return pitch, yaw, roll, scale, translation, expression, motion_points
def forward_generate_frame(feature_volume : LivePortraitFeatureVolume, source_motion_points : LivePortraitMotionPoints, target_motion_points : LivePortraitMotionPoints) -> VisionFrame:
def forward_generate_frame(feature_volume : LivePortraitFeatureVolume, target_motion_points : LivePortraitMotionPoints, temp_motion_points : LivePortraitMotionPoints) -> VisionFrame:
generator = get_inference_pool().get('generator')
with thread_semaphore():
crop_vision_frame = generator.run(None,
{
'feature_volume': feature_volume,
'source': source_motion_points,
'target': target_motion_points
'source': target_motion_points,
'target': temp_motion_points
})[0][0]
return crop_vision_frame
@@ -235,64 +247,14 @@ def normalize_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
return crop_vision_frame
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
pass
def process_frame(inputs : ExpressionRestorerInputs) -> VisionFrame:
reference_faces = inputs.get('reference_faces')
source_vision_frame = inputs.get('source_vision_frame')
reference_vision_frame = inputs.get('reference_vision_frame')
target_vision_frame = inputs.get('target_vision_frame')
many_faces = sort_and_filter_faces(get_many_faces([ target_vision_frame ]))
temp_vision_frame = inputs.get('temp_vision_frame')
target_faces = select_faces(reference_vision_frame, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'many':
if many_faces:
for target_face in many_faces:
target_vision_frame = restore_expression(source_vision_frame, target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'one':
target_face = get_one_face(many_faces)
if target_face:
target_vision_frame = restore_expression(source_vision_frame, target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'reference':
similar_faces = find_similar_faces(many_faces, reference_faces, state_manager.get_item('reference_face_distance'))
if similar_faces:
for similar_face in similar_faces:
target_vision_frame = restore_expression(source_vision_frame, similar_face, target_vision_frame)
return target_vision_frame
if target_faces:
for target_face in target_faces:
temp_vision_frame = restore_expression(target_face, target_vision_frame, temp_vision_frame)
def process_frames(source_path : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
for queue_payload in process_manager.manage(queue_payloads):
frame_number = queue_payload.get('frame_number')
if state_manager.get_item('trim_frame_start'):
frame_number += state_manager.get_item('trim_frame_start')
source_vision_frame = read_video_frame(state_manager.get_item('target_path'), frame_number)
target_vision_path = queue_payload.get('frame_path')
target_vision_frame = read_image(target_vision_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'source_vision_frame': source_vision_frame,
'target_vision_frame': target_vision_frame
})
write_image(target_vision_path, output_vision_frame)
update_progress(1)
def process_image(source_path : str, target_path : str, output_path : str) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
source_vision_frame = read_static_image(state_manager.get_item('target_path'))
target_vision_frame = read_static_image(target_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'source_vision_frame': source_vision_frame,
'target_vision_frame': target_vision_frame
})
write_image(output_path, output_vision_frame)
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
processors.multi_process_frames(None, temp_frame_paths, process_frames)
return temp_vision_frame

View File

@@ -1,24 +1,20 @@
from argparse import ArgumentParser
from typing import List
import cv2
import numpy
import facefusion.jobs.job_manager
import facefusion.jobs.job_store
import facefusion.processors.core as processors
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, logger, process_manager, state_manager, video_manager, wording
from facefusion.face_analyser import get_many_faces, get_one_face
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, logger, state_manager, video_manager, wording
from facefusion.face_helper import warp_face_by_face_landmark_5
from facefusion.face_masker import create_area_mask, create_box_mask, create_occlusion_mask, create_region_mask
from facefusion.face_selector import find_similar_faces, sort_and_filter_faces
from facefusion.face_store import get_reference_faces
from facefusion.filesystem import in_directory, same_file_extension
from facefusion.face_selector import select_faces
from facefusion.filesystem import in_directory, is_image, is_video, same_file_extension
from facefusion.processors import choices as processors_choices
from facefusion.processors.types import FaceDebuggerInputs
from facefusion.program_helper import find_argument_group
from facefusion.types import ApplyStateItem, Args, Face, InferencePool, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
from facefusion.vision import read_image, read_static_image, write_image
from facefusion.types import ApplyStateItem, Args, Face, InferencePool, ProcessMode, VisionFrame
from facefusion.vision import read_static_image, read_static_video_frame
def get_inference_pool() -> InferencePool:
@@ -45,6 +41,9 @@ def pre_check() -> bool:
def pre_process(mode : ProcessMode) -> bool:
if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')):
logger.error(wording.get('choose_image_or_video_target') + wording.get('exclamation_mark'), __name__)
return False
if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False
@@ -56,6 +55,7 @@ def pre_process(mode : ProcessMode) -> bool:
def post_process() -> None:
read_static_image.cache_clear()
read_static_video_frame.cache_clear()
video_manager.clear_video_pool()
if state_manager.get_item('video_memory_strategy') == 'strict':
content_analyser.clear_inference_pool()
@@ -67,162 +67,159 @@ def post_process() -> None:
def debug_face(target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
primary_color = (0, 0, 255)
primary_light_color = (100, 100, 255)
secondary_color = (0, 255, 0)
tertiary_color = (255, 255, 0)
bounding_box = target_face.bounding_box.astype(numpy.int32)
temp_vision_frame = temp_vision_frame.copy()
has_face_landmark_5_fallback = numpy.array_equal(target_face.landmark_set.get('5'), target_face.landmark_set.get('5/68'))
has_face_landmark_68_fallback = numpy.array_equal(target_face.landmark_set.get('68'), target_face.landmark_set.get('68/5'))
face_debugger_items = state_manager.get_item('face_debugger_items')
if 'bounding-box' in face_debugger_items:
x1, y1, x2, y2 = bounding_box
cv2.rectangle(temp_vision_frame, (x1, y1), (x2, y2), primary_color, 2)
if target_face.angle == 0:
cv2.line(temp_vision_frame, (x1, y1), (x2, y1), primary_light_color, 3)
if target_face.angle == 180:
cv2.line(temp_vision_frame, (x1, y2), (x2, y2), primary_light_color, 3)
if target_face.angle == 90:
cv2.line(temp_vision_frame, (x2, y1), (x2, y2), primary_light_color, 3)
if target_face.angle == 270:
cv2.line(temp_vision_frame, (x1, y1), (x1, y2), primary_light_color, 3)
temp_vision_frame = draw_bounding_box(target_face, temp_vision_frame)
if 'face-mask' in face_debugger_items:
crop_vision_frame, affine_matrix = warp_face_by_face_landmark_5(temp_vision_frame, target_face.landmark_set.get('5/68'), 'arcface_128', (512, 512))
inverse_matrix = cv2.invertAffineTransform(affine_matrix)
temp_size = temp_vision_frame.shape[:2][::-1]
crop_masks = []
temp_vision_frame = draw_face_mask(target_face, temp_vision_frame)
if 'box' in state_manager.get_item('face_mask_types'):
box_mask = create_box_mask(crop_vision_frame, 0, state_manager.get_item('face_mask_padding'))
crop_masks.append(box_mask)
if 'face-landmark-5' in face_debugger_items:
temp_vision_frame = draw_face_landmark_5(target_face, temp_vision_frame)
if 'occlusion' in state_manager.get_item('face_mask_types'):
occlusion_mask = create_occlusion_mask(crop_vision_frame)
crop_masks.append(occlusion_mask)
if 'face-landmark-5/68' in face_debugger_items:
temp_vision_frame = draw_face_landmark_5_68(target_face, temp_vision_frame)
if 'area' in state_manager.get_item('face_mask_types'):
face_landmark_68 = cv2.transform(target_face.landmark_set.get('68').reshape(1, -1, 2), affine_matrix).reshape(-1, 2)
area_mask = create_area_mask(crop_vision_frame, face_landmark_68, state_manager.get_item('face_mask_areas'))
crop_masks.append(area_mask)
if 'face-landmark-68' in face_debugger_items:
temp_vision_frame = draw_face_landmark_68(target_face, temp_vision_frame)
if 'region' in state_manager.get_item('face_mask_types'):
region_mask = create_region_mask(crop_vision_frame, state_manager.get_item('face_mask_regions'))
crop_masks.append(region_mask)
crop_mask = numpy.minimum.reduce(crop_masks).clip(0, 1)
crop_mask = (crop_mask * 255).astype(numpy.uint8)
inverse_vision_frame = cv2.warpAffine(crop_mask, inverse_matrix, temp_size)
inverse_vision_frame = cv2.threshold(inverse_vision_frame, 100, 255, cv2.THRESH_BINARY)[1]
inverse_vision_frame[inverse_vision_frame > 0] = 255 #type:ignore[operator]
inverse_contours = cv2.findContours(inverse_vision_frame, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)[0]
cv2.drawContours(temp_vision_frame, inverse_contours, -1, tertiary_color if has_face_landmark_5_fallback else secondary_color, 2)
if 'face-landmark-5' in face_debugger_items and numpy.any(target_face.landmark_set.get('5')):
face_landmark_5 = target_face.landmark_set.get('5').astype(numpy.int32)
for index in range(face_landmark_5.shape[0]):
cv2.circle(temp_vision_frame, (face_landmark_5[index][0], face_landmark_5[index][1]), 3, primary_color, -1)
if 'face-landmark-5/68' in face_debugger_items and numpy.any(target_face.landmark_set.get('5/68')):
face_landmark_5_68 = target_face.landmark_set.get('5/68').astype(numpy.int32)
for index in range(face_landmark_5_68.shape[0]):
cv2.circle(temp_vision_frame, (face_landmark_5_68[index][0], face_landmark_5_68[index][1]), 3, tertiary_color if has_face_landmark_5_fallback else secondary_color, -1)
if 'face-landmark-68' in face_debugger_items and numpy.any(target_face.landmark_set.get('68')):
face_landmark_68 = target_face.landmark_set.get('68').astype(numpy.int32)
for index in range(face_landmark_68.shape[0]):
cv2.circle(temp_vision_frame, (face_landmark_68[index][0], face_landmark_68[index][1]), 3, tertiary_color if has_face_landmark_68_fallback else secondary_color, -1)
if 'face-landmark-68/5' in face_debugger_items and numpy.any(target_face.landmark_set.get('68')):
face_landmark_68 = target_face.landmark_set.get('68/5').astype(numpy.int32)
for index in range(face_landmark_68.shape[0]):
cv2.circle(temp_vision_frame, (face_landmark_68[index][0], face_landmark_68[index][1]), 3, tertiary_color, -1)
if bounding_box[3] - bounding_box[1] > 50 and bounding_box[2] - bounding_box[0] > 50:
top = bounding_box[1]
left = bounding_box[0] - 20
if 'face-detector-score' in face_debugger_items:
face_score_text = str(round(target_face.score_set.get('detector'), 2))
top = top + 20
cv2.putText(temp_vision_frame, face_score_text, (left, top), cv2.FONT_HERSHEY_SIMPLEX, 0.5, primary_color, 2)
if 'face-landmarker-score' in face_debugger_items:
face_score_text = str(round(target_face.score_set.get('landmarker'), 2))
top = top + 20
cv2.putText(temp_vision_frame, face_score_text, (left, top), cv2.FONT_HERSHEY_SIMPLEX, 0.5, tertiary_color if has_face_landmark_5_fallback else secondary_color, 2)
if 'age' in face_debugger_items:
face_age_text = str(target_face.age.start) + '-' + str(target_face.age.stop)
top = top + 20
cv2.putText(temp_vision_frame, face_age_text, (left, top), cv2.FONT_HERSHEY_SIMPLEX, 0.5, primary_color, 2)
if 'gender' in face_debugger_items:
face_gender_text = target_face.gender
top = top + 20
cv2.putText(temp_vision_frame, face_gender_text, (left, top), cv2.FONT_HERSHEY_SIMPLEX, 0.5, primary_color, 2)
if 'race' in face_debugger_items:
face_race_text = target_face.race
top = top + 20
cv2.putText(temp_vision_frame, face_race_text, (left, top), cv2.FONT_HERSHEY_SIMPLEX, 0.5, primary_color, 2)
if 'face-landmark-68/5' in face_debugger_items:
temp_vision_frame = draw_face_landmark_68_5(target_face, temp_vision_frame)
return temp_vision_frame
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
pass
def draw_bounding_box(target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
box_color = 0, 0, 255
border_color = 100, 100, 255
bounding_box = target_face.bounding_box.astype(numpy.int32)
x1, y1, x2, y2 = bounding_box
cv2.rectangle(temp_vision_frame, (x1, y1), (x2, y2), box_color, 2)
if target_face.angle == 0:
cv2.line(temp_vision_frame, (x1, y1), (x2, y1), border_color, 3)
if target_face.angle == 180:
cv2.line(temp_vision_frame, (x1, y2), (x2, y2), border_color, 3)
if target_face.angle == 90:
cv2.line(temp_vision_frame, (x2, y1), (x2, y2), border_color, 3)
if target_face.angle == 270:
cv2.line(temp_vision_frame, (x1, y1), (x1, y2), border_color, 3)
return temp_vision_frame
def draw_face_mask(target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
crop_masks = []
face_landmark_5 = target_face.landmark_set.get('5')
face_landmark_68 = target_face.landmark_set.get('68')
face_landmark_5_68 = target_face.landmark_set.get('5/68')
crop_vision_frame, affine_matrix = warp_face_by_face_landmark_5(temp_vision_frame, face_landmark_5_68, 'arcface_128', (512, 512))
inverse_matrix = cv2.invertAffineTransform(affine_matrix)
temp_size = temp_vision_frame.shape[:2][::-1]
mask_color = 0, 255, 0
if numpy.array_equal(face_landmark_5, face_landmark_5_68):
mask_color = 255, 255, 0
if 'box' in state_manager.get_item('face_mask_types'):
box_mask = create_box_mask(crop_vision_frame, 0, state_manager.get_item('face_mask_padding'))
crop_masks.append(box_mask)
if 'occlusion' in state_manager.get_item('face_mask_types'):
occlusion_mask = create_occlusion_mask(crop_vision_frame)
crop_masks.append(occlusion_mask)
if 'area' in state_manager.get_item('face_mask_types'):
face_landmark_68 = cv2.transform(face_landmark_68.reshape(1, -1, 2), affine_matrix).reshape(-1, 2)
area_mask = create_area_mask(crop_vision_frame, face_landmark_68, state_manager.get_item('face_mask_areas'))
crop_masks.append(area_mask)
if 'region' in state_manager.get_item('face_mask_types'):
region_mask = create_region_mask(crop_vision_frame, state_manager.get_item('face_mask_regions'))
crop_masks.append(region_mask)
crop_mask = numpy.minimum.reduce(crop_masks).clip(0, 1)
crop_mask = (crop_mask * 255).astype(numpy.uint8)
inverse_vision_frame = cv2.warpAffine(crop_mask, inverse_matrix, temp_size)
inverse_vision_frame = cv2.threshold(inverse_vision_frame, 100, 255, cv2.THRESH_BINARY)[1]
inverse_contours, _ = cv2.findContours(inverse_vision_frame, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
cv2.drawContours(temp_vision_frame, inverse_contours, -1, mask_color, 2)
return temp_vision_frame
def draw_face_landmark_5(target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
face_landmark_5 = target_face.landmark_set.get('5')
point_color = 0, 0, 255
if numpy.any(face_landmark_5):
face_landmark_5 = face_landmark_5.astype(numpy.int32)
for point in face_landmark_5:
cv2.circle(temp_vision_frame, tuple(point), 3, point_color, -1)
return temp_vision_frame
def draw_face_landmark_5_68(target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
face_landmark_5 = target_face.landmark_set.get('5')
face_landmark_5_68 = target_face.landmark_set.get('5/68')
point_color = 0, 255, 0
if numpy.array_equal(face_landmark_5, face_landmark_5_68):
point_color = 255, 255, 0
if numpy.any(face_landmark_5_68):
face_landmark_5_68 = face_landmark_5_68.astype(numpy.int32)
for point in face_landmark_5_68:
cv2.circle(temp_vision_frame, tuple(point), 3, point_color, -1)
return temp_vision_frame
def draw_face_landmark_68(target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
face_landmark_68 = target_face.landmark_set.get('68')
face_landmark_68_5 = target_face.landmark_set.get('68/5')
point_color = 0, 255, 0
if numpy.array_equal(face_landmark_68, face_landmark_68_5):
point_color = 255, 255, 0
if numpy.any(face_landmark_68):
face_landmark_68 = face_landmark_68.astype(numpy.int32)
for point in face_landmark_68:
cv2.circle(temp_vision_frame, tuple(point), 3, point_color, -1)
return temp_vision_frame
def draw_face_landmark_68_5(target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
face_landmark_68_5 = target_face.landmark_set.get('68/5')
point_color = 255, 255, 0
if numpy.any(face_landmark_68_5):
face_landmark_68_5 = face_landmark_68_5.astype(numpy.int32)
for point in face_landmark_68_5:
cv2.circle(temp_vision_frame, tuple(point), 3, point_color, -1)
return temp_vision_frame
def process_frame(inputs : FaceDebuggerInputs) -> VisionFrame:
reference_faces = inputs.get('reference_faces')
reference_vision_frame = inputs.get('reference_vision_frame')
target_vision_frame = inputs.get('target_vision_frame')
many_faces = sort_and_filter_faces(get_many_faces([ target_vision_frame ]))
temp_vision_frame = inputs.get('temp_vision_frame')
target_faces = select_faces(reference_vision_frame, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'many':
if many_faces:
for target_face in many_faces:
target_vision_frame = debug_face(target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'one':
target_face = get_one_face(many_faces)
if target_face:
target_vision_frame = debug_face(target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'reference':
similar_faces = find_similar_faces(many_faces, reference_faces, state_manager.get_item('reference_face_distance'))
if similar_faces:
for similar_face in similar_faces:
target_vision_frame = debug_face(similar_face, target_vision_frame)
return target_vision_frame
if target_faces:
for target_face in target_faces:
temp_vision_frame = debug_face(target_face, temp_vision_frame)
return temp_vision_frame
def process_frames(source_paths : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
for queue_payload in process_manager.manage(queue_payloads):
target_vision_path = queue_payload['frame_path']
target_vision_frame = read_image(target_vision_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'target_vision_frame': target_vision_frame
})
write_image(target_vision_path, output_vision_frame)
update_progress(1)
def process_image(source_paths : List[str], target_path : str, output_path : str) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
target_vision_frame = read_static_image(target_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'target_vision_frame': target_vision_frame
})
write_image(output_path, output_vision_frame)
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
processors.multi_process_frames(source_paths, temp_frame_paths, process_frames)

View File

@@ -1,32 +1,29 @@
from argparse import ArgumentParser
from functools import lru_cache
from typing import List, Tuple
from typing import Tuple
import cv2
import numpy
import facefusion.jobs.job_manager
import facefusion.jobs.job_store
import facefusion.processors.core as processors
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, process_manager, state_manager, video_manager, wording
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, state_manager, video_manager, wording
from facefusion.common_helper import create_float_metavar
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
from facefusion.face_analyser import get_many_faces, get_one_face
from facefusion.face_helper import paste_back, scale_face_landmark_5, warp_face_by_face_landmark_5
from facefusion.face_masker import create_box_mask
from facefusion.face_selector import find_similar_faces, sort_and_filter_faces
from facefusion.face_store import get_reference_faces
from facefusion.face_selector import select_faces
from facefusion.filesystem import in_directory, is_image, is_video, resolve_relative_path, same_file_extension
from facefusion.processors import choices as processors_choices
from facefusion.processors.live_portrait import create_rotation, limit_euler_angles, limit_expression
from facefusion.processors.live_portrait import create_rotation, limit_angle, limit_expression
from facefusion.processors.types import FaceEditorInputs, LivePortraitExpression, LivePortraitFeatureVolume, LivePortraitMotionPoints, LivePortraitPitch, LivePortraitRoll, LivePortraitRotation, LivePortraitScale, LivePortraitTranslation, LivePortraitYaw
from facefusion.program_helper import find_argument_group
from facefusion.thread_helper import conditional_thread_semaphore, thread_semaphore
from facefusion.types import ApplyStateItem, Args, DownloadScope, Face, FaceLandmark68, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
from facefusion.vision import read_image, read_static_image, write_image
from facefusion.types import ApplyStateItem, Args, DownloadScope, Face, FaceLandmark68, InferencePool, ModelOptions, ModelSet, ProcessMode, VisionFrame
from facefusion.vision import read_static_image, read_static_video_frame
@lru_cache(maxsize = None)
@lru_cache()
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
return\
{
@@ -182,6 +179,7 @@ def pre_process(mode : ProcessMode) -> bool:
def post_process() -> None:
read_static_image.cache_clear()
read_static_video_frame.cache_clear()
video_manager.clear_video_pool()
if state_manager.get_item('video_memory_strategy') in [ 'strict', 'moderate' ]:
clear_inference_pool()
@@ -203,8 +201,8 @@ def edit_face(target_face : Face, temp_vision_frame : VisionFrame) -> VisionFram
crop_vision_frame = prepare_crop_frame(crop_vision_frame)
crop_vision_frame = apply_edit(crop_vision_frame, target_face.landmark_set.get('68'))
crop_vision_frame = normalize_crop_frame(crop_vision_frame)
temp_vision_frame = paste_back(temp_vision_frame, crop_vision_frame, box_mask, affine_matrix)
return temp_vision_frame
paste_vision_frame = paste_back(temp_vision_frame, crop_vision_frame, box_mask, affine_matrix)
return paste_vision_frame
def apply_edit(crop_vision_frame : VisionFrame, face_landmark_68 : FaceLandmark68) -> VisionFrame:
@@ -342,8 +340,8 @@ def edit_eye_gaze(expression : LivePortraitExpression) -> LivePortraitExpression
def edit_eye_open(motion_points : LivePortraitMotionPoints, face_landmark_68 : FaceLandmark68) -> LivePortraitMotionPoints:
face_editor_eye_open_ratio = state_manager.get_item('face_editor_eye_open_ratio')
left_eye_ratio = calc_distance_ratio(face_landmark_68, 37, 40, 39, 36)
right_eye_ratio = calc_distance_ratio(face_landmark_68, 43, 46, 45, 42)
left_eye_ratio = calculate_distance_ratio(face_landmark_68, 37, 40, 39, 36)
right_eye_ratio = calculate_distance_ratio(face_landmark_68, 43, 46, 45, 42)
if face_editor_eye_open_ratio < 0:
eye_motion_points = numpy.concatenate([ motion_points.ravel(), [ left_eye_ratio, right_eye_ratio, 0.0 ] ])
@@ -357,7 +355,7 @@ def edit_eye_open(motion_points : LivePortraitMotionPoints, face_landmark_68 : F
def edit_lip_open(motion_points : LivePortraitMotionPoints, face_landmark_68 : FaceLandmark68) -> LivePortraitMotionPoints:
face_editor_lip_open_ratio = state_manager.get_item('face_editor_lip_open_ratio')
lip_ratio = calc_distance_ratio(face_landmark_68, 62, 66, 54, 48)
lip_ratio = calculate_distance_ratio(face_landmark_68, 62, 66, 54, 48)
if face_editor_lip_open_ratio < 0:
lip_motion_points = numpy.concatenate([ motion_points.ravel(), [ lip_ratio, 0.0 ] ])
@@ -450,12 +448,12 @@ def edit_head_rotation(pitch : LivePortraitPitch, yaw : LivePortraitYaw, roll :
edit_pitch = pitch + float(numpy.interp(face_editor_head_pitch, [ -1, 1 ], [ 20, -20 ]))
edit_yaw = yaw + float(numpy.interp(face_editor_head_yaw, [ -1, 1 ], [ 60, -60 ]))
edit_roll = roll + float(numpy.interp(face_editor_head_roll, [ -1, 1 ], [ -15, 15 ]))
edit_pitch, edit_yaw, edit_roll = limit_euler_angles(pitch, yaw, roll, edit_pitch, edit_yaw, edit_roll)
edit_pitch, edit_yaw, edit_roll = limit_angle(pitch, yaw, roll, edit_pitch, edit_yaw, edit_roll)
rotation = create_rotation(edit_pitch, edit_yaw, edit_roll)
return rotation
def calc_distance_ratio(face_landmark_68 : FaceLandmark68, top_index : int, bottom_index : int, left_index : int, right_index : int) -> float:
def calculate_distance_ratio(face_landmark_68 : FaceLandmark68, top_index : int, bottom_index : int, left_index : int, right_index : int) -> float:
vertical_direction = face_landmark_68[top_index] - face_landmark_68[bottom_index]
horizontal_direction = face_landmark_68[left_index] - face_landmark_68[right_index]
distance_ratio = float(numpy.linalg.norm(vertical_direction) / (numpy.linalg.norm(horizontal_direction) + 1e-6))
@@ -478,56 +476,14 @@ def normalize_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
return crop_vision_frame
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
pass
def process_frame(inputs : FaceEditorInputs) -> VisionFrame:
reference_faces = inputs.get('reference_faces')
reference_vision_frame = inputs.get('reference_vision_frame')
target_vision_frame = inputs.get('target_vision_frame')
many_faces = sort_and_filter_faces(get_many_faces([ target_vision_frame ]))
temp_vision_frame = inputs.get('temp_vision_frame')
target_faces = select_faces(reference_vision_frame, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'many':
if many_faces:
for target_face in many_faces:
target_vision_frame = edit_face(target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'one':
target_face = get_one_face(many_faces)
if target_face:
target_vision_frame = edit_face(target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'reference':
similar_faces = find_similar_faces(many_faces, reference_faces, state_manager.get_item('reference_face_distance'))
if similar_faces:
for similar_face in similar_faces:
target_vision_frame = edit_face(similar_face, target_vision_frame)
return target_vision_frame
if target_faces:
for target_face in target_faces:
temp_vision_frame = edit_face(target_face, temp_vision_frame)
def process_frames(source_path : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
for queue_payload in process_manager.manage(queue_payloads):
target_vision_path = queue_payload['frame_path']
target_vision_frame = read_image(target_vision_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'target_vision_frame': target_vision_frame
})
write_image(target_vision_path, output_vision_frame)
update_progress(1)
def process_image(source_path : str, target_path : str, output_path : str) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
target_vision_frame = read_static_image(target_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'target_vision_frame': target_vision_frame
})
write_image(output_path, output_vision_frame)
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
processors.multi_process_frames(None, temp_frame_paths, process_frames)
return temp_vision_frame

View File

@@ -1,31 +1,26 @@
from argparse import ArgumentParser
from functools import lru_cache
from typing import List
import cv2
import numpy
import facefusion.jobs.job_manager
import facefusion.jobs.job_store
import facefusion.processors.core as processors
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, process_manager, state_manager, video_manager, wording
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, state_manager, video_manager, wording
from facefusion.common_helper import create_float_metavar, create_int_metavar
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
from facefusion.face_analyser import get_many_faces, get_one_face
from facefusion.face_helper import paste_back, warp_face_by_face_landmark_5
from facefusion.face_masker import create_box_mask, create_occlusion_mask
from facefusion.face_selector import find_similar_faces, sort_and_filter_faces
from facefusion.face_store import get_reference_faces
from facefusion.face_selector import select_faces
from facefusion.filesystem import in_directory, is_image, is_video, resolve_relative_path, same_file_extension
from facefusion.processors import choices as processors_choices
from facefusion.processors.types import FaceEnhancerInputs, FaceEnhancerWeight
from facefusion.program_helper import find_argument_group
from facefusion.thread_helper import thread_semaphore
from facefusion.types import ApplyStateItem, Args, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
from facefusion.vision import read_image, read_static_image, write_image
from facefusion.types import ApplyStateItem, Args, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, VisionFrame
from facefusion.vision import blend_frame, read_static_image, read_static_video_frame
@lru_cache(maxsize = None)
@lru_cache()
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
return\
{
@@ -243,7 +238,7 @@ def register_args(program : ArgumentParser) -> None:
if group_processors:
group_processors.add_argument('--face-enhancer-model', help = wording.get('help.face_enhancer_model'), default = config.get_str_value('processors', 'face_enhancer_model', 'gfpgan_1.4'), choices = processors_choices.face_enhancer_models)
group_processors.add_argument('--face-enhancer-blend', help = wording.get('help.face_enhancer_blend'), type = int, default = config.get_int_value('processors', 'face_enhancer_blend', '80'), choices = processors_choices.face_enhancer_blend_range, metavar = create_int_metavar(processors_choices.face_enhancer_blend_range))
group_processors.add_argument('--face-enhancer-weight', help = wording.get('help.face_enhancer_weight'), type = float, default = config.get_float_value('processors', 'face_enhancer_weight', '1.0'), choices = processors_choices.face_enhancer_weight_range, metavar = create_float_metavar(processors_choices.face_enhancer_weight_range))
group_processors.add_argument('--face-enhancer-weight', help = wording.get('help.face_enhancer_weight'), type = float, default = config.get_float_value('processors', 'face_enhancer_weight', '0.5'), choices = processors_choices.face_enhancer_weight_range, metavar = create_float_metavar(processors_choices.face_enhancer_weight_range))
facefusion.jobs.job_store.register_step_keys([ 'face_enhancer_model', 'face_enhancer_blend', 'face_enhancer_weight' ])
@@ -275,6 +270,7 @@ def pre_process(mode : ProcessMode) -> bool:
def post_process() -> None:
read_static_image.cache_clear()
read_static_video_frame.cache_clear()
video_manager.clear_video_pool()
if state_manager.get_item('video_memory_strategy') in [ 'strict', 'moderate' ]:
clear_inference_pool()
@@ -307,7 +303,7 @@ def enhance_face(target_face : Face, temp_vision_frame : VisionFrame) -> VisionF
crop_vision_frame = normalize_crop_frame(crop_vision_frame)
crop_mask = numpy.minimum.reduce(crop_masks).clip(0, 1)
paste_vision_frame = paste_back(temp_vision_frame, crop_vision_frame, crop_mask, affine_matrix)
temp_vision_frame = blend_frame(temp_vision_frame, paste_vision_frame)
temp_vision_frame = blend_paste_frame(temp_vision_frame, paste_vision_frame)
return temp_vision_frame
@@ -353,62 +349,20 @@ def normalize_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
return crop_vision_frame
def blend_frame(temp_vision_frame : VisionFrame, paste_vision_frame : VisionFrame) -> VisionFrame:
def blend_paste_frame(temp_vision_frame : VisionFrame, paste_vision_frame : VisionFrame) -> VisionFrame:
face_enhancer_blend = 1 - (state_manager.get_item('face_enhancer_blend') / 100)
temp_vision_frame = cv2.addWeighted(temp_vision_frame, face_enhancer_blend, paste_vision_frame, 1 - face_enhancer_blend, 0)
temp_vision_frame = blend_frame(temp_vision_frame, paste_vision_frame, 1 - face_enhancer_blend)
return temp_vision_frame
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
return enhance_face(target_face, temp_vision_frame)
def process_frame(inputs : FaceEnhancerInputs) -> VisionFrame:
reference_faces = inputs.get('reference_faces')
reference_vision_frame = inputs.get('reference_vision_frame')
target_vision_frame = inputs.get('target_vision_frame')
many_faces = sort_and_filter_faces(get_many_faces([ target_vision_frame ]))
temp_vision_frame = inputs.get('temp_vision_frame')
target_faces = select_faces(reference_vision_frame, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'many':
if many_faces:
for target_face in many_faces:
target_vision_frame = enhance_face(target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'one':
target_face = get_one_face(many_faces)
if target_face:
target_vision_frame = enhance_face(target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'reference':
similar_faces = find_similar_faces(many_faces, reference_faces, state_manager.get_item('reference_face_distance'))
if similar_faces:
for similar_face in similar_faces:
target_vision_frame = enhance_face(similar_face, target_vision_frame)
return target_vision_frame
if target_faces:
for target_face in target_faces:
temp_vision_frame = enhance_face(target_face, temp_vision_frame)
def process_frames(source_path : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
for queue_payload in process_manager.manage(queue_payloads):
target_vision_path = queue_payload['frame_path']
target_vision_frame = read_image(target_vision_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'target_vision_frame': target_vision_frame
})
write_image(target_vision_path, output_vision_frame)
update_progress(1)
def process_image(source_path : str, target_path : str, output_path : str) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
target_vision_frame = read_static_image(target_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'target_vision_frame': target_vision_frame
})
write_image(output_path, output_vision_frame)
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
processors.multi_process_frames(None, temp_frame_paths, process_frames)
return temp_vision_frame

View File

@@ -1,6 +1,6 @@
from argparse import ArgumentParser
from functools import lru_cache
from typing import List, Tuple
from typing import List, Optional, Tuple
import cv2
import numpy
@@ -8,16 +8,14 @@ import numpy
import facefusion.choices
import facefusion.jobs.job_manager
import facefusion.jobs.job_store
import facefusion.processors.core as processors
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, process_manager, state_manager, video_manager, wording
from facefusion.common_helper import get_first
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, state_manager, video_manager, wording
from facefusion.common_helper import get_first, is_macos
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
from facefusion.execution import has_execution_provider
from facefusion.face_analyser import get_average_face, get_many_faces, get_one_face
from facefusion.face_helper import paste_back, warp_face_by_face_landmark_5
from facefusion.face_masker import create_area_mask, create_box_mask, create_occlusion_mask, create_region_mask
from facefusion.face_selector import find_similar_faces, sort_and_filter_faces, sort_faces_by_order
from facefusion.face_store import get_reference_faces
from facefusion.face_selector import select_faces, sort_faces_by_order
from facefusion.filesystem import filter_image_paths, has_image, in_directory, is_image, is_video, resolve_relative_path, same_file_extension
from facefusion.model_helper import get_static_model_initializer
from facefusion.processors import choices as processors_choices
@@ -25,11 +23,11 @@ from facefusion.processors.pixel_boost import explode_pixel_boost, implode_pixel
from facefusion.processors.types import FaceSwapperInputs
from facefusion.program_helper import find_argument_group
from facefusion.thread_helper import conditional_thread_semaphore
from facefusion.types import ApplyStateItem, Args, DownloadScope, Embedding, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
from facefusion.vision import read_image, read_static_image, read_static_images, unpack_resolution, write_image
from facefusion.types import ApplyStateItem, Args, DownloadScope, Embedding, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, VisionFrame
from facefusion.vision import read_static_image, read_static_images, read_static_video_frame, unpack_resolution
@lru_cache(maxsize = None)
@lru_cache()
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
return\
{
@@ -68,8 +66,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.0.0', 'arcface_converter_ghost.hash'),
'path': resolve_relative_path('../.assets/models/arcface_converter_ghost.hash')
'url': resolve_download_url('models-3.4.0', 'crossface_ghost.hash'),
'path': resolve_relative_path('../.assets/models/crossface_ghost.hash')
}
},
'sources':
@@ -81,8 +79,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.0.0', 'arcface_converter_ghost.onnx'),
'path': resolve_relative_path('../.assets/models/arcface_converter_ghost.onnx')
'url': resolve_download_url('models-3.4.0', 'crossface_ghost.onnx'),
'path': resolve_relative_path('../.assets/models/crossface_ghost.onnx')
}
},
'type': 'ghost',
@@ -102,8 +100,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.0.0', 'arcface_converter_ghost.hash'),
'path': resolve_relative_path('../.assets/models/arcface_converter_ghost.hash')
'url': resolve_download_url('models-3.4.0', 'crossface_ghost.hash'),
'path': resolve_relative_path('../.assets/models/crossface_ghost.hash')
}
},
'sources':
@@ -115,8 +113,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.0.0', 'arcface_converter_ghost.onnx'),
'path': resolve_relative_path('../.assets/models/arcface_converter_ghost.onnx')
'url': resolve_download_url('models-3.4.0', 'crossface_ghost.onnx'),
'path': resolve_relative_path('../.assets/models/crossface_ghost.onnx')
}
},
'type': 'ghost',
@@ -136,8 +134,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.0.0', 'arcface_converter_ghost.hash'),
'path': resolve_relative_path('../.assets/models/arcface_converter_ghost.hash')
'url': resolve_download_url('models-3.4.0', 'crossface_ghost.hash'),
'path': resolve_relative_path('../.assets/models/crossface_ghost.hash')
}
},
'sources':
@@ -149,8 +147,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.0.0', 'arcface_converter_ghost.onnx'),
'path': resolve_relative_path('../.assets/models/arcface_converter_ghost.onnx')
'url': resolve_download_url('models-3.4.0', 'crossface_ghost.onnx'),
'path': resolve_relative_path('../.assets/models/crossface_ghost.onnx')
}
},
'type': 'ghost',
@@ -170,8 +168,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.1.0', 'arcface_converter_hififace.hash'),
'path': resolve_relative_path('../.assets/models/arcface_converter_hififace.hash')
'url': resolve_download_url('models-3.4.0', 'crossface_hififace.hash'),
'path': resolve_relative_path('../.assets/models/crossface_hififace.hash')
}
},
'sources':
@@ -183,8 +181,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.1.0', 'arcface_converter_hififace.onnx'),
'path': resolve_relative_path('../.assets/models/arcface_converter_hififace.onnx')
'url': resolve_download_url('models-3.4.0', 'crossface_hififace.onnx'),
'path': resolve_relative_path('../.assets/models/crossface_hififace.onnx')
}
},
'type': 'hififace',
@@ -324,8 +322,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.0.0', 'arcface_converter_simswap.hash'),
'path': resolve_relative_path('../.assets/models/arcface_converter_simswap.hash')
'url': resolve_download_url('models-3.4.0', 'crossface_simswap.hash'),
'path': resolve_relative_path('../.assets/models/crossface_simswap.hash')
}
},
'sources':
@@ -337,8 +335,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.0.0', 'arcface_converter_simswap.onnx'),
'path': resolve_relative_path('../.assets/models/arcface_converter_simswap.onnx')
'url': resolve_download_url('models-3.4.0', 'crossface_simswap.onnx'),
'path': resolve_relative_path('../.assets/models/crossface_simswap.onnx')
}
},
'type': 'simswap',
@@ -358,8 +356,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.0.0', 'arcface_converter_simswap.hash'),
'path': resolve_relative_path('../.assets/models/arcface_converter_simswap.hash')
'url': resolve_download_url('models-3.4.0', 'crossface_simswap.hash'),
'path': resolve_relative_path('../.assets/models/crossface_simswap.hash')
}
},
'sources':
@@ -371,8 +369,8 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
},
'embedding_converter':
{
'url': resolve_download_url('models-3.0.0', 'arcface_converter_simswap.onnx'),
'path': resolve_relative_path('../.assets/models/arcface_converter_simswap.onnx')
'url': resolve_download_url('models-3.4.0', 'crossface_simswap.onnx'),
'path': resolve_relative_path('../.assets/models/crossface_simswap.onnx')
}
},
'type': 'simswap',
@@ -428,7 +426,7 @@ def get_model_options() -> ModelOptions:
def get_model_name() -> str:
model_name = state_manager.get_item('face_swapper_model')
if has_execution_provider('coreml') and model_name == 'inswapper_128_fp16':
if is_macos() and has_execution_provider('coreml') and model_name == 'inswapper_128_fp16':
return 'inswapper_128'
return model_name
@@ -440,12 +438,14 @@ def register_args(program : ArgumentParser) -> None:
known_args, _ = program.parse_known_args()
face_swapper_pixel_boost_choices = processors_choices.face_swapper_set.get(known_args.face_swapper_model)
group_processors.add_argument('--face-swapper-pixel-boost', help = wording.get('help.face_swapper_pixel_boost'), default = config.get_str_value('processors', 'face_swapper_pixel_boost', get_first(face_swapper_pixel_boost_choices)), choices = face_swapper_pixel_boost_choices)
facefusion.jobs.job_store.register_step_keys([ 'face_swapper_model', 'face_swapper_pixel_boost' ])
group_processors.add_argument('--face-swapper-weight', help = wording.get('help.face_swapper_weight'), type = float, default = config.get_float_value('processors', 'face_swapper_weight', '0.5'), choices = processors_choices.face_swapper_weight_range)
facefusion.jobs.job_store.register_step_keys([ 'face_swapper_model', 'face_swapper_pixel_boost', 'face_swapper_weight' ])
def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None:
apply_state_item('face_swapper_model', args.get('face_swapper_model'))
apply_state_item('face_swapper_pixel_boost', args.get('face_swapper_pixel_boost'))
apply_state_item('face_swapper_weight', args.get('face_swapper_weight'))
def pre_check() -> bool:
@@ -459,26 +459,33 @@ def pre_process(mode : ProcessMode) -> bool:
if not has_image(state_manager.get_item('source_paths')):
logger.error(wording.get('choose_image_source') + wording.get('exclamation_mark'), __name__)
return False
source_image_paths = filter_image_paths(state_manager.get_item('source_paths'))
source_frames = read_static_images(source_image_paths)
source_faces = get_many_faces(source_frames)
if not get_one_face(source_faces):
logger.error(wording.get('no_source_face_detected') + wording.get('exclamation_mark'), __name__)
return False
if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')):
logger.error(wording.get('choose_image_or_video_target') + wording.get('exclamation_mark'), __name__)
return False
if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False
if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False
return True
def post_process() -> None:
read_static_image.cache_clear()
read_static_video_frame.cache_clear()
video_manager.clear_video_pool()
if state_manager.get_item('video_memory_strategy') in [ 'strict', 'moderate' ]:
get_static_model_initializer.cache_clear()
@@ -512,7 +519,7 @@ def swap_face(source_face : Face, target_face : Face, temp_vision_frame : Vision
pixel_boost_vision_frames = implode_pixel_boost(crop_vision_frame, pixel_boost_total, model_size)
for pixel_boost_vision_frame in pixel_boost_vision_frames:
pixel_boost_vision_frame = prepare_crop_frame(pixel_boost_vision_frame)
pixel_boost_vision_frame = forward_swap_face(source_face, pixel_boost_vision_frame)
pixel_boost_vision_frame = forward_swap_face(source_face, target_face, pixel_boost_vision_frame)
pixel_boost_vision_frame = normalize_crop_frame(pixel_boost_vision_frame)
temp_vision_frames.append(pixel_boost_vision_frame)
crop_vision_frame = explode_pixel_boost(temp_vision_frames, pixel_boost_total, model_size, pixel_boost_size)
@@ -527,16 +534,16 @@ def swap_face(source_face : Face, target_face : Face, temp_vision_frame : Vision
crop_masks.append(region_mask)
crop_mask = numpy.minimum.reduce(crop_masks).clip(0, 1)
temp_vision_frame = paste_back(temp_vision_frame, crop_vision_frame, crop_mask, affine_matrix)
return temp_vision_frame
paste_vision_frame = paste_back(temp_vision_frame, crop_vision_frame, crop_mask, affine_matrix)
return paste_vision_frame
def forward_swap_face(source_face : Face, crop_vision_frame : VisionFrame) -> VisionFrame:
def forward_swap_face(source_face : Face, target_face : Face, crop_vision_frame : VisionFrame) -> VisionFrame:
face_swapper = get_inference_pool().get('face_swapper')
model_type = get_model_options().get('type')
face_swapper_inputs = {}
if has_execution_provider('coreml') and model_type in [ 'ghost', 'uniface' ]:
if is_macos() and has_execution_provider('coreml') and model_type in [ 'ghost', 'uniface' ]:
face_swapper.set_providers([ facefusion.choices.execution_provider_set.get('cpu') ])
for face_swapper_input in face_swapper.get_inputs():
@@ -544,7 +551,9 @@ def forward_swap_face(source_face : Face, crop_vision_frame : VisionFrame) -> Vi
if model_type in [ 'blendswap', 'uniface' ]:
face_swapper_inputs[face_swapper_input.name] = prepare_source_frame(source_face)
else:
face_swapper_inputs[face_swapper_input.name] = prepare_source_embedding(source_face)
source_embedding = prepare_source_embedding(source_face)
source_embedding = balance_source_embedding(source_embedding, target_face.embedding)
face_swapper_inputs[face_swapper_input.name] = source_embedding
if face_swapper_input.name == 'target':
face_swapper_inputs[face_swapper_input.name] = crop_vision_frame
@@ -554,16 +563,16 @@ def forward_swap_face(source_face : Face, crop_vision_frame : VisionFrame) -> Vi
return crop_vision_frame
def forward_convert_embedding(embedding : Embedding) -> Embedding:
def forward_convert_embedding(face_embedding : Embedding) -> Embedding:
embedding_converter = get_inference_pool().get('embedding_converter')
with conditional_thread_semaphore():
embedding = embedding_converter.run(None,
face_embedding = embedding_converter.run(None,
{
'input': embedding
'input': face_embedding
})[0]
return embedding
return face_embedding
def prepare_source_frame(source_face : Face) -> VisionFrame:
@@ -572,8 +581,10 @@ def prepare_source_frame(source_face : Face) -> VisionFrame:
if model_type == 'blendswap':
source_vision_frame, _ = warp_face_by_face_landmark_5(source_vision_frame, source_face.landmark_set.get('5/68'), 'arcface_112_v2', (112, 112))
if model_type == 'uniface':
source_vision_frame, _ = warp_face_by_face_landmark_5(source_vision_frame, source_face.landmark_set.get('5/68'), 'ffhq_512', (256, 256))
source_vision_frame = source_vision_frame[:, :, ::-1] / 255.0
source_vision_frame = source_vision_frame.transpose(2, 0, 1)
source_vision_frame = numpy.expand_dims(source_vision_frame, axis = 0).astype(numpy.float32)
@@ -584,12 +595,13 @@ def prepare_source_embedding(source_face : Face) -> Embedding:
model_type = get_model_options().get('type')
if model_type == 'ghost':
source_embedding, _ = convert_embedding(source_face)
source_embedding = source_face.embedding.reshape(-1, 512)
source_embedding, _ = convert_source_embedding(source_embedding)
source_embedding = source_embedding.reshape(1, -1)
return source_embedding
if model_type == 'hyperswap':
source_embedding = source_face.normed_embedding.reshape((1, -1))
source_embedding = source_face.embedding_norm.reshape((1, -1))
return source_embedding
if model_type == 'inswapper':
@@ -599,17 +611,31 @@ def prepare_source_embedding(source_face : Face) -> Embedding:
source_embedding = numpy.dot(source_embedding, model_initializer) / numpy.linalg.norm(source_embedding)
return source_embedding
_, source_normed_embedding = convert_embedding(source_face)
source_embedding = source_normed_embedding.reshape(1, -1)
source_embedding = source_face.embedding.reshape(-1, 512)
_, source_embedding_norm = convert_source_embedding(source_embedding)
source_embedding = source_embedding_norm.reshape(1, -1)
return source_embedding
def convert_embedding(source_face : Face) -> Tuple[Embedding, Embedding]:
embedding = source_face.embedding.reshape(-1, 512)
embedding = forward_convert_embedding(embedding)
embedding = embedding.ravel()
normed_embedding = embedding / numpy.linalg.norm(embedding)
return embedding, normed_embedding
def balance_source_embedding(source_embedding : Embedding, target_embedding : Embedding) -> Embedding:
model_type = get_model_options().get('type')
face_swapper_weight = state_manager.get_item('face_swapper_weight')
face_swapper_weight = numpy.interp(face_swapper_weight, [ 0, 1 ], [ 0.35, -0.35 ]).astype(numpy.float32)
if model_type in [ 'hififace', 'hyperswap', 'inswapper', 'simswap' ]:
target_embedding = target_embedding / numpy.linalg.norm(target_embedding)
source_embedding = source_embedding.reshape(1, -1)
target_embedding = target_embedding.reshape(1, -1)
source_embedding = source_embedding * (1 - face_swapper_weight) + target_embedding * face_swapper_weight
return source_embedding
def convert_source_embedding(source_embedding : Embedding) -> Tuple[Embedding, Embedding]:
source_embedding = forward_convert_embedding(source_embedding)
source_embedding = source_embedding.ravel()
source_embedding_norm = source_embedding / numpy.linalg.norm(source_embedding)
return source_embedding, source_embedding_norm
def prepare_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
@@ -629,84 +655,39 @@ def normalize_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
model_standard_deviation = get_model_options().get('standard_deviation')
crop_vision_frame = crop_vision_frame.transpose(1, 2, 0)
if model_type in [ 'ghost', 'hififace', 'hyperswap', 'uniface' ]:
crop_vision_frame = crop_vision_frame * model_standard_deviation + model_mean
crop_vision_frame = crop_vision_frame.clip(0, 1)
crop_vision_frame = crop_vision_frame[:, :, ::-1] * 255
return crop_vision_frame
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
return swap_face(source_face, target_face, temp_vision_frame)
def extract_source_face(source_vision_frames : List[VisionFrame]) -> Optional[Face]:
source_faces = []
if source_vision_frames:
for source_vision_frame in source_vision_frames:
temp_faces = get_many_faces([source_vision_frame])
temp_faces = sort_faces_by_order(temp_faces, 'large-small')
if temp_faces:
source_faces.append(get_first(temp_faces))
return get_average_face(source_faces)
def process_frame(inputs : FaceSwapperInputs) -> VisionFrame:
reference_faces = inputs.get('reference_faces')
source_face = inputs.get('source_face')
reference_vision_frame = inputs.get('reference_vision_frame')
source_vision_frames = inputs.get('source_vision_frames')
target_vision_frame = inputs.get('target_vision_frame')
many_faces = sort_and_filter_faces(get_many_faces([ target_vision_frame ]))
temp_vision_frame = inputs.get('temp_vision_frame')
source_face = extract_source_face(source_vision_frames)
target_faces = select_faces(reference_vision_frame, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'many':
if many_faces:
for target_face in many_faces:
target_vision_frame = swap_face(source_face, target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'one':
target_face = get_one_face(many_faces)
if target_face:
target_vision_frame = swap_face(source_face, target_face, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'reference':
similar_faces = find_similar_faces(many_faces, reference_faces, state_manager.get_item('reference_face_distance'))
if similar_faces:
for similar_face in similar_faces:
target_vision_frame = swap_face(source_face, similar_face, target_vision_frame)
return target_vision_frame
if source_face and target_faces:
for target_face in target_faces:
temp_vision_frame = swap_face(source_face, target_face, temp_vision_frame)
def process_frames(source_paths : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
source_frames = read_static_images(source_paths)
source_faces = []
for source_frame in source_frames:
temp_faces = get_many_faces([ source_frame ])
temp_faces = sort_faces_by_order(temp_faces, 'large-small')
if temp_faces:
source_faces.append(get_first(temp_faces))
source_face = get_average_face(source_faces)
for queue_payload in process_manager.manage(queue_payloads):
target_vision_path = queue_payload['frame_path']
target_vision_frame = read_image(target_vision_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'source_face': source_face,
'target_vision_frame': target_vision_frame
})
write_image(target_vision_path, output_vision_frame)
update_progress(1)
def process_image(source_paths : List[str], target_path : str, output_path : str) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
source_frames = read_static_images(source_paths)
source_faces = []
for source_frame in source_frames:
temp_faces = get_many_faces([ source_frame ])
temp_faces = sort_faces_by_order(temp_faces, 'large-small')
if temp_faces:
source_faces.append(get_first(temp_faces))
source_face = get_average_face(source_faces)
target_vision_frame = read_static_image(target_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'source_face': source_face,
'target_vision_frame': target_vision_frame
})
write_image(output_path, output_vision_frame)
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
processors.multi_process_frames(source_paths, temp_frame_paths, process_frames)
return temp_vision_frame

View File

@@ -7,9 +7,8 @@ import numpy
import facefusion.jobs.job_manager
import facefusion.jobs.job_store
import facefusion.processors.core as processors
from facefusion import config, content_analyser, inference_manager, logger, process_manager, state_manager, video_manager, wording
from facefusion.common_helper import create_int_metavar
from facefusion import config, content_analyser, inference_manager, logger, state_manager, video_manager, wording
from facefusion.common_helper import create_int_metavar, is_macos
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
from facefusion.execution import has_execution_provider
from facefusion.filesystem import in_directory, is_image, is_video, resolve_relative_path, same_file_extension
@@ -17,11 +16,11 @@ from facefusion.processors import choices as processors_choices
from facefusion.processors.types import FrameColorizerInputs
from facefusion.program_helper import find_argument_group
from facefusion.thread_helper import thread_semaphore
from facefusion.types import ApplyStateItem, Args, DownloadScope, ExecutionProvider, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
from facefusion.vision import read_image, read_static_image, unpack_resolution, write_image
from facefusion.types import ApplyStateItem, Args, DownloadScope, ExecutionProvider, InferencePool, ModelOptions, ModelSet, ProcessMode, VisionFrame
from facefusion.vision import blend_frame, read_static_image, read_static_video_frame, unpack_resolution
@lru_cache(maxsize = None)
@lru_cache()
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
return\
{
@@ -141,7 +140,7 @@ def clear_inference_pool() -> None:
def resolve_execution_providers() -> List[ExecutionProvider]:
if has_execution_provider('coreml'):
if is_macos() and has_execution_provider('coreml'):
return [ 'cpu' ]
return state_manager.get_item('execution_providers')
@@ -188,6 +187,7 @@ def pre_process(mode : ProcessMode) -> bool:
def post_process() -> None:
read_static_image.cache_clear()
read_static_video_frame.cache_clear()
video_manager.clear_video_pool()
if state_manager.get_item('video_memory_strategy') in [ 'strict', 'moderate' ]:
clear_inference_pool()
@@ -199,7 +199,7 @@ def colorize_frame(temp_vision_frame : VisionFrame) -> VisionFrame:
color_vision_frame = prepare_temp_frame(temp_vision_frame)
color_vision_frame = forward(color_vision_frame)
color_vision_frame = merge_color_frame(temp_vision_frame, color_vision_frame)
color_vision_frame = blend_frame(temp_vision_frame, color_vision_frame)
color_vision_frame = blend_color_frame(temp_vision_frame, color_vision_frame)
return color_vision_frame
@@ -255,41 +255,12 @@ def merge_color_frame(temp_vision_frame : VisionFrame, color_vision_frame : Visi
return color_vision_frame
def blend_frame(temp_vision_frame : VisionFrame, paste_vision_frame : VisionFrame) -> VisionFrame:
def blend_color_frame(temp_vision_frame : VisionFrame, color_vision_frame : VisionFrame) -> VisionFrame:
frame_colorizer_blend = 1 - (state_manager.get_item('frame_colorizer_blend') / 100)
temp_vision_frame = cv2.addWeighted(temp_vision_frame, frame_colorizer_blend, paste_vision_frame, 1 - frame_colorizer_blend, 0)
temp_vision_frame = blend_frame(temp_vision_frame, color_vision_frame, 1 - frame_colorizer_blend)
return temp_vision_frame
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
pass
def process_frame(inputs : FrameColorizerInputs) -> VisionFrame:
target_vision_frame = inputs.get('target_vision_frame')
return colorize_frame(target_vision_frame)
def process_frames(source_paths : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
for queue_payload in process_manager.manage(queue_payloads):
target_vision_path = queue_payload['frame_path']
target_vision_frame = read_image(target_vision_path)
output_vision_frame = process_frame(
{
'target_vision_frame': target_vision_frame
})
write_image(target_vision_path, output_vision_frame)
update_progress(1)
def process_image(source_paths : List[str], target_path : str, output_path : str) -> None:
target_vision_frame = read_static_image(target_path)
output_vision_frame = process_frame(
{
'target_vision_frame': target_vision_frame
})
write_image(output_path, output_vision_frame)
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
processors.multi_process_frames(None, temp_frame_paths, process_frames)
temp_vision_frame = inputs.get('temp_vision_frame')
return colorize_frame(temp_vision_frame)

View File

@@ -1,15 +1,13 @@
from argparse import ArgumentParser
from functools import lru_cache
from typing import List
import cv2
import numpy
import facefusion.jobs.job_manager
import facefusion.jobs.job_store
import facefusion.processors.core as processors
from facefusion import config, content_analyser, inference_manager, logger, process_manager, state_manager, video_manager, wording
from facefusion.common_helper import create_int_metavar
from facefusion import config, content_analyser, inference_manager, logger, state_manager, video_manager, wording
from facefusion.common_helper import create_int_metavar, is_macos
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
from facefusion.execution import has_execution_provider
from facefusion.filesystem import in_directory, is_image, is_video, resolve_relative_path, same_file_extension
@@ -17,11 +15,11 @@ from facefusion.processors import choices as processors_choices
from facefusion.processors.types import FrameEnhancerInputs
from facefusion.program_helper import find_argument_group
from facefusion.thread_helper import conditional_thread_semaphore
from facefusion.types import ApplyStateItem, Args, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
from facefusion.vision import create_tile_frames, merge_tile_frames, read_image, read_static_image, write_image
from facefusion.types import ApplyStateItem, Args, DownloadScope, InferencePool, ModelOptions, ModelSet, ProcessMode, VisionFrame
from facefusion.vision import blend_frame, create_tile_frames, merge_tile_frames, read_static_image, read_static_video_frame
@lru_cache(maxsize = None)
@lru_cache()
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
return\
{
@@ -426,7 +424,7 @@ def get_model_options() -> ModelOptions:
def get_frame_enhancer_model() -> str:
frame_enhancer_model = state_manager.get_item('frame_enhancer_model')
if has_execution_provider('coreml'):
if is_macos() and has_execution_provider('coreml'):
if frame_enhancer_model == 'real_esrgan_x2_fp16':
return 'real_esrgan_x2'
if frame_enhancer_model == 'real_esrgan_x4_fp16':
@@ -471,6 +469,7 @@ def pre_process(mode : ProcessMode) -> bool:
def post_process() -> None:
read_static_image.cache_clear()
read_static_video_frame.cache_clear()
video_manager.clear_video_pool()
if state_manager.get_item('video_memory_strategy') in [ 'strict', 'moderate' ]:
clear_inference_pool()
@@ -490,7 +489,7 @@ def enhance_frame(temp_vision_frame : VisionFrame) -> VisionFrame:
tile_vision_frames[index] = normalize_tile_frame(tile_vision_frame)
merge_vision_frame = merge_tile_frames(tile_vision_frames, temp_width * model_scale, temp_height * model_scale, pad_width * model_scale, pad_height * model_scale, (model_size[0] * model_scale, model_size[1] * model_scale, model_size[2] * model_scale))
temp_vision_frame = blend_frame(temp_vision_frame, merge_vision_frame)
temp_vision_frame = blend_merge_frame(temp_vision_frame, merge_vision_frame)
return temp_vision_frame
@@ -506,55 +505,26 @@ def forward(tile_vision_frame : VisionFrame) -> VisionFrame:
return tile_vision_frame
def prepare_tile_frame(vision_tile_frame : VisionFrame) -> VisionFrame:
vision_tile_frame = numpy.expand_dims(vision_tile_frame[:, :, ::-1], axis = 0)
vision_tile_frame = vision_tile_frame.transpose(0, 3, 1, 2)
vision_tile_frame = vision_tile_frame.astype(numpy.float32) / 255.0
return vision_tile_frame
def prepare_tile_frame(tile_vision_frame : VisionFrame) -> VisionFrame:
tile_vision_frame = numpy.expand_dims(tile_vision_frame[:, :, ::-1], axis = 0)
tile_vision_frame = tile_vision_frame.transpose(0, 3, 1, 2)
tile_vision_frame = tile_vision_frame.astype(numpy.float32) / 255.0
return tile_vision_frame
def normalize_tile_frame(vision_tile_frame : VisionFrame) -> VisionFrame:
vision_tile_frame = vision_tile_frame.transpose(0, 2, 3, 1).squeeze(0) * 255
vision_tile_frame = vision_tile_frame.clip(0, 255).astype(numpy.uint8)[:, :, ::-1]
return vision_tile_frame
def normalize_tile_frame(tile_vision_frame : VisionFrame) -> VisionFrame:
tile_vision_frame = tile_vision_frame.transpose(0, 2, 3, 1).squeeze(0) * 255
tile_vision_frame = tile_vision_frame.clip(0, 255).astype(numpy.uint8)[:, :, ::-1]
return tile_vision_frame
def blend_frame(temp_vision_frame : VisionFrame, merge_vision_frame : VisionFrame) -> VisionFrame:
def blend_merge_frame(temp_vision_frame : VisionFrame, merge_vision_frame : VisionFrame) -> VisionFrame:
frame_enhancer_blend = 1 - (state_manager.get_item('frame_enhancer_blend') / 100)
temp_vision_frame = cv2.resize(temp_vision_frame, (merge_vision_frame.shape[1], merge_vision_frame.shape[0]))
temp_vision_frame = cv2.addWeighted(temp_vision_frame, frame_enhancer_blend, merge_vision_frame, 1 - frame_enhancer_blend, 0)
temp_vision_frame = blend_frame(temp_vision_frame, merge_vision_frame, 1 - frame_enhancer_blend)
return temp_vision_frame
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
pass
def process_frame(inputs : FrameEnhancerInputs) -> VisionFrame:
target_vision_frame = inputs.get('target_vision_frame')
return enhance_frame(target_vision_frame)
def process_frames(source_paths : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
for queue_payload in process_manager.manage(queue_payloads):
target_vision_path = queue_payload['frame_path']
target_vision_frame = read_image(target_vision_path)
output_vision_frame = process_frame(
{
'target_vision_frame': target_vision_frame
})
write_image(target_vision_path, output_vision_frame)
update_progress(1)
def process_image(source_paths : List[str], target_path : str, output_path : str) -> None:
target_vision_frame = read_static_image(target_path)
output_vision_frame = process_frame(
{
'target_vision_frame': target_vision_frame
})
write_image(output_path, output_vision_frame)
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
processors.multi_process_frames(None, temp_frame_paths, process_frames)
temp_vision_frame = inputs.get('temp_vision_frame')
return enhance_frame(temp_vision_frame)

View File

@@ -1,33 +1,28 @@
from argparse import ArgumentParser
from functools import lru_cache
from typing import List
import cv2
import numpy
import facefusion.jobs.job_manager
import facefusion.jobs.job_store
import facefusion.processors.core as processors
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, process_manager, state_manager, video_manager, voice_extractor, wording
from facefusion.audio import create_empty_audio_frame, get_voice_frame, read_static_voice
from facefusion import config, content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, inference_manager, logger, state_manager, video_manager, voice_extractor, wording
from facefusion.audio import read_static_voice
from facefusion.common_helper import create_float_metavar
from facefusion.common_helper import get_first
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
from facefusion.face_analyser import get_many_faces, get_one_face
from facefusion.face_helper import create_bounding_box, paste_back, warp_face_by_bounding_box, warp_face_by_face_landmark_5
from facefusion.face_masker import create_area_mask, create_box_mask, create_occlusion_mask
from facefusion.face_selector import find_similar_faces, sort_and_filter_faces
from facefusion.face_store import get_reference_faces
from facefusion.filesystem import filter_audio_paths, has_audio, in_directory, is_image, is_video, resolve_relative_path, same_file_extension
from facefusion.face_selector import select_faces
from facefusion.filesystem import has_audio, resolve_relative_path
from facefusion.processors import choices as processors_choices
from facefusion.processors.types import LipSyncerInputs, LipSyncerWeight
from facefusion.program_helper import find_argument_group
from facefusion.thread_helper import conditional_thread_semaphore
from facefusion.types import ApplyStateItem, Args, AudioFrame, BoundingBox, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
from facefusion.vision import read_image, read_static_image, restrict_video_fps, write_image
from facefusion.types import ApplyStateItem, Args, AudioFrame, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, VisionFrame
from facefusion.vision import read_static_image, read_static_video_frame
@lru_cache(maxsize = None)
@lru_cache()
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
return\
{
@@ -138,20 +133,12 @@ def pre_process(mode : ProcessMode) -> bool:
if not has_audio(state_manager.get_item('source_paths')):
logger.error(wording.get('choose_audio_source') + wording.get('exclamation_mark'), __name__)
return False
if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')):
logger.error(wording.get('choose_image_or_video_target') + wording.get('exclamation_mark'), __name__)
return False
if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False
if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False
return True
def post_process() -> None:
read_static_image.cache_clear()
read_static_video_frame.cache_clear()
read_static_voice.cache_clear()
video_manager.clear_video_pool()
if state_manager.get_item('video_memory_strategy') in [ 'strict', 'moderate' ]:
@@ -166,10 +153,10 @@ def post_process() -> None:
voice_extractor.clear_inference_pool()
def sync_lip(target_face : Face, temp_audio_frame : AudioFrame, temp_vision_frame : VisionFrame) -> VisionFrame:
def sync_lip(target_face : Face, source_voice_frame : AudioFrame, temp_vision_frame : VisionFrame) -> VisionFrame:
model_type = get_model_options().get('type')
model_size = get_model_options().get('size')
temp_audio_frame = prepare_audio_frame(temp_audio_frame)
source_voice_frame = prepare_audio_frame(source_voice_frame)
crop_vision_frame, affine_matrix = warp_face_by_face_landmark_5(temp_vision_frame, target_face.landmark_set.get('5/68'), 'ffhq_512', (512, 512))
crop_masks = []
@@ -182,17 +169,17 @@ def sync_lip(target_face : Face, temp_audio_frame : AudioFrame, temp_vision_fram
box_mask = create_box_mask(crop_vision_frame, state_manager.get_item('face_mask_blur'), state_manager.get_item('face_mask_padding'))
crop_masks.append(box_mask)
crop_vision_frame = prepare_crop_frame(crop_vision_frame)
crop_vision_frame = forward_edtalk(temp_audio_frame, crop_vision_frame, lip_syncer_weight)
crop_vision_frame = forward_edtalk(source_voice_frame, crop_vision_frame, lip_syncer_weight)
crop_vision_frame = normalize_crop_frame(crop_vision_frame)
if model_type == 'wav2lip':
face_landmark_68 = cv2.transform(target_face.landmark_set.get('68').reshape(1, -1, 2), affine_matrix).reshape(-1, 2)
area_mask = create_area_mask(crop_vision_frame, face_landmark_68, [ 'lower-face' ])
crop_masks.append(area_mask)
bounding_box = create_bounding_box(face_landmark_68)
bounding_box = resize_bounding_box(bounding_box, 1 / 8)
area_vision_frame, area_matrix = warp_face_by_bounding_box(crop_vision_frame, bounding_box, model_size)
area_vision_frame = prepare_crop_frame(area_vision_frame)
area_vision_frame = forward_wav2lip(temp_audio_frame, area_vision_frame)
area_vision_frame = forward_wav2lip(source_voice_frame, area_vision_frame)
area_vision_frame = normalize_crop_frame(area_vision_frame)
crop_vision_frame = cv2.warpAffine(area_vision_frame, cv2.invertAffineTransform(area_matrix), (512, 512), borderMode = cv2.BORDER_REPLICATE)
@@ -249,23 +236,17 @@ def prepare_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
crop_vision_frame = cv2.resize(crop_vision_frame, model_size, interpolation = cv2.INTER_AREA)
crop_vision_frame = crop_vision_frame[:, :, ::-1] / 255.0
crop_vision_frame = numpy.expand_dims(crop_vision_frame.transpose(2, 0, 1), axis = 0).astype(numpy.float32)
if model_type == 'wav2lip':
crop_vision_frame = numpy.expand_dims(crop_vision_frame, axis = 0)
prepare_vision_frame = crop_vision_frame.copy()
prepare_vision_frame[:, model_size[0] // 2:] = 0
crop_vision_frame = numpy.concatenate((prepare_vision_frame, crop_vision_frame), axis = 3)
crop_vision_frame = crop_vision_frame.transpose(0, 3, 1, 2).astype('float32') / 255.0
crop_vision_frame = crop_vision_frame.transpose(0, 3, 1, 2).astype(numpy.float32) / 255.0
return crop_vision_frame
def resize_bounding_box(bounding_box : BoundingBox, aspect_ratio : float) -> BoundingBox:
x1, y1, x2, y2 = bounding_box
y1 -= numpy.abs(y2 - y1) * aspect_ratio
bounding_box[1] = max(y1, 0)
return bounding_box
def normalize_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
model_type = get_model_options().get('type')
crop_vision_frame = crop_vision_frame[0].transpose(1, 2, 0)
@@ -279,70 +260,16 @@ def normalize_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
return crop_vision_frame
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
pass
def process_frame(inputs : LipSyncerInputs) -> VisionFrame:
reference_faces = inputs.get('reference_faces')
source_audio_frame = inputs.get('source_audio_frame')
reference_vision_frame = inputs.get('reference_vision_frame')
source_voice_frame = inputs.get('source_voice_frame')
target_vision_frame = inputs.get('target_vision_frame')
many_faces = sort_and_filter_faces(get_many_faces([ target_vision_frame ]))
temp_vision_frame = inputs.get('temp_vision_frame')
target_faces = select_faces(reference_vision_frame, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'many':
if many_faces:
for target_face in many_faces:
target_vision_frame = sync_lip(target_face, source_audio_frame, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'one':
target_face = get_one_face(many_faces)
if target_face:
target_vision_frame = sync_lip(target_face, source_audio_frame, target_vision_frame)
if state_manager.get_item('face_selector_mode') == 'reference':
similar_faces = find_similar_faces(many_faces, reference_faces, state_manager.get_item('reference_face_distance'))
if similar_faces:
for similar_face in similar_faces:
target_vision_frame = sync_lip(similar_face, source_audio_frame, target_vision_frame)
return target_vision_frame
if target_faces:
for target_face in target_faces:
temp_vision_frame = sync_lip(target_face, source_voice_frame, temp_vision_frame)
return temp_vision_frame
def process_frames(source_paths : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
source_audio_path = get_first(filter_audio_paths(source_paths))
temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps'))
for queue_payload in process_manager.manage(queue_payloads):
frame_number = queue_payload.get('frame_number')
target_vision_path = queue_payload.get('frame_path')
source_audio_frame = get_voice_frame(source_audio_path, temp_video_fps, frame_number)
if not numpy.any(source_audio_frame):
source_audio_frame = create_empty_audio_frame()
target_vision_frame = read_image(target_vision_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'source_audio_frame': source_audio_frame,
'target_vision_frame': target_vision_frame
})
write_image(target_vision_path, output_vision_frame)
update_progress(1)
def process_image(source_paths : List[str], target_path : str, output_path : str) -> None:
reference_faces = get_reference_faces() if 'reference' in state_manager.get_item('face_selector_mode') else None
source_audio_frame = create_empty_audio_frame()
target_vision_frame = read_static_image(target_path)
output_vision_frame = process_frame(
{
'reference_faces': reference_faces,
'source_audio_frame': source_audio_frame,
'target_vision_frame': target_vision_frame
})
write_image(output_path, output_vision_frame)
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
source_audio_paths = filter_audio_paths(state_manager.get_item('source_paths'))
temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps'))
for source_audio_path in source_audio_paths:
read_static_voice(source_audio_path, temp_video_fps)
processors.multi_process_frames(source_paths, temp_frame_paths, process_frames)