Feat/halt on error (#862)

* Introduce halt-on-error

* Introduce halt-on-error

* Fix wording
This commit is contained in:
Henry Ruhs
2025-01-31 10:17:33 +01:00
committed by henryruhs
parent efc9652df4
commit 296eea8577
14 changed files with 99 additions and 54 deletions

View File

@@ -129,6 +129,7 @@ def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None:
apply_state_item('system_memory_limit', args.get('system_memory_limit'))
# misc
apply_state_item('log_level', args.get('log_level'))
apply_state_item('halt_on_error', args.get('halt_on_error'))
# jobs
apply_state_item('job_id', args.get('job_id'))
apply_state_item('job_status', args.get('job_status'))

View File

@@ -183,7 +183,7 @@ def route_job_manager(args : Args) -> ErrorCode:
return 1
if state_manager.get_item('command') == 'job-submit-all':
if job_manager.submit_jobs():
if job_manager.submit_jobs(state_manager.get_item('halt_on_error')):
logger.info(wording.get('job_all_submitted'), __name__)
return 0
logger.error(wording.get('job_all_not_submitted'), __name__)
@@ -197,7 +197,7 @@ def route_job_manager(args : Args) -> ErrorCode:
return 1
if state_manager.get_item('command') == 'job-delete-all':
if job_manager.delete_jobs():
if job_manager.delete_jobs(state_manager.get_item('halt_on_error')):
logger.info(wording.get('job_all_deleted'), __name__)
return 0
logger.error(wording.get('job_all_not_deleted'), __name__)
@@ -250,7 +250,7 @@ def route_job_runner() -> ErrorCode:
if state_manager.get_item('command') == 'job-run-all':
logger.info(wording.get('running_jobs'), __name__)
if job_runner.run_jobs(process_step):
if job_runner.run_jobs(process_step, state_manager.get_item('halt_on_error')):
logger.info(wording.get('processing_jobs_succeed'), __name__)
return 0
logger.info(wording.get('processing_jobs_failed'), __name__)
@@ -266,7 +266,7 @@ def route_job_runner() -> ErrorCode:
if state_manager.get_item('command') == 'job-retry-all':
logger.info(wording.get('retrying_jobs'), __name__)
if job_runner.retry_jobs(process_step):
if job_runner.retry_jobs(process_step, state_manager.get_item('halt_on_error')):
logger.info(wording.get('processing_jobs_succeed'), __name__)
return 0
logger.info(wording.get('processing_jobs_failed'), __name__)

View File

@@ -48,14 +48,17 @@ def submit_job(job_id : str) -> bool:
return False
def submit_jobs() -> bool:
def submit_jobs(halt_on_error : bool) -> bool:
drafted_job_ids = find_job_ids('drafted')
has_error = False
if drafted_job_ids:
for job_id in drafted_job_ids:
if not submit_job(job_id):
return False
return True
has_error = True
if halt_on_error:
return False
return not has_error
return False
@@ -63,14 +66,17 @@ def delete_job(job_id : str) -> bool:
return delete_job_file(job_id)
def delete_jobs() -> bool:
def delete_jobs(halt_on_error : bool) -> bool:
job_ids = find_job_ids('drafted') + find_job_ids('queued') + find_job_ids('failed') + find_job_ids('completed')
has_error = False
if job_ids:
for job_id in job_ids:
if not delete_job(job_id):
return False
return True
has_error = True
if halt_on_error:
return False
return not has_error
return False

View File

@@ -16,14 +16,17 @@ def run_job(job_id : str, process_step : ProcessStep) -> bool:
return False
def run_jobs(process_step : ProcessStep) -> bool:
def run_jobs(process_step : ProcessStep, halt_on_error : bool) -> bool:
queued_job_ids = job_manager.find_job_ids('queued')
has_error = False
if queued_job_ids:
for job_id in queued_job_ids:
if not run_job(job_id, process_step):
return False
return True
has_error = True
if halt_on_error:
return False
return not has_error
return False
@@ -35,14 +38,17 @@ def retry_job(job_id : str, process_step : ProcessStep) -> bool:
return False
def retry_jobs(process_step : ProcessStep) -> bool:
def retry_jobs(process_step : ProcessStep, halt_on_error : bool) -> bool:
failed_job_ids = job_manager.find_job_ids('failed')
has_error = False
if failed_job_ids:
for job_id in failed_job_ids:
if not retry_job(job_id, process_step):
return False
return True
has_error = True
if halt_on_error:
return False
return not has_error
return False

View File

@@ -230,7 +230,7 @@ def create_memory_program() -> ArgumentParser:
return program
def create_misc_program() -> ArgumentParser:
def create_log_level_program() -> ArgumentParser:
program = ArgumentParser(add_help = False)
group_misc = program.add_argument_group('misc')
group_misc.add_argument('--log-level', help = wording.get('help.log_level'), default = config.get_str_value('misc.log_level', 'info'), choices = facefusion.choices.log_levels)
@@ -238,6 +238,14 @@ def create_misc_program() -> ArgumentParser:
return program
def create_halt_on_error_program() -> ArgumentParser:
program = ArgumentParser(add_help = False)
group_misc = program.add_argument_group('misc')
group_misc.add_argument('--halt-on-error', help = wording.get('help.halt_on_error'), action = 'store_true', default = config.get_bool_value('misc.halt_on_error'))
job_store.register_job_keys([ 'halt_on_error' ])
return program
def create_job_id_program() -> ArgumentParser:
program = ArgumentParser(add_help = False)
program.add_argument('job_id', help = wording.get('help.job_id'))
@@ -262,7 +270,7 @@ def collect_step_program() -> ArgumentParser:
def collect_job_program() -> ArgumentParser:
return ArgumentParser(parents= [ create_execution_program(), create_download_providers_program(), create_memory_program(), create_misc_program() ], add_help = False)
return ArgumentParser(parents= [ create_execution_program(), create_download_providers_program(), create_memory_program(), create_log_level_program() ], add_help = False)
def create_program() -> ArgumentParser:
@@ -274,23 +282,23 @@ def create_program() -> ArgumentParser:
sub_program.add_parser('run', help = wording.get('help.run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_uis_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('headless-run', help = wording.get('help.headless_run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('batch-run', help = wording.get('help.batch_run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_pattern_program(), create_target_pattern_program(), create_output_pattern_program(), collect_step_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('force-download', help = wording.get('help.force_download'), parents = [ create_download_providers_program(), create_download_scope_program(), create_misc_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('force-download', help = wording.get('help.force_download'), parents = [ create_download_providers_program(), create_download_scope_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
# job manager
sub_program.add_parser('job-list', help = wording.get('help.job_list'), parents = [ create_job_status_program(), create_jobs_path_program(), create_misc_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-create', help = wording.get('help.job_create'), parents = [ create_job_id_program(), create_jobs_path_program(), create_misc_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-submit', help = wording.get('help.job_submit'), parents = [ create_job_id_program(), create_jobs_path_program(), create_misc_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-submit-all', help = wording.get('help.job_submit_all'), parents = [ create_jobs_path_program(), create_misc_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-delete', help = wording.get('help.job_delete'), parents = [ create_job_id_program(), create_jobs_path_program(), create_misc_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-delete-all', help = wording.get('help.job_delete_all'), parents = [ create_jobs_path_program(), create_misc_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-add-step', help = wording.get('help.job_add_step'), parents = [ create_job_id_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_misc_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-remix-step', help = wording.get('help.job_remix_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_output_path_program(), collect_step_program(), create_misc_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-insert-step', help = wording.get('help.job_insert_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_misc_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-remove-step', help = wording.get('help.job_remove_step'), parents = [ create_job_id_program(), create_step_index_program(), create_jobs_path_program(), create_misc_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-list', help = wording.get('help.job_list'), parents = [ create_job_status_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-create', help = wording.get('help.job_create'), parents = [ create_job_id_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-submit', help = wording.get('help.job_submit'), parents = [ create_job_id_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-submit-all', help = wording.get('help.job_submit_all'), parents = [ create_jobs_path_program(), create_log_level_program(), create_halt_on_error_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-delete', help = wording.get('help.job_delete'), parents = [ create_job_id_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-delete-all', help = wording.get('help.job_delete_all'), parents = [ create_jobs_path_program(), create_log_level_program(), create_halt_on_error_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-add-step', help = wording.get('help.job_add_step'), parents = [ create_job_id_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-remix-step', help = wording.get('help.job_remix_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-insert-step', help = wording.get('help.job_insert_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-remove-step', help = wording.get('help.job_remove_step'), parents = [ create_job_id_program(), create_step_index_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
# job runner
sub_program.add_parser('job-run', help = wording.get('help.job_run'), parents = [ create_job_id_program(), create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-run-all', help = wording.get('help.job_run_all'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-run-all', help = wording.get('help.job_run_all'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program(), create_halt_on_error_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-retry', help = wording.get('help.job_retry'), parents = [ create_job_id_program(), create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-retry-all', help = wording.get('help.job_retry_all'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-retry-all', help = wording.get('help.job_retry_all'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program(), create_halt_on_error_program() ], formatter_class = create_help_formatter_large)
return ArgumentParser(parents = [ program ], formatter_class = create_help_formatter_small)

View File

@@ -283,6 +283,7 @@ StateKey = Literal\
'video_memory_strategy',
'system_memory_limit',
'log_level',
'halt_on_error',
'job_id',
'job_status',
'step_index'
@@ -347,6 +348,7 @@ State = TypedDict('State',
'video_memory_strategy' : VideoMemoryStrategy,
'system_memory_limit' : int,
'log_level' : LogLevel,
'halt_on_error' : bool,
'job_id' : str,
'job_status' : JobStatus,
'step_index' : int

View File

@@ -89,6 +89,7 @@ def apply(job_action : JobManagerAction, created_job_id : str, selected_job_id :
if is_directory(step_args.get('output_path')):
step_args['output_path'] = suggest_output_path(step_args.get('output_path'), state_manager.get_item('target_path'))
if job_action == 'job-create':
if created_job_id and job_manager.create_job(created_job_id):
updated_job_ids = job_manager.find_job_ids('drafted') or [ 'none' ]
@@ -97,6 +98,7 @@ def apply(job_action : JobManagerAction, created_job_id : str, selected_job_id :
return gradio.Dropdown(value = 'job-add-step'), gradio.Textbox(visible = False), gradio.Dropdown(value = created_job_id, choices = updated_job_ids, visible = True), gradio.Dropdown()
else:
logger.error(wording.get('job_not_created').format(job_id = created_job_id), __name__)
if job_action == 'job-submit':
if selected_job_id and job_manager.submit_job(selected_job_id):
updated_job_ids = job_manager.find_job_ids('drafted') or [ 'none' ]
@@ -105,6 +107,7 @@ def apply(job_action : JobManagerAction, created_job_id : str, selected_job_id :
return gradio.Dropdown(), gradio.Textbox(), gradio.Dropdown(value = get_last(updated_job_ids), choices = updated_job_ids, visible = True), gradio.Dropdown()
else:
logger.error(wording.get('job_not_submitted').format(job_id = selected_job_id), __name__)
if job_action == 'job-delete':
if selected_job_id and job_manager.delete_job(selected_job_id):
updated_job_ids = job_manager.find_job_ids('drafted') + job_manager.find_job_ids('queued') + job_manager.find_job_ids('failed') + job_manager.find_job_ids('completed') or [ 'none' ]
@@ -113,6 +116,7 @@ def apply(job_action : JobManagerAction, created_job_id : str, selected_job_id :
return gradio.Dropdown(), gradio.Textbox(), gradio.Dropdown(value = get_last(updated_job_ids), choices = updated_job_ids, visible = True), gradio.Dropdown()
else:
logger.error(wording.get('job_not_deleted').format(job_id = selected_job_id), __name__)
if job_action == 'job-add-step':
if selected_job_id and job_manager.add_step(selected_job_id, step_args):
state_manager.set_item('output_path', output_path)
@@ -121,6 +125,7 @@ def apply(job_action : JobManagerAction, created_job_id : str, selected_job_id :
else:
state_manager.set_item('output_path', output_path)
logger.error(wording.get('job_step_not_added').format(job_id = selected_job_id), __name__)
if job_action == 'job-remix-step':
if selected_job_id and job_manager.has_step(selected_job_id, selected_step_index) and job_manager.remix_step(selected_job_id, selected_step_index, step_args):
updated_step_choices = get_step_choices(selected_job_id) or [ 'none' ] #type:ignore[list-item]
@@ -131,6 +136,7 @@ def apply(job_action : JobManagerAction, created_job_id : str, selected_job_id :
else:
state_manager.set_item('output_path', output_path)
logger.error(wording.get('job_remix_step_not_added').format(job_id = selected_job_id, step_index = selected_step_index), __name__)
if job_action == 'job-insert-step':
if selected_job_id and job_manager.has_step(selected_job_id, selected_step_index) and job_manager.insert_step(selected_job_id, selected_step_index, step_args):
updated_step_choices = get_step_choices(selected_job_id) or [ 'none' ] #type:ignore[list-item]
@@ -141,6 +147,7 @@ def apply(job_action : JobManagerAction, created_job_id : str, selected_job_id :
else:
state_manager.set_item('output_path', output_path)
logger.error(wording.get('job_step_not_inserted').format(job_id = selected_job_id, step_index = selected_step_index), __name__)
if job_action == 'job-remove-step':
if selected_job_id and job_manager.has_step(selected_job_id, selected_step_index) and job_manager.remove_step(selected_job_id, selected_step_index):
updated_step_choices = get_step_choices(selected_job_id) or [ 'none' ] #type:ignore[list-item]
@@ -160,16 +167,19 @@ def get_step_choices(job_id : str) -> List[int]:
def update(job_action : JobManagerAction, selected_job_id : str) -> Tuple[gradio.Textbox, gradio.Dropdown, gradio.Dropdown]:
if job_action == 'job-create':
return gradio.Textbox(value = None, visible = True), gradio.Dropdown(visible = False), gradio.Dropdown(visible = False)
if job_action == 'job-delete':
updated_job_ids = job_manager.find_job_ids('drafted') + job_manager.find_job_ids('queued') + job_manager.find_job_ids('failed') + job_manager.find_job_ids('completed') or [ 'none' ]
updated_job_id = selected_job_id if selected_job_id in updated_job_ids else get_last(updated_job_ids)
return gradio.Textbox(visible = False), gradio.Dropdown(value = updated_job_id, choices = updated_job_ids, visible = True), gradio.Dropdown(visible = False)
if job_action in [ 'job-submit', 'job-add-step' ]:
updated_job_ids = job_manager.find_job_ids('drafted') or [ 'none' ]
updated_job_id = selected_job_id if selected_job_id in updated_job_ids else get_last(updated_job_ids)
return gradio.Textbox(visible = False), gradio.Dropdown(value = updated_job_id, choices = updated_job_ids, visible = True), gradio.Dropdown(visible = False)
if job_action in [ 'job-remix-step', 'job-insert-step', 'job-remove-step' ]:
updated_job_ids = job_manager.find_job_ids('drafted') or [ 'none' ]
updated_job_id = selected_job_id if selected_job_id in updated_job_ids else get_last(updated_job_ids)

View File

@@ -95,12 +95,15 @@ def run(job_action : JobRunnerAction, job_id : str) -> Tuple[gradio.Button, grad
updated_job_ids = job_manager.find_job_ids('queued') or [ 'none' ]
return gradio.Button(visible = True), gradio.Button(visible = False), gradio.Dropdown(value = get_last(updated_job_ids), choices = updated_job_ids)
if job_action == 'job-run-all':
logger.info(wording.get('running_jobs'), __name__)
if job_runner.run_jobs(process_step):
halt_on_error = False
if job_runner.run_jobs(process_step, halt_on_error):
logger.info(wording.get('processing_jobs_succeed'), __name__)
else:
logger.info(wording.get('processing_jobs_failed'), __name__)
if job_action == 'job-retry':
logger.info(wording.get('retrying_job').format(job_id = job_id), __name__)
if job_id and job_runner.retry_job(job_id, process_step):
@@ -110,9 +113,11 @@ def run(job_action : JobRunnerAction, job_id : str) -> Tuple[gradio.Button, grad
updated_job_ids = job_manager.find_job_ids('failed') or [ 'none' ]
return gradio.Button(visible = True), gradio.Button(visible = False), gradio.Dropdown(value = get_last(updated_job_ids), choices = updated_job_ids)
if job_action == 'job-retry-all':
logger.info(wording.get('retrying_jobs'), __name__)
if job_runner.retry_jobs(process_step):
halt_on_error = False
if job_runner.retry_jobs(process_step, halt_on_error):
logger.info(wording.get('processing_jobs_succeed'), __name__)
else:
logger.info(wording.get('processing_jobs_failed'), __name__)
@@ -129,6 +134,7 @@ def update_job_action(job_action : JobRunnerAction) -> gradio.Dropdown:
updated_job_ids = job_manager.find_job_ids('queued') or [ 'none' ]
return gradio.Dropdown(value = get_last(updated_job_ids), choices = updated_job_ids, visible = True)
if job_action == 'job-retry':
updated_job_ids = job_manager.find_job_ids('failed') or [ 'none' ]

View File

@@ -200,6 +200,7 @@ WORDING : Dict[str, Any] =\
'system_memory_limit': 'limit the available RAM that can be used while processing',
# misc
'log_level': 'adjust the message severity displayed in the terminal',
'halt_on_error': 'halt the program once an error occurred',
# run
'run': 'run the program',
'headless_run': 'run the program in headless mode',