Feat/custom file format handling (#845)

* Purge filetype dependency, Rename file_extension to file_format, Introduce custom format detections

* Changed a lot

* Purge filetype dependency, Rename file_extension to file_format, Introduce custom format detections

* Fix stuff

* Fix stuff

* Simplify all the is_ and has_ methods

* Simplify all the is_ and has_ methods

* Use the new helper on more places

* Introduce are_ next to is_ and has_

* Get rid of the type-ignores

* Add more video types
This commit is contained in:
Henry Ruhs
2025-01-06 11:19:43 +01:00
committed by henryruhs
parent bb32135af2
commit 6f0675030e
36 changed files with 259 additions and 183 deletions

View File

@@ -1,5 +1,5 @@
from facefusion import state_manager from facefusion import state_manager
from facefusion.filesystem import is_image, is_video, list_directory from facefusion.filesystem import get_file_name, is_image, is_video, resolve_file_paths
from facefusion.jobs import job_store from facefusion.jobs import job_store
from facefusion.normalizer import normalize_fps, normalize_padding from facefusion.normalizer import normalize_fps, normalize_padding
from facefusion.processors.core import get_processors_modules from facefusion.processors.core import get_processors_modules
@@ -107,7 +107,7 @@ def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None:
apply_state_item('output_video_fps', output_video_fps) apply_state_item('output_video_fps', output_video_fps)
apply_state_item('skip_audio', args.get('skip_audio')) apply_state_item('skip_audio', args.get('skip_audio'))
# processors # processors
available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ]
apply_state_item('processors', args.get('processors')) apply_state_item('processors', args.get('processors'))
for processor_module in get_processors_modules(available_processors): for processor_module in get_processors_modules(available_processors):
processor_module.apply_args(args, apply_state_item) processor_module.apply_args(args, apply_state_item)

View File

@@ -2,7 +2,7 @@ import logging
from typing import List, Sequence from typing import List, Sequence
from facefusion.common_helper import create_float_range, create_int_range from facefusion.common_helper import create_float_range, create_int_range
from facefusion.typing import Angle, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, JobStatus, LogLevel, LogLevelSet, OutputAudioEncoder, OutputVideoEncoder, OutputVideoPreset, Race, Score, TempFrameFormat, UiWorkflow, VideoMemoryStrategy from facefusion.typing import Angle, AudioFormat, AudioTypeSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, OutputAudioEncoder, OutputVideoEncoder, OutputVideoPreset, Race, Score, UiWorkflow, VideoFormat, VideoMemoryStrategy, VideoTypeSet
face_detector_set : FaceDetectorSet =\ face_detector_set : FaceDetectorSet =\
{ {
@@ -34,7 +34,33 @@ face_mask_region_set : FaceMaskRegionSet =\
'lower-lip': 13 'lower-lip': 13
} }
face_mask_regions : List[FaceMaskRegion] = list(face_mask_region_set.keys()) face_mask_regions : List[FaceMaskRegion] = list(face_mask_region_set.keys())
temp_frame_formats : List[TempFrameFormat] = [ 'bmp', 'jpg', 'png' ]
audio_type_set : AudioTypeSet =\
{
'mp3': 'audio/mpeg',
'ogg': 'audio/ogg',
'wav': 'audio/x-wav'
}
image_type_set : ImageTypeSet =\
{
'bmp': 'image/bmp',
'jpg': 'image/jpeg',
'png': 'image/png',
'webp': 'image/webp'
}
video_type_set : VideoTypeSet =\
{
'avi': 'video/x-msvideo',
'mkv': 'video/x-matroska',
'mp4': 'video/mp4',
'mov': 'video/quicktime',
'webm': 'video/webm'
}
audio_formats : List[AudioFormat] = list(audio_type_set.keys())
image_formats : List[ImageFormat] = list(image_type_set.keys())
video_formats : List[VideoFormat] = list(video_type_set.keys())
temp_frame_formats : List[ImageFormat] = [ 'bmp', 'jpg', 'png' ]
output_audio_encoders : List[OutputAudioEncoder] = [ 'aac', 'libmp3lame', 'libopus', 'libvorbis' ] output_audio_encoders : List[OutputAudioEncoder] = [ 'aac', 'libmp3lame', 'libopus', 'libvorbis' ]
output_video_encoders : List[OutputVideoEncoder] = [ 'libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox' ] output_video_encoders : List[OutputVideoEncoder] = [ 'libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox' ]
output_video_presets : List[OutputVideoPreset] = [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ] output_video_presets : List[OutputVideoPreset] = [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ]

View File

@@ -16,7 +16,7 @@ from facefusion.face_analyser import get_average_face, get_many_faces, get_one_f
from facefusion.face_selector import sort_and_filter_faces from facefusion.face_selector import sort_and_filter_faces
from facefusion.face_store import append_reference_face, clear_reference_faces, get_reference_faces from facefusion.face_store import append_reference_face, clear_reference_faces, get_reference_faces
from facefusion.ffmpeg import copy_image, extract_frames, finalize_image, merge_video, replace_audio, restore_audio from facefusion.ffmpeg import copy_image, extract_frames, finalize_image, merge_video, replace_audio, restore_audio
from facefusion.filesystem import filter_audio_paths, is_image, is_video, list_directory, resolve_file_pattern from facefusion.filesystem import filter_audio_paths, get_file_name, is_image, is_video, resolve_file_paths, resolve_file_pattern
from facefusion.jobs import job_helper, job_manager, job_runner from facefusion.jobs import job_helper, job_manager, job_runner
from facefusion.jobs.job_list import compose_job_list from facefusion.jobs.job_list import compose_job_list
from facefusion.memory import limit_system_memory from facefusion.memory import limit_system_memory
@@ -24,7 +24,7 @@ from facefusion.processors.core import get_processors_modules
from facefusion.program import create_program from facefusion.program import create_program
from facefusion.program_helper import validate_args from facefusion.program_helper import validate_args
from facefusion.statistics import conditional_log_statistics from facefusion.statistics import conditional_log_statistics
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, get_temp_frame_paths, move_temp_file from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, move_temp_file, resolve_temp_frame_paths
from facefusion.typing import Args, ErrorCode from facefusion.typing import Args, ErrorCode
from facefusion.vision import get_video_frame, pack_resolution, read_image, read_static_images, restrict_image_resolution, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, unpack_resolution from facefusion.vision import get_video_frame, pack_resolution, read_image, read_static_images, restrict_image_resolution, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, unpack_resolution
@@ -133,7 +133,7 @@ def force_download() -> ErrorCode:
face_recognizer, face_recognizer,
voice_extractor voice_extractor
] ]
available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ]
processor_modules = get_processors_modules(available_processors) processor_modules = get_processors_modules(available_processors)
for module in common_modules + processor_modules: for module in common_modules + processor_modules:
@@ -413,7 +413,7 @@ def process_video(start_time : float) -> ErrorCode:
process_manager.end() process_manager.end()
return 1 return 1
# process frames # process frames
temp_frame_paths = get_temp_frame_paths(state_manager.get_item('target_path')) temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item('target_path'))
if temp_frame_paths: if temp_frame_paths:
for processor_module in get_processors_modules(state_manager.get_item('processors')): for processor_module in get_processors_modules(state_manager.get_item('processors')):
logger.info(wording.get('processing'), processor_module.__name__) logger.info(wording.get('processing'), processor_module.__name__)

