diff --git a/facefusion/audio.py b/facefusion/audio.py index abe52c8..383e225 100644 --- a/facefusion/audio.py +++ b/facefusion/audio.py @@ -3,7 +3,7 @@ from typing import Any, List, Optional import numpy import scipy -from numpy._typing import NDArray +from numpy.typing import NDArray from facefusion.ffmpeg import read_audio_buffer from facefusion.filesystem import is_audio @@ -17,11 +17,12 @@ def read_static_audio(audio_path : str, fps : Fps) -> Optional[List[AudioFrame]] def read_audio(audio_path : str, fps : Fps) -> Optional[List[AudioFrame]]: - sample_rate = 48000 - channel_total = 2 + audio_sample_rate = 48000 + audio_sample_size = 16 + audio_channel_total = 2 if is_audio(audio_path): - audio_buffer = read_audio_buffer(audio_path, sample_rate, channel_total) + audio_buffer = read_audio_buffer(audio_path, audio_sample_rate, audio_sample_size, audio_channel_total) audio = numpy.frombuffer(audio_buffer, dtype = numpy.int16).reshape(-1, 2) audio = prepare_audio(audio) spectrogram = create_spectrogram(audio) @@ -36,15 +37,16 @@ def read_static_voice(audio_path : str, fps : Fps) -> Optional[List[AudioFrame]] def read_voice(audio_path : str, fps : Fps) -> Optional[List[AudioFrame]]: - sample_rate = 48000 - channel_total = 2 - chunk_size = 240 * 1024 - step_size = 180 * 1024 + voice_sample_rate = 48000 + voice_sample_size = 16 + voice_channel_total = 2 + voice_chunk_size = 240 * 1024 + voice_step_size = 180 * 1024 if is_audio(audio_path): - audio_buffer = read_audio_buffer(audio_path, sample_rate, channel_total) + audio_buffer = read_audio_buffer(audio_path, voice_sample_rate, voice_sample_size, voice_channel_total) audio = numpy.frombuffer(audio_buffer, dtype = numpy.int16).reshape(-1, 2) - audio = batch_extract_voice(audio, chunk_size, step_size) + audio = batch_extract_voice(audio, voice_chunk_size, voice_step_size) audio = prepare_voice(audio) spectrogram = create_spectrogram(audio) audio_frames = extract_audio_frames(spectrogram, fps) @@ -60,6 +62,20 @@ def get_audio_frame(audio_path : str, fps : Fps, frame_number : int = 0) -> Opti return None +def extract_audio_frames(spectrogram: Spectrogram, fps: Fps) -> List[AudioFrame]: + audio_frames = [] + mel_filter_total = 80 + audio_step_size = 16 + indices = numpy.arange(0, spectrogram.shape[1], mel_filter_total / fps).astype(numpy.int16) + indices = indices[indices >= audio_step_size] + + for index in indices: + start = max(0, index - audio_step_size) + audio_frames.append(spectrogram[:, start:index]) + + return audio_frames + + def get_voice_frame(audio_path : str, fps : Fps, frame_number : int = 0) -> Optional[AudioFrame]: if is_audio(audio_path): voice_frames = read_static_voice(audio_path, fps) @@ -70,8 +86,8 @@ def get_voice_frame(audio_path : str, fps : Fps, frame_number : int = 0) -> Opti def create_empty_audio_frame() -> AudioFrame: mel_filter_total = 80 - step_size = 16 - audio_frame = numpy.zeros((mel_filter_total, step_size)).astype(numpy.int16) + audio_step_size = 16 + audio_frame = numpy.zeros((mel_filter_total, audio_step_size)).astype(numpy.int16) return audio_frame @@ -84,10 +100,10 @@ def prepare_audio(audio : Audio) -> Audio: def prepare_voice(audio : Audio) -> Audio: - sample_rate = 48000 - resample_rate = 16000 - - audio = scipy.signal.resample(audio, int(len(audio) * resample_rate / sample_rate)) + audio_sample_rate = 48000 + audio_resample_rate = 16000 + audio_resample_factor = round(len(audio) * audio_resample_rate / audio_sample_rate) + audio = scipy.signal.resample(audio, audio_resample_factor) audio = prepare_audio(audio) return audio @@ -101,19 +117,20 @@ def convert_mel_to_hertz(mel : Mel) -> NDArray[Any]: def create_mel_filter_bank() -> MelFilterBank: + audio_sample_rate = 16000 + audio_min_frequency = 55.0 + audio_max_frequency = 7600.0 mel_filter_total = 80 mel_bin_total = 800 - sample_rate = 16000 - min_frequency = 55.0 - max_frequency = 7600.0 mel_filter_bank = numpy.zeros((mel_filter_total, mel_bin_total // 2 + 1)) - mel_frequency_range = numpy.linspace(convert_hertz_to_mel(min_frequency), convert_hertz_to_mel(max_frequency), mel_filter_total + 2) - indices = numpy.floor((mel_bin_total + 1) * convert_mel_to_hertz(mel_frequency_range) / sample_rate).astype(numpy.int16) + mel_frequency_range = numpy.linspace(convert_hertz_to_mel(audio_min_frequency), convert_hertz_to_mel(audio_max_frequency), mel_filter_total + 2) + indices = numpy.floor((mel_bin_total + 1) * convert_mel_to_hertz(mel_frequency_range) / audio_sample_rate).astype(numpy.int16) for index in range(mel_filter_total): start = indices[index] end = indices[index + 1] mel_filter_bank[index, start:end] = scipy.signal.windows.triang(end - start) + return mel_filter_bank @@ -124,16 +141,3 @@ def create_spectrogram(audio : Audio) -> Spectrogram: spectrogram = scipy.signal.stft(audio, nperseg = mel_bin_total, nfft = mel_bin_total, noverlap = mel_bin_overlap)[2] spectrogram = numpy.dot(mel_filter_bank, numpy.abs(spectrogram)) return spectrogram - - -def extract_audio_frames(spectrogram : Spectrogram, fps : Fps) -> List[AudioFrame]: - mel_filter_total = 80 - step_size = 16 - audio_frames = [] - indices = numpy.arange(0, spectrogram.shape[1], mel_filter_total / fps).astype(numpy.int16) - indices = indices[indices >= step_size] - - for index in indices: - start = max(0, index - step_size) - audio_frames.append(spectrogram[:, start:index]) - return audio_frames diff --git a/facefusion/choices.py b/facefusion/choices.py index baf79d4..9a448f8 100755 --- a/facefusion/choices.py +++ b/facefusion/choices.py @@ -2,7 +2,7 @@ import logging from typing import List, Sequence from facefusion.common_helper import create_float_range, create_int_range -from facefusion.typing import Angle, AudioFormat, AudioTypeSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, OutputAudioEncoder, OutputVideoEncoder, OutputVideoPreset, Race, Score, UiWorkflow, VideoFormat, VideoMemoryStrategy, VideoTypeSet +from facefusion.typing import Angle, AudioEncoder, AudioFormat, AudioTypeSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, Race, Score, UiWorkflow, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoTypeSet, WebcamMode face_detector_set : FaceDetectorSet =\ { @@ -61,13 +61,16 @@ image_formats : List[ImageFormat] = list(image_type_set.keys()) video_formats : List[VideoFormat] = list(video_type_set.keys()) temp_frame_formats : List[ImageFormat] = [ 'bmp', 'jpg', 'png' ] -output_audio_encoders : List[OutputAudioEncoder] = [ 'aac', 'libmp3lame', 'libopus', 'libvorbis' ] -output_video_encoders : List[OutputVideoEncoder] = [ 'libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox' ] -output_video_presets : List[OutputVideoPreset] = [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ] +output_audio_encoders : List[AudioEncoder] = [ 'aac', 'libmp3lame', 'libopus', 'libvorbis' ] +output_video_encoders : List[VideoEncoder] = [ 'libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox' ] +output_video_presets : List[VideoPreset] = [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ] image_template_sizes : List[float] = [ 0.25, 0.5, 0.75, 1, 1.5, 2, 2.5, 3, 3.5, 4 ] video_template_sizes : List[int] = [ 240, 360, 480, 540, 720, 1080, 1440, 2160, 4320 ] +webcam_modes : List[WebcamMode] = [ 'inline', 'udp', 'v4l2' ] +webcam_resolutions : List[str] = [ '320x240', '640x480', '800x600', '1024x768', '1280x720', '1280x960', '1920x1080', '2560x1440', '3840x2160' ] + execution_provider_set : ExecutionProviderSet =\ { 'cpu': 'CPUExecutionProvider', diff --git a/facefusion/curl_builder.py b/facefusion/curl_builder.py new file mode 100644 index 0000000..7b60dc4 --- /dev/null +++ b/facefusion/curl_builder.py @@ -0,0 +1,27 @@ +import itertools +import shutil + +from facefusion import metadata +from facefusion.typing import Commands + + +def run(commands : Commands) -> Commands: + user_agent = metadata.get('name') + '/' + metadata.get('version') + + return [ shutil.which('curl'), '--user-agent', user_agent, '--insecure', '--location', '--silent' ] + commands + + +def chain(*commands : Commands) -> Commands: + return list(itertools.chain(*commands)) + + +def head(url : str) -> Commands: + return [ '-I', url ] + + +def download(url : str, download_file_path : str) -> Commands: + return [ '--create-dirs', '--continue-at', '-', '--output', download_file_path, url ] + + +def set_timeout(timeout : int) -> Commands: + return [ '--connect-timeout', str(timeout) ] diff --git a/facefusion/download.py b/facefusion/download.py index 25662d4..f56f515 100644 --- a/facefusion/download.py +++ b/facefusion/download.py @@ -1,5 +1,4 @@ import os -import shutil import subprocess from functools import lru_cache from typing import List, Optional, Tuple @@ -8,14 +7,14 @@ from urllib.parse import urlparse from tqdm import tqdm import facefusion.choices -from facefusion import logger, process_manager, state_manager, wording +from facefusion import curl_builder, logger, process_manager, state_manager, wording from facefusion.filesystem import get_file_name, get_file_size, is_file, remove_file from facefusion.hash_helper import validate_hash from facefusion.typing import Commands, DownloadProvider, DownloadSet def open_curl(commands : Commands) -> subprocess.Popen[bytes]: - commands = [ shutil.which('curl'), '--silent', '--insecure', '--location' ] + commands + commands = curl_builder.run(commands) return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE) @@ -28,7 +27,10 @@ def conditional_download(download_directory_path : str, urls : List[str]) -> Non if initial_size < download_size: with tqdm(total = download_size, initial = initial_size, desc = wording.get('downloading'), unit = 'B', unit_scale = True, unit_divisor = 1024, ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress: - commands = [ '--create-dirs', '--continue-at', '-', '--output', download_file_path, url, '--connect-timeout', '10' ] + commands = curl_builder.chain( + curl_builder.download(url, download_file_path), + curl_builder.set_timeout(10) + ) open_curl(commands) current_size = initial_size progress.set_postfix(download_providers = state_manager.get_item('download_providers'), file_name = download_file_name) @@ -41,7 +43,10 @@ def conditional_download(download_directory_path : str, urls : List[str]) -> Non @lru_cache(maxsize = None) def get_static_download_size(url : str) -> int: - commands = [ '-I', url, '--connect-timeout', '5' ] + commands = curl_builder.chain( + curl_builder.head(url), + curl_builder.set_timeout(5) + ) process = open_curl(commands) lines = reversed(process.stdout.readlines()) @@ -56,7 +61,10 @@ def get_static_download_size(url : str) -> int: @lru_cache(maxsize = None) def ping_static_url(url : str) -> bool: - commands = [ '-I', url, '--connect-timeout', '5' ] + commands = curl_builder.chain( + curl_builder.head(url), + curl_builder.set_timeout(5) + ) process = open_curl(commands) process.communicate() return process.returncode == 0 diff --git a/facefusion/ffmpeg.py b/facefusion/ffmpeg.py index 919db20..0463534 100644 --- a/facefusion/ffmpeg.py +++ b/facefusion/ffmpeg.py @@ -1,21 +1,22 @@ import os -import shutil import subprocess import tempfile from typing import List, Optional from tqdm import tqdm -from facefusion import logger, process_manager, state_manager, wording +from facefusion import ffmpeg_builder, logger, process_manager, state_manager, wording from facefusion.filesystem import get_file_format, remove_file from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern, resolve_temp_frame_paths -from facefusion.typing import AudioBuffer, Commands, Fps, OutputVideoPreset, UpdateProgress +from facefusion.typing import AudioBuffer, Commands, Fps, UpdateProgress from facefusion.vision import count_trim_frame_total, detect_video_duration, restrict_video_fps def run_ffmpeg_with_progress(commands : Commands, update_progress : UpdateProgress) -> subprocess.Popen[bytes]: log_level = state_manager.get_item('log_level') - commands = [ shutil.which('ffmpeg'), '-hide_banner', '-nostats', '-loglevel', 'error', '-progress', '-' ] + commands + commands.extend(ffmpeg_builder.set_progress()) + commands.extend(ffmpeg_builder.cast_stream()) + commands = ffmpeg_builder.run(commands) process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE) while process_manager.is_processing(): @@ -40,7 +41,7 @@ def run_ffmpeg_with_progress(commands : Commands, update_progress : UpdateProgre def run_ffmpeg(commands : Commands) -> subprocess.Popen[bytes]: log_level = state_manager.get_item('log_level') - commands = [ shutil.which('ffmpeg'), '-hide_banner', '-nostats', '-loglevel', 'error' ] + commands + commands = ffmpeg_builder.run(commands) process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE) while process_manager.is_processing(): @@ -58,7 +59,7 @@ def run_ffmpeg(commands : Commands) -> subprocess.Popen[bytes]: def open_ffmpeg(commands : Commands) -> subprocess.Popen[bytes]: - commands = [ shutil.which('ffmpeg'), '-loglevel', 'quiet' ] + commands + commands = ffmpeg_builder.run(commands) return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE) @@ -74,17 +75,14 @@ def log_debug(process : subprocess.Popen[bytes]) -> None: def extract_frames(target_path : str, temp_video_resolution : str, temp_video_fps : Fps, trim_frame_start : int, trim_frame_end : int) -> bool: extract_frame_total = count_trim_frame_total(target_path, trim_frame_start, trim_frame_end) temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d') - commands = [ '-i', target_path, '-s', str(temp_video_resolution), '-q:v', '0' ] - - if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int): - commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ':end_frame=' + str(trim_frame_end) + ',fps=' + str(temp_video_fps) ]) - elif isinstance(trim_frame_start, int): - commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ',fps=' + str(temp_video_fps) ]) - elif isinstance(trim_frame_end, int): - commands.extend([ '-vf', 'trim=end_frame=' + str(trim_frame_end) + ',fps=' + str(temp_video_fps) ]) - else: - commands.extend([ '-vf', 'fps=' + str(temp_video_fps) ]) - commands.extend([ '-vsync', '0', temp_frames_pattern ]) + commands = ffmpeg_builder.chain( + ffmpeg_builder.set_input(target_path), + ffmpeg_builder.set_media_resolution(temp_video_resolution), + ffmpeg_builder.set_frame_quality(0), + ffmpeg_builder.select_frame_range(trim_frame_start, trim_frame_end, temp_video_fps), + ffmpeg_builder.prevent_frame_drop(), + ffmpeg_builder.set_output(temp_frames_pattern) + ) with tqdm(total = extract_frame_total, desc = wording.get('extracting'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress: process = run_ffmpeg_with_progress(commands, lambda frame_number: progress.update(frame_number - progress.n)) @@ -93,27 +91,37 @@ def extract_frames(target_path : str, temp_video_resolution : str, temp_video_fp def copy_image(target_path : str, temp_image_resolution : str) -> bool: temp_file_path = get_temp_file_path(target_path) - if get_file_format(target_path) == 'webp': - output_image_compression = 100 - else: - output_image_compression = 1 - commands = [ '-i', target_path, '-s', str(temp_image_resolution), '-q:v', str(output_image_compression), '-y', temp_file_path ] + commands = ffmpeg_builder.chain( + ffmpeg_builder.set_input(target_path), + ffmpeg_builder.set_media_resolution(temp_image_resolution), + ffmpeg_builder.set_image_quality(target_path, 100), + ffmpeg_builder.force_output(temp_file_path) + ) return run_ffmpeg(commands).returncode == 0 def finalize_image(target_path : str, output_path : str, output_image_resolution : str) -> bool: output_image_quality = state_manager.get_item('output_image_quality') temp_file_path = get_temp_file_path(target_path) - if get_file_format(target_path) == 'webp': - output_image_compression = output_image_quality - else: - output_image_compression = round(31 - (output_image_quality * 0.31)) - commands = [ '-i', temp_file_path, '-s', str(output_image_resolution), '-q:v', str(output_image_compression), '-y', output_path ] + commands = ffmpeg_builder.chain( + ffmpeg_builder.set_input(temp_file_path), + ffmpeg_builder.set_media_resolution(output_image_resolution), + ffmpeg_builder.set_image_quality(target_path, output_image_quality), + ffmpeg_builder.force_output(output_path) + ) return run_ffmpeg(commands).returncode == 0 -def read_audio_buffer(target_path : str, sample_rate : int, channel_total : int) -> Optional[AudioBuffer]: - commands = [ '-i', target_path, '-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sample_rate), '-ac', str(channel_total), '-' ] +def read_audio_buffer(target_path : str, audio_sample_rate : int, audio_sample_size : int, audio_channel_total : int) -> Optional[AudioBuffer]: + commands = ffmpeg_builder.chain( + ffmpeg_builder.set_input(target_path), + ffmpeg_builder.ignore_video_stream(), + ffmpeg_builder.set_audio_sample_rate(audio_sample_rate), + ffmpeg_builder.set_audio_sample_size(audio_sample_size), + ffmpeg_builder.set_audio_channel_total(audio_channel_total), + ffmpeg_builder.cast_stream() + ) + process = open_ffmpeg(commands) audio_buffer, _ = process.communicate() if process.returncode == 0: @@ -127,26 +135,20 @@ def restore_audio(target_path : str, output_path : str, output_video_fps : Fps, output_audio_volume = state_manager.get_item('output_audio_volume') temp_file_path = get_temp_file_path(target_path) temp_video_duration = detect_video_duration(temp_file_path) - commands = [ '-i', temp_file_path ] - if isinstance(trim_frame_start, int): - start_time = trim_frame_start / output_video_fps - commands.extend([ '-ss', str(start_time) ]) - if isinstance(trim_frame_end, int): - end_time = trim_frame_end / output_video_fps - commands.extend([ '-to', str(end_time) ]) - commands.extend([ '-i', target_path, '-c:v', 'copy', '-c:a', output_audio_encoder ]) - if output_audio_encoder in [ 'aac' ]: - output_audio_compression = round(10 - (output_audio_quality * 0.9)) - commands.extend([ '-q:a', str(output_audio_compression) ]) - if output_audio_encoder in [ 'libmp3lame' ]: - output_audio_compression = round(9 - (output_audio_quality * 0.9)) - commands.extend([ '-q:a', str(output_audio_compression) ]) - if output_audio_encoder in [ 'libopus', 'libvorbis' ]: - output_audio_compression = round((100 - output_audio_quality) / 10) - commands.extend([ '-q:a', str(output_audio_compression) ]) - output_audio_volume = output_audio_volume / 100 - commands.extend([ '-filter:a', 'volume=' + str(output_audio_volume), '-map', '0:v:0', '-map', '1:a:0', '-t', str(temp_video_duration), '-y', output_path ]) + commands = ffmpeg_builder.chain( + ffmpeg_builder.set_input(temp_file_path), + ffmpeg_builder.select_media_range(trim_frame_start, trim_frame_end, output_video_fps), + ffmpeg_builder.set_input(target_path), + ffmpeg_builder.copy_video_encoder(), + ffmpeg_builder.set_audio_encoder(output_audio_encoder), + ffmpeg_builder.set_audio_quality(output_audio_encoder, output_audio_quality), + ffmpeg_builder.set_audio_volume(output_audio_volume), + ffmpeg_builder.select_media_stream('0:v:0'), + ffmpeg_builder.select_media_stream('1:a:0'), + ffmpeg_builder.set_video_duration(temp_video_duration), + ffmpeg_builder.force_output(output_path) + ) return run_ffmpeg(commands).returncode == 0 @@ -156,19 +158,17 @@ def replace_audio(target_path : str, audio_path : str, output_path : str) -> boo output_audio_volume = state_manager.get_item('output_audio_volume') temp_file_path = get_temp_file_path(target_path) temp_video_duration = detect_video_duration(temp_file_path) - commands = [ '-i', temp_file_path, '-i', audio_path, '-c:v', 'copy', '-c:a', output_audio_encoder ] - if output_audio_encoder in [ 'aac' ]: - output_audio_compression = round(10 - (output_audio_quality * 0.9)) - commands.extend([ '-q:a', str(output_audio_compression) ]) - if output_audio_encoder in [ 'libmp3lame' ]: - output_audio_compression = round(9 - (output_audio_quality * 0.9)) - commands.extend([ '-q:a', str(output_audio_compression) ]) - if output_audio_encoder in [ 'libopus', 'libvorbis' ]: - output_audio_compression = round((100 - output_audio_quality) / 10) - commands.extend([ '-q:a', str(output_audio_compression) ]) - output_audio_volume = output_audio_volume / 100 - commands.extend([ '-filter:a', 'volume=' + str(output_audio_volume), '-t', str(temp_video_duration), '-y', output_path ]) + commands = ffmpeg_builder.chain( + ffmpeg_builder.set_input(temp_file_path), + ffmpeg_builder.set_input(audio_path), + ffmpeg_builder.copy_video_encoder(), + ffmpeg_builder.set_audio_encoder(output_audio_encoder), + ffmpeg_builder.set_audio_quality(output_audio_encoder, output_audio_quality), + ffmpeg_builder.set_audio_volume(output_audio_volume), + ffmpeg_builder.set_video_duration(temp_video_duration), + ffmpeg_builder.force_output(output_path) + ) return run_ffmpeg(commands).returncode == 0 @@ -183,22 +183,19 @@ def merge_video(target_path : str, output_video_resolution : str, output_video_f if get_file_format(target_path) == 'webm': output_video_encoder = 'libvpx-vp9' - commands = [ '-r', str(temp_video_fps), '-i', temp_frames_pattern, '-s', str(output_video_resolution), '-c:v', output_video_encoder ] - if output_video_encoder in [ 'libx264', 'libx265' ]: - output_video_compression = round(51 - (output_video_quality * 0.51)) - commands.extend([ '-crf', str(output_video_compression), '-preset', output_video_preset ]) - if output_video_encoder in [ 'libvpx-vp9' ]: - output_video_compression = round(63 - (output_video_quality * 0.63)) - commands.extend([ '-crf', str(output_video_compression) ]) - if output_video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]: - output_video_compression = round(51 - (output_video_quality * 0.51)) - commands.extend([ '-cq', str(output_video_compression), '-preset', map_nvenc_preset(output_video_preset) ]) - if output_video_encoder in [ 'h264_amf', 'hevc_amf' ]: - output_video_compression = round(51 - (output_video_quality * 0.51)) - commands.extend([ '-qp_i', str(output_video_compression), '-qp_p', str(output_video_compression), '-quality', map_amf_preset(output_video_preset) ]) - if output_video_encoder in [ 'h264_videotoolbox', 'hevc_videotoolbox' ]: - commands.extend([ '-q:v', str(output_video_quality) ]) - commands.extend([ '-vf', 'framerate=fps=' + str(output_video_fps), '-pix_fmt', 'yuv420p', '-colorspace', 'bt709', '-y', temp_file_path ]) + + commands = ffmpeg_builder.chain( + ffmpeg_builder.set_conditional_fps(temp_video_fps), + ffmpeg_builder.set_input(temp_frames_pattern), + ffmpeg_builder.set_video_encoder(output_video_encoder), + ffmpeg_builder.set_media_resolution(output_video_resolution), + ffmpeg_builder.set_video_quality(output_video_encoder, output_video_quality), + ffmpeg_builder.set_video_preset(output_video_encoder, output_video_preset), + ffmpeg_builder.set_video_fps(output_video_fps), + ffmpeg_builder.set_pixel_format('yuv420p'), + ffmpeg_builder.set_video_colorspace('bt709'), + ffmpeg_builder.force_output(temp_file_path) + ) with tqdm(total = merge_frame_total, desc = wording.get('merging'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress: process = run_ffmpeg_with_progress(commands, lambda frame_number: progress.update(frame_number - progress.n)) @@ -213,38 +210,16 @@ def concat_video(output_path : str, temp_output_paths : List[str]) -> bool: concat_video_file.write('file \'' + os.path.abspath(temp_output_path) + '\'' + os.linesep) concat_video_file.flush() concat_video_file.close() - commands = [ '-f', 'concat', '-safe', '0', '-i', concat_video_file.name, '-c:v', 'copy', '-c:a', 'copy', '-y', os.path.abspath(output_path) ] + + output_path = os.path.abspath(output_path) + commands = ffmpeg_builder.chain( + ffmpeg_builder.unsafe_concat(), + ffmpeg_builder.set_input(concat_video_file.name), + ffmpeg_builder.copy_video_encoder(), + ffmpeg_builder.copy_audio_encoder(), + ffmpeg_builder.force_output(output_path) + ) process = run_ffmpeg(commands) process.communicate() remove_file(concat_video_path) return process.returncode == 0 - - -def map_nvenc_preset(output_video_preset : OutputVideoPreset) -> Optional[str]: - if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast' ]: - return 'fast' - if output_video_preset == 'medium': - return 'medium' - if output_video_preset in [ 'slow', 'slower', 'veryslow' ]: - return 'slow' - return None - - -def map_amf_preset(output_video_preset : OutputVideoPreset) -> Optional[str]: - if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast' ]: - return 'speed' - if output_video_preset in [ 'faster', 'fast', 'medium' ]: - return 'balanced' - if output_video_preset in [ 'slow', 'slower', 'veryslow' ]: - return 'quality' - return None - - -def map_qsv_preset(output_video_preset : OutputVideoPreset) -> Optional[str]: - if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast' ]: - return 'fast' - if output_video_preset == 'medium': - return 'medium' - if output_video_preset in [ 'slow', 'slower', 'veryslow' ]: - return 'slow' - return None diff --git a/facefusion/ffmpeg_builder.py b/facefusion/ffmpeg_builder.py new file mode 100644 index 0000000..755f0e1 --- /dev/null +++ b/facefusion/ffmpeg_builder.py @@ -0,0 +1,226 @@ +import itertools +import shutil +from typing import Optional + +from facefusion.filesystem import get_file_format +from facefusion.typing import AudioEncoder, Commands, Duration, Fps, StreamMode, VideoEncoder, VideoPreset + + +def run(commands : Commands) -> Commands: + return [ shutil.which('ffmpeg'), '-loglevel', 'error' ] + commands + + +def chain(*commands : Commands) -> Commands: + return list(itertools.chain(*commands)) + + +def set_progress() -> Commands: + return [ '-progress' ] + + +def set_input(input_path : str) -> Commands: + return [ '-i', input_path ] + + +def set_conditional_fps(conditional_fps : Fps) -> Commands: + return [ '-r', str(conditional_fps) ] + + +def set_output(output_path : str) -> Commands: + return [ output_path ] + + +def force_output(output_path : str) -> Commands: + return [ '-y', output_path ] + + +def cast_stream() -> Commands: + return [ '-' ] + + +def set_stream_mode(stream_mode : StreamMode) -> Commands: + if stream_mode == 'udp': + return [ '-f', 'mpegts' ] + if stream_mode == 'v4l2': + return [ '-f', 'v4l2' ] + return [] + + +def unsafe_concat() -> Commands: + return [ '-f', 'concat', '-safe', '0' ] + + +def set_pixel_format(pixel_format : str) -> Commands: + return [ '-pix_fmt', pixel_format ] + + +def set_frame_quality(frame_quality : int) -> Commands: + return [ '-q:v', str(frame_quality) ] + + +def select_frame_range(frame_start : int, frame_end : int, video_fps : Fps) -> Commands: + if isinstance(frame_start, int) and isinstance(frame_end, int): + return [ '-vf', 'trim=start_frame=' + str(frame_start) + ':end_frame=' + str(frame_end) + ',fps=' + str(video_fps) ] + if isinstance(frame_start, int): + return [ '-vf', 'trim=start_frame=' + str(frame_start) + ',fps=' + str(video_fps) ] + if isinstance(frame_end, int): + return [ '-vf', 'trim=end_frame=' + str(frame_end) + ',fps=' + str(video_fps) ] + return [ '-vf', 'fps=' + str(video_fps) ] + + +def prevent_frame_drop() -> Commands: + return [ '-vsync', '0' ] + + +def select_media_range(frame_start : int, frame_end : int, media_fps : Fps) -> Commands: + commands = [] + + if isinstance(frame_start, int): + commands.extend([ '-ss', str(frame_start / media_fps) ]) + if isinstance(frame_end, int): + commands.extend([ '-to', str(frame_end / media_fps) ]) + return commands + + +def select_media_stream(media_stream : str) -> Commands: + return [ '-map', media_stream ] + + +def set_media_resolution(video_resolution : str) -> Commands: + return [ '-s', video_resolution ] + + +def set_image_quality(image_path : str, image_quality : int) -> Commands: + if get_file_format(image_path) == 'webp': + image_compression = image_quality + else: + image_compression = round(31 - (image_quality * 0.31)) + return [ '-q:v', str(image_compression) ] + + +def set_audio_encoder(audio_codec : str) -> Commands: + return [ '-c:a', audio_codec ] + + +def copy_audio_encoder() -> Commands: + return set_audio_encoder('copy') + + +def set_audio_sample_rate(audio_sample_rate : int) -> Commands: + return [ '-ar', str(audio_sample_rate) ] + + +def set_audio_sample_size(audio_sample_size : int) -> Commands: + if audio_sample_size == 16: + return [ '-f', 's16le', '-acodec', 'pcm_s16le' ] + if audio_sample_size == 32: + return [ '-f', 's32le', '-acodec', 'pcm_s32le' ] + return [] + + +def set_audio_channel_total(audio_channel_total : int) -> Commands: + return [ '-ac', str(audio_channel_total) ] + + +def set_audio_quality(audio_encoder : AudioEncoder, audio_quality : int) -> Commands: + if audio_encoder == 'aac': + audio_compression = round(10 - (audio_quality * 0.9)) + return [ '-q:a', str(audio_compression) ] + if audio_encoder == 'libmp3lame': + audio_compression = round(9 - (audio_quality * 0.9)) + return [ '-q:a', str(audio_compression) ] + if audio_encoder in [ 'libopus', 'libvorbis' ]: + audio_compression = round((100 - audio_quality) / 10) + return [ '-q:a', str(audio_compression) ] + return [] + + +def set_audio_volume(audio_volume : int) -> Commands: + return [ '-filter:a', 'volume=' + str(audio_volume / 100) ] + + +def set_video_encoder(video_encoder : str) -> Commands: + return [ '-c:v', video_encoder ] + + +def copy_video_encoder() -> Commands: + return set_video_encoder('copy') + + +def set_video_quality(video_encoder : VideoEncoder, video_quality : int) -> Commands: + if video_encoder in [ 'libx264', 'libx265' ]: + video_compression = round(51 - (video_quality * 0.51)) + return [ '-crf', str(video_compression) ] + if video_encoder == 'libvpx-vp9': + video_compression = round(63 - (video_quality * 0.63)) + return [ '-crf', str(video_compression) ] + if video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]: + video_compression = round(51 - (video_quality * 0.51)) + return [ '-cq', str(video_compression) ] + if video_encoder in [ 'h264_amf', 'hevc_amf' ]: + video_compression = round(51 - (video_quality * 0.51)) + return [ '-qp_i', str(video_compression), '-qp_p', str(video_compression) ] + if video_encoder in [ 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox' ]: + video_compression = round(51 - (video_quality * 0.51)) + return [ '-q:v', str(video_compression) ] + return [ '-q:v', str(video_quality) ] + + +def set_video_preset(video_encoder : VideoEncoder, video_preset : VideoPreset) -> Commands: + if video_encoder in [ 'libx264', 'libx265' ]: + return [ '-preset', video_preset ] + if video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]: + return [ '-preset', map_nvenc_preset(video_preset) ] + if video_encoder in [ 'h264_amf', 'hevc_amf' ]: + return [ '-quality', map_amf_preset(video_preset) ] + if video_encoder in [ 'h264_qsv', 'hevc_qsv' ]: + return [ '-preset', map_qsv_preset(video_preset) ] + return [] + + +def set_video_colorspace(video_colorspace : str) -> Commands: + return [ '-colorspace', video_colorspace ] + + +def set_video_fps(video_fps : Fps) -> Commands: + return [ '-vf', 'framerate=fps=' + str(video_fps) ] + + +def set_video_duration(video_duration : Duration) -> Commands: + return [ '-t', str(video_duration) ] + + +def capture_video() -> Commands: + return [ '-f', 'rawvideo' ] + + +def ignore_video_stream() -> Commands: + return [ '-vn' ] + + +def map_nvenc_preset(video_preset : VideoPreset) -> Optional[str]: + if video_preset in [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast' ]: + return 'fast' + if video_preset == 'medium': + return 'medium' + if video_preset in [ 'slow', 'slower', 'veryslow' ]: + return 'slow' + return None + + +def map_amf_preset(video_preset : VideoPreset) -> Optional[str]: + if video_preset in [ 'ultrafast', 'superfast', 'veryfast' ]: + return 'speed' + if video_preset in [ 'faster', 'fast', 'medium' ]: + return 'balanced' + if video_preset in [ 'slow', 'slower', 'veryslow' ]: + return 'quality' + return None + + +def map_qsv_preset(video_preset : VideoPreset) -> Optional[str]: + if video_preset in [ 'ultrafast', 'superfast', 'veryfast' ]: + return 'veryfast' + if video_preset in [ 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ]: + return video_preset + return None diff --git a/facefusion/processors/modules/expression_restorer.py b/facefusion/processors/modules/expression_restorer.py index 19e40f4..e5a276a 100755 --- a/facefusion/processors/modules/expression_restorer.py +++ b/facefusion/processors/modules/expression_restorer.py @@ -19,8 +19,7 @@ from facefusion.face_store import get_reference_faces from facefusion.filesystem import in_directory, is_image, is_video, resolve_relative_path, same_file_extension from facefusion.processors import choices as processors_choices from facefusion.processors.live_portrait import create_rotation, limit_expression -from facefusion.processors.typing import ExpressionRestorerInputs -from facefusion.processors.typing import LivePortraitExpression, LivePortraitFeatureVolume, LivePortraitMotionPoints, LivePortraitPitch, LivePortraitRoll, LivePortraitScale, LivePortraitTranslation, LivePortraitYaw +from facefusion.processors.typing import ExpressionRestorerInputs, LivePortraitExpression, LivePortraitFeatureVolume, LivePortraitMotionPoints, LivePortraitPitch, LivePortraitRoll, LivePortraitScale, LivePortraitTranslation, LivePortraitYaw from facefusion.program_helper import find_argument_group from facefusion.thread_helper import conditional_thread_semaphore, thread_semaphore from facefusion.typing import ApplyStateItem, Args, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame diff --git a/facefusion/typing.py b/facefusion/typing.py index 21ec66c..1c7977f 100755 --- a/facefusion/typing.py +++ b/facefusion/typing.py @@ -117,9 +117,12 @@ AudioTypeSet = Dict[AudioFormat, str] ImageTypeSet = Dict[ImageFormat, str] VideoTypeSet = Dict[VideoFormat, str] -OutputAudioEncoder = Literal['aac', 'libmp3lame', 'libopus', 'libvorbis'] -OutputVideoEncoder = Literal['libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf','h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox'] -OutputVideoPreset = Literal['ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow'] +AudioEncoder = Literal['aac', 'libmp3lame', 'libopus', 'libvorbis'] +VideoEncoder = Literal['libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox'] +VideoPreset = Literal['ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow'] + +WebcamMode = Literal['inline', 'udp', 'v4l2'] +StreamMode = Literal['udp', 'v4l2'] ModelOptions = Dict[str, Any] ModelSet = Dict[str, ModelOptions] @@ -317,11 +320,11 @@ State = TypedDict('State', 'keep_temp' : bool, 'output_image_quality' : int, 'output_image_resolution' : str, - 'output_audio_encoder' : OutputAudioEncoder, + 'output_audio_encoder' : AudioEncoder, 'output_audio_quality' : int, 'output_audio_volume' : int, - 'output_video_encoder' : OutputVideoEncoder, - 'output_video_preset' : OutputVideoPreset, + 'output_video_encoder' : VideoEncoder, + 'output_video_preset' : VideoPreset, 'output_video_quality' : int, 'output_video_resolution' : str, 'output_video_fps' : float, diff --git a/facefusion/uis/choices.py b/facefusion/uis/choices.py index be98eb8..4d36dc4 100644 --- a/facefusion/uis/choices.py +++ b/facefusion/uis/choices.py @@ -1,11 +1,9 @@ from typing import List -from facefusion.uis.typing import JobManagerAction, JobRunnerAction, WebcamMode +from facefusion.uis.typing import JobManagerAction, JobRunnerAction job_manager_actions : List[JobManagerAction] = [ 'job-create', 'job-submit', 'job-delete', 'job-add-step', 'job-remix-step', 'job-insert-step', 'job-remove-step' ] job_runner_actions : List[JobRunnerAction] = [ 'job-run', 'job-run-all', 'job-retry', 'job-retry-all' ] common_options : List[str] = [ 'keep-temp' ] -webcam_modes : List[WebcamMode] = [ 'inline', 'udp', 'v4l2' ] -webcam_resolutions : List[str] = [ '320x240', '640x480', '800x600', '1024x768', '1280x720', '1280x960', '1920x1080', '2560x1440', '3840x2160' ] diff --git a/facefusion/uis/components/output_options.py b/facefusion/uis/components/output_options.py index 3011f2b..69175a2 100644 --- a/facefusion/uis/components/output_options.py +++ b/facefusion/uis/components/output_options.py @@ -6,7 +6,7 @@ import facefusion.choices from facefusion import state_manager, wording from facefusion.common_helper import calc_int_step from facefusion.filesystem import is_image, is_video -from facefusion.typing import Fps, OutputAudioEncoder, OutputVideoEncoder, OutputVideoPreset +from facefusion.typing import AudioEncoder, Fps, VideoEncoder, VideoPreset from facefusion.uis.core import get_ui_components, register_ui_component from facefusion.vision import create_image_resolutions, create_video_resolutions, detect_image_resolution, detect_video_fps, detect_video_resolution, pack_resolution @@ -159,7 +159,7 @@ def update_output_image_resolution(output_image_resolution : str) -> None: state_manager.set_item('output_image_resolution', output_image_resolution) -def update_output_audio_encoder(output_audio_encoder : OutputAudioEncoder) -> None: +def update_output_audio_encoder(output_audio_encoder : AudioEncoder) -> None: state_manager.set_item('output_audio_encoder', output_audio_encoder) @@ -171,11 +171,11 @@ def update_output_audio_volume(output_audio_volume: float) -> None: state_manager.set_item('output_audio_volume', int(output_audio_volume)) -def update_output_video_encoder(output_video_encoder : OutputVideoEncoder) -> None: +def update_output_video_encoder(output_video_encoder : VideoEncoder) -> None: state_manager.set_item('output_video_encoder', output_video_encoder) -def update_output_video_preset(output_video_preset : OutputVideoPreset) -> None: +def update_output_video_preset(output_video_preset : VideoPreset) -> None: state_manager.set_item('output_video_preset', output_video_preset) diff --git a/facefusion/uis/components/webcam.py b/facefusion/uis/components/webcam.py index 46ba38d..11b1e68 100644 --- a/facefusion/uis/components/webcam.py +++ b/facefusion/uis/components/webcam.py @@ -8,17 +8,16 @@ import cv2 import gradio from tqdm import tqdm -from facefusion import logger, state_manager, wording +from facefusion import ffmpeg_builder, logger, state_manager, wording from facefusion.audio import create_empty_audio_frame -from facefusion.common_helper import get_first, is_windows +from facefusion.common_helper import is_windows from facefusion.content_analyser import analyse_stream from facefusion.face_analyser import get_average_face, get_many_faces from facefusion.ffmpeg import open_ffmpeg -from facefusion.filesystem import filter_image_paths +from facefusion.filesystem import filter_image_paths, is_directory from facefusion.processors.core import get_processors_modules -from facefusion.typing import Face, Fps, VisionFrame +from facefusion.typing import Face, Fps, StreamMode, VisionFrame, WebcamMode from facefusion.uis.core import get_ui_component -from facefusion.uis.typing import StreamMode, WebcamMode from facefusion.vision import normalize_frame_color, read_static_images, unpack_resolution WEBCAM_CAPTURE : Optional[cv2.VideoCapture] = None @@ -164,17 +163,32 @@ def process_stream_frame(source_face : Face, target_vision_frame : VisionFrame) def open_stream(stream_mode : StreamMode, stream_resolution : str, stream_fps : Fps) -> subprocess.Popen[bytes]: - commands = [ '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-s', stream_resolution, '-r', str(stream_fps), '-i', '-'] + commands = ffmpeg_builder.chain( + ffmpeg_builder.capture_video(), + ffmpeg_builder.set_media_resolution(stream_resolution), + ffmpeg_builder.set_conditional_fps(stream_fps) + ) if stream_mode == 'udp': - commands.extend([ '-b:v', '2000k', '-f', 'mpegts', 'udp://localhost:27000?pkt_size=1316' ]) + commands.extend(ffmpeg_builder.set_input('-')) + commands.extend(ffmpeg_builder.set_stream_mode('udp')) + commands.extend(ffmpeg_builder.set_output('udp://localhost:27000?pkt_size=1316')) + if stream_mode == 'v4l2': - try: - device_name = get_first(os.listdir('/sys/devices/virtual/video4linux')) - if device_name: - commands.extend([ '-f', 'v4l2', '/dev/' + device_name ]) - except FileNotFoundError: + device_directory_path = '/sys/devices/virtual/video4linux' + + commands.extend(ffmpeg_builder.set_input('-')) + commands.extend(ffmpeg_builder.set_stream_mode('v4l2')) + if is_directory(device_directory_path): + device_names = os.listdir(device_directory_path) + + for device_name in device_names: + device_path = '/dev/' + device_name + commands.extend(ffmpeg_builder.set_output(device_path)) + + else: logger.error(wording.get('stream_not_loaded').format(stream_mode = stream_mode), __name__) + return open_ffmpeg(commands) diff --git a/facefusion/uis/components/webcam_options.py b/facefusion/uis/components/webcam_options.py index ec8a4d3..b7971c2 100644 --- a/facefusion/uis/components/webcam_options.py +++ b/facefusion/uis/components/webcam_options.py @@ -2,9 +2,9 @@ from typing import Optional import gradio +import facefusion.choices from facefusion import wording from facefusion.common_helper import get_first -from facefusion.uis import choices as uis_choices from facefusion.uis.components.webcam import get_available_webcam_ids from facefusion.uis.core import register_ui_component @@ -28,13 +28,13 @@ def render() -> None: ) WEBCAM_MODE_RADIO = gradio.Radio( label = wording.get('uis.webcam_mode_radio'), - choices = uis_choices.webcam_modes, + choices = facefusion.choices.webcam_modes, value = 'inline' ) WEBCAM_RESOLUTION_DROPDOWN = gradio.Dropdown( label = wording.get('uis.webcam_resolution_dropdown'), - choices = uis_choices.webcam_resolutions, - value = uis_choices.webcam_resolutions[0] + choices = facefusion.choices.webcam_resolutions, + value = facefusion.choices.webcam_resolutions[0] ) WEBCAM_FPS_SLIDER = gradio.Slider( label = wording.get('uis.webcam_fps_slider'), diff --git a/facefusion/uis/typing.py b/facefusion/uis/typing.py index 6de5730..7b564d2 100644 --- a/facefusion/uis/typing.py +++ b/facefusion/uis/typing.py @@ -81,6 +81,3 @@ ComponentName = Literal\ JobManagerAction = Literal['job-create', 'job-submit', 'job-delete', 'job-add-step', 'job-remix-step', 'job-insert-step', 'job-remove-step'] JobRunnerAction = Literal['job-run', 'job-run-all', 'job-retry', 'job-retry-all'] - -WebcamMode = Literal['inline', 'udp', 'v4l2'] -StreamMode = Literal['udp', 'v4l2'] diff --git a/tests/test_curl_builder.py b/tests/test_curl_builder.py new file mode 100644 index 0000000..21367e2 --- /dev/null +++ b/tests/test_curl_builder.py @@ -0,0 +1,17 @@ +from shutil import which + +from facefusion import curl_builder, metadata + + +def test_run() -> None: + user_agent = metadata.get('name') + '/' + metadata.get('version') + + assert curl_builder.run([]) == [ which('curl'), '--user-agent', user_agent, '--insecure', '--location', '--silent' ] + + +def test_chain() -> None: + commands = curl_builder.chain( + curl_builder.head(metadata.get('url')) + ) + + assert commands == [ '-I', metadata.get('url') ] diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index 5506885..855cfff 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -76,9 +76,9 @@ def test_concat_video() -> None: def test_read_audio_buffer() -> None: - assert isinstance(read_audio_buffer(get_test_example_file('source.mp3'), 1, 1), bytes) - assert isinstance(read_audio_buffer(get_test_example_file('source.wav'), 1, 1), bytes) - assert read_audio_buffer(get_test_example_file('invalid.mp3'), 1, 1) is None + assert isinstance(read_audio_buffer(get_test_example_file('source.mp3'), 1, 16, 1), bytes) + assert isinstance(read_audio_buffer(get_test_example_file('source.wav'), 1, 16, 1), bytes) + assert read_audio_buffer(get_test_example_file('invalid.mp3'), 1, 16, 1) is None def test_restore_audio() -> None: diff --git a/tests/test_ffmpeg_builder.py b/tests/test_ffmpeg_builder.py new file mode 100644 index 0000000..e180a86 --- /dev/null +++ b/tests/test_ffmpeg_builder.py @@ -0,0 +1,32 @@ +from shutil import which + +from facefusion import ffmpeg_builder + + +def test_run() -> None: + assert ffmpeg_builder.run([]) == [ which('ffmpeg'), '-loglevel', 'error' ] + + +def test_chain() -> None: + commands = ffmpeg_builder.chain( + ffmpeg_builder.set_progress() + ) + + assert commands == [ '-progress' ] + + +def test_stream_mode() -> None: + assert ffmpeg_builder.set_stream_mode('udp') == [ '-f', 'mpegts' ] + assert ffmpeg_builder.set_stream_mode('v4l2') == [ '-f', 'v4l2' ] + + +def test_select_frame_range() -> None: + assert ffmpeg_builder.select_frame_range(0, None, 30) == [ '-vf', 'trim=start_frame=0,fps=30' ] + assert ffmpeg_builder.select_frame_range(None, 100, 30) == [ '-vf', 'trim=end_frame=100,fps=30' ] + assert ffmpeg_builder.select_frame_range(0, 100, 30) == [ '-vf', 'trim=start_frame=0:end_frame=100,fps=30' ] + assert ffmpeg_builder.select_frame_range(None, None, 30) == [ '-vf', 'fps=30' ] + + +def test_audio_sample_size() -> None: + assert ffmpeg_builder.set_audio_sample_size(16) == [ '-f', 's16le', '-acodec', 'pcm_s16le' ] + assert ffmpeg_builder.set_audio_sample_size(32) == [ '-f', 's32le', '-acodec', 'pcm_s32le' ]