Files
trivia-transcriber/transcribe.py
2025-06-04 15:40:54 +02:00

293 lines
14 KiB
Python

import os
import subprocess
import torch
from pyannote.audio import Pipeline
import whisper # Using openai-whisper
import whisperx # Still needed for assign_word_speakers
import pandas as pd
from dotenv import load_dotenv # For loading .env file
# --- Configuration ---
load_dotenv() # Load environment variables from .env file
YOUR_MP4_FILE = "jeden-z-10-final.mp4"
YOUR_HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN")
OPENAI_WHISPER_MODEL_SIZE = "large-v3" # Using the model that gave the best results
PROCESS_AUDIO_DURATION_SECONDS = None # Set to None for full audio, or e.g. 30 for testing
# Advanced Whisper transcription parameters for tuning
WHISPER_BEAM_SIZE = 5 # Default for non-English is 5. Larger can be more accurate but slower.
# Single low temperature for less randomness, potentially fewer hallucinations.
# Whisper default is a tuple: (0.0, 0.2, 0.4, 0.6, 0.8, 1.0)
WHISPER_TEMPERATURE = 0.0
WHISPER_PATIENCE = None # Default is 1.0. Higher values might help with repetition.
WHISPER_NO_SPEECH_THRESHOLD = 0.6 # Default. Lower might detect more faint speech.
# --- Paths ---
CURRENT_DIR = os.getcwd()
AUDIO_FILENAME = "extracted_audio.wav"
AUDIO_FILE_PATH = os.path.join(CURRENT_DIR, AUDIO_FILENAME)
MP4_FILE_PATH = os.path.join(CURRENT_DIR, YOUR_MP4_FILE)
OUTPUT_TRANSCRIPT_FILE = "transcript_with_speakers.txt"
# --- Device Selection ---
PYANNOTE_DEVICE_TARGET = "cpu"
WHISPER_DEVICE_TARGET = "cpu"
if torch.backends.mps.is_available():
print("MPS device is available.")
PYANNOTE_DEVICE_TARGET = "mps"
WHISPER_DEVICE_TARGET = "mps"
elif torch.cuda.is_available():
print("CUDA device is available.")
PYANNOTE_DEVICE_TARGET = "cuda"
WHISPER_DEVICE_TARGET = "cuda"
else:
print("MPS and CUDA not available. Using CPU for all operations.")
print(f"Target Pyannote Device: {PYANNOTE_DEVICE_TARGET}")
print(f"Target Whisper Device: {WHISPER_DEVICE_TARGET}")
# --- Helper Function to Format Time ---
def format_time(seconds):
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds - int(seconds)) * 1000)
return f"{hours:02}:{minutes:02}:{secs:02}.{millis:03}"
# --- Main Processing ---
def main():
if not YOUR_HUGGINGFACE_TOKEN:
print("ERROR: HUGGINGFACE_TOKEN not found. Please set it in your .env file.")
return
# --- 1. Extract Audio from MP4 ---
print(f"Extracting audio from {YOUR_MP4_FILE}...")
if not os.path.exists(MP4_FILE_PATH):
print(f"ERROR: MP4 file not found at {MP4_FILE_PATH}")
return
ffmpeg_command = [
"ffmpeg", "-i", MP4_FILE_PATH,
"-vn", "-acodec", "pcm_s16le",
"-ar", "16000", "-ac", "1",
AUDIO_FILE_PATH, "-y"
]
if PROCESS_AUDIO_DURATION_SECONDS is not None:
ffmpeg_command.extend(["-ss", "0", "-t", str(PROCESS_AUDIO_DURATION_SECONDS)])
try:
subprocess.run(ffmpeg_command, check=True, capture_output=True)
duration_msg = f"for the first {PROCESS_AUDIO_DURATION_SECONDS} seconds" if PROCESS_AUDIO_DURATION_SECONDS else "for the full duration"
print(f"Audio extracted {duration_msg} to {AUDIO_FILE_PATH}")
except subprocess.CalledProcessError as e:
print(f"ERROR: ffmpeg audio extraction failed.")
print("FFmpeg stdout:", e.stdout.decode() if e.stdout else "N/A")
print("FFmpeg stderr:", e.stderr.decode() if e.stderr else "N/A")
return
except FileNotFoundError:
print("ERROR: ffmpeg command not found. Make sure FFmpeg is installed and in your PATH.")
return
# --- 2. Speaker Diarization (pyannote.audio) ---
actual_pyannote_device = PYANNOTE_DEVICE_TARGET
print(f"Performing speaker diarization using device: {actual_pyannote_device}...")
try:
diarization_pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=YOUR_HUGGINGFACE_TOKEN
)
diarization_pipeline.to(torch.device(actual_pyannote_device))
diarization_result_annotation = diarization_pipeline(AUDIO_FILE_PATH, num_speakers=None)
print("Diarization complete.")
except Exception as e:
print(f"ERROR: Speaker diarization on {actual_pyannote_device} failed: {e}")
if actual_pyannote_device == "mps":
print("Attempting diarization on CPU as a fallback...")
actual_pyannote_device = "cpu"
try:
diarization_pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=YOUR_HUGGINGFACE_TOKEN
)
diarization_pipeline.to(torch.device(actual_pyannote_device))
diarization_result_annotation = diarization_pipeline(AUDIO_FILE_PATH, num_speakers=None)
print("Diarization on CPU fallback complete.")
except Exception as e_cpu:
print(f"ERROR: Speaker diarization on CPU fallback also failed: {e_cpu}")
import traceback
print(traceback.format_exc())
return
else:
import traceback
print(traceback.format_exc())
return
diarization_segments_for_df = []
for segment, _, speaker_label in diarization_result_annotation.itertracks(yield_label=True):
diarization_segments_for_df.append({
'start': segment.start,
'end': segment.end,
'speaker': speaker_label
})
if not diarization_segments_for_df:
print("Warning: No speaker segments found by pyannote.audio.")
diarize_df = pd.DataFrame(columns=['start', 'end', 'speaker'])
else:
diarize_df = pd.DataFrame(diarization_segments_for_df)
print(f"DEBUG: Diarization DataFrame head:\n{diarize_df.head()}")
# --- 3. Transcription (OpenAI-Whisper) ---
actual_whisper_device = WHISPER_DEVICE_TARGET
print(f"Attempting OpenAI-Whisper model '{OPENAI_WHISPER_MODEL_SIZE}' on device: {actual_whisper_device}...")
transcription_result_openai = None
model = None
transcribe_options = {
"language": "pl",
"fp16": (actual_whisper_device != "cpu"),
"word_timestamps": True,
"verbose": False,
"beam_size": WHISPER_BEAM_SIZE,
"patience": WHISPER_PATIENCE,
"no_speech_threshold": WHISPER_NO_SPEECH_THRESHOLD,
"temperature": WHISPER_TEMPERATURE
}
print(f"DEBUG: Whisper transcribe options: {transcribe_options}")
try:
model = whisper.load_model(OPENAI_WHISPER_MODEL_SIZE, device=actual_whisper_device)
transcription_result_openai = model.transcribe(AUDIO_FILE_PATH, **transcribe_options)
print(f"OpenAI-Whisper transcription on {actual_whisper_device} complete. Detected language (should be pl): {transcription_result_openai.get('language', 'N/A')}")
except NotImplementedError as nie:
print(f"ERROR: OpenAI-Whisper on {actual_whisper_device} failed with NotImplementedError: {nie}")
if actual_whisper_device == "mps":
print("Falling back to CPU for OpenAI-Whisper due to MPS NotImplementedError.")
actual_whisper_device = "cpu"
transcribe_options["fp16"] = False # CPU does not use fp16
print(f"DEBUG: Whisper transcribe options (CPU fallback): {transcribe_options}")
try:
model = whisper.load_model(OPENAI_WHISPER_MODEL_SIZE, device=actual_whisper_device)
transcription_result_openai = model.transcribe(AUDIO_FILE_PATH, **transcribe_options)
print(f"OpenAI-Whisper transcription on CPU fallback complete. Detected language (should be pl): {transcription_result_openai.get('language', 'N/A')}")
except Exception as e_cpu:
print(f"ERROR: OpenAI-Whisper on CPU fallback also failed: {e_cpu}")
import traceback
print(traceback.format_exc())
return
else:
import traceback
print(traceback.format_exc())
return
except Exception as e:
print(f"ERROR: OpenAI-Whisper transcription on {actual_whisper_device} failed with other error: {e}")
import traceback
print(traceback.format_exc())
return
if transcription_result_openai is None:
print("ERROR: Transcription result is None after attempts. Cannot proceed.")
return
# --- 4. Format OpenAI-Whisper output for whisperx.assign_word_speakers ---
formatted_transcript_for_speaker_assignment = {"segments": []}
if "segments" not in transcription_result_openai or not transcription_result_openai["segments"]:
print("Warning: OpenAI-Whisper produced no segments. Cannot assign speakers.")
else:
for seg_openai in transcription_result_openai["segments"]:
segment_data = {
"start": float(seg_openai["start"]),
"end": float(seg_openai["end"]),
"text": seg_openai["text"]
}
words_data = []
if "words" in seg_openai and seg_openai["words"]:
for word_info in seg_openai["words"]:
word_start = word_info.get("start")
word_end = word_info.get("end")
if word_start is not None and word_end is not None:
words_data.append({
"word": word_info["word"],
"start": float(word_start),
"end": float(word_end),
"score": float(word_info.get("probability", 0.0))
})
else:
print(f"DEBUG: Skipping word with missing start/end: {word_info}")
else:
words_data.append({
"word": seg_openai["text"],
"start": float(seg_openai["start"]),
"end": float(seg_openai["end"]),
"score": 0.0
})
segment_data["words"] = words_data
formatted_transcript_for_speaker_assignment["segments"].append(segment_data)
print(f"DEBUG: Total segments formatted for speaker assignment: {len(formatted_transcript_for_speaker_assignment['segments'])}")
if formatted_transcript_for_speaker_assignment['segments']:
print(f"DEBUG: First formatted segment: {formatted_transcript_for_speaker_assignment['segments'][0]}")
# --- 5. Assign Speaker Labels (using whisperx.assign_word_speakers) ---
print("Assigning speaker labels using whisperx.assign_word_speakers...")
result_with_speakers = {"segments": []}
if not formatted_transcript_for_speaker_assignment["segments"]:
print("Skipping speaker assignment as there are no transcribed segments.")
elif diarize_df.empty and formatted_transcript_for_speaker_assignment["segments"]:
print("Warning: No diarization segments, but transcription exists. Output will not have speaker labels.")
for seg in formatted_transcript_for_speaker_assignment["segments"]:
new_seg = seg.copy()
new_seg['speaker'] = '[NO_DIARIZATION_DATA]'
if 'words' in new_seg:
for word_data in new_seg['words']:
word_data['speaker'] = '[NO_DIARIZATION_DATA]'
result_with_speakers["segments"].append(new_seg)
else:
try:
result_with_speakers = whisperx.assign_word_speakers(
diarize_df,
formatted_transcript_for_speaker_assignment
)
print("Speaker assignment complete.")
except Exception as e:
print(f"ERROR: whisperx.assign_word_speakers failed: {e}")
import traceback
print(traceback.format_exc())
for seg_idx, seg_data in enumerate(formatted_transcript_for_speaker_assignment["segments"]):
new_seg = seg_data.copy()
new_seg['speaker'] = f'[SPEAKER_ASSIGNMENT_FAILED_{seg_idx}]'
result_with_speakers["segments"].append(new_seg)
print("Fell back to transcript with generic error speaker labels due to assignment error.")
# --- 6. Format and Save Output ---
print(f"Saving transcript to {OUTPUT_TRANSCRIPT_FILE}...")
with open(OUTPUT_TRANSCRIPT_FILE, "w", encoding="utf-8") as f:
if not result_with_speakers or "segments" not in result_with_speakers or not result_with_speakers["segments"]:
f.write("No transcription results to save or speaker assignment failed to produce segments.\n")
if not diarize_df.empty:
f.write("\nDiarization Segments (if any, without transcription matching):\n")
for _, row in diarize_df.iterrows():
f.write(f"Speaker: {row['speaker']} ({format_time(row['start'])} - {format_time(row['end'])})\n")
else:
for segment_info in result_with_speakers["segments"]:
speaker_label = segment_info.get('speaker', '[SPEAKER_NOT_ASSIGNED]')
text = segment_info.get("text", "").strip()
start_time = float(segment_info.get("start", 0.0))
end_time = float(segment_info.get("end", 0.0))
if not text:
continue
f.write(f"{speaker_label} ({format_time(start_time)} - {format_time(end_time)}): {text}\n\n")
print("--- Process Complete! ---")
print(f"Find your transcript at: {OUTPUT_TRANSCRIPT_FILE}")
print("\nNote: Speakers are labeled generically (e.g., SPEAKER_00, SPEAKER_01, or error labels).")
print("You'll need to listen to identify and map these to actual person names.")
if __name__ == "__main__":
main()