View File

@@ -9,7 +9,7 @@ from tqdm import tqdm
import facefusion.choices import facefusion.choices
from facefusion import logger, process_manager, state_manager, wording from facefusion import logger, process_manager, state_manager, wording
from facefusion.filesystem import get_file_size, is_file, remove_file from facefusion.filesystem import get_file_name, get_file_size, is_file, remove_file
from facefusion.hash_helper import validate_hash from facefusion.hash_helper import validate_hash
from facefusion.typing import DownloadProvider, DownloadSet from facefusion.typing import DownloadProvider, DownloadSet
@@ -79,10 +79,10 @@ def conditional_download_hashes(hashes : DownloadSet) -> bool:
valid_hash_paths, invalid_hash_paths = validate_hash_paths(hash_paths) valid_hash_paths, invalid_hash_paths = validate_hash_paths(hash_paths)
for valid_hash_path in valid_hash_paths: for valid_hash_path in valid_hash_paths:
valid_hash_file_name, _ = os.path.splitext(os.path.basename(valid_hash_path)) valid_hash_file_name = get_file_name(valid_hash_path)
logger.debug(wording.get('validating_hash_succeed').format(hash_file_name = valid_hash_file_name), __name__) logger.debug(wording.get('validating_hash_succeed').format(hash_file_name = valid_hash_file_name), __name__)
for invalid_hash_path in invalid_hash_paths: for invalid_hash_path in invalid_hash_paths:
invalid_hash_file_name, _ = os.path.splitext(os.path.basename(invalid_hash_path)) invalid_hash_file_name = get_file_name(invalid_hash_path)
logger.error(wording.get('validating_hash_failed').format(hash_file_name = invalid_hash_file_name), __name__) logger.error(wording.get('validating_hash_failed').format(hash_file_name = invalid_hash_file_name), __name__)
if not invalid_hash_paths: if not invalid_hash_paths:
@@ -106,10 +106,10 @@ def conditional_download_sources(sources : DownloadSet) -> bool:
valid_source_paths, invalid_source_paths = validate_source_paths(source_paths) valid_source_paths, invalid_source_paths = validate_source_paths(source_paths)
for valid_source_path in valid_source_paths: for valid_source_path in valid_source_paths:
valid_source_file_name, _ = os.path.splitext(os.path.basename(valid_source_path)) valid_source_file_name = get_file_name(valid_source_path)
logger.debug(wording.get('validating_source_succeed').format(source_file_name = valid_source_file_name), __name__) logger.debug(wording.get('validating_source_succeed').format(source_file_name = valid_source_file_name), __name__)
for invalid_source_path in invalid_source_paths: for invalid_source_path in invalid_source_paths:
invalid_source_file_name, _ = os.path.splitext(os.path.basename(invalid_source_path)) invalid_source_file_name = get_file_name(invalid_source_path)
logger.error(wording.get('validating_source_failed').format(source_file_name = invalid_source_file_name), __name__) logger.error(wording.get('validating_source_failed').format(source_file_name = invalid_source_file_name), __name__)
if remove_file(invalid_source_path): if remove_file(invalid_source_path):

View File

