Vibe coded benchmark command

This commit is contained in:
henryruhs
2025-06-13 23:15:34 +02:00
parent 55e9df9c39
commit 589c317f19
10 changed files with 166 additions and 84 deletions

View File

@@ -36,6 +36,7 @@ commands:
run run the program
headless-run run the program in headless mode
batch-run run the program in batch mode
benchmark run performance benchmarks and exit
force-download force automate downloads and exit
job-list list jobs by status
job-create create a drafted job

View File

@@ -125,6 +125,9 @@ def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None:
# download
apply_state_item('download_providers', args.get('download_providers'))
apply_state_item('download_scope', args.get('download_scope'))
# benchmark
apply_state_item('benchmark_runs', args.get('benchmark_runs'))
apply_state_item('benchmark_cycles', args.get('benchmark_cycles'))
# memory
apply_state_item('video_memory_strategy', args.get('video_memory_strategy'))
apply_state_item('system_memory_limit', args.get('system_memory_limit'))

115
facefusion/benchmarker.py Normal file
View File

@@ -0,0 +1,115 @@
import hashlib
import os
import statistics
import tempfile
from time import perf_counter
from typing import Any, Dict, List
from facefusion import state_manager
from facefusion.cli_helper import render_table
from facefusion.download import conditional_download, resolve_download_url
from facefusion.filesystem import get_file_extension, is_video
from facefusion.memory import limit_system_memory
from facefusion.vision import count_video_frame_total, detect_video_fps, detect_video_resolution, pack_resolution
BENCHMARKS : Dict[str, str] = {
'240p': '.assets/examples/target-240p.mp4',
'360p': '.assets/examples/target-360p.mp4',
'540p': '.assets/examples/target-540p.mp4',
'720p': '.assets/examples/target-720p.mp4',
'1080p': '.assets/examples/target-1080p.mp4',
'1440p': '.assets/examples/target-1440p.mp4',
'2160p': '.assets/examples/target-2160p.mp4'
}
def pre_check() -> bool:
conditional_download('.assets/examples',
[
resolve_download_url('examples-3.0.0', 'source.jpg'),
resolve_download_url('examples-3.0.0', 'source.mp3'),
resolve_download_url('examples-3.0.0', 'target-240p.mp4'),
resolve_download_url('examples-3.0.0', 'target-360p.mp4'),
resolve_download_url('examples-3.0.0', 'target-540p.mp4'),
resolve_download_url('examples-3.0.0', 'target-720p.mp4'),
resolve_download_url('examples-3.0.0', 'target-1080p.mp4'),
resolve_download_url('examples-3.0.0', 'target-1440p.mp4'),
resolve_download_url('examples-3.0.0', 'target-2160p.mp4')
])
return True
def suggest_output_path(target_path : str) -> str:
if is_video(target_path):
target_file_extension = get_file_extension(target_path)
return os.path.join(tempfile.gettempdir(), hashlib.sha1().hexdigest()[:8] + target_file_extension)
return ''
def pre_process() -> None:
system_memory_limit = state_manager.get_item('system_memory_limit')
if system_memory_limit and system_memory_limit > 0:
limit_system_memory(system_memory_limit)
def benchmark_target(benchmark_cycles : int) -> List[Any]:
from facefusion.core import conditional_process
process_times = []
video_frame_total = count_video_frame_total(state_manager.get_item('target_path'))
output_video_resolution = detect_video_resolution(state_manager.get_item('target_path'))
state_manager.set_item('output_video_resolution', pack_resolution(output_video_resolution))
state_manager.set_item('output_video_fps', detect_video_fps(state_manager.get_item('target_path')))
conditional_process()
for index in range(benchmark_cycles):
start_time = perf_counter()
conditional_process()
end_time = perf_counter()
process_times.append(end_time - start_time)
average_run = round(statistics.mean(process_times), 2)
fastest_run = round(min(process_times), 2)
slowest_run = round(max(process_times), 2)
relative_fps = round(video_frame_total * benchmark_cycles / sum(process_times), 2)
return [
state_manager.get_item('target_path'),
benchmark_cycles,
average_run,
fastest_run,
slowest_run,
relative_fps
]
def run() -> None:
benchmark_runs = state_manager.get_item('benchmark_runs')
benchmark_cycles = state_manager.get_item('benchmark_cycles')
state_manager.set_item('source_paths', [ '.assets/examples/source.jpg', '.assets/examples/source.mp3' ])
state_manager.set_item('face_landmarker_score', 0)
state_manager.set_item('temp_frame_format', 'bmp')
state_manager.set_item('output_audio_volume', 0)
state_manager.set_item('output_video_preset', 'ultrafast')
state_manager.set_item('video_memory_strategy', 'tolerant')
benchmark_results = []
target_paths = [ BENCHMARKS[benchmark_run] for benchmark_run in benchmark_runs if benchmark_run in BENCHMARKS ]
if target_paths:
pre_process()
for target_path in target_paths:
state_manager.set_item('target_path', target_path)
state_manager.set_item('output_path', suggest_output_path(state_manager.get_item('target_path')))
benchmark_results.append(benchmark_target(benchmark_cycles))
headers = [
'Target Path',
'Cycles',
'Average (s)',
'Fastest (s)',
'Slowest (s)',
'Relative FPS'
]
render_table(headers, benchmark_results)

