From 6f0675030e8245a3d591c59283e10cbeab806838 Mon Sep 17 00:00:00 2001 From: Henry Ruhs Date: Mon, 6 Jan 2025 11:19:43 +0100 Subject: [PATCH] Feat/custom file format handling (#845) * Purge filetype dependency, Rename file_extension to file_format, Introduce custom format detections * Changed a lot * Purge filetype dependency, Rename file_extension to file_format, Introduce custom format detections * Fix stuff * Fix stuff * Simplify all the is_ and has_ methods * Simplify all the is_ and has_ methods * Use the new helper on more places * Introduce are_ next to is_ and has_ * Get rid of the type-ignores * Add more video types --- facefusion/args.py | 4 +- facefusion/choices.py | 30 +++- facefusion/core.py | 8 +- facefusion/download.py | 10 +- facefusion/ffmpeg.py | 13 +- facefusion/filesystem.py | 146 +++++++++++------- facefusion/hash_helper.py | 6 +- facefusion/jobs/job_helper.py | 5 +- facefusion/jobs/job_manager.py | 4 +- facefusion/jobs/job_runner.py | 6 +- facefusion/processors/choices.py | 10 +- facefusion/processors/modules/age_modifier.py | 2 +- facefusion/processors/modules/deep_swapper.py | 14 +- .../processors/modules/expression_restorer.py | 2 +- .../processors/modules/face_debugger.py | 2 +- facefusion/processors/modules/face_editor.py | 2 +- .../processors/modules/face_enhancer.py | 2 +- facefusion/processors/modules/face_swapper.py | 2 +- .../processors/modules/frame_colorizer.py | 2 +- .../processors/modules/frame_enhancer.py | 2 +- facefusion/processors/modules/lip_syncer.py | 2 +- facefusion/program.py | 6 +- facefusion/temp_helper.py | 26 ++-- facefusion/typing.py | 16 +- facefusion/uis/components/benchmark.py | 6 +- facefusion/uis/components/download.py | 4 +- facefusion/uis/components/execution.py | 4 +- facefusion/uis/components/processors.py | 4 +- facefusion/uis/components/source.py | 5 - facefusion/uis/components/target.py | 5 - facefusion/uis/ui_helper.py | 8 +- facefusion/vision.py | 7 +- requirements.txt | 1 - tests/test_ffmpeg.py | 4 +- tests/test_filesystem.py | 70 ++++++--- tests/test_vision.py | 2 +- 36 files changed, 259 insertions(+), 183 deletions(-) diff --git a/facefusion/args.py b/facefusion/args.py index f8ee798..778c335 100644 --- a/facefusion/args.py +++ b/facefusion/args.py @@ -1,5 +1,5 @@ from facefusion import state_manager -from facefusion.filesystem import is_image, is_video, list_directory +from facefusion.filesystem import get_file_name, is_image, is_video, resolve_file_paths from facefusion.jobs import job_store from facefusion.normalizer import normalize_fps, normalize_padding from facefusion.processors.core import get_processors_modules @@ -107,7 +107,7 @@ def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None: apply_state_item('output_video_fps', output_video_fps) apply_state_item('skip_audio', args.get('skip_audio')) # processors - available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] + available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ] apply_state_item('processors', args.get('processors')) for processor_module in get_processors_modules(available_processors): processor_module.apply_args(args, apply_state_item) diff --git a/facefusion/choices.py b/facefusion/choices.py index 6f12865..b4ba7a6 100755 --- a/facefusion/choices.py +++ b/facefusion/choices.py @@ -2,7 +2,7 @@ import logging from typing import List, Sequence from facefusion.common_helper import create_float_range, create_int_range -from facefusion.typing import Angle, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, JobStatus, LogLevel, LogLevelSet, OutputAudioEncoder, OutputVideoEncoder, OutputVideoPreset, Race, Score, TempFrameFormat, UiWorkflow, VideoMemoryStrategy +from facefusion.typing import Angle, AudioFormat, AudioTypeSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, OutputAudioEncoder, OutputVideoEncoder, OutputVideoPreset, Race, Score, UiWorkflow, VideoFormat, VideoMemoryStrategy, VideoTypeSet face_detector_set : FaceDetectorSet =\ { @@ -34,7 +34,33 @@ face_mask_region_set : FaceMaskRegionSet =\ 'lower-lip': 13 } face_mask_regions : List[FaceMaskRegion] = list(face_mask_region_set.keys()) -temp_frame_formats : List[TempFrameFormat] = [ 'bmp', 'jpg', 'png' ] + +audio_type_set : AudioTypeSet =\ +{ + 'mp3': 'audio/mpeg', + 'ogg': 'audio/ogg', + 'wav': 'audio/x-wav' +} +image_type_set : ImageTypeSet =\ +{ + 'bmp': 'image/bmp', + 'jpg': 'image/jpeg', + 'png': 'image/png', + 'webp': 'image/webp' +} +video_type_set : VideoTypeSet =\ +{ + 'avi': 'video/x-msvideo', + 'mkv': 'video/x-matroska', + 'mp4': 'video/mp4', + 'mov': 'video/quicktime', + 'webm': 'video/webm' +} +audio_formats : List[AudioFormat] = list(audio_type_set.keys()) +image_formats : List[ImageFormat] = list(image_type_set.keys()) +video_formats : List[VideoFormat] = list(video_type_set.keys()) +temp_frame_formats : List[ImageFormat] = [ 'bmp', 'jpg', 'png' ] + output_audio_encoders : List[OutputAudioEncoder] = [ 'aac', 'libmp3lame', 'libopus', 'libvorbis' ] output_video_encoders : List[OutputVideoEncoder] = [ 'libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox' ] output_video_presets : List[OutputVideoPreset] = [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ] diff --git a/facefusion/core.py b/facefusion/core.py index 38d64db..4df5410 100755 --- a/facefusion/core.py +++ b/facefusion/core.py @@ -16,7 +16,7 @@ from facefusion.face_analyser import get_average_face, get_many_faces, get_one_f from facefusion.face_selector import sort_and_filter_faces from facefusion.face_store import append_reference_face, clear_reference_faces, get_reference_faces from facefusion.ffmpeg import copy_image, extract_frames, finalize_image, merge_video, replace_audio, restore_audio -from facefusion.filesystem import filter_audio_paths, is_image, is_video, list_directory, resolve_file_pattern +from facefusion.filesystem import filter_audio_paths, get_file_name, is_image, is_video, resolve_file_paths, resolve_file_pattern from facefusion.jobs import job_helper, job_manager, job_runner from facefusion.jobs.job_list import compose_job_list from facefusion.memory import limit_system_memory @@ -24,7 +24,7 @@ from facefusion.processors.core import get_processors_modules from facefusion.program import create_program from facefusion.program_helper import validate_args from facefusion.statistics import conditional_log_statistics -from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, get_temp_frame_paths, move_temp_file +from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, move_temp_file, resolve_temp_frame_paths from facefusion.typing import Args, ErrorCode from facefusion.vision import get_video_frame, pack_resolution, read_image, read_static_images, restrict_image_resolution, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, unpack_resolution @@ -133,7 +133,7 @@ def force_download() -> ErrorCode: face_recognizer, voice_extractor ] - available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] + available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ] processor_modules = get_processors_modules(available_processors) for module in common_modules + processor_modules: @@ -413,7 +413,7 @@ def process_video(start_time : float) -> ErrorCode: process_manager.end() return 1 # process frames - temp_frame_paths = get_temp_frame_paths(state_manager.get_item('target_path')) + temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item('target_path')) if temp_frame_paths: for processor_module in get_processors_modules(state_manager.get_item('processors')): logger.info(wording.get('processing'), processor_module.__name__) diff --git a/facefusion/download.py b/facefusion/download.py index 85f576e..273aa69 100644 --- a/facefusion/download.py +++ b/facefusion/download.py @@ -9,7 +9,7 @@ from tqdm import tqdm import facefusion.choices from facefusion import logger, process_manager, state_manager, wording -from facefusion.filesystem import get_file_size, is_file, remove_file +from facefusion.filesystem import get_file_name, get_file_size, is_file, remove_file from facefusion.hash_helper import validate_hash from facefusion.typing import DownloadProvider, DownloadSet @@ -79,10 +79,10 @@ def conditional_download_hashes(hashes : DownloadSet) -> bool: valid_hash_paths, invalid_hash_paths = validate_hash_paths(hash_paths) for valid_hash_path in valid_hash_paths: - valid_hash_file_name, _ = os.path.splitext(os.path.basename(valid_hash_path)) + valid_hash_file_name = get_file_name(valid_hash_path) logger.debug(wording.get('validating_hash_succeed').format(hash_file_name = valid_hash_file_name), __name__) for invalid_hash_path in invalid_hash_paths: - invalid_hash_file_name, _ = os.path.splitext(os.path.basename(invalid_hash_path)) + invalid_hash_file_name = get_file_name(invalid_hash_path) logger.error(wording.get('validating_hash_failed').format(hash_file_name = invalid_hash_file_name), __name__) if not invalid_hash_paths: @@ -106,10 +106,10 @@ def conditional_download_sources(sources : DownloadSet) -> bool: valid_source_paths, invalid_source_paths = validate_source_paths(source_paths) for valid_source_path in valid_source_paths: - valid_source_file_name, _ = os.path.splitext(os.path.basename(valid_source_path)) + valid_source_file_name = get_file_name(valid_source_path) logger.debug(wording.get('validating_source_succeed').format(source_file_name = valid_source_file_name), __name__) for invalid_source_path in invalid_source_paths: - invalid_source_file_name, _ = os.path.splitext(os.path.basename(invalid_source_path)) + invalid_source_file_name = get_file_name(invalid_source_path) logger.error(wording.get('validating_source_failed').format(source_file_name = invalid_source_file_name), __name__) if remove_file(invalid_source_path): diff --git a/facefusion/ffmpeg.py b/facefusion/ffmpeg.py index 0cd3c1f..85fe581 100644 --- a/facefusion/ffmpeg.py +++ b/facefusion/ffmpeg.py @@ -4,12 +4,11 @@ import subprocess import tempfile from typing import List, Optional -import filetype from tqdm import tqdm from facefusion import logger, process_manager, state_manager, wording -from facefusion.filesystem import remove_file -from facefusion.temp_helper import get_temp_file_path, get_temp_frame_paths, get_temp_frames_pattern +from facefusion.filesystem import get_file_format, remove_file +from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern, resolve_temp_frame_paths from facefusion.typing import AudioBuffer, Fps, OutputVideoPreset, UpdateProgress from facefusion.vision import count_trim_frame_total, detect_video_duration, restrict_video_fps @@ -99,13 +98,12 @@ def merge_video(target_path : str, output_video_resolution : str, output_video_f output_video_encoder = state_manager.get_item('output_video_encoder') output_video_quality = state_manager.get_item('output_video_quality') output_video_preset = state_manager.get_item('output_video_preset') - merge_frame_total = len(get_temp_frame_paths(target_path)) + merge_frame_total = len(resolve_temp_frame_paths(target_path)) temp_video_fps = restrict_video_fps(target_path, output_video_fps) temp_file_path = get_temp_file_path(target_path) temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d') - is_webm = filetype.guess_mime(target_path) == 'video/webm' - if is_webm: + if get_file_format(target_path) == 'webm': output_video_encoder = 'libvpx-vp9' commands = [ '-r', str(temp_video_fps), '-i', temp_frames_pattern, '-s', str(output_video_resolution), '-c:v', output_video_encoder ] if output_video_encoder in [ 'libx264', 'libx265' ]: @@ -161,8 +159,7 @@ def finalize_image(target_path : str, output_path : str, output_image_resolution def calc_image_compression(image_path : str, image_quality : int) -> int: - is_webp = filetype.guess_mime(image_path) == 'image/webp' - if is_webp: + if get_file_format(image_path) == 'webm': image_quality = 100 - image_quality return round(31 - (image_quality * 0.31)) diff --git a/facefusion/filesystem.py b/facefusion/filesystem.py index 8ea5b26..901e7c8 100644 --- a/facefusion/filesystem.py +++ b/facefusion/filesystem.py @@ -1,12 +1,9 @@ import glob import os import shutil -from pathlib import Path from typing import List, Optional -import filetype - -from facefusion.typing import File +import facefusion.choices def get_file_size(file_path : str) -> int: @@ -15,54 +12,91 @@ def get_file_size(file_path : str) -> int: return 0 -def same_file_extension(file_paths : List[str]) -> bool: - file_extensions : List[str] = [] +def get_file_name(file_path : str) -> Optional[str]: + file_name, _ = os.path.splitext(os.path.basename(file_path)) - for file_path in file_paths: - _, file_extension = os.path.splitext(file_path.lower()) + if file_name: + return file_name + return None - if file_extensions and file_extension not in file_extensions: - return False - file_extensions.append(file_extension) - return True + +def get_file_extension(file_path : str) -> Optional[str]: + _, file_extension = os.path.splitext(file_path) + + if file_extension: + return file_extension + return None + + +def get_file_format(file_path : str) -> Optional[str]: + file_extension = get_file_extension(file_path) + + if file_extension: + return file_extension.lower().lstrip('.') + return None + + +def same_file_extension(first_file_path : str, second_file_path : str) -> bool: + first_file_extension = get_file_extension(first_file_path) + second_file_extension = get_file_extension(second_file_path) + + if first_file_extension and second_file_extension: + return get_file_extension(first_file_path) == get_file_extension(second_file_path) + return False def is_file(file_path : str) -> bool: - return bool(file_path and os.path.isfile(file_path)) - - -def is_directory(directory_path : str) -> bool: - return bool(directory_path and os.path.isdir(directory_path)) - - -def in_directory(file_path : str) -> bool: - if file_path and not is_directory(file_path): - return is_directory(os.path.dirname(file_path)) + if file_path: + return os.path.isfile(file_path) return False def is_audio(audio_path : str) -> bool: - return is_file(audio_path) and filetype.helpers.is_audio(audio_path) + return is_file(audio_path) and get_file_format(audio_path) in facefusion.choices.audio_formats def has_audio(audio_paths : List[str]) -> bool: if audio_paths: - return any(is_audio(audio_path) for audio_path in audio_paths) + return any(map(is_audio, audio_paths)) + return False + + +def are_audios(audio_paths : List[str]) -> bool: + if audio_paths: + return all(map(is_audio, audio_paths)) return False def is_image(image_path : str) -> bool: - return is_file(image_path) and filetype.helpers.is_image(image_path) + return is_file(image_path) and get_file_format(image_path) in facefusion.choices.image_formats -def has_image(image_paths: List[str]) -> bool: +def has_image(image_paths : List[str]) -> bool: if image_paths: return any(is_image(image_path) for image_path in image_paths) return False +def are_images(image_paths : List[str]) -> bool: + if image_paths: + return all(map(is_image, image_paths)) + return False + + def is_video(video_path : str) -> bool: - return is_file(video_path) and filetype.helpers.is_video(video_path) + return is_file(video_path) and get_file_format(video_path) in facefusion.choices.video_formats + + +def has_video(video_paths : List[str]) -> bool: + if video_paths: + return any(map(is_video, video_paths)) + return False + + +def are_videos(video_paths : List[str]) -> bool: + if video_paths: + return any(map(is_video, video_paths)) + return False def filter_audio_paths(paths : List[str]) -> List[str]: @@ -77,10 +111,6 @@ def filter_image_paths(paths : List[str]) -> List[str]: return [] -def resolve_relative_path(path : str) -> str: - return os.path.abspath(os.path.join(os.path.dirname(__file__), path)) - - def copy_file(file_path : str, move_path : str) -> bool: if is_file(file_path): shutil.copy(file_path, move_path) @@ -102,31 +132,18 @@ def remove_file(file_path : str) -> bool: return False -def create_directory(directory_path : str) -> bool: - if directory_path and not is_file(directory_path): - Path(directory_path).mkdir(parents = True, exist_ok = True) - return is_directory(directory_path) - return False +def resolve_file_paths(directory_path : str) -> List[str]: + file_paths : List[str] = [] - -def list_directory(directory_path : str) -> Optional[List[File]]: if is_directory(directory_path): - file_paths = sorted(os.listdir(directory_path)) - files : List[File] = [] + file_names_and_extensions = sorted(os.listdir(directory_path)) - for file_path in file_paths: - file_name, file_extension = os.path.splitext(file_path) + for file_name_and_extension in file_names_and_extensions: + if not file_name_and_extension.startswith(('.', '__')): + file_path = os.path.join(directory_path, file_name_and_extension) + file_paths.append(file_path) - if not file_name.startswith(('.', '__')): - files.append( - { - 'name': file_name, - 'extension': file_extension, - 'path': os.path.join(directory_path, file_path) - }) - - return files - return None + return file_paths def resolve_file_pattern(file_pattern : str) -> List[str]: @@ -135,8 +152,33 @@ def resolve_file_pattern(file_pattern : str) -> List[str]: return [] +def is_directory(directory_path : str) -> bool: + if directory_path: + return os.path.isdir(directory_path) + return False + + +def in_directory(file_path : str) -> bool: + if file_path: + directory_path = os.path.dirname(file_path) + if directory_path: + return not is_directory(file_path) and is_directory(directory_path) + return False + + +def create_directory(directory_path : str) -> bool: + if directory_path and not is_file(directory_path): + os.makedirs(directory_path, exist_ok = True) + return is_directory(directory_path) + return False + + def remove_directory(directory_path : str) -> bool: if is_directory(directory_path): shutil.rmtree(directory_path, ignore_errors = True) return not is_directory(directory_path) return False + + +def resolve_relative_path(path : str) -> str: + return os.path.abspath(os.path.join(os.path.dirname(__file__), path)) diff --git a/facefusion/hash_helper.py b/facefusion/hash_helper.py index 9d334b9..f326ecf 100644 --- a/facefusion/hash_helper.py +++ b/facefusion/hash_helper.py @@ -2,7 +2,7 @@ import os import zlib from typing import Optional -from facefusion.filesystem import is_file +from facefusion.filesystem import get_file_name, is_file def create_hash(content : bytes) -> str: @@ -25,8 +25,8 @@ def validate_hash(validate_path : str) -> bool: def get_hash_path(validate_path : str) -> Optional[str]: if is_file(validate_path): - validate_directory_path, _ = os.path.split(validate_path) - validate_file_name, _ = os.path.splitext(_) + validate_directory_path, file_name_and_extension = os.path.split(validate_path) + validate_file_name = get_file_name(file_name_and_extension) return os.path.join(validate_directory_path, validate_file_name + '.hash') return None diff --git a/facefusion/jobs/job_helper.py b/facefusion/jobs/job_helper.py index 26f468e..6e3139b 100644 --- a/facefusion/jobs/job_helper.py +++ b/facefusion/jobs/job_helper.py @@ -2,11 +2,14 @@ import os from datetime import datetime from typing import Optional +from facefusion.filesystem import get_file_extension, get_file_name + def get_step_output_path(job_id : str, step_index : int, output_path : str) -> Optional[str]: if output_path: output_directory_path, _ = os.path.split(output_path) - output_file_name, output_file_extension = os.path.splitext(_) + output_file_name = get_file_name(_) + output_file_extension = get_file_extension(_) return os.path.join(output_directory_path, output_file_name + '-' + job_id + '-' + str(step_index) + output_file_extension) return None diff --git a/facefusion/jobs/job_manager.py b/facefusion/jobs/job_manager.py index 6b783b2..dedad90 100644 --- a/facefusion/jobs/job_manager.py +++ b/facefusion/jobs/job_manager.py @@ -4,7 +4,7 @@ from typing import List, Optional import facefusion.choices from facefusion.date_helper import get_current_date_time -from facefusion.filesystem import create_directory, is_directory, is_file, move_file, remove_directory, remove_file, resolve_file_pattern +from facefusion.filesystem import create_directory, get_file_name, is_directory, is_file, move_file, remove_directory, remove_file, resolve_file_pattern from facefusion.jobs.job_helper import get_step_output_path from facefusion.json import read_json, write_json from facefusion.typing import Args, Job, JobSet, JobStatus, JobStep, JobStepStatus @@ -90,7 +90,7 @@ def find_job_ids(job_status : JobStatus) -> List[str]: job_ids = [] for job_path in job_paths: - job_id, _ = os.path.splitext(os.path.basename(job_path)) + job_id = get_file_name(job_path) job_ids.append(job_id) return job_ids diff --git a/facefusion/jobs/job_runner.py b/facefusion/jobs/job_runner.py index e1adddb..33a2faf 100644 --- a/facefusion/jobs/job_runner.py +++ b/facefusion/jobs/job_runner.py @@ -1,5 +1,5 @@ from facefusion.ffmpeg import concat_video -from facefusion.filesystem import is_image, is_video, move_file, remove_file +from facefusion.filesystem import are_images, are_videos, move_file, remove_file from facefusion.jobs import job_helper, job_manager from facefusion.typing import JobOutputSet, JobStep, ProcessStep @@ -73,10 +73,10 @@ def finalize_steps(job_id : str) -> bool: output_set = collect_output_set(job_id) for output_path, temp_output_paths in output_set.items(): - if all(map(is_video, temp_output_paths)): + if are_videos(temp_output_paths): if not concat_video(output_path, temp_output_paths): return False - if any(map(is_image, temp_output_paths)): + if are_images(temp_output_paths): for temp_output_path in temp_output_paths: if not move_file(temp_output_path, output_path): return False diff --git a/facefusion/processors/choices.py b/facefusion/processors/choices.py index e0008af..0a2a276 100755 --- a/facefusion/processors/choices.py +++ b/facefusion/processors/choices.py @@ -1,7 +1,7 @@ from typing import List, Sequence from facefusion.common_helper import create_float_range, create_int_range -from facefusion.filesystem import list_directory, resolve_relative_path +from facefusion.filesystem import get_file_name, resolve_file_paths, resolve_relative_path from facefusion.processors.typing import AgeModifierModel, DeepSwapperModel, ExpressionRestorerModel, FaceDebuggerItem, FaceEditorModel, FaceEnhancerModel, FaceSwapperModel, FaceSwapperSet, FrameColorizerModel, FrameEnhancerModel, LipSyncerModel age_modifier_models : List[AgeModifierModel] = [ 'styleganex_age' ] @@ -157,12 +157,12 @@ deep_swapper_models : List[DeepSwapperModel] =\ 'rumateus/taylor_swift_224' ] -custom_model_files = list_directory(resolve_relative_path('../.assets/models/custom')) +custom_model_file_paths = resolve_file_paths(resolve_relative_path('../.assets/models/custom')) -if custom_model_files: +if custom_model_file_paths: - for model_file in custom_model_files: - model_id = '/'.join([ 'custom', model_file.get('name') ]) + for model_file_path in custom_model_file_paths: + model_id = '/'.join([ 'custom', get_file_name(model_file_path) ]) deep_swapper_models.append(model_id) expression_restorer_models : List[ExpressionRestorerModel] = [ 'live_portrait' ] diff --git a/facefusion/processors/modules/age_modifier.py b/facefusion/processors/modules/age_modifier.py index ed4e931..783bd1c 100755 --- a/facefusion/processors/modules/age_modifier.py +++ b/facefusion/processors/modules/age_modifier.py @@ -104,7 +104,7 @@ def pre_process(mode : ProcessMode) -> bool: if mode == 'output' and not in_directory(state_manager.get_item('output_path')): logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) return False - if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): + if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')): logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) return False return True diff --git a/facefusion/processors/modules/deep_swapper.py b/facefusion/processors/modules/deep_swapper.py index 71706cb..879e95b 100755 --- a/facefusion/processors/modules/deep_swapper.py +++ b/facefusion/processors/modules/deep_swapper.py @@ -17,7 +17,7 @@ from facefusion.face_helper import paste_back, warp_face_by_face_landmark_5 from facefusion.face_masker import create_occlusion_mask, create_region_mask, create_static_box_mask from facefusion.face_selector import find_similar_faces, sort_and_filter_faces from facefusion.face_store import get_reference_faces -from facefusion.filesystem import in_directory, is_image, is_video, list_directory, resolve_relative_path, same_file_extension +from facefusion.filesystem import get_file_name, in_directory, is_image, is_video, resolve_file_paths, resolve_relative_path, same_file_extension from facefusion.processors import choices as processors_choices from facefusion.processors.typing import DeepSwapperInputs, DeepSwapperMorph from facefusion.program_helper import find_argument_group @@ -216,12 +216,12 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet: 'template': 'dfl_whole_face' } - custom_model_files = list_directory(resolve_relative_path('../.assets/models/custom')) + custom_model_file_paths = resolve_file_paths(resolve_relative_path('../.assets/models/custom')) - if custom_model_files: + if custom_model_file_paths: - for model_file in custom_model_files: - model_id = '/'.join([ 'custom', model_file.get('name') ]) + for model_file_path in custom_model_file_paths: + model_id = '/'.join([ 'custom', get_file_name(model_file_path) ]) model_set[model_id] =\ { @@ -229,7 +229,7 @@ def create_static_model_set(download_scope : DownloadScope) -> ModelSet: { 'deep_swapper': { - 'path': resolve_relative_path(model_file.get('path')) + 'path': resolve_relative_path(model_file_path) } }, 'template': 'dfl_whole_face' @@ -290,7 +290,7 @@ def pre_process(mode : ProcessMode) -> bool: if mode == 'output' and not in_directory(state_manager.get_item('output_path')): logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) return False - if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): + if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')): logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) return False return True diff --git a/facefusion/processors/modules/expression_restorer.py b/facefusion/processors/modules/expression_restorer.py index 2dd0547..19e40f4 100755 --- a/facefusion/processors/modules/expression_restorer.py +++ b/facefusion/processors/modules/expression_restorer.py @@ -119,7 +119,7 @@ def pre_process(mode : ProcessMode) -> bool: if mode == 'output' and not in_directory(state_manager.get_item('output_path')): logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) return False - if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): + if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')): logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) return False return True diff --git a/facefusion/processors/modules/face_debugger.py b/facefusion/processors/modules/face_debugger.py index d1f1f3f..dd0c142 100755 --- a/facefusion/processors/modules/face_debugger.py +++ b/facefusion/processors/modules/face_debugger.py @@ -48,7 +48,7 @@ def pre_process(mode : ProcessMode) -> bool: if mode == 'output' and not in_directory(state_manager.get_item('output_path')): logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) return False - if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): + if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')): logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) return False return True diff --git a/facefusion/processors/modules/face_editor.py b/facefusion/processors/modules/face_editor.py index e55b942..a0d2bcc 100755 --- a/facefusion/processors/modules/face_editor.py +++ b/facefusion/processors/modules/face_editor.py @@ -171,7 +171,7 @@ def pre_process(mode : ProcessMode) -> bool: if mode == 'output' and not in_directory(state_manager.get_item('output_path')): logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) return False - if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): + if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')): logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) return False return True diff --git a/facefusion/processors/modules/face_enhancer.py b/facefusion/processors/modules/face_enhancer.py index ce8de8a..d47ec42 100755 --- a/facefusion/processors/modules/face_enhancer.py +++ b/facefusion/processors/modules/face_enhancer.py @@ -264,7 +264,7 @@ def pre_process(mode : ProcessMode) -> bool: if mode == 'output' and not in_directory(state_manager.get_item('output_path')): logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) return False - if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): + if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')): logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) return False return True diff --git a/facefusion/processors/modules/face_swapper.py b/facefusion/processors/modules/face_swapper.py index f73795b..16f5932 100755 --- a/facefusion/processors/modules/face_swapper.py +++ b/facefusion/processors/modules/face_swapper.py @@ -390,7 +390,7 @@ def pre_process(mode : ProcessMode) -> bool: if mode == 'output' and not in_directory(state_manager.get_item('output_path')): logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) return False - if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): + if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')): logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) return False return True diff --git a/facefusion/processors/modules/frame_colorizer.py b/facefusion/processors/modules/frame_colorizer.py index f61cc27..3f53e9a 100644 --- a/facefusion/processors/modules/frame_colorizer.py +++ b/facefusion/processors/modules/frame_colorizer.py @@ -170,7 +170,7 @@ def pre_process(mode : ProcessMode) -> bool: if mode == 'output' and not in_directory(state_manager.get_item('output_path')): logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) return False - if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): + if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')): logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) return False return True diff --git a/facefusion/processors/modules/frame_enhancer.py b/facefusion/processors/modules/frame_enhancer.py index b6dea11..1c8abe4 100644 --- a/facefusion/processors/modules/frame_enhancer.py +++ b/facefusion/processors/modules/frame_enhancer.py @@ -434,7 +434,7 @@ def pre_process(mode : ProcessMode) -> bool: if mode == 'output' and not in_directory(state_manager.get_item('output_path')): logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) return False - if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): + if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')): logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) return False return True diff --git a/facefusion/processors/modules/lip_syncer.py b/facefusion/processors/modules/lip_syncer.py index 2779456..a92ed49 100755 --- a/facefusion/processors/modules/lip_syncer.py +++ b/facefusion/processors/modules/lip_syncer.py @@ -115,7 +115,7 @@ def pre_process(mode : ProcessMode) -> bool: if mode == 'output' and not in_directory(state_manager.get_item('output_path')): logger.error(wording.get('specify_image_or_video_output') + wording.get('exclamation_mark'), __name__) return False - if mode == 'output' and not same_file_extension([ state_manager.get_item('target_path'), state_manager.get_item('output_path') ]): + if mode == 'output' and not same_file_extension(state_manager.get_item('target_path'), state_manager.get_item('output_path')): logger.error(wording.get('match_target_and_output_extension') + wording.get('exclamation_mark'), __name__) return False return True diff --git a/facefusion/program.py b/facefusion/program.py index 1a25aab..71e1def 100755 --- a/facefusion/program.py +++ b/facefusion/program.py @@ -5,7 +5,7 @@ import facefusion.choices from facefusion import config, metadata, state_manager, wording from facefusion.common_helper import create_float_metavar, create_int_metavar, get_last from facefusion.execution import get_available_execution_providers -from facefusion.filesystem import list_directory +from facefusion.filesystem import get_file_name, resolve_file_paths from facefusion.jobs import job_store from facefusion.processors.core import get_processors_modules @@ -171,7 +171,7 @@ def create_output_creation_program() -> ArgumentParser: def create_processors_program() -> ArgumentParser: program = ArgumentParser(add_help = False) - available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] + available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ] group_processors = program.add_argument_group('processors') group_processors.add_argument('--processors', help = wording.get('help.processors').format(choices = ', '.join(available_processors)), default = config.get_str_list('processors.processors', 'face_swapper'), nargs = '+') job_store.register_step_keys([ 'processors' ]) @@ -182,7 +182,7 @@ def create_processors_program() -> ArgumentParser: def create_uis_program() -> ArgumentParser: program = ArgumentParser(add_help = False) - available_ui_layouts = [ file.get('name') for file in list_directory('facefusion/uis/layouts') ] + available_ui_layouts = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/uis/layouts') ] group_uis = program.add_argument_group('uis') group_uis.add_argument('--open-browser', help = wording.get('help.open_browser'), action = 'store_true', default = config.get_bool_value('uis.open_browser')) group_uis.add_argument('--ui-layouts', help = wording.get('help.ui_layouts').format(choices = ', '.join(available_ui_layouts)), default = config.get_str_list('uis.ui_layouts', 'default'), nargs = '+') diff --git a/facefusion/temp_helper.py b/facefusion/temp_helper.py index 16e9c60..9622621 100644 --- a/facefusion/temp_helper.py +++ b/facefusion/temp_helper.py @@ -2,12 +2,12 @@ import os from typing import List from facefusion import state_manager -from facefusion.filesystem import create_directory, move_file, remove_directory, resolve_file_pattern +from facefusion.filesystem import create_directory, get_file_extension, get_file_name, move_file, remove_directory, resolve_file_pattern def get_temp_file_path(file_path : str) -> str: - _, temp_file_extension = os.path.splitext(os.path.basename(file_path)) temp_directory_path = get_temp_directory_path(file_path) + temp_file_extension = get_file_extension(file_path) return os.path.join(temp_directory_path, 'temp' + temp_file_extension) @@ -16,8 +16,18 @@ def move_temp_file(file_path : str, move_path : str) -> bool: return move_file(temp_file_path, move_path) +def resolve_temp_frame_paths(target_path : str) -> List[str]: + temp_frames_pattern = get_temp_frames_pattern(target_path, '*') + return resolve_file_pattern(temp_frames_pattern) + + +def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str: + temp_directory_path = get_temp_directory_path(target_path) + return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format')) + + def get_temp_directory_path(file_path : str) -> str: - temp_file_name, _ = os.path.splitext(os.path.basename(file_path)) + temp_file_name = get_file_name(file_path) return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name) @@ -31,13 +41,3 @@ def clear_temp_directory(file_path : str) -> bool: temp_directory_path = get_temp_directory_path(file_path) return remove_directory(temp_directory_path) return True - - -def get_temp_frame_paths(target_path : str) -> List[str]: - temp_frames_pattern = get_temp_frames_pattern(target_path, '*') - return resolve_file_pattern(temp_frames_pattern) - - -def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str: - temp_directory_path = get_temp_directory_path(target_path) - return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format')) diff --git a/facefusion/typing.py b/facefusion/typing.py index 4b2f441..5c1b540 100755 --- a/facefusion/typing.py +++ b/facefusion/typing.py @@ -106,7 +106,15 @@ FaceParserModel = Literal['bisenet_resnet_18', 'bisenet_resnet_34'] FaceMaskType = Literal['box', 'occlusion', 'region'] FaceMaskRegion = Literal['skin', 'left-eyebrow', 'right-eyebrow', 'left-eye', 'right-eye', 'glasses', 'nose', 'mouth', 'upper-lip', 'lower-lip'] FaceMaskRegionSet = Dict[FaceMaskRegion, int] + +AudioFormat = Literal['mp3', 'ogg', 'wav'] +ImageFormat = Literal['bmp', 'jpg', 'png', 'webp'] +VideoFormat = Literal['avi', 'mkv', 'mov', 'mp4', 'webm'] TempFrameFormat = Literal['bmp', 'jpg', 'png'] +AudioTypeSet = Dict[AudioFormat, str] +ImageTypeSet = Dict[ImageFormat, str] +VideoTypeSet = Dict[VideoFormat, str] + OutputAudioEncoder = Literal['aac', 'libmp3lame', 'libopus', 'libvorbis'] OutputVideoEncoder = Literal['libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf','h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox'] OutputVideoPreset = Literal['ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow'] @@ -174,14 +182,6 @@ Download = TypedDict('Download', DownloadSet = Dict[str, Download] VideoMemoryStrategy = Literal['strict', 'moderate', 'tolerant'] - -File = TypedDict('File', -{ - 'name' : str, - 'extension' : str, - 'path': str -}) - AppContext = Literal['cli', 'ui'] InferencePool = Dict[str, InferenceSession] diff --git a/facefusion/uis/components/benchmark.py b/facefusion/uis/components/benchmark.py index 7e30ffe..183ebd1 100644 --- a/facefusion/uis/components/benchmark.py +++ b/facefusion/uis/components/benchmark.py @@ -9,7 +9,7 @@ import gradio from facefusion import state_manager, wording from facefusion.core import conditional_process -from facefusion.filesystem import is_video +from facefusion.filesystem import get_file_extension, is_video from facefusion.memory import limit_system_memory from facefusion.uis.core import get_ui_component from facefusion.vision import count_video_frame_total, detect_video_fps, detect_video_resolution, pack_resolution @@ -72,8 +72,8 @@ def listen() -> None: def suggest_output_path(target_path : str) -> Optional[str]: if is_video(target_path): - _, target_extension = os.path.splitext(target_path) - return os.path.join(tempfile.gettempdir(), hashlib.sha1().hexdigest()[:8] + target_extension) + target_file_extension = get_file_extension(target_path) + return os.path.join(tempfile.gettempdir(), hashlib.sha1().hexdigest()[:8] + target_file_extension) return None diff --git a/facefusion/uis/components/download.py b/facefusion/uis/components/download.py index 7fa8faf..8943c7b 100644 --- a/facefusion/uis/components/download.py +++ b/facefusion/uis/components/download.py @@ -4,7 +4,7 @@ import gradio import facefusion.choices from facefusion import content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, state_manager, voice_extractor, wording -from facefusion.filesystem import list_directory +from facefusion.filesystem import get_file_name, resolve_file_paths from facefusion.processors.core import get_processors_modules from facefusion.typing import DownloadProvider @@ -36,7 +36,7 @@ def update_download_providers(download_providers : List[DownloadProvider]) -> gr face_masker, voice_extractor ] - available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] + available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ] processor_modules = get_processors_modules(available_processors) for module in common_modules + processor_modules: diff --git a/facefusion/uis/components/execution.py b/facefusion/uis/components/execution.py index 53275ad..841f420 100644 --- a/facefusion/uis/components/execution.py +++ b/facefusion/uis/components/execution.py @@ -4,7 +4,7 @@ import gradio from facefusion import content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, state_manager, voice_extractor, wording from facefusion.execution import get_available_execution_providers -from facefusion.filesystem import list_directory +from facefusion.filesystem import get_file_name, resolve_file_paths from facefusion.processors.core import get_processors_modules from facefusion.typing import ExecutionProvider @@ -36,7 +36,7 @@ def update_execution_providers(execution_providers : List[ExecutionProvider]) -> face_recognizer, voice_extractor ] - available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] + available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ] processor_modules = get_processors_modules(available_processors) for module in common_modules + processor_modules: diff --git a/facefusion/uis/components/processors.py b/facefusion/uis/components/processors.py index 990006f..85ebc9f 100644 --- a/facefusion/uis/components/processors.py +++ b/facefusion/uis/components/processors.py @@ -3,7 +3,7 @@ from typing import List, Optional import gradio from facefusion import state_manager, wording -from facefusion.filesystem import list_directory +from facefusion.filesystem import get_file_name, resolve_file_paths from facefusion.processors.core import get_processors_modules from facefusion.uis.core import register_ui_component @@ -39,5 +39,5 @@ def update_processors(processors : List[str]) -> gradio.CheckboxGroup: def sort_processors(processors : List[str]) -> List[str]: - available_processors = [ file.get('name') for file in list_directory('facefusion/processors/modules') ] + available_processors = [ get_file_name(file_path) for file_path in resolve_file_paths('facefusion/processors/modules') ] return sorted(available_processors, key = lambda processor : processors.index(processor) if processor in processors else len(processors)) diff --git a/facefusion/uis/components/source.py b/facefusion/uis/components/source.py index 4b7d572..0762166 100644 --- a/facefusion/uis/components/source.py +++ b/facefusion/uis/components/source.py @@ -23,11 +23,6 @@ def render() -> None: SOURCE_FILE = gradio.File( label = wording.get('uis.source_file'), file_count = 'multiple', - file_types = - [ - 'audio', - 'image' - ], value = state_manager.get_item('source_paths') if has_source_audio or has_source_image else None ) source_file_names = [ source_file_value.get('path') for source_file_value in SOURCE_FILE.value ] if SOURCE_FILE.value else None diff --git a/facefusion/uis/components/target.py b/facefusion/uis/components/target.py index 63a0d70..a87cf00 100644 --- a/facefusion/uis/components/target.py +++ b/facefusion/uis/components/target.py @@ -26,11 +26,6 @@ def render() -> None: TARGET_FILE = gradio.File( label = wording.get('uis.target_file'), file_count = 'single', - file_types = - [ - 'image', - 'video' - ], value = state_manager.get_item('target_path') if is_target_image or is_target_video else None ) target_image_options : ComponentOptions =\ diff --git a/facefusion/uis/ui_helper.py b/facefusion/uis/ui_helper.py index 4e729e9..27bc265 100644 --- a/facefusion/uis/ui_helper.py +++ b/facefusion/uis/ui_helper.py @@ -3,7 +3,7 @@ import os from typing import Optional from facefusion import state_manager -from facefusion.filesystem import is_image, is_video +from facefusion.filesystem import get_file_extension, is_image, is_video def convert_int_none(value : int) -> Optional[int]: @@ -20,7 +20,7 @@ def convert_str_none(value : str) -> Optional[str]: def suggest_output_path(output_directory_path : str, target_path : str) -> Optional[str]: if is_image(target_path) or is_video(target_path): - _, target_extension = os.path.splitext(target_path) - output_name = hashlib.sha1(str(state_manager.get_state()).encode()).hexdigest()[:8] - return os.path.join(output_directory_path, output_name + target_extension) + output_file_name = hashlib.sha1(str(state_manager.get_state()).encode()).hexdigest()[:8] + target_file_extension = get_file_extension(target_path) + return os.path.join(output_directory_path, output_file_name + target_file_extension) return None diff --git a/facefusion/vision.py b/facefusion/vision.py index e9b1a14..2ca3662 100644 --- a/facefusion/vision.py +++ b/facefusion/vision.py @@ -1,4 +1,3 @@ -import os from functools import lru_cache from typing import List, Optional, Tuple @@ -8,7 +7,7 @@ from cv2.typing import Size import facefusion.choices from facefusion.common_helper import is_windows -from facefusion.filesystem import is_image, is_video +from facefusion.filesystem import get_file_extension, is_image, is_video from facefusion.typing import Duration, Fps, Orientation, Resolution, VisionFrame @@ -38,8 +37,8 @@ def read_image(image_path : str) -> Optional[VisionFrame]: def write_image(image_path : str, vision_frame : VisionFrame) -> bool: if image_path: if is_windows(): - _, file_extension = os.path.splitext(image_path) - _, vision_frame = cv2.imencode(file_extension, vision_frame) + image_file_extension = get_file_extension(image_path) + _, vision_frame = cv2.imencode(image_file_extension, vision_frame) vision_frame.tofile(image_path) return is_image(image_path) return cv2.imwrite(image_path, vision_frame) diff --git a/requirements.txt b/requirements.txt index 180f1a4..9bc9e37 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -filetype==1.2.0 gradio==5.9.1 gradio-rangeslider==0.0.8 numpy==2.2.0 diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index a703ef4..514b6ed 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -7,7 +7,7 @@ from facefusion import process_manager, state_manager from facefusion.download import conditional_download from facefusion.ffmpeg import concat_video, extract_frames, read_audio_buffer, replace_audio, restore_audio from facefusion.filesystem import copy_file -from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, get_temp_frame_paths +from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory @@ -57,7 +57,7 @@ def test_extract_frames() -> None: create_temp_directory(target_path) assert extract_frames(target_path, '452x240', 30.0, trim_frame_start, trim_frame_end) is True - assert len(get_temp_frame_paths(target_path)) == frame_total + assert len(resolve_temp_frame_paths(target_path)) == frame_total clear_temp_directory(target_path) diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py index 70878fa..3c84952 100644 --- a/tests/test_filesystem.py +++ b/tests/test_filesystem.py @@ -3,7 +3,7 @@ import os.path import pytest from facefusion.download import conditional_download -from facefusion.filesystem import create_directory, filter_audio_paths, filter_image_paths, get_file_size, has_audio, has_image, in_directory, is_audio, is_directory, is_file, is_image, is_video, list_directory, remove_directory, same_file_extension +from facefusion.filesystem import create_directory, filter_audio_paths, filter_image_paths, get_file_extension, get_file_format, get_file_size, has_audio, has_image, has_video, in_directory, is_audio, is_directory, is_file, is_image, is_video, remove_directory, resolve_file_paths, same_file_extension from .helper import get_test_example_file, get_test_examples_directory, get_test_outputs_directory @@ -18,13 +18,26 @@ def before_all() -> None: def test_get_file_size() -> None: - assert get_file_size(get_test_example_file('source.jpg')) > 0 + assert get_file_size(get_test_example_file('source.jpg')) == 549458 assert get_file_size('invalid') == 0 +def test_get_file_extension() -> None: + assert get_file_extension('source.jpg') == '.jpg' + assert get_file_extension('source.mp3') == '.mp3' + assert get_file_extension('invalid') is None + + +def test_get_file_format() -> None: + assert get_file_format('source.jpg') == 'jpg' + assert get_file_format('source.mp3') == 'mp3' + assert get_file_format('invalid') is None + + def test_same_file_extension() -> None: - assert same_file_extension([ 'target.jpg', 'output.jpg' ]) is True - assert same_file_extension([ 'target.jpg', 'output.mp4' ]) is False + assert same_file_extension('source.jpg', 'source.jpg') is True + assert same_file_extension('source.jpg', 'source.mp3') is False + assert same_file_extension('invalid', 'invalid') is False def test_is_file() -> None: @@ -33,18 +46,6 @@ def test_is_file() -> None: assert is_file('invalid') is False -def test_is_directory() -> None: - assert is_directory(get_test_examples_directory()) is True - assert is_directory(get_test_example_file('source.jpg')) is False - assert is_directory('invalid') is False - - -def test_in_directory() -> None: - assert in_directory(get_test_example_file('source.jpg')) is True - assert in_directory('source.jpg') is False - assert in_directory('invalid') is False - - def test_is_audio() -> None: assert is_audio(get_test_example_file('source.mp3')) is True assert is_audio(get_test_example_file('source.jpg')) is False @@ -77,6 +78,13 @@ def test_is_video() -> None: assert is_video('invalid') is False +def test_has_video() -> None: + assert has_video([ get_test_example_file('target-240p.mp4') ]) is True + assert has_video([ get_test_example_file('target-240p.mp4'), get_test_example_file('source.mp3') ]) is True + assert has_video([ get_test_example_file('source.mp3'), get_test_example_file('source.mp3') ]) is False + assert has_video([ 'invalid' ]) is False + + def test_filter_audio_paths() -> None: assert filter_audio_paths([ get_test_example_file('source.jpg'), get_test_example_file('source.mp3') ]) == [ get_test_example_file('source.mp3') ] assert filter_audio_paths([ get_test_example_file('source.jpg'), get_test_example_file('source.jpg') ]) == [] @@ -89,6 +97,15 @@ def test_filter_image_paths() -> None: assert filter_audio_paths([ 'invalid' ]) == [] +def test_resolve_file_paths() -> None: + file_paths = resolve_file_paths(get_test_examples_directory()) + + for file_path in file_paths: + assert file_path == get_test_example_file(file_path) + + assert resolve_file_paths('invalid') == [] + + def test_create_directory() -> None: create_directory_path = os.path.join(get_test_outputs_directory(), 'create_directory') @@ -96,15 +113,6 @@ def test_create_directory() -> None: assert create_directory(get_test_example_file('source.jpg')) is False -def test_list_directory() -> None: - files = list_directory(get_test_examples_directory()) - - for file in files: - assert file.get('path') == get_test_example_file(file.get('name') + file.get('extension')) - - assert list_directory('invalid') is None - - def test_remove_directory() -> None: remove_directory_path = os.path.join(get_test_outputs_directory(), 'remove_directory') create_directory(remove_directory_path) @@ -112,3 +120,15 @@ def test_remove_directory() -> None: assert remove_directory(remove_directory_path) is True assert remove_directory(get_test_example_file('source.jpg')) is False assert remove_directory('invalid') is False + + +def test_is_directory() -> None: + assert is_directory(get_test_examples_directory()) is True + assert is_directory(get_test_example_file('source.jpg')) is False + assert is_directory('invalid') is False + + +def test_in_directory() -> None: + assert in_directory(get_test_example_file('source.jpg')) is True + assert in_directory('source.jpg') is False + assert in_directory('invalid') is False diff --git a/tests/test_vision.py b/tests/test_vision.py index 6d104d0..bd16476 100644 --- a/tests/test_vision.py +++ b/tests/test_vision.py @@ -4,7 +4,7 @@ import pytest from facefusion.download import conditional_download from facefusion.vision import calc_histogram_difference, count_trim_frame_total, count_video_frame_total, create_image_resolutions, create_video_resolutions, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution, get_video_frame, match_frame_color, normalize_resolution, pack_resolution, read_image, restrict_image_resolution, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, unpack_resolution, write_image -from .helper import get_test_example_file, get_test_examples_directory, prepare_test_output_directory, get_test_output_file +from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True)