@@ -4,12 +4,11 @@ import subprocess
import tempfile import tempfile
from typing import List, Optional from typing import List, Optional
import filetype
from tqdm import tqdm from tqdm import tqdm
from facefusion import logger, process_manager, state_manager, wording from facefusion import logger, process_manager, state_manager, wording
from facefusion.filesystem import remove_file from facefusion.filesystem import get_file_format, remove_file
from facefusion.temp_helper import get_temp_file_path, get_temp_frame_paths, get_temp_frames_pattern from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern, resolve_temp_frame_paths
from facefusion.typing import AudioBuffer, Fps, OutputVideoPreset, UpdateProgress from facefusion.typing import AudioBuffer, Fps, OutputVideoPreset, UpdateProgress
from facefusion.vision import count_trim_frame_total, detect_video_duration, restrict_video_fps from facefusion.vision import count_trim_frame_total, detect_video_duration, restrict_video_fps
@@ -99,13 +98,12 @@ def merge_video(target_path : str, output_video_resolution : str, output_video_f
output_video_encoder = state_manager.get_item('output_video_encoder') output_video_encoder = state_manager.get_item('output_video_encoder')
output_video_quality = state_manager.get_item('output_video_quality') output_video_quality = state_manager.get_item('output_video_quality')
output_video_preset = state_manager.get_item('output_video_preset') output_video_preset = state_manager.get_item('output_video_preset')
merge_frame_total = len(get_temp_frame_paths(target_path)) merge_frame_total = len(resolve_temp_frame_paths(target_path))
temp_video_fps = restrict_video_fps(target_path, output_video_fps) temp_video_fps = restrict_video_fps(target_path, output_video_fps)
temp_file_path = get_temp_file_path(target_path) temp_file_path = get_temp_file_path(target_path)
temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d') temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d')
is_webm = filetype.guess_mime(target_path) == 'video/webm'
if is_webm: if get_file_format(target_path) == 'webm':
output_video_encoder = 'libvpx-vp9' output_video_encoder = 'libvpx-vp9'
commands = [ '-r', str(temp_video_fps), '-i', temp_frames_pattern, '-s', str(output_video_resolution), '-c:v', output_video_encoder ] commands = [ '-r', str(temp_video_fps), '-i', temp_frames_pattern, '-s', str(output_video_resolution), '-c:v', output_video_encoder ]
if output_video_encoder in [ 'libx264', 'libx265' ]: if output_video_encoder in [ 'libx264', 'libx265' ]:
@@ -161,8 +159,7 @@ def finalize_image(target_path : str, output_path : str, output_image_resolution
def calc_image_compression(image_path : str, image_quality : int) -> int: def calc_image_compression(image_path : str, image_quality : int) -> int:
is_webp = filetype.guess_mime(image_path) == 'image/webp' if get_file_format(image_path) == 'webm':
if is_webp:
image_quality = 100 - image_quality image_quality = 100 - image_quality
return round(31 - (image_quality * 0.31)) return round(31 - (image_quality * 0.31))

View File

@@ -1,12 +1,9 @@
import glob import glob
import os import os
import shutil import shutil
from pathlib import Path
from typing import List, Optional from typing import List, Optional
import filetype import facefusion.choices
from facefusion.typing import File
def get_file_size(file_path : str) -> int: def get_file_size(file_path : str) -> int:
@@ -15,44 +12,63 @@ def get_file_size(file_path : str) -> int:
return 0 return 0
def same_file_extension(file_paths : List[str]) -> bool: def get_file_name(file_path : str) -> Optional[str]:
file_extensions : List[str] = [] file_name, _ = os.path.splitext(os.path.basename(file_path))
for file_path in file_paths: if file_name:
_, file_extension = os.path.splitext(file_path.lower()) return file_name
return None
if file_extensions and file_extension not in file_extensions:
def get_file_extension(file_path : str) -> Optional[str]:
_, file_extension = os.path.splitext(file_path)
if file_extension:
return file_extension
return None
def get_file_format(file_path : str) -> Optional[str]:
file_extension = get_file_extension(file_path)
if file_extension:
return file_extension.lower().lstrip('.')
return None
def same_file_extension(first_file_path : str, second_file_path : str) -> bool:
first_file_extension = get_file_extension(first_file_path)
second_file_extension = get_file_extension(second_file_path)
if first_file_extension and second_file_extension:
return get_file_extension(first_file_path) == get_file_extension(second_file_path)
return False return False
file_extensions.append(file_extension)
return True
def is_file(file_path : str) -> bool: def is_file(file_path : str) -> bool:
return bool(file_path and os.path.isfile(file_path)) if file_path:
return os.path.isfile(file_path)
def is_directory(directory_path : str) -> bool:
return bool(directory_path and os.path.isdir(directory_path))
def in_directory(file_path : str) -> bool:
if file_path and not is_directory(file_path):
return is_directory(os.path.dirname(file_path))
return False return False
def is_audio(audio_path : str) -> bool: def is_audio(audio_path : str) -> bool:
return is_file(audio_path) and filetype.helpers.is_audio(audio_path) return is_file(audio_path) and get_file_format(audio_path) in facefusion.choices.audio_formats
def has_audio(audio_paths : List[str]) -> bool: def has_audio(audio_paths : List[str]) -> bool:
if audio_paths: if audio_paths:
return any(is_audio(audio_path) for audio_path in audio_paths) return any(map(is_audio, audio_paths))
return False
def are_audios(audio_paths : List[str]) -> bool:
if audio_paths:
return all(map(is_audio, audio_paths))
return False return False
def is_image(image_path : str) -> bool: def is_image(image_path : str) -> bool:
return is_file(image_path) and filetype.helpers.is_image(image_path) return is_file(image_path) and get_file_format(image_path) in facefusion.choices.image_formats
def has_image(image_paths : List[str]) -> bool: def has_image(image_paths : List[str]) -> bool:
@@ -61,8 +77,26 @@ def has_image(image_paths: List[str]) -> bool:
return False return False
def are_images(image_paths : List[str]) -> bool:
if image_paths:
return all(map(is_image, image_paths))
return False
def is_video(video_path : str) -> bool: def is_video(video_path : str) -> bool:
return is_file(video_path) and filetype.helpers.is_video(video_path) return is_file(video_path) and get_file_format(video_path) in facefusion.choices.video_formats
def has_video(video_paths : List[str]) -> bool:
if video_paths:
return any(map(is_video, video_paths))
return False
def are_videos(video_paths : List[str]) -> bool:
if video_paths:
return any(map(is_video, video_paths))
return False
def filter_audio_paths(paths : List[str]) -> List[str]: def filter_audio_paths(paths : List[str]) -> List[str]:
@@ -77,10 +111,6 @@ def filter_image_paths(paths : List[str]) -> List[str]:
return [] return []
def resolve_relative_path(path : str) -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
def copy_file(file_path : str, move_path : str) -> bool: def copy_file(file_path : str, move_path : str) -> bool:
if is_file(file_path): if is_file(file_path):
shutil.copy(file_path, move_path) shutil.copy(file_path, move_path)
@@ -102,31 +132,18 @@ def remove_file(file_path : str) -> bool:
return False return False
def create_directory(directory_path : str) -> bool: def resolve_file_paths(directory_path : str) -> List[str]:
if directory_path and not is_file(directory_path): file_paths : List[str] = []
Path(directory_path).mkdir(parents = True, exist_ok = True)
return is_directory(directory_path)
return False
def list_directory(directory_path : str) -> Optional[List[File]]:
if is_directory(directory_path): if is_directory(directory_path):
file_paths = sorted(os.listdir(directory_path)) file_names_and_extensions = sorted(os.listdir(directory_path))
files : List[File] = []
for file_path in file_paths: for file_name_and_extension in file_names_and_extensions:
file_name, file_extension = os.path.splitext(file_path) if not file_name_and_extension.startswith(('.', '__')):
file_path = os.path.join(directory_path, file_name_and_extension)
file_paths.append(file_path)
if not file_name.startswith(('.', '__')): return file_paths
files.append(
{
'name': file_name,
'extension': file_extension,
'path': os.path.join(directory_path, file_path)
})
return files
return None
def resolve_file_pattern(file_pattern : str) -> List[str]: def resolve_file_pattern(file_pattern : str) -> List[str]:
@@ -135,8 +152,33 @@ def resolve_file_pattern(file_pattern : str) -> List[str]:
return [] return []
def is_directory(directory_path : str) -> bool:
if directory_path:
return os.path.isdir(directory_path)
return False
def in_directory(file_path : str) -> bool:
if file_path:
directory_path = os.path.dirname(file_path)
if directory_path:
return not is_directory(file_path) and is_directory(directory_path)
return False
def create_directory(directory_path : str) -> bool:
if directory_path and not is_file(directory_path):
os.makedirs(directory_path, exist_ok = True)
return is_directory(directory_path)
return False
def remove_directory(directory_path : str) -> bool: def remove_directory(directory_path : str) -> bool:
if is_directory(directory_path): if is_directory(directory_path):
shutil.rmtree(directory_path, ignore_errors = True) shutil.rmtree(directory_path, ignore_errors = True)
return not is_directory(directory_path) return not is_directory(directory_path)
return False return False
def resolve_relative_path(path : str) -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), path))

View File