View File

@@ -59,6 +59,16 @@ def route(args : Args) -> None:
error_code = force_download()
return hard_exit(error_code)
if state_manager.get_item('command') == 'benchmark':
import facefusion.benchmarker as benchmarker
if not common_pre_check() or not processors_pre_check():
return hard_exit(2)
if not benchmarker.pre_check():
return hard_exit(2)
benchmarker.run()
return
if state_manager.get_item('command') in [ 'job-list', 'job-create', 'job-submit', 'job-submit-all', 'job-delete', 'job-delete-all', 'job-add-step', 'job-remix-step', 'job-insert-step', 'job-remove-step' ]:
if not job_manager.init_jobs(state_manager.get_item('jobs_path')):
hard_exit(1)

View File

@@ -214,6 +214,16 @@ def create_download_providers_program() -> ArgumentParser:
return program
def create_benchmark_program() -> ArgumentParser:
from facefusion.benchmarker import BENCHMARKS
program = ArgumentParser(add_help = False)
group_benchmark = program.add_argument_group('benchmark')
group_benchmark.add_argument('--benchmark-runs', help = wording.get('help.benchmark_runs'), default = [ '240p' ], choices = list(BENCHMARKS.keys()), nargs = '+')
group_benchmark.add_argument('--benchmark-cycles', help = wording.get('help.benchmark_cycles'), type = int, default = 5, choices = range(1, 11))
job_store.register_job_keys([ 'benchmark_runs', 'benchmark_cycles' ])
return program
def create_download_scope_program() -> ArgumentParser:
program = ArgumentParser(add_help = False)
group_download = program.add_argument_group('download')
@@ -283,6 +293,7 @@ def create_program() -> ArgumentParser:
sub_program.add_parser('run', help = wording.get('help.run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_uis_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('headless-run', help = wording.get('help.headless_run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('batch-run', help = wording.get('help.batch_run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_pattern_program(), create_target_pattern_program(), create_output_pattern_program(), collect_step_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('benchmark', help = wording.get('help.benchmark'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_benchmark_program(), collect_step_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('force-download', help = wording.get('help.force_download'), parents = [ create_download_providers_program(), create_download_scope_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
# job manager
sub_program.add_parser('job-list', help = wording.get('help.job_list'), parents = [ create_job_status_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)

View File

@@ -283,6 +283,8 @@ StateKey = Literal\
'execution_thread_count',
'execution_queue_count',
'download_providers',
'benchmark_runs',
'benchmark_cycles',
'download_scope',
'video_memory_strategy',
'system_memory_limit',
@@ -349,6 +351,8 @@ State = TypedDict('State',
'execution_thread_count' : int,
'execution_queue_count' : int,
'download_providers' : List[DownloadProvider],
'benchmark_runs' : List[str],
'benchmark_cycles' : int,
'download_scope' : DownloadScope,
'video_memory_strategy' : VideoMemoryStrategy,
'system_memory_limit' : int,

View File

@@ -1,31 +1,13 @@
import hashlib
import os
import statistics
import tempfile
from time import perf_counter
from typing import Any, Dict, Generator, List, Optional
from typing import Any, Generator, List, Optional
import gradio
from facefusion import state_manager, wording
from facefusion.core import conditional_process
from facefusion.filesystem import get_file_extension, is_video
from facefusion.memory import limit_system_memory
from facefusion.benchmarker import BENCHMARKS, benchmark_target, pre_process, suggest_output_path
from facefusion.uis.core import get_ui_component
from facefusion.vision import count_video_frame_total, detect_video_fps, detect_video_resolution, pack_resolution
BENCHMARK_BENCHMARKS_DATAFRAME : Optional[gradio.Dataframe] = None
BENCHMARK_START_BUTTON : Optional[gradio.Button] = None
BENCHMARKS : Dict[str, str] =\
{
'240p': '.assets/examples/target-240p.mp4',
'360p': '.assets/examples/target-360p.mp4',
'540p': '.assets/examples/target-540p.mp4',
'720p': '.assets/examples/target-720p.mp4',
'1080p': '.assets/examples/target-1080p.mp4',
'1440p': '.assets/examples/target-1440p.mp4',
'2160p': '.assets/examples/target-2160p.mp4'
}
def render() -> None:
@@ -68,19 +50,13 @@ def listen() -> None:
BENCHMARK_START_BUTTON.click(start, inputs = [ benchmark_runs_checkbox_group, benchmark_cycles_slider ], outputs = BENCHMARK_BENCHMARKS_DATAFRAME)
def suggest_output_path(target_path : str) -> Optional[str]:
if is_video(target_path):
target_file_extension = get_file_extension(target_path)
return os.path.join(tempfile.gettempdir(), hashlib.sha1().hexdigest()[:8] + target_file_extension)
return None
def start(benchmark_runs : List[str], benchmark_cycles : int) -> Generator[List[Any], None, None]:
state_manager.init_item('source_paths', [ '.assets/examples/source.jpg', '.assets/examples/source.mp3' ])
state_manager.init_item('face_landmarker_score', 0)
state_manager.init_item('temp_frame_format', 'bmp')
state_manager.init_item('output_audio_volume', 0)
state_manager.init_item('output_video_preset', 'ultrafast')
state_manager.set_item('source_paths', [ '.assets/examples/source.jpg', '.assets/examples/source.mp3' ])
state_manager.set_item('face_landmarker_score', 0)
state_manager.set_item('temp_frame_format', 'bmp')
state_manager.set_item('output_audio_volume', 0)
state_manager.set_item('output_video_preset', 'ultrafast')
state_manager.set_item('video_memory_strategy', 'tolerant')
state_manager.sync_item('execution_providers')
state_manager.sync_item('execution_thread_count')
state_manager.sync_item('execution_queue_count')
@@ -91,42 +67,9 @@ def start(benchmark_runs : List[str], benchmark_cycles : int) -> Generator[List[
if target_paths:
pre_process()
for target_path in target_paths:
state_manager.init_item('target_path', target_path)
state_manager.init_item('output_path', suggest_output_path(state_manager.get_item('target_path')))
benchmark_results.append(benchmark(benchmark_cycles))
state_manager.set_item('target_path', target_path)
state_manager.set_item('output_path', suggest_output_path(state_manager.get_item('target_path')))
benchmark_results.append(benchmark_target(benchmark_cycles))
yield benchmark_results
def pre_process() -> None:
system_memory_limit = state_manager.get_item('system_memory_limit')
if system_memory_limit and system_memory_limit > 0:
limit_system_memory(system_memory_limit)
def benchmark(benchmark_cycles : int) -> List[Any]:
process_times = []
video_frame_total = count_video_frame_total(state_manager.get_item('target_path'))
output_video_resolution = detect_video_resolution(state_manager.get_item('target_path'))
state_manager.init_item('output_video_resolution', pack_resolution(output_video_resolution))
state_manager.init_item('output_video_fps', detect_video_fps(state_manager.get_item('target_path')))
conditional_process()
for index in range(benchmark_cycles):
start_time = perf_counter()
conditional_process()
end_time = perf_counter()
process_times.append(end_time - start_time)
average_run = round(statistics.mean(process_times), 2)
fastest_run = round(min(process_times), 2)
slowest_run = round(max(process_times), 2)
relative_fps = round(video_frame_total * benchmark_cycles / sum(process_times), 2)
return\
[
state_manager.get_item('target_path'),
benchmark_cycles,
average_run,
fastest_run,
slowest_run,
relative_fps
]

View File

@@ -3,7 +3,7 @@ from typing import Optional
import gradio
from facefusion import wording
from facefusion.uis.components.benchmark import BENCHMARKS
from facefusion.benchmarker import BENCHMARKS
from facefusion.uis.core import register_ui_component
BENCHMARK_RUNS_CHECKBOX_GROUP : Optional[gradio.CheckboxGroup] = None

View File

@@ -1,24 +1,15 @@
import gradio
from facefusion import state_manager
from facefusion.download import conditional_download, resolve_download_url
from facefusion.uis.components import about, age_modifier_options, benchmark, benchmark_options, deep_swapper_options, download, execution, execution_queue_count, execution_thread_count, expression_restorer_options, face_debugger_options, face_editor_options, face_enhancer_options, face_swapper_options, frame_colorizer_options, frame_enhancer_options, lip_syncer_options, memory, processors
from facefusion.benchmarker import pre_check as benchmarker_pre_check
from facefusion.uis.components import (about, age_modifier_options, benchmark, benchmark_options, deep_swapper_options,
download, execution, execution_queue_count, execution_thread_count, expression_restorer_options,
face_debugger_options, face_editor_options, face_enhancer_options, face_swapper_options,
frame_colorizer_options, frame_enhancer_options, lip_syncer_options, memory, processors)
def pre_check() -> bool:
conditional_download('.assets/examples',
[
resolve_download_url('examples-3.0.0', 'source.jpg'),
resolve_download_url('examples-3.0.0', 'source.mp3'),
resolve_download_url('examples-3.0.0', 'target-240p.mp4'),
resolve_download_url('examples-3.0.0', 'target-360p.mp4'),
resolve_download_url('examples-3.0.0', 'target-540p.mp4'),
resolve_download_url('examples-3.0.0', 'target-720p.mp4'),
resolve_download_url('examples-3.0.0', 'target-1080p.mp4'),
resolve_download_url('examples-3.0.0', 'target-1440p.mp4'),
resolve_download_url('examples-3.0.0', 'target-2160p.mp4')
])
return True
return benchmarker_pre_check()
def render() -> gradio.Blocks:

View File

@@ -207,7 +207,11 @@ WORDING : Dict[str, Any] =\
'run': 'run the program',
'headless_run': 'run the program in headless mode',
'batch_run': 'run the program in batch mode',
'benchmark': 'run performance benchmarks and exit',
'force_download': 'force automate downloads and exit',
# benchmark
'benchmark_runs': 'choose the resolution for the benchmark runs (choices: {choices}, ...)',
'benchmark_cycles': 'specify the number of benchmark cycles',
# jobs
'job_id': 'specify the job id',
'job_status': 'specify the job status',