Merge pull request #8 from karl0ss/reworked

Reworked
This commit is contained in:
Karl0ss 2024-07-23 16:26:54 +01:00 committed by GitHub
commit bfb2332649
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 126 additions and 68 deletions

View File

@ -2,8 +2,6 @@ import argparse
from faster_whisper import available_models from faster_whisper import available_models
from utils.constants import LANGUAGE_CODES from utils.constants import LANGUAGE_CODES
from main import process from main import process
from utils.convert import str2bool, str2timeinterval
def main(): def main():
""" """
@ -12,15 +10,20 @@ def main():
Parses command line arguments, processes the inputs using the specified options, Parses command line arguments, processes the inputs using the specified options,
and performs transcription or translation based on the specified task. and performs transcription or translation based on the specified task.
""" """
# Create an ArgumentParser object with a specific formatter for default values
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter formatter_class=argparse.ArgumentDefaultsHelpFormatter
) )
# Add argument for selecting the Whisper model
parser.add_argument( parser.add_argument(
"--model", "--model",
default="small", default="small",
choices=available_models(), choices=available_models(),
help="name of the Whisper model to use", help="name of the Whisper model to use",
) )
# Add argument for specifying the device to use (CPU, CUDA, or auto-detect)
parser.add_argument( parser.add_argument(
"--device", "--device",
type=str, type=str,
@ -28,35 +31,24 @@ def main():
choices=["cpu", "cuda", "auto"], choices=["cpu", "cuda", "auto"],
help='Device to use for computation ("cpu", "cuda", "auto")', help='Device to use for computation ("cpu", "cuda", "auto")',
) )
# parser.add_argument(
# "--compute_type", # Add argument for processing a single file
# type=str,
# default="default",
# choices=[
# "int8",
# "int8_float32",
# "int8_float16",
# "int8_bfloat16",
# "int16",
# "float16",
# "bfloat16",
# "float32",
# ],
# help="Type to use for computation. \
# See https://opennmt.net/CTranslate2/quantization.html.",
# )
parser.add_argument( parser.add_argument(
"--file", "--file",
type=str, type=str,
default=None, default=None,
help="Process a single file" help="Process a single file"
) )
# Add argument for processing all videos in a folder
parser.add_argument( parser.add_argument(
"--folder", "--folder",
type=str, type=str,
default=None, default=None,
help="Process all videos in folder" help="Process all videos in folder"
) )
# Add argument for specifying the task: transcribe or translate
parser.add_argument( parser.add_argument(
"--show", "--show",
type=str, type=str,
@ -64,6 +56,8 @@ def main():
help="whether to perform X->X speech recognition ('transcribe') \ help="whether to perform X->X speech recognition ('transcribe') \
or X->English translation ('translate')", or X->English translation ('translate')",
) )
# Add argument for setting the origin language of the video, with auto-detection as default
parser.add_argument( parser.add_argument(
"--language", "--language",
type=str, type=str,
@ -72,16 +66,20 @@ def main():
help="What is the origin language of the video? \ help="What is the origin language of the video? \
If unset, it is detected automatically.", If unset, it is detected automatically.",
) )
# Add argument for selecting the backend: whisper or faster_whisper
parser.add_argument( parser.add_argument(
"--backend", "--backend",
type=str, type=str,
default="whisper", default="whisper",
choices=["whisper", "faster_whisper"], choices=["whisper", "faster_whisper"],
) )
# Parse the command line arguments into a dictionary
args = parser.parse_args().__dict__ args = parser.parse_args().__dict__
# Call the process function with the parsed arguments
process(args) process(args)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -2,8 +2,9 @@ import os
import warnings import warnings
import tempfile import tempfile
import time import time
from typing import List, Dict, Any
from utils.files import filename, write_srt from utils.files import filename, write_srt
from utils.ffmpeg import get_audio, add_subtitles_to_mp4 from utils.ffmpeg import get_audio, add_subtitles_to_mp4, check_for_subtitles
from utils.bazarr import get_wanted_episodes, get_episode_details, sync_series from utils.bazarr import get_wanted_episodes, get_episode_details, sync_series
from utils.sonarr import update_show_in_sonarr from utils.sonarr import update_show_in_sonarr
from utils.faster_whisper import WhisperAI as fasterWhisperAI from utils.faster_whisper import WhisperAI as fasterWhisperAI
@ -11,57 +12,101 @@ from utils.whisper import WhisperAI
from utils.decorator import measure_time from utils.decorator import measure_time
def process_audio_and_subtitles(file_path: str, model_args: Dict[str, Any], args: Dict[str, Any], backend: str) -> None:
"""Processes audio extraction and subtitle generation for a given file.
def folder_flow(folder, model_args, args, backend): Args:
print(f"Processing {folder}") file_path (str): Path to the video file.
files = os.listdir(folder) model_args (Dict[str, Any]): Model arguments for subtitle generation.
for file in files: args (Dict[str, Any]): Additional arguments for subtitle generation.
print(f"processing {file}") backend (str): Backend to use ('whisper' or 'faster_whisper').
path = folder+file
try:
audios = get_audio([path], 0, None)
subtitles = get_subtitles(audios, tempfile.gettempdir(), model_args, args, backend)
add_subtitles_to_mp4(subtitles) Returns:
time.sleep(5) None
except Exception as ex: """
print(f"skipping file due to - {ex}") try:
audios = get_audio([file_path], 0, None)
subtitles = get_subtitles(audios, tempfile.gettempdir(), model_args, args, backend)
add_subtitles_to_mp4(subtitles)
time.sleep(5)
except Exception as ex:
print(f"Skipping file {file_path} due to - {ex}")
def file_flow(show, model_args, args, backend):
print(f"Processing {show}")
try:
audios = get_audio([show], 0, None)
subtitles = get_subtitles(audios, tempfile.gettempdir(), model_args, args, backend)
add_subtitles_to_mp4(subtitles) def folder_flow(folder: str, model_args: Dict[str, Any], args: Dict[str, Any], backend: str) -> None:
time.sleep(5) """Processes all files within a specified folder.
except Exception as ex:
print(f"skipping file due to - {ex}")
def bazzar_flow(show, model_args, args, backend): Args:
folder (str): Path to the folder containing video files.
model_args (Dict[str, Any]): Model arguments for subtitle generation.
args (Dict[str, Any]): Additional arguments for subtitle generation.
backend (str): Backend to use ('whisper' or 'faster_whisper').
Returns:
None
"""
print(f"Processing folder {folder}")
files = os.listdir(folder)
for file in files:
path = os.path.join(folder, file)
print(f"Processing file {path}")
if not check_for_subtitles(path):
process_audio_and_subtitles(path, model_args, args, backend)
def file_flow(file_path: str, model_args: Dict[str, Any], args: Dict[str, Any], backend: str) -> None:
"""Processes a single specified file.
Args:
file_path (str): Path to the video file.
model_args (Dict[str, Any]): Model arguments for subtitle generation.
args (Dict[str, Any]): Additional arguments for subtitle generation.
backend (str): Backend to use ('whisper' or 'faster_whisper').
Returns:
None
"""
print(f"Processing file {file_path}")
if not check_for_subtitles(file_path):
process_audio_and_subtitles(file_path, model_args, args, backend)
def bazzar_flow(show: str, model_args: Dict[str, Any], args: Dict[str, Any], backend: str) -> None:
"""Processes episodes needing subtitles from Bazarr API.
Args:
show (str): The show name.
model_args (Dict[str, Any]): Model arguments for subtitle generation.
args (Dict[str, Any]): Additional arguments for subtitle generation.
backend (str): Backend to use ('whisper' or 'faster_whisper').
Returns:
None
"""
list_of_episodes_needing_subtitles = get_wanted_episodes(show) list_of_episodes_needing_subtitles = get_wanted_episodes(show)
print( print(f"Found {list_of_episodes_needing_subtitles['total']} episodes needing subtitles.")
f"Found {list_of_episodes_needing_subtitles['total']} episodes needing subtitles."
)
for episode in list_of_episodes_needing_subtitles["data"]: for episode in list_of_episodes_needing_subtitles["data"]:
print(f"Processing {episode['seriesTitle']} - {episode['episode_number']}") print(f"Processing {episode['seriesTitle']} - {episode['episode_number']}")
episode_data = get_episode_details(episode["sonarrEpisodeId"]) episode_data = get_episode_details(episode["sonarrEpisodeId"])
try: process_audio_and_subtitles(episode_data["path"], model_args, args, backend)
audios = get_audio([episode_data["path"]], 0, None) update_show_in_sonarr(episode["sonarrSeriesId"])
subtitles = get_subtitles(audios, tempfile.gettempdir(), model_args, args, backend) sync_series()
add_subtitles_to_mp4(subtitles)
update_show_in_sonarr(episode["sonarrSeriesId"])
time.sleep(5)
sync_series()
except Exception as ex:
print(f"skipping file due to - {ex}")
@measure_time @measure_time
def get_subtitles( def get_subtitles(audio_paths: List[str], output_dir: str, model_args: Dict[str, Any], transcribe_args: Dict[str, Any], backend: str) -> Dict[str, str]:
audio_paths: list, output_dir: str, model_args: dict, transcribe_args: dict, backend: str """Generates subtitles for given audio files using the specified model.
):
Args:
audio_paths (List[str]): List of paths to the audio files.
output_dir (str): Directory to save the generated subtitle files.
model_args (Dict[str, Any]): Model arguments for subtitle generation.
transcribe_args (Dict[str, Any]): Transcription arguments for subtitle generation.
backend (str): Backend to use ('whisper' or 'faster_whisper').
Returns:
Dict[str, str]: A dictionary mapping audio file paths to generated subtitle file paths.
"""
if backend == 'whisper': if backend == 'whisper':
model = WhisperAI(model_args, transcribe_args) model = WhisperAI(model_args, transcribe_args)
else: else:
@ -82,8 +127,15 @@ def get_subtitles(
return subtitles_path return subtitles_path
def process(args: dict): def process(args: Dict[str, Any]) -> None:
"""Main entry point to determine which processing flow to use.
Args:
args (Dict[str, Any]): Dictionary of arguments including model, language, show, file, folder, and backend.
Returns:
None
"""
model_name: str = args.pop("model") model_name: str = args.pop("model")
language: str = args.pop("language") language: str = args.pop("language")
show: str = args.pop("show") show: str = args.pop("show")
@ -92,16 +144,12 @@ def process(args: dict):
backend: str = args.pop("backend") backend: str = args.pop("backend")
if model_name.endswith(".en"): if model_name.endswith(".en"):
warnings.warn( warnings.warn(f"{model_name} is an English-only model, forcing English detection.")
f"{model_name} is an English-only model, forcing English detection."
)
args["language"] = "en" args["language"] = "en"
# if translate task used and language argument is set, then use it
elif language != "auto": elif language != "auto":
args["language"] = language args["language"] = language
model_args = {} model_args = {"device": args.pop("device")}
model_args["device"] = args.pop("device")
if file: if file:
file_flow(file, model_args, args, backend) file_flow(file, model_args, args, backend)

View File

@ -4,6 +4,18 @@ import ffmpeg
from .files import filename from .files import filename
def check_for_subtitles(video_path:str):
# Probe the video file to get information about its streams
probe = ffmpeg.probe(video_path)
# Check if there are any subtitle streams
for stream in probe['streams']:
if stream['codec_type'] == 'subtitle':
print("File has subtitles")
return True
return False
def get_audio(paths: list, audio_channel_index: int, sample_interval: list): def get_audio(paths: list, audio_channel_index: int, sample_interval: list):
temp_dir = tempfile.gettempdir() temp_dir = tempfile.gettempdir()