@@ -2,7 +2,7 @@ import os
import zlib import zlib
from typing import Optional from typing import Optional
from facefusion.filesystem import is_file from facefusion.filesystem import get_file_name, is_file
def create_hash(content : bytes) -> str: def create_hash(content : bytes) -> str:
@@ -25,8 +25,8 @@ def validate_hash(validate_path : str) -> bool:
def get_hash_path(validate_path : str) -> Optional[str]: def get_hash_path(validate_path : str) -> Optional[str]:
if is_file(validate_path): if is_file(validate_path):
validate_directory_path, _ = os.path.split(validate_path) validate_directory_path, file_name_and_extension = os.path.split(validate_path)
validate_file_name, _ = os.path.splitext(_) validate_file_name = get_file_name(file_name_and_extension)
return os.path.join(validate_directory_path, validate_file_name + '.hash') return os.path.join(validate_directory_path, validate_file_name + '.hash')
return None return None

View File

@@ -2,11 +2,14 @@ import os
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
from facefusion.filesystem import get_file_extension, get_file_name
def get_step_output_path(job_id : str, step_index : int, output_path : str) -> Optional[str]: def get_step_output_path(job_id : str, step_index : int, output_path : str) -> Optional[str]:
if output_path: if output_path:
output_directory_path, _ = os.path.split(output_path) output_directory_path, _ = os.path.split(output_path)
output_file_name, output_file_extension = os.path.splitext(_) output_file_name = get_file_name(_)
output_file_extension = get_file_extension(_)
return os.path.join(output_directory_path, output_file_name + '-' + job_id + '-' + str(step_index) + output_file_extension) return os.path.join(output_directory_path, output_file_name + '-' + job_id + '-' + str(step_index) + output_file_extension)
return None return None

View File

@@ -4,7 +4,7 @@ from typing import List, Optional
import facefusion.choices import facefusion.choices
from facefusion.date_helper import get_current_date_time from facefusion.date_helper import get_current_date_time
from facefusion.filesystem import create_directory, is_directory, is_file, move_file, remove_directory, remove_file, resolve_file_pattern from facefusion.filesystem import create_directory, get_file_name, is_directory, is_file, move_file, remove_directory, remove_file, resolve_file_pattern
from facefusion.jobs.job_helper import get_step_output_path from facefusion.jobs.job_helper import get_step_output_path
from facefusion.json import read_json, write_json from facefusion.json import read_json, write_json
from facefusion.typing import Args, Job, JobSet, JobStatus, JobStep, JobStepStatus from facefusion.typing import Args, Job, JobSet, JobStatus, JobStep, JobStepStatus
@@ -90,7 +90,7 @@ def find_job_ids(job_status : JobStatus) -> List[str]:
job_ids = [] job_ids = []
for job_path in job_paths: for job_path in job_paths:
job_id, _ = os.path.splitext(os.path.basename(job_path)) job_id = get_file_name(job_path)
job_ids.append(job_id) job_ids.append(job_id)
return job_ids return job_ids

View File

@@ -1,5 +1,5 @@
from facefusion.ffmpeg import concat_video from facefusion.ffmpeg import concat_video
from facefusion.filesystem import is_image, is_video, move_file, remove_file from facefusion.filesystem import are_images, are_videos, move_file, remove_file
from facefusion.jobs import job_helper, job_manager from facefusion.jobs import job_helper, job_manager
from facefusion.typing import JobOutputSet, JobStep, ProcessStep from facefusion.typing import JobOutputSet, JobStep, ProcessStep
@@ -73,10 +73,10 @@ def finalize_steps(job_id : str) -> bool:
output_set = collect_output_set(job_id) output_set = collect_output_set(job_id)
for output_path, temp_output_paths in output_set.items(): for output_path, temp_output_paths in output_set.items():
if all(map(is_video, temp_output_paths)): if are_videos(temp_output_paths):
if not concat_video(output_path, temp_output_paths): if not concat_video(output_path, temp_output_paths):
return False return False
if any(map(is_image, temp_output_paths)): if are_images(temp_output_paths):
for temp_output_path in temp_output_paths: for temp_output_path in temp_output_paths:
if not move_file(temp_output_path, output_path): if not move_file(temp_output_path, output_path):
return False return False

View File

@@ -1,7 +1,7 @@
from typing import List, Sequence from typing import List, Sequence
from facefusion.common_helper import create_float_range, create_int_range from facefusion.common_helper import create_float_range, create_int_range
from facefusion.filesystem import list_directory, resolve_relative_path from facefusion.filesystem import get_file_name, resolve_file_paths, resolve_relative_path
from facefusion.processors.typing import AgeModifierModel, DeepSwapperModel, ExpressionRestorerModel, FaceDebuggerItem, FaceEditorModel, FaceEnhancerModel, FaceSwapperModel, FaceSwapperSet, FrameColorizerModel, FrameEnhancerModel, LipSyncerModel from facefusion.processors.typing import AgeModifierModel, DeepSwapperModel, ExpressionRestorerModel, FaceDebuggerItem, FaceEditorModel, FaceEnhancerModel, FaceSwapperModel, FaceSwapperSet, FrameColorizerModel, FrameEnhancerModel, LipSyncerModel
age_modifier_models : List[AgeModifierModel] = [ 'styleganex_age' ] age_modifier_models : List[AgeModifierModel] = [ 'styleganex_age' ]
@@ -157,12 +157,12 @@ deep_swapper_models : List[DeepSwapperModel] =\
'rumateus/taylor_swift_224' 'rumateus/taylor_swift_224'
] ]
custom_model_files = list_directory(resolve_relative_path('../.assets/models/custom')) custom_model_file_paths = resolve_file_paths(resolve_relative_path('../.assets/models/custom'))
if custom_model_files: if custom_model_file_paths:
for model_file in custom_model_files: for model_file_path in custom_model_file_paths:
model_id = '/'.join([ 'custom', model_file.get('name') ]) model_id = '/'.join([ 'custom', get_file_name(model_file_path) ])
deep_swapper_models.append(model_id) deep_swapper_models.append(model_id)
expression_restorer_models : List[ExpressionRestorerModel] = [ 'live_portrait' ] expression_restorer_models : List[ExpressionRestorerModel] = [ 'live_portrait' ]

View File

@@ -104,7 +104,7 @@ def pre_process(mode : ProcessMode) -> bool:
if mode == 'output' and not in_directory(state_manager.get_item('output_path')): if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False return False
if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False return False
return True return True

View File

