145 lines
4.2 KiB
Python
145 lines
4.2 KiB
Python
from functools import lru_cache
|
|
from typing import List
|
|
|
|
import numpy
|
|
from tqdm import tqdm
|
|
|
|
from facefusion import inference_manager, state_manager, wording
|
|
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url
|
|
from facefusion.filesystem import resolve_relative_path
|
|
from facefusion.thread_helper import conditional_thread_semaphore
|
|
from facefusion.types import Detection, DownloadScope, Fps, InferencePool, ModelOptions, ModelSet, Score, VisionFrame
|
|
from facefusion.vision import detect_video_fps, fit_frame, read_image, read_video_frame
|
|
|
|
STREAM_COUNTER = 0
|
|
|
|
|
|
@lru_cache(maxsize = None)
|
|
def create_static_model_set(download_scope : DownloadScope) -> ModelSet:
|
|
return\
|
|
{
|
|
'yolo_nsfw':
|
|
{
|
|
'hashes':
|
|
{
|
|
'content_analyser':
|
|
{
|
|
'url': resolve_download_url('models-3.2.0', 'yolo_11m_nsfw.hash'),
|
|
'path': resolve_relative_path('../.assets/models/yolo_11m_nsfw.hash')
|
|
}
|
|
},
|
|
'sources':
|
|
{
|
|
'content_analyser':
|
|
{
|
|
'url': resolve_download_url('models-3.2.0', 'yolo_11m_nsfw.onnx'),
|
|
'path': resolve_relative_path('../.assets/models/yolo_11m_nsfw.onnx')
|
|
}
|
|
},
|
|
'size': (640, 640)
|
|
}
|
|
}
|
|
|
|
|
|
def get_inference_pool() -> InferencePool:
|
|
model_names = [ 'yolo_nsfw' ]
|
|
model_source_set = get_model_options().get('sources')
|
|
|
|
return inference_manager.get_inference_pool(__name__, model_names, model_source_set)
|
|
|
|
|
|
def clear_inference_pool() -> None:
|
|
model_names = [ 'yolo_nsfw' ]
|
|
inference_manager.clear_inference_pool(__name__, model_names)
|
|
|
|
|
|
def get_model_options() -> ModelOptions:
|
|
return create_static_model_set('full').get('yolo_nsfw')
|
|
|
|
|
|
def pre_check() -> bool:
|
|
model_hash_set = get_model_options().get('hashes')
|
|
model_source_set = get_model_options().get('sources')
|
|
|
|
return conditional_download_hashes(model_hash_set) and conditional_download_sources(model_source_set)
|
|
|
|
|
|
def analyse_stream(vision_frame : VisionFrame, video_fps : Fps) -> bool:
|
|
global STREAM_COUNTER
|
|
|
|
STREAM_COUNTER = STREAM_COUNTER + 1
|
|
if STREAM_COUNTER % int(video_fps) == 0:
|
|
return analyse_frame(vision_frame)
|
|
return False
|
|
|
|
|
|
def analyse_frame(vision_frame : VisionFrame) -> bool:
|
|
nsfw_scores = detect_nsfw(vision_frame)
|
|
|
|
return len(nsfw_scores) > 0
|
|
|
|
|
|
@lru_cache(maxsize = None)
|
|
def analyse_image(image_path : str) -> bool:
|
|
vision_frame = read_image(image_path)
|
|
return analyse_frame(vision_frame)
|
|
|
|
|
|
@lru_cache(maxsize = None)
|
|
def analyse_video(video_path : str, trim_frame_start : int, trim_frame_end : int) -> bool:
|
|
video_fps = detect_video_fps(video_path)
|
|
frame_range = range(trim_frame_start, trim_frame_end)
|
|
rate = 0.0
|
|
total = 0
|
|
counter = 0
|
|
|
|
with tqdm(total = len(frame_range), desc = wording.get('analysing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
|
|
|
|
for frame_number in frame_range:
|
|
if frame_number % int(video_fps) == 0:
|
|
vision_frame = read_video_frame(video_path, frame_number)
|
|
total += 1
|
|
if analyse_frame(vision_frame):
|
|
counter += 1
|
|
if counter > 0 and total > 0:
|
|
rate = counter / total * 100
|
|
progress.set_postfix(rate = rate)
|
|
progress.update()
|
|
|
|
return rate > 10.0
|
|
|
|
|
|
def detect_nsfw(vision_frame : VisionFrame) -> List[Score]:
|
|
nsfw_scores = []
|
|
model_size = get_model_options().get('size')
|
|
temp_vision_frame = fit_frame(vision_frame, model_size)
|
|
detect_vision_frame = prepare_detect_frame(temp_vision_frame)
|
|
detection = forward(detect_vision_frame)
|
|
detection = numpy.squeeze(detection).T
|
|
nsfw_scores_raw = numpy.amax(detection[:, 4:], axis = 1)
|
|
keep_indices = numpy.where(nsfw_scores_raw > 0.2)[0]
|
|
|
|
if numpy.any(keep_indices):
|
|
nsfw_scores_raw = nsfw_scores_raw[keep_indices]
|
|
nsfw_scores = nsfw_scores_raw.ravel().tolist()
|
|
|
|
return nsfw_scores
|
|
|
|
|
|
def forward(vision_frame : VisionFrame) -> Detection:
|
|
content_analyser = get_inference_pool().get('content_analyser')
|
|
|
|
with conditional_thread_semaphore():
|
|
detection = content_analyser.run(None,
|
|
{
|
|
'input': vision_frame
|
|
})
|
|
|
|
return detection
|
|
|
|
|
|
def prepare_detect_frame(temp_vision_frame : VisionFrame) -> VisionFrame:
|
|
detect_vision_frame = temp_vision_frame / 255.0
|
|
detect_vision_frame = numpy.expand_dims(detect_vision_frame.transpose(2, 0, 1), axis = 0).astype(numpy.float32)
|
|
return detect_vision_frame
|