Feat/commands builder (#852)
* Protype for ffmpeg builder * Protype for ffmpeg builder * Add curl builder * Fix typing import * Adjust commands indent * Protype for ffmpeg builder part2 * Protype for ffmpeg builder part3 * Protype for ffmpeg builder part3 * Add chain() helper to the builders * Protype for ffmpeg builder part4 * Protype for ffmpeg builder part5 * Protoype for ffmpeg builder part5 * Protoype for ffmpeg builder part6 * Allow dynamic audio size * Fix testing * Protoype for ffmpeg builder part7 * Fix and polish ffmpeg builder * Hardcode the log level for ffmpeg * More ffmpeg rework * Prototype for ffmpeg builder part8 * Prototype for ffmpeg builder part9 * Fix CI * Fix Styles * Add lazy testing, User Agent for CURL * More testing * More testing
This commit is contained in:
@@ -3,7 +3,7 @@ from typing import Any, List, Optional
|
|||||||
|
|
||||||
import numpy
|
import numpy
|
||||||
import scipy
|
import scipy
|
||||||
from numpy._typing import NDArray
|
from numpy.typing import NDArray
|
||||||
|
|
||||||
from facefusion.ffmpeg import read_audio_buffer
|
from facefusion.ffmpeg import read_audio_buffer
|
||||||
from facefusion.filesystem import is_audio
|
from facefusion.filesystem import is_audio
|
||||||
@@ -17,11 +17,12 @@ def read_static_audio(audio_path : str, fps : Fps) -> Optional[List[AudioFrame]]
|
|||||||
|
|
||||||
|
|
||||||
def read_audio(audio_path : str, fps : Fps) -> Optional[List[AudioFrame]]:
|
def read_audio(audio_path : str, fps : Fps) -> Optional[List[AudioFrame]]:
|
||||||
sample_rate = 48000
|
audio_sample_rate = 48000
|
||||||
channel_total = 2
|
audio_sample_size = 16
|
||||||
|
audio_channel_total = 2
|
||||||
|
|
||||||
if is_audio(audio_path):
|
if is_audio(audio_path):
|
||||||
audio_buffer = read_audio_buffer(audio_path, sample_rate, channel_total)
|
audio_buffer = read_audio_buffer(audio_path, audio_sample_rate, audio_sample_size, audio_channel_total)
|
||||||
audio = numpy.frombuffer(audio_buffer, dtype = numpy.int16).reshape(-1, 2)
|
audio = numpy.frombuffer(audio_buffer, dtype = numpy.int16).reshape(-1, 2)
|
||||||
audio = prepare_audio(audio)
|
audio = prepare_audio(audio)
|
||||||
spectrogram = create_spectrogram(audio)
|
spectrogram = create_spectrogram(audio)
|
||||||
@@ -36,15 +37,16 @@ def read_static_voice(audio_path : str, fps : Fps) -> Optional[List[AudioFrame]]
|
|||||||
|
|
||||||
|
|
||||||
def read_voice(audio_path : str, fps : Fps) -> Optional[List[AudioFrame]]:
|
def read_voice(audio_path : str, fps : Fps) -> Optional[List[AudioFrame]]:
|
||||||
sample_rate = 48000
|
voice_sample_rate = 48000
|
||||||
channel_total = 2
|
voice_sample_size = 16
|
||||||
chunk_size = 240 * 1024
|
voice_channel_total = 2
|
||||||
step_size = 180 * 1024
|
voice_chunk_size = 240 * 1024
|
||||||
|
voice_step_size = 180 * 1024
|
||||||
|
|
||||||
if is_audio(audio_path):
|
if is_audio(audio_path):
|
||||||
audio_buffer = read_audio_buffer(audio_path, sample_rate, channel_total)
|
audio_buffer = read_audio_buffer(audio_path, voice_sample_rate, voice_sample_size, voice_channel_total)
|
||||||
audio = numpy.frombuffer(audio_buffer, dtype = numpy.int16).reshape(-1, 2)
|
audio = numpy.frombuffer(audio_buffer, dtype = numpy.int16).reshape(-1, 2)
|
||||||
audio = batch_extract_voice(audio, chunk_size, step_size)
|
audio = batch_extract_voice(audio, voice_chunk_size, voice_step_size)
|
||||||
audio = prepare_voice(audio)
|
audio = prepare_voice(audio)
|
||||||
spectrogram = create_spectrogram(audio)
|
spectrogram = create_spectrogram(audio)
|
||||||
audio_frames = extract_audio_frames(spectrogram, fps)
|
audio_frames = extract_audio_frames(spectrogram, fps)
|
||||||
@@ -60,6 +62,20 @@ def get_audio_frame(audio_path : str, fps : Fps, frame_number : int = 0) -> Opti
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def extract_audio_frames(spectrogram: Spectrogram, fps: Fps) -> List[AudioFrame]:
|
||||||
|
audio_frames = []
|
||||||
|
mel_filter_total = 80
|
||||||
|
audio_step_size = 16
|
||||||
|
indices = numpy.arange(0, spectrogram.shape[1], mel_filter_total / fps).astype(numpy.int16)
|
||||||
|
indices = indices[indices >= audio_step_size]
|
||||||
|
|
||||||
|
for index in indices:
|
||||||
|
start = max(0, index - audio_step_size)
|
||||||
|
audio_frames.append(spectrogram[:, start:index])
|
||||||
|
|
||||||
|
return audio_frames
|
||||||
|
|
||||||
|
|
||||||
def get_voice_frame(audio_path : str, fps : Fps, frame_number : int = 0) -> Optional[AudioFrame]:
|
def get_voice_frame(audio_path : str, fps : Fps, frame_number : int = 0) -> Optional[AudioFrame]:
|
||||||
if is_audio(audio_path):
|
if is_audio(audio_path):
|
||||||
voice_frames = read_static_voice(audio_path, fps)
|
voice_frames = read_static_voice(audio_path, fps)
|
||||||
@@ -70,8 +86,8 @@ def get_voice_frame(audio_path : str, fps : Fps, frame_number : int = 0) -> Opti
|
|||||||
|
|
||||||
def create_empty_audio_frame() -> AudioFrame:
|
def create_empty_audio_frame() -> AudioFrame:
|
||||||
mel_filter_total = 80
|
mel_filter_total = 80
|
||||||
step_size = 16
|
audio_step_size = 16
|
||||||
audio_frame = numpy.zeros((mel_filter_total, step_size)).astype(numpy.int16)
|
audio_frame = numpy.zeros((mel_filter_total, audio_step_size)).astype(numpy.int16)
|
||||||
return audio_frame
|
return audio_frame
|
||||||
|
|
||||||
|
|
||||||
@@ -84,10 +100,10 @@ def prepare_audio(audio : Audio) -> Audio:
|
|||||||
|
|
||||||
|
|
||||||
def prepare_voice(audio : Audio) -> Audio:
|
def prepare_voice(audio : Audio) -> Audio:
|
||||||
sample_rate = 48000
|
audio_sample_rate = 48000
|
||||||
resample_rate = 16000
|
audio_resample_rate = 16000
|
||||||
|
audio_resample_factor = round(len(audio) * audio_resample_rate / audio_sample_rate)
|
||||||
audio = scipy.signal.resample(audio, int(len(audio) * resample_rate / sample_rate))
|
audio = scipy.signal.resample(audio, audio_resample_factor)
|
||||||
audio = prepare_audio(audio)
|
audio = prepare_audio(audio)
|
||||||
return audio
|
return audio
|
||||||
|
|
||||||
@@ -101,19 +117,20 @@ def convert_mel_to_hertz(mel : Mel) -> NDArray[Any]:
|
|||||||
|
|
||||||
|
|
||||||
def create_mel_filter_bank() -> MelFilterBank:
|
def create_mel_filter_bank() -> MelFilterBank:
|
||||||
|
audio_sample_rate = 16000
|
||||||
|
audio_min_frequency = 55.0
|
||||||
|
audio_max_frequency = 7600.0
|
||||||
mel_filter_total = 80
|
mel_filter_total = 80
|
||||||
mel_bin_total = 800
|
mel_bin_total = 800
|
||||||
sample_rate = 16000
|
|
||||||
min_frequency = 55.0
|
|
||||||
max_frequency = 7600.0
|
|
||||||
mel_filter_bank = numpy.zeros((mel_filter_total, mel_bin_total // 2 + 1))
|
mel_filter_bank = numpy.zeros((mel_filter_total, mel_bin_total // 2 + 1))
|
||||||
mel_frequency_range = numpy.linspace(convert_hertz_to_mel(min_frequency), convert_hertz_to_mel(max_frequency), mel_filter_total + 2)
|
mel_frequency_range = numpy.linspace(convert_hertz_to_mel(audio_min_frequency), convert_hertz_to_mel(audio_max_frequency), mel_filter_total + 2)
|
||||||
indices = numpy.floor((mel_bin_total + 1) * convert_mel_to_hertz(mel_frequency_range) / sample_rate).astype(numpy.int16)
|
indices = numpy.floor((mel_bin_total + 1) * convert_mel_to_hertz(mel_frequency_range) / audio_sample_rate).astype(numpy.int16)
|
||||||
|
|
||||||
for index in range(mel_filter_total):
|
for index in range(mel_filter_total):
|
||||||
start = indices[index]
|
start = indices[index]
|
||||||
end = indices[index + 1]
|
end = indices[index + 1]
|
||||||
mel_filter_bank[index, start:end] = scipy.signal.windows.triang(end - start)
|
mel_filter_bank[index, start:end] = scipy.signal.windows.triang(end - start)
|
||||||
|
|
||||||
return mel_filter_bank
|
return mel_filter_bank
|
||||||
|
|
||||||
|
|
||||||
@@ -124,16 +141,3 @@ def create_spectrogram(audio : Audio) -> Spectrogram:
|
|||||||
spectrogram = scipy.signal.stft(audio, nperseg = mel_bin_total, nfft = mel_bin_total, noverlap = mel_bin_overlap)[2]
|
spectrogram = scipy.signal.stft(audio, nperseg = mel_bin_total, nfft = mel_bin_total, noverlap = mel_bin_overlap)[2]
|
||||||
spectrogram = numpy.dot(mel_filter_bank, numpy.abs(spectrogram))
|
spectrogram = numpy.dot(mel_filter_bank, numpy.abs(spectrogram))
|
||||||
return spectrogram
|
return spectrogram
|
||||||
|
|
||||||
|
|
||||||
def extract_audio_frames(spectrogram : Spectrogram, fps : Fps) -> List[AudioFrame]:
|
|
||||||
mel_filter_total = 80
|
|
||||||
step_size = 16
|
|
||||||
audio_frames = []
|
|
||||||
indices = numpy.arange(0, spectrogram.shape[1], mel_filter_total / fps).astype(numpy.int16)
|
|
||||||
indices = indices[indices >= step_size]
|
|
||||||
|
|
||||||
for index in indices:
|
|
||||||
start = max(0, index - step_size)
|
|
||||||
audio_frames.append(spectrogram[:, start:index])
|
|
||||||
return audio_frames
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import logging
|
|||||||
from typing import List, Sequence
|
from typing import List, Sequence
|
||||||
|
|
||||||
from facefusion.common_helper import create_float_range, create_int_range
|
from facefusion.common_helper import create_float_range, create_int_range
|
||||||
from facefusion.typing import Angle, AudioFormat, AudioTypeSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, OutputAudioEncoder, OutputVideoEncoder, OutputVideoPreset, Race, Score, UiWorkflow, VideoFormat, VideoMemoryStrategy, VideoTypeSet
|
from facefusion.typing import Angle, AudioEncoder, AudioFormat, AudioTypeSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, Race, Score, UiWorkflow, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoTypeSet, WebcamMode
|
||||||
|
|
||||||
face_detector_set : FaceDetectorSet =\
|
face_detector_set : FaceDetectorSet =\
|
||||||
{
|
{
|
||||||
@@ -61,13 +61,16 @@ image_formats : List[ImageFormat] = list(image_type_set.keys())
|
|||||||
video_formats : List[VideoFormat] = list(video_type_set.keys())
|
video_formats : List[VideoFormat] = list(video_type_set.keys())
|
||||||
temp_frame_formats : List[ImageFormat] = [ 'bmp', 'jpg', 'png' ]
|
temp_frame_formats : List[ImageFormat] = [ 'bmp', 'jpg', 'png' ]
|
||||||
|
|
||||||
output_audio_encoders : List[OutputAudioEncoder] = [ 'aac', 'libmp3lame', 'libopus', 'libvorbis' ]
|
output_audio_encoders : List[AudioEncoder] = [ 'aac', 'libmp3lame', 'libopus', 'libvorbis' ]
|
||||||
output_video_encoders : List[OutputVideoEncoder] = [ 'libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox' ]
|
output_video_encoders : List[VideoEncoder] = [ 'libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox' ]
|
||||||
output_video_presets : List[OutputVideoPreset] = [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ]
|
output_video_presets : List[VideoPreset] = [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ]
|
||||||
|
|
||||||
image_template_sizes : List[float] = [ 0.25, 0.5, 0.75, 1, 1.5, 2, 2.5, 3, 3.5, 4 ]
|
image_template_sizes : List[float] = [ 0.25, 0.5, 0.75, 1, 1.5, 2, 2.5, 3, 3.5, 4 ]
|
||||||
video_template_sizes : List[int] = [ 240, 360, 480, 540, 720, 1080, 1440, 2160, 4320 ]
|
video_template_sizes : List[int] = [ 240, 360, 480, 540, 720, 1080, 1440, 2160, 4320 ]
|
||||||
|
|
||||||
|
webcam_modes : List[WebcamMode] = [ 'inline', 'udp', 'v4l2' ]
|
||||||
|
webcam_resolutions : List[str] = [ '320x240', '640x480', '800x600', '1024x768', '1280x720', '1280x960', '1920x1080', '2560x1440', '3840x2160' ]
|
||||||
|
|
||||||
execution_provider_set : ExecutionProviderSet =\
|
execution_provider_set : ExecutionProviderSet =\
|
||||||
{
|
{
|
||||||
'cpu': 'CPUExecutionProvider',
|
'cpu': 'CPUExecutionProvider',
|
||||||
|
|||||||
27
facefusion/curl_builder.py
Normal file
27
facefusion/curl_builder.py
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
import itertools
|
||||||
|
import shutil
|
||||||
|
|
||||||
|
from facefusion import metadata
|
||||||
|
from facefusion.typing import Commands
|
||||||
|
|
||||||
|
|
||||||
|
def run(commands : Commands) -> Commands:
|
||||||
|
user_agent = metadata.get('name') + '/' + metadata.get('version')
|
||||||
|
|
||||||
|
return [ shutil.which('curl'), '--user-agent', user_agent, '--insecure', '--location', '--silent' ] + commands
|
||||||
|
|
||||||
|
|
||||||
|
def chain(*commands : Commands) -> Commands:
|
||||||
|
return list(itertools.chain(*commands))
|
||||||
|
|
||||||
|
|
||||||
|
def head(url : str) -> Commands:
|
||||||
|
return [ '-I', url ]
|
||||||
|
|
||||||
|
|
||||||
|
def download(url : str, download_file_path : str) -> Commands:
|
||||||
|
return [ '--create-dirs', '--continue-at', '-', '--output', download_file_path, url ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_timeout(timeout : int) -> Commands:
|
||||||
|
return [ '--connect-timeout', str(timeout) ]
|
||||||
@@ -1,5 +1,4 @@
|
|||||||
import os
|
import os
|
||||||
import shutil
|
|
||||||
import subprocess
|
import subprocess
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from typing import List, Optional, Tuple
|
from typing import List, Optional, Tuple
|
||||||
@@ -8,14 +7,14 @@ from urllib.parse import urlparse
|
|||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
|
||||||
import facefusion.choices
|
import facefusion.choices
|
||||||
from facefusion import logger, process_manager, state_manager, wording
|
from facefusion import curl_builder, logger, process_manager, state_manager, wording
|
||||||
from facefusion.filesystem import get_file_name, get_file_size, is_file, remove_file
|
from facefusion.filesystem import get_file_name, get_file_size, is_file, remove_file
|
||||||
from facefusion.hash_helper import validate_hash
|
from facefusion.hash_helper import validate_hash
|
||||||
from facefusion.typing import Commands, DownloadProvider, DownloadSet
|
from facefusion.typing import Commands, DownloadProvider, DownloadSet
|
||||||
|
|
||||||
|
|
||||||
def open_curl(commands : Commands) -> subprocess.Popen[bytes]:
|
def open_curl(commands : Commands) -> subprocess.Popen[bytes]:
|
||||||
commands = [ shutil.which('curl'), '--silent', '--insecure', '--location' ] + commands
|
commands = curl_builder.run(commands)
|
||||||
return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE)
|
return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||||
|
|
||||||
|
|
||||||
@@ -28,7 +27,10 @@ def conditional_download(download_directory_path : str, urls : List[str]) -> Non
|
|||||||
|
|
||||||
if initial_size < download_size:
|
if initial_size < download_size:
|
||||||
with tqdm(total = download_size, initial = initial_size, desc = wording.get('downloading'), unit = 'B', unit_scale = True, unit_divisor = 1024, ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
|
with tqdm(total = download_size, initial = initial_size, desc = wording.get('downloading'), unit = 'B', unit_scale = True, unit_divisor = 1024, ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
|
||||||
commands = [ '--create-dirs', '--continue-at', '-', '--output', download_file_path, url, '--connect-timeout', '10' ]
|
commands = curl_builder.chain(
|
||||||
|
curl_builder.download(url, download_file_path),
|
||||||
|
curl_builder.set_timeout(10)
|
||||||
|
)
|
||||||
open_curl(commands)
|
open_curl(commands)
|
||||||
current_size = initial_size
|
current_size = initial_size
|
||||||
progress.set_postfix(download_providers = state_manager.get_item('download_providers'), file_name = download_file_name)
|
progress.set_postfix(download_providers = state_manager.get_item('download_providers'), file_name = download_file_name)
|
||||||
@@ -41,7 +43,10 @@ def conditional_download(download_directory_path : str, urls : List[str]) -> Non
|
|||||||
|
|
||||||
@lru_cache(maxsize = None)
|
@lru_cache(maxsize = None)
|
||||||
def get_static_download_size(url : str) -> int:
|
def get_static_download_size(url : str) -> int:
|
||||||
commands = [ '-I', url, '--connect-timeout', '5' ]
|
commands = curl_builder.chain(
|
||||||
|
curl_builder.head(url),
|
||||||
|
curl_builder.set_timeout(5)
|
||||||
|
)
|
||||||
process = open_curl(commands)
|
process = open_curl(commands)
|
||||||
lines = reversed(process.stdout.readlines())
|
lines = reversed(process.stdout.readlines())
|
||||||
|
|
||||||
@@ -56,7 +61,10 @@ def get_static_download_size(url : str) -> int:
|
|||||||
|
|
||||||
@lru_cache(maxsize = None)
|
@lru_cache(maxsize = None)
|
||||||
def ping_static_url(url : str) -> bool:
|
def ping_static_url(url : str) -> bool:
|
||||||
commands = [ '-I', url, '--connect-timeout', '5' ]
|
commands = curl_builder.chain(
|
||||||
|
curl_builder.head(url),
|
||||||
|
curl_builder.set_timeout(5)
|
||||||
|
)
|
||||||
process = open_curl(commands)
|
process = open_curl(commands)
|
||||||
process.communicate()
|
process.communicate()
|
||||||
return process.returncode == 0
|
return process.returncode == 0
|
||||||
|
|||||||
@@ -1,21 +1,22 @@
|
|||||||
import os
|
import os
|
||||||
import shutil
|
|
||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
|
||||||
from facefusion import logger, process_manager, state_manager, wording
|
from facefusion import ffmpeg_builder, logger, process_manager, state_manager, wording
|
||||||
from facefusion.filesystem import get_file_format, remove_file
|
from facefusion.filesystem import get_file_format, remove_file
|
||||||
from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern, resolve_temp_frame_paths
|
from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern, resolve_temp_frame_paths
|
||||||
from facefusion.typing import AudioBuffer, Commands, Fps, OutputVideoPreset, UpdateProgress
|
from facefusion.typing import AudioBuffer, Commands, Fps, UpdateProgress
|
||||||
from facefusion.vision import count_trim_frame_total, detect_video_duration, restrict_video_fps
|
from facefusion.vision import count_trim_frame_total, detect_video_duration, restrict_video_fps
|
||||||
|
|
||||||
|
|
||||||
def run_ffmpeg_with_progress(commands : Commands, update_progress : UpdateProgress) -> subprocess.Popen[bytes]:
|
def run_ffmpeg_with_progress(commands : Commands, update_progress : UpdateProgress) -> subprocess.Popen[bytes]:
|
||||||
log_level = state_manager.get_item('log_level')
|
log_level = state_manager.get_item('log_level')
|
||||||
commands = [ shutil.which('ffmpeg'), '-hide_banner', '-nostats', '-loglevel', 'error', '-progress', '-' ] + commands
|
commands.extend(ffmpeg_builder.set_progress())
|
||||||
|
commands.extend(ffmpeg_builder.cast_stream())
|
||||||
|
commands = ffmpeg_builder.run(commands)
|
||||||
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
|
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||||
|
|
||||||
while process_manager.is_processing():
|
while process_manager.is_processing():
|
||||||
@@ -40,7 +41,7 @@ def run_ffmpeg_with_progress(commands : Commands, update_progress : UpdateProgre
|
|||||||
|
|
||||||
def run_ffmpeg(commands : Commands) -> subprocess.Popen[bytes]:
|
def run_ffmpeg(commands : Commands) -> subprocess.Popen[bytes]:
|
||||||
log_level = state_manager.get_item('log_level')
|
log_level = state_manager.get_item('log_level')
|
||||||
commands = [ shutil.which('ffmpeg'), '-hide_banner', '-nostats', '-loglevel', 'error' ] + commands
|
commands = ffmpeg_builder.run(commands)
|
||||||
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
|
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||||
|
|
||||||
while process_manager.is_processing():
|
while process_manager.is_processing():
|
||||||
@@ -58,7 +59,7 @@ def run_ffmpeg(commands : Commands) -> subprocess.Popen[bytes]:
|
|||||||
|
|
||||||
|
|
||||||
def open_ffmpeg(commands : Commands) -> subprocess.Popen[bytes]:
|
def open_ffmpeg(commands : Commands) -> subprocess.Popen[bytes]:
|
||||||
commands = [ shutil.which('ffmpeg'), '-loglevel', 'quiet' ] + commands
|
commands = ffmpeg_builder.run(commands)
|
||||||
return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE)
|
return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||||
|
|
||||||
|
|
||||||
@@ -74,17 +75,14 @@ def log_debug(process : subprocess.Popen[bytes]) -> None:
|
|||||||
def extract_frames(target_path : str, temp_video_resolution : str, temp_video_fps : Fps, trim_frame_start : int, trim_frame_end : int) -> bool:
|
def extract_frames(target_path : str, temp_video_resolution : str, temp_video_fps : Fps, trim_frame_start : int, trim_frame_end : int) -> bool:
|
||||||
extract_frame_total = count_trim_frame_total(target_path, trim_frame_start, trim_frame_end)
|
extract_frame_total = count_trim_frame_total(target_path, trim_frame_start, trim_frame_end)
|
||||||
temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d')
|
temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d')
|
||||||
commands = [ '-i', target_path, '-s', str(temp_video_resolution), '-q:v', '0' ]
|
commands = ffmpeg_builder.chain(
|
||||||
|
ffmpeg_builder.set_input(target_path),
|
||||||
if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int):
|
ffmpeg_builder.set_media_resolution(temp_video_resolution),
|
||||||
commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ':end_frame=' + str(trim_frame_end) + ',fps=' + str(temp_video_fps) ])
|
ffmpeg_builder.set_frame_quality(0),
|
||||||
elif isinstance(trim_frame_start, int):
|
ffmpeg_builder.select_frame_range(trim_frame_start, trim_frame_end, temp_video_fps),
|
||||||
commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ',fps=' + str(temp_video_fps) ])
|
ffmpeg_builder.prevent_frame_drop(),
|
||||||
elif isinstance(trim_frame_end, int):
|
ffmpeg_builder.set_output(temp_frames_pattern)
|
||||||
commands.extend([ '-vf', 'trim=end_frame=' + str(trim_frame_end) + ',fps=' + str(temp_video_fps) ])
|
)
|
||||||
else:
|
|
||||||
commands.extend([ '-vf', 'fps=' + str(temp_video_fps) ])
|
|
||||||
commands.extend([ '-vsync', '0', temp_frames_pattern ])
|
|
||||||
|
|
||||||
with tqdm(total = extract_frame_total, desc = wording.get('extracting'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
|
with tqdm(total = extract_frame_total, desc = wording.get('extracting'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
|
||||||
process = run_ffmpeg_with_progress(commands, lambda frame_number: progress.update(frame_number - progress.n))
|
process = run_ffmpeg_with_progress(commands, lambda frame_number: progress.update(frame_number - progress.n))
|
||||||
@@ -93,27 +91,37 @@ def extract_frames(target_path : str, temp_video_resolution : str, temp_video_fp
|
|||||||
|
|
||||||
def copy_image(target_path : str, temp_image_resolution : str) -> bool:
|
def copy_image(target_path : str, temp_image_resolution : str) -> bool:
|
||||||
temp_file_path = get_temp_file_path(target_path)
|
temp_file_path = get_temp_file_path(target_path)
|
||||||
if get_file_format(target_path) == 'webp':
|
commands = ffmpeg_builder.chain(
|
||||||
output_image_compression = 100
|
ffmpeg_builder.set_input(target_path),
|
||||||
else:
|
ffmpeg_builder.set_media_resolution(temp_image_resolution),
|
||||||
output_image_compression = 1
|
ffmpeg_builder.set_image_quality(target_path, 100),
|
||||||
commands = [ '-i', target_path, '-s', str(temp_image_resolution), '-q:v', str(output_image_compression), '-y', temp_file_path ]
|
ffmpeg_builder.force_output(temp_file_path)
|
||||||
|
)
|
||||||
return run_ffmpeg(commands).returncode == 0
|
return run_ffmpeg(commands).returncode == 0
|
||||||
|
|
||||||
|
|
||||||
def finalize_image(target_path : str, output_path : str, output_image_resolution : str) -> bool:
|
def finalize_image(target_path : str, output_path : str, output_image_resolution : str) -> bool:
|
||||||
output_image_quality = state_manager.get_item('output_image_quality')
|
output_image_quality = state_manager.get_item('output_image_quality')
|
||||||
temp_file_path = get_temp_file_path(target_path)
|
temp_file_path = get_temp_file_path(target_path)
|
||||||
if get_file_format(target_path) == 'webp':
|
commands = ffmpeg_builder.chain(
|
||||||
output_image_compression = output_image_quality
|
ffmpeg_builder.set_input(temp_file_path),
|
||||||
else:
|
ffmpeg_builder.set_media_resolution(output_image_resolution),
|
||||||
output_image_compression = round(31 - (output_image_quality * 0.31))
|
ffmpeg_builder.set_image_quality(target_path, output_image_quality),
|
||||||
commands = [ '-i', temp_file_path, '-s', str(output_image_resolution), '-q:v', str(output_image_compression), '-y', output_path ]
|
ffmpeg_builder.force_output(output_path)
|
||||||
|
)
|
||||||
return run_ffmpeg(commands).returncode == 0
|
return run_ffmpeg(commands).returncode == 0
|
||||||
|
|
||||||
|
|
||||||
def read_audio_buffer(target_path : str, sample_rate : int, channel_total : int) -> Optional[AudioBuffer]:
|
def read_audio_buffer(target_path : str, audio_sample_rate : int, audio_sample_size : int, audio_channel_total : int) -> Optional[AudioBuffer]:
|
||||||
commands = [ '-i', target_path, '-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sample_rate), '-ac', str(channel_total), '-' ]
|
commands = ffmpeg_builder.chain(
|
||||||
|
ffmpeg_builder.set_input(target_path),
|
||||||
|
ffmpeg_builder.ignore_video_stream(),
|
||||||
|
ffmpeg_builder.set_audio_sample_rate(audio_sample_rate),
|
||||||
|
ffmpeg_builder.set_audio_sample_size(audio_sample_size),
|
||||||
|
ffmpeg_builder.set_audio_channel_total(audio_channel_total),
|
||||||
|
ffmpeg_builder.cast_stream()
|
||||||
|
)
|
||||||
|
|
||||||
process = open_ffmpeg(commands)
|
process = open_ffmpeg(commands)
|
||||||
audio_buffer, _ = process.communicate()
|
audio_buffer, _ = process.communicate()
|
||||||
if process.returncode == 0:
|
if process.returncode == 0:
|
||||||
@@ -127,26 +135,20 @@ def restore_audio(target_path : str, output_path : str, output_video_fps : Fps,
|
|||||||
output_audio_volume = state_manager.get_item('output_audio_volume')
|
output_audio_volume = state_manager.get_item('output_audio_volume')
|
||||||
temp_file_path = get_temp_file_path(target_path)
|
temp_file_path = get_temp_file_path(target_path)
|
||||||
temp_video_duration = detect_video_duration(temp_file_path)
|
temp_video_duration = detect_video_duration(temp_file_path)
|
||||||
commands = [ '-i', temp_file_path ]
|
|
||||||
|
|
||||||
if isinstance(trim_frame_start, int):
|
commands = ffmpeg_builder.chain(
|
||||||
start_time = trim_frame_start / output_video_fps
|
ffmpeg_builder.set_input(temp_file_path),
|
||||||
commands.extend([ '-ss', str(start_time) ])
|
ffmpeg_builder.select_media_range(trim_frame_start, trim_frame_end, output_video_fps),
|
||||||
if isinstance(trim_frame_end, int):
|
ffmpeg_builder.set_input(target_path),
|
||||||
end_time = trim_frame_end / output_video_fps
|
ffmpeg_builder.copy_video_encoder(),
|
||||||
commands.extend([ '-to', str(end_time) ])
|
ffmpeg_builder.set_audio_encoder(output_audio_encoder),
|
||||||
commands.extend([ '-i', target_path, '-c:v', 'copy', '-c:a', output_audio_encoder ])
|
ffmpeg_builder.set_audio_quality(output_audio_encoder, output_audio_quality),
|
||||||
if output_audio_encoder in [ 'aac' ]:
|
ffmpeg_builder.set_audio_volume(output_audio_volume),
|
||||||
output_audio_compression = round(10 - (output_audio_quality * 0.9))
|
ffmpeg_builder.select_media_stream('0:v:0'),
|
||||||
commands.extend([ '-q:a', str(output_audio_compression) ])
|
ffmpeg_builder.select_media_stream('1:a:0'),
|
||||||
if output_audio_encoder in [ 'libmp3lame' ]:
|
ffmpeg_builder.set_video_duration(temp_video_duration),
|
||||||
output_audio_compression = round(9 - (output_audio_quality * 0.9))
|
ffmpeg_builder.force_output(output_path)
|
||||||
commands.extend([ '-q:a', str(output_audio_compression) ])
|
)
|
||||||
if output_audio_encoder in [ 'libopus', 'libvorbis' ]:
|
|
||||||
output_audio_compression = round((100 - output_audio_quality) / 10)
|
|
||||||
commands.extend([ '-q:a', str(output_audio_compression) ])
|
|
||||||
output_audio_volume = output_audio_volume / 100
|
|
||||||
commands.extend([ '-filter:a', 'volume=' + str(output_audio_volume), '-map', '0:v:0', '-map', '1:a:0', '-t', str(temp_video_duration), '-y', output_path ])
|
|
||||||
return run_ffmpeg(commands).returncode == 0
|
return run_ffmpeg(commands).returncode == 0
|
||||||
|
|
||||||
|
|
||||||
@@ -156,19 +158,17 @@ def replace_audio(target_path : str, audio_path : str, output_path : str) -> boo
|
|||||||
output_audio_volume = state_manager.get_item('output_audio_volume')
|
output_audio_volume = state_manager.get_item('output_audio_volume')
|
||||||
temp_file_path = get_temp_file_path(target_path)
|
temp_file_path = get_temp_file_path(target_path)
|
||||||
temp_video_duration = detect_video_duration(temp_file_path)
|
temp_video_duration = detect_video_duration(temp_file_path)
|
||||||
commands = [ '-i', temp_file_path, '-i', audio_path, '-c:v', 'copy', '-c:a', output_audio_encoder ]
|
|
||||||
|
|
||||||
if output_audio_encoder in [ 'aac' ]:
|
commands = ffmpeg_builder.chain(
|
||||||
output_audio_compression = round(10 - (output_audio_quality * 0.9))
|
ffmpeg_builder.set_input(temp_file_path),
|
||||||
commands.extend([ '-q:a', str(output_audio_compression) ])
|
ffmpeg_builder.set_input(audio_path),
|
||||||
if output_audio_encoder in [ 'libmp3lame' ]:
|
ffmpeg_builder.copy_video_encoder(),
|
||||||
output_audio_compression = round(9 - (output_audio_quality * 0.9))
|
ffmpeg_builder.set_audio_encoder(output_audio_encoder),
|
||||||
commands.extend([ '-q:a', str(output_audio_compression) ])
|
ffmpeg_builder.set_audio_quality(output_audio_encoder, output_audio_quality),
|
||||||
if output_audio_encoder in [ 'libopus', 'libvorbis' ]:
|
ffmpeg_builder.set_audio_volume(output_audio_volume),
|
||||||
output_audio_compression = round((100 - output_audio_quality) / 10)
|
ffmpeg_builder.set_video_duration(temp_video_duration),
|
||||||
commands.extend([ '-q:a', str(output_audio_compression) ])
|
ffmpeg_builder.force_output(output_path)
|
||||||
output_audio_volume = output_audio_volume / 100
|
)
|
||||||
commands.extend([ '-filter:a', 'volume=' + str(output_audio_volume), '-t', str(temp_video_duration), '-y', output_path ])
|
|
||||||
return run_ffmpeg(commands).returncode == 0
|
return run_ffmpeg(commands).returncode == 0
|
||||||
|
|
||||||
|
|
||||||
@@ -183,22 +183,19 @@ def merge_video(target_path : str, output_video_resolution : str, output_video_f
|
|||||||
|
|
||||||
if get_file_format(target_path) == 'webm':
|
if get_file_format(target_path) == 'webm':
|
||||||
output_video_encoder = 'libvpx-vp9'
|
output_video_encoder = 'libvpx-vp9'
|
||||||
commands = [ '-r', str(temp_video_fps), '-i', temp_frames_pattern, '-s', str(output_video_resolution), '-c:v', output_video_encoder ]
|
|
||||||
if output_video_encoder in [ 'libx264', 'libx265' ]:
|
commands = ffmpeg_builder.chain(
|
||||||
output_video_compression = round(51 - (output_video_quality * 0.51))
|
ffmpeg_builder.set_conditional_fps(temp_video_fps),
|
||||||
commands.extend([ '-crf', str(output_video_compression), '-preset', output_video_preset ])
|
ffmpeg_builder.set_input(temp_frames_pattern),
|
||||||
if output_video_encoder in [ 'libvpx-vp9' ]:
|
ffmpeg_builder.set_video_encoder(output_video_encoder),
|
||||||
output_video_compression = round(63 - (output_video_quality * 0.63))
|
ffmpeg_builder.set_media_resolution(output_video_resolution),
|
||||||
commands.extend([ '-crf', str(output_video_compression) ])
|
ffmpeg_builder.set_video_quality(output_video_encoder, output_video_quality),
|
||||||
if output_video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]:
|
ffmpeg_builder.set_video_preset(output_video_encoder, output_video_preset),
|
||||||
output_video_compression = round(51 - (output_video_quality * 0.51))
|
ffmpeg_builder.set_video_fps(output_video_fps),
|
||||||
commands.extend([ '-cq', str(output_video_compression), '-preset', map_nvenc_preset(output_video_preset) ])
|
ffmpeg_builder.set_pixel_format('yuv420p'),
|
||||||
if output_video_encoder in [ 'h264_amf', 'hevc_amf' ]:
|
ffmpeg_builder.set_video_colorspace('bt709'),
|
||||||
output_video_compression = round(51 - (output_video_quality * 0.51))
|
ffmpeg_builder.force_output(temp_file_path)
|
||||||
commands.extend([ '-qp_i', str(output_video_compression), '-qp_p', str(output_video_compression), '-quality', map_amf_preset(output_video_preset) ])
|
)
|
||||||
if output_video_encoder in [ 'h264_videotoolbox', 'hevc_videotoolbox' ]:
|
|
||||||
commands.extend([ '-q:v', str(output_video_quality) ])
|
|
||||||
commands.extend([ '-vf', 'framerate=fps=' + str(output_video_fps), '-pix_fmt', 'yuv420p', '-colorspace', 'bt709', '-y', temp_file_path ])
|
|
||||||
|
|
||||||
with tqdm(total = merge_frame_total, desc = wording.get('merging'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
|
with tqdm(total = merge_frame_total, desc = wording.get('merging'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
|
||||||
process = run_ffmpeg_with_progress(commands, lambda frame_number: progress.update(frame_number - progress.n))
|
process = run_ffmpeg_with_progress(commands, lambda frame_number: progress.update(frame_number - progress.n))
|
||||||
@@ -213,38 +210,16 @@ def concat_video(output_path : str, temp_output_paths : List[str]) -> bool:
|
|||||||
concat_video_file.write('file \'' + os.path.abspath(temp_output_path) + '\'' + os.linesep)
|
concat_video_file.write('file \'' + os.path.abspath(temp_output_path) + '\'' + os.linesep)
|
||||||
concat_video_file.flush()
|
concat_video_file.flush()
|
||||||
concat_video_file.close()
|
concat_video_file.close()
|
||||||
commands = [ '-f', 'concat', '-safe', '0', '-i', concat_video_file.name, '-c:v', 'copy', '-c:a', 'copy', '-y', os.path.abspath(output_path) ]
|
|
||||||
|
output_path = os.path.abspath(output_path)
|
||||||
|
commands = ffmpeg_builder.chain(
|
||||||
|
ffmpeg_builder.unsafe_concat(),
|
||||||
|
ffmpeg_builder.set_input(concat_video_file.name),
|
||||||
|
ffmpeg_builder.copy_video_encoder(),
|
||||||
|
ffmpeg_builder.copy_audio_encoder(),
|
||||||
|
ffmpeg_builder.force_output(output_path)
|
||||||
|
)
|
||||||
process = run_ffmpeg(commands)
|
process = run_ffmpeg(commands)
|
||||||
process.communicate()
|
process.communicate()
|
||||||
remove_file(concat_video_path)
|
remove_file(concat_video_path)
|
||||||
return process.returncode == 0
|
return process.returncode == 0
|
||||||
|
|
||||||
|
|
||||||
def map_nvenc_preset(output_video_preset : OutputVideoPreset) -> Optional[str]:
|
|
||||||
if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast' ]:
|
|
||||||
return 'fast'
|
|
||||||
if output_video_preset == 'medium':
|
|
||||||
return 'medium'
|
|
||||||
if output_video_preset in [ 'slow', 'slower', 'veryslow' ]:
|
|
||||||
return 'slow'
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def map_amf_preset(output_video_preset : OutputVideoPreset) -> Optional[str]:
|
|
||||||
if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast' ]:
|
|
||||||
return 'speed'
|
|
||||||
if output_video_preset in [ 'faster', 'fast', 'medium' ]:
|
|
||||||
return 'balanced'
|
|
||||||
if output_video_preset in [ 'slow', 'slower', 'veryslow' ]:
|
|
||||||
return 'quality'
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def map_qsv_preset(output_video_preset : OutputVideoPreset) -> Optional[str]:
|
|
||||||
if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast' ]:
|
|
||||||
return 'fast'
|
|
||||||
if output_video_preset == 'medium':
|
|
||||||
return 'medium'
|
|
||||||
if output_video_preset in [ 'slow', 'slower', 'veryslow' ]:
|
|
||||||
return 'slow'
|
|
||||||
return None
|
|
||||||
|
|||||||
226
facefusion/ffmpeg_builder.py
Normal file
226
facefusion/ffmpeg_builder.py
Normal file
@@ -0,0 +1,226 @@
|
|||||||
|
import itertools
|
||||||
|
import shutil
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from facefusion.filesystem import get_file_format
|
||||||
|
from facefusion.typing import AudioEncoder, Commands, Duration, Fps, StreamMode, VideoEncoder, VideoPreset
|
||||||
|
|
||||||
|
|
||||||
|
def run(commands : Commands) -> Commands:
|
||||||
|
return [ shutil.which('ffmpeg'), '-loglevel', 'error' ] + commands
|
||||||
|
|
||||||
|
|
||||||
|
def chain(*commands : Commands) -> Commands:
|
||||||
|
return list(itertools.chain(*commands))
|
||||||
|
|
||||||
|
|
||||||
|
def set_progress() -> Commands:
|
||||||
|
return [ '-progress' ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_input(input_path : str) -> Commands:
|
||||||
|
return [ '-i', input_path ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_conditional_fps(conditional_fps : Fps) -> Commands:
|
||||||
|
return [ '-r', str(conditional_fps) ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_output(output_path : str) -> Commands:
|
||||||
|
return [ output_path ]
|
||||||
|
|
||||||
|
|
||||||
|
def force_output(output_path : str) -> Commands:
|
||||||
|
return [ '-y', output_path ]
|
||||||
|
|
||||||
|
|
||||||
|
def cast_stream() -> Commands:
|
||||||
|
return [ '-' ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_stream_mode(stream_mode : StreamMode) -> Commands:
|
||||||
|
if stream_mode == 'udp':
|
||||||
|
return [ '-f', 'mpegts' ]
|
||||||
|
if stream_mode == 'v4l2':
|
||||||
|
return [ '-f', 'v4l2' ]
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def unsafe_concat() -> Commands:
|
||||||
|
return [ '-f', 'concat', '-safe', '0' ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_pixel_format(pixel_format : str) -> Commands:
|
||||||
|
return [ '-pix_fmt', pixel_format ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_frame_quality(frame_quality : int) -> Commands:
|
||||||
|
return [ '-q:v', str(frame_quality) ]
|
||||||
|
|
||||||
|
|
||||||
|
def select_frame_range(frame_start : int, frame_end : int, video_fps : Fps) -> Commands:
|
||||||
|
if isinstance(frame_start, int) and isinstance(frame_end, int):
|
||||||
|
return [ '-vf', 'trim=start_frame=' + str(frame_start) + ':end_frame=' + str(frame_end) + ',fps=' + str(video_fps) ]
|
||||||
|
if isinstance(frame_start, int):
|
||||||
|
return [ '-vf', 'trim=start_frame=' + str(frame_start) + ',fps=' + str(video_fps) ]
|
||||||
|
if isinstance(frame_end, int):
|
||||||
|
return [ '-vf', 'trim=end_frame=' + str(frame_end) + ',fps=' + str(video_fps) ]
|
||||||
|
return [ '-vf', 'fps=' + str(video_fps) ]
|
||||||
|
|
||||||
|
|
||||||
|
def prevent_frame_drop() -> Commands:
|
||||||
|
return [ '-vsync', '0' ]
|
||||||
|
|
||||||
|
|
||||||
|
def select_media_range(frame_start : int, frame_end : int, media_fps : Fps) -> Commands:
|
||||||
|
commands = []
|
||||||
|
|
||||||
|
if isinstance(frame_start, int):
|
||||||
|
commands.extend([ '-ss', str(frame_start / media_fps) ])
|
||||||
|
if isinstance(frame_end, int):
|
||||||
|
commands.extend([ '-to', str(frame_end / media_fps) ])
|
||||||
|
return commands
|
||||||
|
|
||||||
|
|
||||||
|
def select_media_stream(media_stream : str) -> Commands:
|
||||||
|
return [ '-map', media_stream ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_media_resolution(video_resolution : str) -> Commands:
|
||||||
|
return [ '-s', video_resolution ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_image_quality(image_path : str, image_quality : int) -> Commands:
|
||||||
|
if get_file_format(image_path) == 'webp':
|
||||||
|
image_compression = image_quality
|
||||||
|
else:
|
||||||
|
image_compression = round(31 - (image_quality * 0.31))
|
||||||
|
return [ '-q:v', str(image_compression) ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_audio_encoder(audio_codec : str) -> Commands:
|
||||||
|
return [ '-c:a', audio_codec ]
|
||||||
|
|
||||||
|
|
||||||
|
def copy_audio_encoder() -> Commands:
|
||||||
|
return set_audio_encoder('copy')
|
||||||
|
|
||||||
|
|
||||||
|
def set_audio_sample_rate(audio_sample_rate : int) -> Commands:
|
||||||
|
return [ '-ar', str(audio_sample_rate) ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_audio_sample_size(audio_sample_size : int) -> Commands:
|
||||||
|
if audio_sample_size == 16:
|
||||||
|
return [ '-f', 's16le', '-acodec', 'pcm_s16le' ]
|
||||||
|
if audio_sample_size == 32:
|
||||||
|
return [ '-f', 's32le', '-acodec', 'pcm_s32le' ]
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def set_audio_channel_total(audio_channel_total : int) -> Commands:
|
||||||
|
return [ '-ac', str(audio_channel_total) ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_audio_quality(audio_encoder : AudioEncoder, audio_quality : int) -> Commands:
|
||||||
|
if audio_encoder == 'aac':
|
||||||
|
audio_compression = round(10 - (audio_quality * 0.9))
|
||||||
|
return [ '-q:a', str(audio_compression) ]
|
||||||
|
if audio_encoder == 'libmp3lame':
|
||||||
|
audio_compression = round(9 - (audio_quality * 0.9))
|
||||||
|
return [ '-q:a', str(audio_compression) ]
|
||||||
|
if audio_encoder in [ 'libopus', 'libvorbis' ]:
|
||||||
|
audio_compression = round((100 - audio_quality) / 10)
|
||||||
|
return [ '-q:a', str(audio_compression) ]
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def set_audio_volume(audio_volume : int) -> Commands:
|
||||||
|
return [ '-filter:a', 'volume=' + str(audio_volume / 100) ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_video_encoder(video_encoder : str) -> Commands:
|
||||||
|
return [ '-c:v', video_encoder ]
|
||||||
|
|
||||||
|
|
||||||
|
def copy_video_encoder() -> Commands:
|
||||||
|
return set_video_encoder('copy')
|
||||||
|
|
||||||
|
|
||||||
|
def set_video_quality(video_encoder : VideoEncoder, video_quality : int) -> Commands:
|
||||||
|
if video_encoder in [ 'libx264', 'libx265' ]:
|
||||||
|
video_compression = round(51 - (video_quality * 0.51))
|
||||||
|
return [ '-crf', str(video_compression) ]
|
||||||
|
if video_encoder == 'libvpx-vp9':
|
||||||
|
video_compression = round(63 - (video_quality * 0.63))
|
||||||
|
return [ '-crf', str(video_compression) ]
|
||||||
|
if video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]:
|
||||||
|
video_compression = round(51 - (video_quality * 0.51))
|
||||||
|
return [ '-cq', str(video_compression) ]
|
||||||
|
if video_encoder in [ 'h264_amf', 'hevc_amf' ]:
|
||||||
|
video_compression = round(51 - (video_quality * 0.51))
|
||||||
|
return [ '-qp_i', str(video_compression), '-qp_p', str(video_compression) ]
|
||||||
|
if video_encoder in [ 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox' ]:
|
||||||
|
video_compression = round(51 - (video_quality * 0.51))
|
||||||
|
return [ '-q:v', str(video_compression) ]
|
||||||
|
return [ '-q:v', str(video_quality) ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_video_preset(video_encoder : VideoEncoder, video_preset : VideoPreset) -> Commands:
|
||||||
|
if video_encoder in [ 'libx264', 'libx265' ]:
|
||||||
|
return [ '-preset', video_preset ]
|
||||||
|
if video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]:
|
||||||
|
return [ '-preset', map_nvenc_preset(video_preset) ]
|
||||||
|
if video_encoder in [ 'h264_amf', 'hevc_amf' ]:
|
||||||
|
return [ '-quality', map_amf_preset(video_preset) ]
|
||||||
|
if video_encoder in [ 'h264_qsv', 'hevc_qsv' ]:
|
||||||
|
return [ '-preset', map_qsv_preset(video_preset) ]
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def set_video_colorspace(video_colorspace : str) -> Commands:
|
||||||
|
return [ '-colorspace', video_colorspace ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_video_fps(video_fps : Fps) -> Commands:
|
||||||
|
return [ '-vf', 'framerate=fps=' + str(video_fps) ]
|
||||||
|
|
||||||
|
|
||||||
|
def set_video_duration(video_duration : Duration) -> Commands:
|
||||||
|
return [ '-t', str(video_duration) ]
|
||||||
|
|
||||||
|
|
||||||
|
def capture_video() -> Commands:
|
||||||
|
return [ '-f', 'rawvideo' ]
|
||||||
|
|
||||||
|
|
||||||
|
def ignore_video_stream() -> Commands:
|
||||||
|
return [ '-vn' ]
|
||||||
|
|
||||||
|
|
||||||
|
def map_nvenc_preset(video_preset : VideoPreset) -> Optional[str]:
|
||||||
|
if video_preset in [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast' ]:
|
||||||
|
return 'fast'
|
||||||
|
if video_preset == 'medium':
|
||||||
|
return 'medium'
|
||||||
|
if video_preset in [ 'slow', 'slower', 'veryslow' ]:
|
||||||
|
return 'slow'
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def map_amf_preset(video_preset : VideoPreset) -> Optional[str]:
|
||||||
|
if video_preset in [ 'ultrafast', 'superfast', 'veryfast' ]:
|
||||||
|
return 'speed'
|
||||||
|
if video_preset in [ 'faster', 'fast', 'medium' ]:
|
||||||
|
return 'balanced'
|
||||||
|
if video_preset in [ 'slow', 'slower', 'veryslow' ]:
|
||||||
|
return 'quality'
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def map_qsv_preset(video_preset : VideoPreset) -> Optional[str]:
|
||||||
|
if video_preset in [ 'ultrafast', 'superfast', 'veryfast' ]:
|
||||||
|
return 'veryfast'
|
||||||
|
if video_preset in [ 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ]:
|
||||||
|
return video_preset
|
||||||
|
return None
|
||||||
@@ -19,8 +19,7 @@ from facefusion.face_store import get_reference_faces
|
|||||||
from facefusion.filesystem import in_directory, is_image, is_video, resolve_relative_path, same_file_extension
|
from facefusion.filesystem import in_directory, is_image, is_video, resolve_relative_path, same_file_extension
|
||||||
from facefusion.processors import choices as processors_choices
|
from facefusion.processors import choices as processors_choices
|
||||||
from facefusion.processors.live_portrait import create_rotation, limit_expression
|
from facefusion.processors.live_portrait import create_rotation, limit_expression
|
||||||
from facefusion.processors.typing import ExpressionRestorerInputs
|
from facefusion.processors.typing import ExpressionRestorerInputs, LivePortraitExpression, LivePortraitFeatureVolume, LivePortraitMotionPoints, LivePortraitPitch, LivePortraitRoll, LivePortraitScale, LivePortraitTranslation, LivePortraitYaw
|
||||||
from facefusion.processors.typing import LivePortraitExpression, LivePortraitFeatureVolume, LivePortraitMotionPoints, LivePortraitPitch, LivePortraitRoll, LivePortraitScale, LivePortraitTranslation, LivePortraitYaw
|
|
||||||
from facefusion.program_helper import find_argument_group
|
from facefusion.program_helper import find_argument_group
|
||||||
from facefusion.thread_helper import conditional_thread_semaphore, thread_semaphore
|
from facefusion.thread_helper import conditional_thread_semaphore, thread_semaphore
|
||||||
from facefusion.typing import ApplyStateItem, Args, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
|
from facefusion.typing import ApplyStateItem, Args, DownloadScope, Face, InferencePool, ModelOptions, ModelSet, ProcessMode, QueuePayload, UpdateProgress, VisionFrame
|
||||||
|
|||||||
@@ -117,9 +117,12 @@ AudioTypeSet = Dict[AudioFormat, str]
|
|||||||
ImageTypeSet = Dict[ImageFormat, str]
|
ImageTypeSet = Dict[ImageFormat, str]
|
||||||
VideoTypeSet = Dict[VideoFormat, str]
|
VideoTypeSet = Dict[VideoFormat, str]
|
||||||
|
|
||||||
OutputAudioEncoder = Literal['aac', 'libmp3lame', 'libopus', 'libvorbis']
|
AudioEncoder = Literal['aac', 'libmp3lame', 'libopus', 'libvorbis']
|
||||||
OutputVideoEncoder = Literal['libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf','h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox']
|
VideoEncoder = Literal['libx264', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox']
|
||||||
OutputVideoPreset = Literal['ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow']
|
VideoPreset = Literal['ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow']
|
||||||
|
|
||||||
|
WebcamMode = Literal['inline', 'udp', 'v4l2']
|
||||||
|
StreamMode = Literal['udp', 'v4l2']
|
||||||
|
|
||||||
ModelOptions = Dict[str, Any]
|
ModelOptions = Dict[str, Any]
|
||||||
ModelSet = Dict[str, ModelOptions]
|
ModelSet = Dict[str, ModelOptions]
|
||||||
@@ -317,11 +320,11 @@ State = TypedDict('State',
|
|||||||
'keep_temp' : bool,
|
'keep_temp' : bool,
|
||||||
'output_image_quality' : int,
|
'output_image_quality' : int,
|
||||||
'output_image_resolution' : str,
|
'output_image_resolution' : str,
|
||||||
'output_audio_encoder' : OutputAudioEncoder,
|
'output_audio_encoder' : AudioEncoder,
|
||||||
'output_audio_quality' : int,
|
'output_audio_quality' : int,
|
||||||
'output_audio_volume' : int,
|
'output_audio_volume' : int,
|
||||||
'output_video_encoder' : OutputVideoEncoder,
|
'output_video_encoder' : VideoEncoder,
|
||||||
'output_video_preset' : OutputVideoPreset,
|
'output_video_preset' : VideoPreset,
|
||||||
'output_video_quality' : int,
|
'output_video_quality' : int,
|
||||||
'output_video_resolution' : str,
|
'output_video_resolution' : str,
|
||||||
'output_video_fps' : float,
|
'output_video_fps' : float,
|
||||||
|
|||||||
@@ -1,11 +1,9 @@
|
|||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
from facefusion.uis.typing import JobManagerAction, JobRunnerAction, WebcamMode
|
from facefusion.uis.typing import JobManagerAction, JobRunnerAction
|
||||||
|
|
||||||
job_manager_actions : List[JobManagerAction] = [ 'job-create', 'job-submit', 'job-delete', 'job-add-step', 'job-remix-step', 'job-insert-step', 'job-remove-step' ]
|
job_manager_actions : List[JobManagerAction] = [ 'job-create', 'job-submit', 'job-delete', 'job-add-step', 'job-remix-step', 'job-insert-step', 'job-remove-step' ]
|
||||||
job_runner_actions : List[JobRunnerAction] = [ 'job-run', 'job-run-all', 'job-retry', 'job-retry-all' ]
|
job_runner_actions : List[JobRunnerAction] = [ 'job-run', 'job-run-all', 'job-retry', 'job-retry-all' ]
|
||||||
|
|
||||||
common_options : List[str] = [ 'keep-temp' ]
|
common_options : List[str] = [ 'keep-temp' ]
|
||||||
|
|
||||||
webcam_modes : List[WebcamMode] = [ 'inline', 'udp', 'v4l2' ]
|
|
||||||
webcam_resolutions : List[str] = [ '320x240', '640x480', '800x600', '1024x768', '1280x720', '1280x960', '1920x1080', '2560x1440', '3840x2160' ]
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import facefusion.choices
|
|||||||
from facefusion import state_manager, wording
|
from facefusion import state_manager, wording
|
||||||
from facefusion.common_helper import calc_int_step
|
from facefusion.common_helper import calc_int_step
|
||||||
from facefusion.filesystem import is_image, is_video
|
from facefusion.filesystem import is_image, is_video
|
||||||
from facefusion.typing import Fps, OutputAudioEncoder, OutputVideoEncoder, OutputVideoPreset
|
from facefusion.typing import AudioEncoder, Fps, VideoEncoder, VideoPreset
|
||||||
from facefusion.uis.core import get_ui_components, register_ui_component
|
from facefusion.uis.core import get_ui_components, register_ui_component
|
||||||
from facefusion.vision import create_image_resolutions, create_video_resolutions, detect_image_resolution, detect_video_fps, detect_video_resolution, pack_resolution
|
from facefusion.vision import create_image_resolutions, create_video_resolutions, detect_image_resolution, detect_video_fps, detect_video_resolution, pack_resolution
|
||||||
|
|
||||||
@@ -159,7 +159,7 @@ def update_output_image_resolution(output_image_resolution : str) -> None:
|
|||||||
state_manager.set_item('output_image_resolution', output_image_resolution)
|
state_manager.set_item('output_image_resolution', output_image_resolution)
|
||||||
|
|
||||||
|
|
||||||
def update_output_audio_encoder(output_audio_encoder : OutputAudioEncoder) -> None:
|
def update_output_audio_encoder(output_audio_encoder : AudioEncoder) -> None:
|
||||||
state_manager.set_item('output_audio_encoder', output_audio_encoder)
|
state_manager.set_item('output_audio_encoder', output_audio_encoder)
|
||||||
|
|
||||||
|
|
||||||
@@ -171,11 +171,11 @@ def update_output_audio_volume(output_audio_volume: float) -> None:
|
|||||||
state_manager.set_item('output_audio_volume', int(output_audio_volume))
|
state_manager.set_item('output_audio_volume', int(output_audio_volume))
|
||||||
|
|
||||||
|
|
||||||
def update_output_video_encoder(output_video_encoder : OutputVideoEncoder) -> None:
|
def update_output_video_encoder(output_video_encoder : VideoEncoder) -> None:
|
||||||
state_manager.set_item('output_video_encoder', output_video_encoder)
|
state_manager.set_item('output_video_encoder', output_video_encoder)
|
||||||
|
|
||||||
|
|
||||||
def update_output_video_preset(output_video_preset : OutputVideoPreset) -> None:
|
def update_output_video_preset(output_video_preset : VideoPreset) -> None:
|
||||||
state_manager.set_item('output_video_preset', output_video_preset)
|
state_manager.set_item('output_video_preset', output_video_preset)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -8,17 +8,16 @@ import cv2
|
|||||||
import gradio
|
import gradio
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
|
||||||
from facefusion import logger, state_manager, wording
|
from facefusion import ffmpeg_builder, logger, state_manager, wording
|
||||||
from facefusion.audio import create_empty_audio_frame
|
from facefusion.audio import create_empty_audio_frame
|
||||||
from facefusion.common_helper import get_first, is_windows
|
from facefusion.common_helper import is_windows
|
||||||
from facefusion.content_analyser import analyse_stream
|
from facefusion.content_analyser import analyse_stream
|
||||||
from facefusion.face_analyser import get_average_face, get_many_faces
|
from facefusion.face_analyser import get_average_face, get_many_faces
|
||||||
from facefusion.ffmpeg import open_ffmpeg
|
from facefusion.ffmpeg import open_ffmpeg
|
||||||
from facefusion.filesystem import filter_image_paths
|
from facefusion.filesystem import filter_image_paths, is_directory
|
||||||
from facefusion.processors.core import get_processors_modules
|
from facefusion.processors.core import get_processors_modules
|
||||||
from facefusion.typing import Face, Fps, VisionFrame
|
from facefusion.typing import Face, Fps, StreamMode, VisionFrame, WebcamMode
|
||||||
from facefusion.uis.core import get_ui_component
|
from facefusion.uis.core import get_ui_component
|
||||||
from facefusion.uis.typing import StreamMode, WebcamMode
|
|
||||||
from facefusion.vision import normalize_frame_color, read_static_images, unpack_resolution
|
from facefusion.vision import normalize_frame_color, read_static_images, unpack_resolution
|
||||||
|
|
||||||
WEBCAM_CAPTURE : Optional[cv2.VideoCapture] = None
|
WEBCAM_CAPTURE : Optional[cv2.VideoCapture] = None
|
||||||
@@ -164,17 +163,32 @@ def process_stream_frame(source_face : Face, target_vision_frame : VisionFrame)
|
|||||||
|
|
||||||
|
|
||||||
def open_stream(stream_mode : StreamMode, stream_resolution : str, stream_fps : Fps) -> subprocess.Popen[bytes]:
|
def open_stream(stream_mode : StreamMode, stream_resolution : str, stream_fps : Fps) -> subprocess.Popen[bytes]:
|
||||||
commands = [ '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-s', stream_resolution, '-r', str(stream_fps), '-i', '-']
|
commands = ffmpeg_builder.chain(
|
||||||
|
ffmpeg_builder.capture_video(),
|
||||||
|
ffmpeg_builder.set_media_resolution(stream_resolution),
|
||||||
|
ffmpeg_builder.set_conditional_fps(stream_fps)
|
||||||
|
)
|
||||||
|
|
||||||
if stream_mode == 'udp':
|
if stream_mode == 'udp':
|
||||||
commands.extend([ '-b:v', '2000k', '-f', 'mpegts', 'udp://localhost:27000?pkt_size=1316' ])
|
commands.extend(ffmpeg_builder.set_input('-'))
|
||||||
|
commands.extend(ffmpeg_builder.set_stream_mode('udp'))
|
||||||
|
commands.extend(ffmpeg_builder.set_output('udp://localhost:27000?pkt_size=1316'))
|
||||||
|
|
||||||
if stream_mode == 'v4l2':
|
if stream_mode == 'v4l2':
|
||||||
try:
|
device_directory_path = '/sys/devices/virtual/video4linux'
|
||||||
device_name = get_first(os.listdir('/sys/devices/virtual/video4linux'))
|
|
||||||
if device_name:
|
commands.extend(ffmpeg_builder.set_input('-'))
|
||||||
commands.extend([ '-f', 'v4l2', '/dev/' + device_name ])
|
commands.extend(ffmpeg_builder.set_stream_mode('v4l2'))
|
||||||
except FileNotFoundError:
|
if is_directory(device_directory_path):
|
||||||
|
device_names = os.listdir(device_directory_path)
|
||||||
|
|
||||||
|
for device_name in device_names:
|
||||||
|
device_path = '/dev/' + device_name
|
||||||
|
commands.extend(ffmpeg_builder.set_output(device_path))
|
||||||
|
|
||||||
|
else:
|
||||||
logger.error(wording.get('stream_not_loaded').format(stream_mode = stream_mode), __name__)
|
logger.error(wording.get('stream_not_loaded').format(stream_mode = stream_mode), __name__)
|
||||||
|
|
||||||
return open_ffmpeg(commands)
|
return open_ffmpeg(commands)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -2,9 +2,9 @@ from typing import Optional
|
|||||||
|
|
||||||
import gradio
|
import gradio
|
||||||
|
|
||||||
|
import facefusion.choices
|
||||||
from facefusion import wording
|
from facefusion import wording
|
||||||
from facefusion.common_helper import get_first
|
from facefusion.common_helper import get_first
|
||||||
from facefusion.uis import choices as uis_choices
|
|
||||||
from facefusion.uis.components.webcam import get_available_webcam_ids
|
from facefusion.uis.components.webcam import get_available_webcam_ids
|
||||||
from facefusion.uis.core import register_ui_component
|
from facefusion.uis.core import register_ui_component
|
||||||
|
|
||||||
@@ -28,13 +28,13 @@ def render() -> None:
|
|||||||
)
|
)
|
||||||
WEBCAM_MODE_RADIO = gradio.Radio(
|
WEBCAM_MODE_RADIO = gradio.Radio(
|
||||||
label = wording.get('uis.webcam_mode_radio'),
|
label = wording.get('uis.webcam_mode_radio'),
|
||||||
choices = uis_choices.webcam_modes,
|
choices = facefusion.choices.webcam_modes,
|
||||||
value = 'inline'
|
value = 'inline'
|
||||||
)
|
)
|
||||||
WEBCAM_RESOLUTION_DROPDOWN = gradio.Dropdown(
|
WEBCAM_RESOLUTION_DROPDOWN = gradio.Dropdown(
|
||||||
label = wording.get('uis.webcam_resolution_dropdown'),
|
label = wording.get('uis.webcam_resolution_dropdown'),
|
||||||
choices = uis_choices.webcam_resolutions,
|
choices = facefusion.choices.webcam_resolutions,
|
||||||
value = uis_choices.webcam_resolutions[0]
|
value = facefusion.choices.webcam_resolutions[0]
|
||||||
)
|
)
|
||||||
WEBCAM_FPS_SLIDER = gradio.Slider(
|
WEBCAM_FPS_SLIDER = gradio.Slider(
|
||||||
label = wording.get('uis.webcam_fps_slider'),
|
label = wording.get('uis.webcam_fps_slider'),
|
||||||
|
|||||||
@@ -81,6 +81,3 @@ ComponentName = Literal\
|
|||||||
|
|
||||||
JobManagerAction = Literal['job-create', 'job-submit', 'job-delete', 'job-add-step', 'job-remix-step', 'job-insert-step', 'job-remove-step']
|
JobManagerAction = Literal['job-create', 'job-submit', 'job-delete', 'job-add-step', 'job-remix-step', 'job-insert-step', 'job-remove-step']
|
||||||
JobRunnerAction = Literal['job-run', 'job-run-all', 'job-retry', 'job-retry-all']
|
JobRunnerAction = Literal['job-run', 'job-run-all', 'job-retry', 'job-retry-all']
|
||||||
|
|
||||||
WebcamMode = Literal['inline', 'udp', 'v4l2']
|
|
||||||
StreamMode = Literal['udp', 'v4l2']
|
|
||||||
|
|||||||
17
tests/test_curl_builder.py
Normal file
17
tests/test_curl_builder.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
from shutil import which
|
||||||
|
|
||||||
|
from facefusion import curl_builder, metadata
|
||||||
|
|
||||||
|
|
||||||
|
def test_run() -> None:
|
||||||
|
user_agent = metadata.get('name') + '/' + metadata.get('version')
|
||||||
|
|
||||||
|
assert curl_builder.run([]) == [ which('curl'), '--user-agent', user_agent, '--insecure', '--location', '--silent' ]
|
||||||
|
|
||||||
|
|
||||||
|
def test_chain() -> None:
|
||||||
|
commands = curl_builder.chain(
|
||||||
|
curl_builder.head(metadata.get('url'))
|
||||||
|
)
|
||||||
|
|
||||||
|
assert commands == [ '-I', metadata.get('url') ]
|
||||||
@@ -76,9 +76,9 @@ def test_concat_video() -> None:
|
|||||||
|
|
||||||
|
|
||||||
def test_read_audio_buffer() -> None:
|
def test_read_audio_buffer() -> None:
|
||||||
assert isinstance(read_audio_buffer(get_test_example_file('source.mp3'), 1, 1), bytes)
|
assert isinstance(read_audio_buffer(get_test_example_file('source.mp3'), 1, 16, 1), bytes)
|
||||||
assert isinstance(read_audio_buffer(get_test_example_file('source.wav'), 1, 1), bytes)
|
assert isinstance(read_audio_buffer(get_test_example_file('source.wav'), 1, 16, 1), bytes)
|
||||||
assert read_audio_buffer(get_test_example_file('invalid.mp3'), 1, 1) is None
|
assert read_audio_buffer(get_test_example_file('invalid.mp3'), 1, 16, 1) is None
|
||||||
|
|
||||||
|
|
||||||
def test_restore_audio() -> None:
|
def test_restore_audio() -> None:
|
||||||
|
|||||||
32
tests/test_ffmpeg_builder.py
Normal file
32
tests/test_ffmpeg_builder.py
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
from shutil import which
|
||||||
|
|
||||||
|
from facefusion import ffmpeg_builder
|
||||||
|
|
||||||
|
|
||||||
|
def test_run() -> None:
|
||||||
|
assert ffmpeg_builder.run([]) == [ which('ffmpeg'), '-loglevel', 'error' ]
|
||||||
|
|
||||||
|
|
||||||
|
def test_chain() -> None:
|
||||||
|
commands = ffmpeg_builder.chain(
|
||||||
|
ffmpeg_builder.set_progress()
|
||||||
|
)
|
||||||
|
|
||||||
|
assert commands == [ '-progress' ]
|
||||||
|
|
||||||
|
|
||||||
|
def test_stream_mode() -> None:
|
||||||
|
assert ffmpeg_builder.set_stream_mode('udp') == [ '-f', 'mpegts' ]
|
||||||
|
assert ffmpeg_builder.set_stream_mode('v4l2') == [ '-f', 'v4l2' ]
|
||||||
|
|
||||||
|
|
||||||
|
def test_select_frame_range() -> None:
|
||||||
|
assert ffmpeg_builder.select_frame_range(0, None, 30) == [ '-vf', 'trim=start_frame=0,fps=30' ]
|
||||||
|
assert ffmpeg_builder.select_frame_range(None, 100, 30) == [ '-vf', 'trim=end_frame=100,fps=30' ]
|
||||||
|
assert ffmpeg_builder.select_frame_range(0, 100, 30) == [ '-vf', 'trim=start_frame=0:end_frame=100,fps=30' ]
|
||||||
|
assert ffmpeg_builder.select_frame_range(None, None, 30) == [ '-vf', 'fps=30' ]
|
||||||
|
|
||||||
|
|
||||||
|
def test_audio_sample_size() -> None:
|
||||||
|
assert ffmpeg_builder.set_audio_sample_size(16) == [ '-f', 's16le', '-acodec', 'pcm_s16le' ]
|
||||||
|
assert ffmpeg_builder.set_audio_sample_size(32) == [ '-f', 's32le', '-acodec', 'pcm_s32le' ]
|
||||||
Reference in New Issue
Block a user