@@ -17,7 +17,7 @@ from facefusion.face_helper import paste_back, warp_face_by_face_landmark_5
from facefusion.face_masker import create_occlusion_mask, create_region_mask, create_static_box_mask from facefusion.face_masker import create_occlusion_mask, create_region_mask, create_static_box_mask
from facefusion.face_selector import find_similar_faces, sort_and_filter_faces from facefusion.face_selector import find_similar_faces, sort_and_filter_faces
from facefusion.face_store import get_reference_faces from facefusion.face_store import get_reference_faces
from facefusion.filesystem import in_directory, is_image, is_video, list_directory, resolve_relative_path, same_file_extension from facefusion.filesystem import get_file_name, in_directory, is_image, is_video, resolve_file_paths, resolve_relative_path, same_file_extension
from facefusion.processors import choices as processors_choices from facefusion.processors import choices as processors_choices
from facefusion.processors.typing import DeepSwapperInputs, DeepSwapperMorph from facefusion.processors.typing import DeepSwapperInputs, DeepSwapperMorph
from facefusion.program_helper import find_argument_group from facefusion.program_helper import find_argument_group
@@ -216,12 +216,12 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
'template': 'dfl_whole_face' 'template': 'dfl_whole_face'
} }
custom_model_files = list_directory(resolve_relative_path('../.assets/models/custom')) custom_model_file_paths = resolve_file_paths(resolve_relative_path('../.assets/models/custom'))
if custom_model_files: if custom_model_file_paths:
for model_file in custom_model_files: for model_file_path in custom_model_file_paths:
model_id = '/'.join([ 'custom', model_file.get('name') ]) model_id = '/'.join([ 'custom', get_file_name(model_file_path) ])
model_set[model_id] =\ model_set[model_id] =\
{ {
@@ -229,7 +229,7 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
{ {
'deep_swapper': 'deep_swapper':
{ {
'path': resolve_relative_path(model_file.get('path')) 'path': resolve_relative_path(model_file_path)
} }
}, },
'template': 'dfl_whole_face' 'template': 'dfl_whole_face'
@@ -290,7 +290,7 @@ def pre_process(mode : ProcessMode) -> bool:
if mode == 'output' and not in_directory(state_manager.get_item('output_path')): if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False return False
if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False return False
return True return True

View File

@@ -119,7 +119,7 @@ def pre_process(mode : ProcessMode) -> bool:
if mode == 'output' and not in_directory(state_manager.get_item('output_path')): if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False return False
if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False return False
return True return True

View File

@@ -48,7 +48,7 @@ def pre_process(mode : ProcessMode) -> bool:
if mode == 'output' and not in_directory(state_manager.get_item('output_path')): if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False return False
if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False return False
return True return True

View File

@@ -171,7 +171,7 @@ def pre_process(mode : ProcessMode) -> bool:
if mode == 'output' and not in_directory(state_manager.get_item('output_path')): if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False return False
if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False return False
return True return True

View File

@@ -264,7 +264,7 @@ def pre_process(mode : ProcessMode) -> bool:
if mode == 'output' and not in_directory(state_manager.get_item('output_path')): if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False return False
if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False return False
return True return True

View File

@@ -390,7 +390,7 @@ def pre_process(mode : ProcessMode) -> bool:
if mode == 'output' and not in_directory(state_manager.get_item('output_path')): if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False return False
if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False return False
return True return True

View File

@@ -170,7 +170,7 @@ def pre_process(mode : ProcessMode) -> bool:
if mode == 'output' and not in_directory(state_manager.get_item('output_path')): if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False return False
if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False return False
return True return True

View File

@@ -434,7 +434,7 @@ def pre_process(mode : ProcessMode) -> bool:
if mode == 'output' and not in_directory(state_manager.get_item('output_path')): if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False return False
if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False return False
return True return True

View File

@@ -115,7 +115,7 @@ def pre_process(mode : ProcessMode) -> bool:
if mode == 'output' and not in_directory(state_manager.get_item('output_path')): if mode == 'output' and not in_directory(state_manager.get_item('output_path')):
logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__)
return False return False
if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')):
logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__)
return False return False
return True return True

View File

@@ -5,7 +5,7 @@ import facefusion.choices
from facefusion import config, metadata, state_manager, wording from facefusion import config, metadata, state_manager, wording
from facefusion.common_helper import create_float_metavar, create_int_metavar, get_last from facefusion.common_helper import create_float_metavar, create_int_metavar, get_last
from facefusion.execution import get_available_execution_providers from facefusion.execution import get_available_execution_providers
from facefusion.filesystem import list_directory from facefusion.filesystem import get_file_name, resolve_file_paths
from facefusion.jobs import job_store from facefusion.jobs import job_store
from facefusion.processors.core import get_processors_modules from facefusion.processors.core import get_processors_modules
@@ -171,7 +171,7 @@ def create_output_creation_program() -> ArgumentParser:
def create_processors_program() -> ArgumentParser: def create_processors_program() -> ArgumentParser:
program = ArgumentParser(add_help = False) program = ArgumentParser(add_help = False)
available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ]
group_processors = program.add_argument_group('processors') group_processors = program.add_argument_group('processors')
group_processors.add_argument('--processors', help = wording.get('help.processors').format(choices = ', '.join(available_processors)), default = config.get_str_list('processors.processors', 'face_swapper'), nargs = '+') group_processors.add_argument('--processors', help = wording.get('help.processors').format(choices = ', '.join(available_processors)), default = config.get_str_list('processors.processors', 'face_swapper'), nargs = '+')
job_store.register_step_keys([ 'processors' ]) job_store.register_step_keys([ 'processors' ])
@@ -182,7 +182,7 @@ def create_processors_program() -> ArgumentParser:
def create_uis_program() -> ArgumentParser: def create_uis_program() -> ArgumentParser:
program = ArgumentParser(add_help = False) program = ArgumentParser(add_help = False)
available_ui_layouts = [ file.get('name') for file in list_directory('facefusion/uis/layouts') ] available_ui_layouts = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/uis/layouts') ]
group_uis = program.add_argument_group('uis') group_uis = program.add_argument_group('uis')
group_uis.add_argument('--open-browser', help = wording.get('help.open_browser'), action = 'store_true', default = config.get_bool_value('uis.open_browser')) group_uis.add_argument('--open-browser', help = wording.get('help.open_browser'), action = 'store_true', default = config.get_bool_value('uis.open_browser'))
group_uis.add_argument('--ui-layouts', help = wording.get('help.ui_layouts').format(choices = ', '.join(available_ui_layouts)), default = config.get_str_list('uis.ui_layouts', 'default'), nargs = '+') group_uis.add_argument('--ui-layouts', help = wording.get('help.ui_layouts').format(choices = ', '.join(available_ui_layouts)), default = config.get_str_list('uis.ui_layouts', 'default'), nargs = '+')

View File

@@ -2,12 +2,12 @@ import os
from typing import List from typing import List
from facefusion import state_manager from facefusion import state_manager
from facefusion.filesystem import create_directory, move_file, remove_directory, resolve_file_pattern from facefusion.filesystem import create_directory, get_file_extension, get_file_name, move_file, remove_directory, resolve_file_pattern
def get_temp_file_path(file_path : str) -> str: def get_temp_file_path(file_path : str) -> str:
_, temp_file_extension = os.path.splitext(os.path.basename(file_path))
temp_directory_path = get_temp_directory_path(file_path) temp_directory_path = get_temp_directory_path(file_path)
temp_file_extension = get_file_extension(file_path)
return os.path.join(temp_directory_path, 'temp' + temp_file_extension) return os.path.join(temp_directory_path, 'temp' + temp_file_extension)
@@ -16,8 +16,18 @@ def move_temp_file(file_path : str, move_path : str) -> bool:
return move_file(temp_file_path, move_path) return move_file(temp_file_path, move_path)
def resolve_temp_frame_paths(target_path : str) -> List[str]:
temp_frames_pattern = get_temp_frames_pattern(target_path, '*')
return resolve_file_pattern(temp_frames_pattern)
def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format'))
def get_temp_directory_path(file_path : str) -> str: def get_temp_directory_path(file_path : str) -> str:
temp_file_name, _ = os.path.splitext(os.path.basename(file_path)) temp_file_name = get_file_name(file_path)
return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name) return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name)
@@ -31,13 +41,3 @@ def clear_temp_directory(file_path : str) -> bool:
temp_directory_path = get_temp_directory_path(file_path) temp_directory_path = get_temp_directory_path(file_path)
return remove_directory(temp_directory_path) return remove_directory(temp_directory_path)
return True return True
def get_temp_frame_paths(target_path : str) -> List[str]:
temp_frames_pattern = get_temp_frames_pattern(target_path, '*')
return resolve_file_pattern(temp_frames_pattern)
def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format'))

View File

@@ -106,7 +106,15 @@ FaceParserModel = Literal['bisenet_resnet_18', 'bisenet_resnet_34']
FaceMaskType = Literal['box', 'occlusion', 'region'] FaceMaskType = Literal['box', 'occlusion', 'region']
FaceMaskRegion = Literal['skin', 'left-eyebrow', 'right-eyebrow', 'left-eye', 'right-eye', 'glasses', 'nose', 'mouth', 'upper-lip', 'lower-lip'] FaceMaskRegion = Literal['skin', 'left-eyebrow', 'right-eyebrow', 'left-eye', 'right-eye', 'glasses', 'nose', 'mouth', 'upper-lip', 'lower-lip']
FaceMaskRegionSet = Dict[FaceMaskRegion, int] FaceMaskRegionSet = Dict[FaceMaskRegion, int]
AudioFormat = Literal['mp3', 'ogg', 'wav']
ImageFormat = Literal['bmp', 'jpg', 'png', 'webp']
VideoFormat = Literal['avi', 'mkv', 'mov', 'mp4', 'webm']
TempFrameFormat = Literal['bmp', 'jpg', 'png'] TempFrameFormat = Literal['bmp', 'jpg', 'png']
AudioTypeSet = Dict[AudioFormat, str]
ImageTypeSet = Dict[ImageFormat, str]
VideoTypeSet = Dict[VideoFormat, str]
OutputAudioEncoder = Literal['aac', 'libmp3lame', 'libopus', 'libvorbis'] OutputAudioEncoder = Literal['aac', 'libmp3lame', 'libopus', 'libvorbis']
OutputVideoEncoder = Literal['libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf','h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox'] OutputVideoEncoder = Literal['libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf','h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox']
OutputVideoPreset = Literal['ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow'] OutputVideoPreset = Literal['ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow']
@@ -174,14 +182,6 @@ Download = TypedDict('Download',
DownloadSet = Dict[str, Download] DownloadSet = Dict[str, Download]
VideoMemoryStrategy = Literal['strict', 'moderate', 'tolerant'] VideoMemoryStrategy = Literal['strict', 'moderate', 'tolerant']
File = TypedDict('File',
{
'name' : str,
'extension' : str,
'path': str
})
AppContext = Literal['cli', 'ui'] AppContext = Literal['cli', 'ui']
InferencePool = Dict[str, InferenceSession] InferencePool = Dict[str, InferenceSession]

View File

@@ -9,7 +9,7 @@ import gradio
from facefusion import state_manager, wording from facefusion import state_manager, wording
from facefusion.core import conditional_process from facefusion.core import conditional_process
from facefusion.filesystem import is_video from facefusion.filesystem import get_file_extension, is_video
from facefusion.memory import limit_system_memory from facefusion.memory import limit_system_memory
from facefusion.uis.core import get_ui_component from facefusion.uis.core import get_ui_component
from facefusion.vision import count_video_frame_total, detect_video_fps, detect_video_resolution, pack_resolution from facefusion.vision import count_video_frame_total, detect_video_fps, detect_video_resolution, pack_resolution
@@ -72,8 +72,8 @@ def listen() -> None:
def suggest_output_path(target_path : str) -> Optional[str]: def suggest_output_path(target_path : str) -> Optional[str]:
if is_video(target_path): if is_video(target_path):
_, target_extension = os.path.splitext(target_path) target_file_extension = get_file_extension(target_path)
return os.path.join(tempfile.gettempdir(), hashlib.sha1().hexdigest()[:8] + target_extension) return os.path.join(tempfile.gettempdir(), hashlib.sha1().hexdigest()[:8] + target_file_extension)
return None return None

View File

@@ -4,7 +4,7 @@ import gradio
import facefusion.choices import facefusion.choices
from facefusion import content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, state_manager, voice_extractor, wording from facefusion import content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, state_manager, voice_extractor, wording
from facefusion.filesystem import list_directory from facefusion.filesystem import get_file_name, resolve_file_paths
from facefusion.processors.core import get_processors_modules from facefusion.processors.core import get_processors_modules
from facefusion.typing import DownloadProvider from facefusion.typing import DownloadProvider
@@ -36,7 +36,7 @@ def update_download_providers(download_providers : List[DownloadProvider]) -> gr
face_masker, face_masker,
voice_extractor voice_extractor
] ]
available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ]
processor_modules = get_processors_modules(available_processors) processor_modules = get_processors_modules(available_processors)
for module in common_modules + processor_modules: for module in common_modules + processor_modules:

View File

@@ -4,7 +4,7 @@ import gradio
from facefusion import content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, state_manager, voice_extractor, wording from facefusion import content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, state_manager, voice_extractor, wording
from facefusion.execution import get_available_execution_providers from facefusion.execution import get_available_execution_providers
from facefusion.filesystem import list_directory from facefusion.filesystem import get_file_name, resolve_file_paths
from facefusion.processors.core import get_processors_modules from facefusion.processors.core import get_processors_modules
from facefusion.typing import ExecutionProvider from facefusion.typing import ExecutionProvider
@@ -36,7 +36,7 @@ def update_execution_providers(execution_providers : List[ExecutionProvider]) ->
face_recognizer, face_recognizer,
voice_extractor voice_extractor
] ]
available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ]
processor_modules = get_processors_modules(available_processors) processor_modules = get_processors_modules(available_processors)
for module in common_modules + processor_modules: for module in common_modules + processor_modules:

View File

@@ -3,7 +3,7 @@ from typing import List, Optional
import gradio import gradio
from facefusion import state_manager, wording from facefusion import state_manager, wording
from facefusion.filesystem import list_directory from facefusion.filesystem import get_file_name, resolve_file_paths
from facefusion.processors.core import get_processors_modules from facefusion.processors.core import get_processors_modules
from facefusion.uis.core import register_ui_component from facefusion.uis.core import register_ui_component
@@ -39,5 +39,5 @@ def update_processors(processors : List[str]) -> gradio.CheckboxGroup:
def sort_processors(processors : List[str]) -> List[str]: def sort_processors(processors : List[str]) -> List[str]:
available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ]
return sorted(available_processors, key = lambda processor : processors.index(processor) if processor in processors else len(processors)) return sorted(available_processors, key = lambda processor : processors.index(processor) if processor in processors else len(processors))

View File

@@ -23,11 +23,6 @@ def render() -> None:
SOURCE_FILE = gradio.File( SOURCE_FILE = gradio.File(
label = wording.get('uis.source_file'), label = wording.get('uis.source_file'),
file_count = 'multiple', file_count = 'multiple',
file_types =
[
'audio',
'image'
],
value = state_manager.get_item('source_paths') if has_source_audio or has_source_image else None value = state_manager.get_item('source_paths') if has_source_audio or has_source_image else None
) )
source_file_names = [ source_file_value.get('path') for source_file_value in SOURCE_FILE.value ] if SOURCE_FILE.value else None source_file_names = [ source_file_value.get('path') for source_file_value in SOURCE_FILE.value ] if SOURCE_FILE.value else None

View File

@@ -26,11 +26,6 @@ def render() -> None:
TARGET_FILE = gradio.File( TARGET_FILE = gradio.File(
label = wording.get('uis.target_file'), label = wording.get('uis.target_file'),
file_count = 'single', file_count = 'single',
file_types =
[
'image',
'video'
],
value = state_manager.get_item('target_path') if is_target_image or is_target_video else None value = state_manager.get_item('target_path') if is_target_image or is_target_video else None
) )
target_image_options : ComponentOptions =\ target_image_options : ComponentOptions =\

View File

@@ -3,7 +3,7 @@ import os
from typing import Optional from typing import Optional
from facefusion import state_manager from facefusion import state_manager
from facefusion.filesystem import is_image, is_video from facefusion.filesystem import get_file_extension, is_image, is_video
def convert_int_none(value : int) -> Optional[int]: def convert_int_none(value : int) -> Optional[int]:
@@ -20,7 +20,7 @@ def convert_str_none(value : str) -> Optional[str]:
def suggest_output_path(output_directory_path : str, target_path : str) -> Optional[str]: def suggest_output_path(output_directory_path : str, target_path : str) -> Optional[str]:
if is_image(target_path) or is_video(target_path): if is_image(target_path) or is_video(target_path):
_, target_extension = os.path.splitext(target_path) output_file_name = hashlib.sha1(str(state_manager.get_state()).encode()).hexdigest()[:8]
output_name = hashlib.sha1(str(state_manager.get_state()).encode()).hexdigest()[:8] target_file_extension = get_file_extension(target_path)
return os.path.join(output_directory_path, output_name + target_extension) return os.path.join(output_directory_path, output_file_name + target_file_extension)
return None return None

View File

@@ -1,4 +1,3 @@
import os
from functools import lru_cache from functools import lru_cache
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
@@ -8,7 +7,7 @@ from cv2.typing import Size
import facefusion.choices import facefusion.choices
from facefusion.common_helper import is_windows from facefusion.common_helper import is_windows
from facefusion.filesystem import is_image, is_video from facefusion.filesystem import get_file_extension, is_image, is_video
from facefusion.typing import Duration, Fps, Orientation, Resolution, VisionFrame from facefusion.typing import Duration, Fps, Orientation, Resolution, VisionFrame
@@ -38,8 +37,8 @@ def read_image(image_path : str) -> Optional[VisionFrame]:
def write_image(image_path : str, vision_frame : VisionFrame) -> bool: def write_image(image_path : str, vision_frame : VisionFrame) -> bool:
if image_path: if image_path:
if is_windows(): if is_windows():
_, file_extension = os.path.splitext(image_path) image_file_extension = get_file_extension(image_path)
_, vision_frame = cv2.imencode(file_extension, vision_frame) _, vision_frame = cv2.imencode(image_file_extension, vision_frame)
vision_frame.tofile(image_path) vision_frame.tofile(image_path)
return is_image(image_path) return is_image(image_path)
return cv2.imwrite(image_path, vision_frame) return cv2.imwrite(image_path, vision_frame)

View File

@@ -1,4 +1,3 @@
filetype==1.2.0
gradio==5.9.1 gradio==5.9.1
gradio-rangeslider==0.0.8 gradio-rangeslider==0.0.8
numpy==2.2.0 numpy==2.2.0

View File

@@ -7,7 +7,7 @@ from facefusion import process_manager, state_manager
from facefusion.download import conditional_download from facefusion.download import conditional_download
from facefusion.ffmpeg import concat_video, extract_frames, read_audio_buffer, replace_audio, restore_audio from facefusion.ffmpeg import concat_video, extract_frames, read_audio_buffer, replace_audio, restore_audio
from facefusion.filesystem import copy_file from facefusion.filesystem import copy_file
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, get_temp_frame_paths from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths
from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory
@@ -57,7 +57,7 @@ def test_extract_frames() -> None:
create_temp_directory(target_path) create_temp_directory(target_path)
assert extract_frames(target_path, '452x240', 30.0, trim_frame_start, trim_frame_end) is True assert extract_frames(target_path, '452x240', 30.0, trim_frame_start, trim_frame_end) is True
assert len(get_temp_frame_paths(target_path)) == frame_total assert len(resolve_temp_frame_paths(target_path)) == frame_total
clear_temp_directory(target_path) clear_temp_directory(target_path)

View File

@@ -3,7 +3,7 @@ import os.path
import pytest import pytest
from facefusion.download import conditional_download from facefusion.download import conditional_download
from facefusion.filesystem import create_directory, filter_audio_paths, filter_image_paths, get_file_size, has_audio, has_image, in_directory, is_audio, is_directory, is_file, is_image, is_video, list_directory, remove_directory, same_file_extension from facefusion.filesystem import create_directory, filter_audio_paths, filter_image_paths, get_file_extension, get_file_format, get_file_size, has_audio, has_image, has_video, in_directory, is_audio, is_directory, is_file, is_image, is_video, remove_directory, resolve_file_paths, same_file_extension
from .helper import get_test_example_file, get_test_examples_directory, get_test_outputs_directory from .helper import get_test_example_file, get_test_examples_directory, get_test_outputs_directory
@@ -18,13 +18,26 @@ def before_all() -> None:
def test_get_file_size() -> None: def test_get_file_size() -> None:
assert get_file_size(get_test_example_file('source.jpg')) > 0 assert get_file_size(get_test_example_file('source.jpg')) == 549458
assert get_file_size('invalid') == 0 assert get_file_size('invalid') == 0
def test_get_file_extension() -> None:
assert get_file_extension('source.jpg') == '.jpg'
assert get_file_extension('source.mp3') == '.mp3'
assert get_file_extension('invalid') is None
def test_get_file_format() -> None:
assert get_file_format('source.jpg') == 'jpg'
assert get_file_format('source.mp3') == 'mp3'
assert get_file_format('invalid') is None
def test_same_file_extension() -> None: def test_same_file_extension() -> None:
assert same_file_extension([ 'target.jpg', 'output.jpg' ]) is True assert same_file_extension('source.jpg', 'source.jpg') is True
assert same_file_extension([ 'target.jpg', 'output.mp4' ]) is False assert same_file_extension('source.jpg', 'source.mp3') is False
assert same_file_extension('invalid', 'invalid') is False
def test_is_file() -> None: def test_is_file() -> None:
@@ -33,18 +46,6 @@ def test_is_file() -> None:
assert is_file('invalid') is False assert is_file('invalid') is False
def test_is_directory() -> None:
assert is_directory(get_test_examples_directory()) is True
assert is_directory(get_test_example_file('source.jpg')) is False
assert is_directory('invalid') is False
def test_in_directory() -> None:
assert in_directory(get_test_example_file('source.jpg')) is True
assert in_directory('source.jpg') is False
assert in_directory('invalid') is False
def test_is_audio() -> None: def test_is_audio() -> None:
assert is_audio(get_test_example_file('source.mp3')) is True assert is_audio(get_test_example_file('source.mp3')) is True
assert is_audio(get_test_example_file('source.jpg')) is False assert is_audio(get_test_example_file('source.jpg')) is False
@@ -77,6 +78,13 @@ def test_is_video() -> None:
assert is_video('invalid') is False assert is_video('invalid') is False
def test_has_video() -> None:
assert has_video([ get_test_example_file('target-240p.mp4') ]) is True
assert has_video([ get_test_example_file('target-240p.mp4'), get_test_example_file('source.mp3') ]) is True
assert has_video([ get_test_example_file('source.mp3'), get_test_example_file('source.mp3') ]) is False
assert has_video([ 'invalid' ]) is False
def test_filter_audio_paths() -> None: def test_filter_audio_paths() -> None:
assert filter_audio_paths([ get_test_example_file('source.jpg'), get_test_example_file('source.mp3') ]) == [ get_test_example_file('source.mp3') ] assert filter_audio_paths([ get_test_example_file('source.jpg'), get_test_example_file('source.mp3') ]) == [ get_test_example_file('source.mp3') ]
assert filter_audio_paths([ get_test_example_file('source.jpg'), get_test_example_file('source.jpg') ]) == [] assert filter_audio_paths([ get_test_example_file('source.jpg'), get_test_example_file('source.jpg') ]) == []
@@ -89,6 +97,15 @@ def test_filter_image_paths() -> None:
assert filter_audio_paths([ 'invalid' ]) == [] assert filter_audio_paths([ 'invalid' ]) == []
def test_resolve_file_paths() -> None:
file_paths = resolve_file_paths(get_test_examples_directory())
for file_path in file_paths:
assert file_path == get_test_example_file(file_path)
assert resolve_file_paths('invalid') == []
def test_create_directory() -> None: def test_create_directory() -> None:
create_directory_path = os.path.join(get_test_outputs_directory(), 'create_directory') create_directory_path = os.path.join(get_test_outputs_directory(), 'create_directory')
@@ -96,15 +113,6 @@ def test_create_directory() -> None:
assert create_directory(get_test_example_file('source.jpg')) is False assert create_directory(get_test_example_file('source.jpg')) is False
def test_list_directory() -> None:
files = list_directory(get_test_examples_directory())
for file in files:
assert file.get('path') == get_test_example_file(file.get('name') + file.get('extension'))
assert list_directory('invalid') is None
def test_remove_directory() -> None: def test_remove_directory() -> None:
remove_directory_path = os.path.join(get_test_outputs_directory(), 'remove_directory') remove_directory_path = os.path.join(get_test_outputs_directory(), 'remove_directory')
create_directory(remove_directory_path) create_directory(remove_directory_path)
@@ -112,3 +120,15 @@ def test_remove_directory() -> None:
assert remove_directory(remove_directory_path) is True assert remove_directory(remove_directory_path) is True
assert remove_directory(get_test_example_file('source.jpg')) is False assert remove_directory(get_test_example_file('source.jpg')) is False
assert remove_directory('invalid') is False assert remove_directory('invalid') is False
def test_is_directory() -> None:
assert is_directory(get_test_examples_directory()) is True
assert is_directory(get_test_example_file('source.jpg')) is False
assert is_directory('invalid') is False
def test_in_directory() -> None:
assert in_directory(get_test_example_file('source.jpg')) is True
assert in_directory('source.jpg') is False
assert in_directory('invalid') is False

View File

@@ -4,7 +4,7 @@ import pytest
from facefusion.download import conditional_download from facefusion.download import conditional_download
from facefusion.vision import calc_histogram_difference, count_trim_frame_total, count_video_frame_total, create_image_resolutions, create_video_resolutions, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution, get_video_frame, match_frame_color, normalize_resolution, pack_resolution, read_image, restrict_image_resolution, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, unpack_resolution, write_image from facefusion.vision import calc_histogram_difference, count_trim_frame_total, count_video_frame_total, create_image_resolutions, create_video_resolutions, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution, get_video_frame, match_frame_color, normalize_resolution, pack_resolution, read_image, restrict_image_resolution, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, unpack_resolution, write_image
from .helper import get_test_example_file, get_test_examples_directory, prepare_test_output_directory, get_test_output_file from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True) @pytest.fixture(scope = 'module', autouse = True)