import os import subprocess import torch from pyannote.audio import Pipeline import whisper # Using openai-whisper import whisperx # Still needed for assign_word_speakers import pandas as pd from dotenv import load_dotenv # For loading .env file # --- Configuration --- load_dotenv() # Load environment variables from .env file YOUR_MP4_FILE = "jeden-z-10-final.mp4" YOUR_HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") OPENAI_WHISPER_MODEL_SIZE = "large-v3" # Using the model that gave the best results PROCESS_AUDIO_DURATION_SECONDS = None # Set to None for full audio, or e.g. 30 for testing # Advanced Whisper transcription parameters for tuning WHISPER_BEAM_SIZE = 5 # Default for non-English is 5. Larger can be more accurate but slower. # Single low temperature for less randomness, potentially fewer hallucinations. # Whisper default is a tuple: (0.0, 0.2, 0.4, 0.6, 0.8, 1.0) WHISPER_TEMPERATURE = 0.0 WHISPER_PATIENCE = None # Default is 1.0. Higher values might help with repetition. WHISPER_NO_SPEECH_THRESHOLD = 0.6 # Default. Lower might detect more faint speech. # --- Paths --- CURRENT_DIR = os.getcwd() AUDIO_FILENAME = "extracted_audio.wav" AUDIO_FILE_PATH = os.path.join(CURRENT_DIR, AUDIO_FILENAME) MP4_FILE_PATH = os.path.join(CURRENT_DIR, YOUR_MP4_FILE) OUTPUT_TRANSCRIPT_FILE = "transcript_with_speakers.txt" # --- Device Selection --- PYANNOTE_DEVICE_TARGET = "cpu" WHISPER_DEVICE_TARGET = "cpu" if torch.backends.mps.is_available(): print("MPS device is available.") PYANNOTE_DEVICE_TARGET = "mps" WHISPER_DEVICE_TARGET = "mps" elif torch.cuda.is_available(): print("CUDA device is available.") PYANNOTE_DEVICE_TARGET = "cuda" WHISPER_DEVICE_TARGET = "cuda" else: print("MPS and CUDA not available. Using CPU for all operations.") print(f"Target Pyannote Device: {PYANNOTE_DEVICE_TARGET}") print(f"Target Whisper Device: {WHISPER_DEVICE_TARGET}") # --- Helper Function to Format Time --- def format_time(seconds): hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) millis = int((seconds - int(seconds)) * 1000) return f"{hours:02}:{minutes:02}:{secs:02}.{millis:03}" # --- Main Processing --- def main(): if not YOUR_HUGGINGFACE_TOKEN: print("ERROR: HUGGINGFACE_TOKEN not found. Please set it in your .env file.") return # --- 1. Extract Audio from MP4 --- print(f"Extracting audio from {YOUR_MP4_FILE}...") if not os.path.exists(MP4_FILE_PATH): print(f"ERROR: MP4 file not found at {MP4_FILE_PATH}") return ffmpeg_command = [ "ffmpeg", "-i", MP4_FILE_PATH, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", AUDIO_FILE_PATH, "-y" ] if PROCESS_AUDIO_DURATION_SECONDS is not None: ffmpeg_command.extend(["-ss", "0", "-t", str(PROCESS_AUDIO_DURATION_SECONDS)]) try: subprocess.run(ffmpeg_command, check=True, capture_output=True) duration_msg = f"for the first {PROCESS_AUDIO_DURATION_SECONDS} seconds" if PROCESS_AUDIO_DURATION_SECONDS else "for the full duration" print(f"Audio extracted {duration_msg} to {AUDIO_FILE_PATH}") except subprocess.CalledProcessError as e: print(f"ERROR: ffmpeg audio extraction failed.") print("FFmpeg stdout:", e.stdout.decode() if e.stdout else "N/A") print("FFmpeg stderr:", e.stderr.decode() if e.stderr else "N/A") return except FileNotFoundError: print("ERROR: ffmpeg command not found. Make sure FFmpeg is installed and in your PATH.") return # --- 2. Speaker Diarization (pyannote.audio) --- actual_pyannote_device = PYANNOTE_DEVICE_TARGET print(f"Performing speaker diarization using device: {actual_pyannote_device}...") try: diarization_pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", use_auth_token=YOUR_HUGGINGFACE_TOKEN ) diarization_pipeline.to(torch.device(actual_pyannote_device)) diarization_result_annotation = diarization_pipeline(AUDIO_FILE_PATH, num_speakers=None) print("Diarization complete.") except Exception as e: print(f"ERROR: Speaker diarization on {actual_pyannote_device} failed: {e}") if actual_pyannote_device == "mps": print("Attempting diarization on CPU as a fallback...") actual_pyannote_device = "cpu" try: diarization_pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", use_auth_token=YOUR_HUGGINGFACE_TOKEN ) diarization_pipeline.to(torch.device(actual_pyannote_device)) diarization_result_annotation = diarization_pipeline(AUDIO_FILE_PATH, num_speakers=None) print("Diarization on CPU fallback complete.") except Exception as e_cpu: print(f"ERROR: Speaker diarization on CPU fallback also failed: {e_cpu}") import traceback print(traceback.format_exc()) return else: import traceback print(traceback.format_exc()) return diarization_segments_for_df = [] for segment, _, speaker_label in diarization_result_annotation.itertracks(yield_label=True): diarization_segments_for_df.append({ 'start': segment.start, 'end': segment.end, 'speaker': speaker_label }) if not diarization_segments_for_df: print("Warning: No speaker segments found by pyannote.audio.") diarize_df = pd.DataFrame(columns=['start', 'end', 'speaker']) else: diarize_df = pd.DataFrame(diarization_segments_for_df) print(f"DEBUG: Diarization DataFrame head:\n{diarize_df.head()}") # --- 3. Transcription (OpenAI-Whisper) --- actual_whisper_device = WHISPER_DEVICE_TARGET print(f"Attempting OpenAI-Whisper model '{OPENAI_WHISPER_MODEL_SIZE}' on device: {actual_whisper_device}...") transcription_result_openai = None model = None transcribe_options = { "language": "pl", "fp16": (actual_whisper_device != "cpu"), "word_timestamps": True, "verbose": False, "beam_size": WHISPER_BEAM_SIZE, "patience": WHISPER_PATIENCE, "no_speech_threshold": WHISPER_NO_SPEECH_THRESHOLD, "temperature": WHISPER_TEMPERATURE } print(f"DEBUG: Whisper transcribe options: {transcribe_options}") try: model = whisper.load_model(OPENAI_WHISPER_MODEL_SIZE, device=actual_whisper_device) transcription_result_openai = model.transcribe(AUDIO_FILE_PATH, **transcribe_options) print(f"OpenAI-Whisper transcription on {actual_whisper_device} complete. Detected language (should be pl): {transcription_result_openai.get('language', 'N/A')}") except NotImplementedError as nie: print(f"ERROR: OpenAI-Whisper on {actual_whisper_device} failed with NotImplementedError: {nie}") if actual_whisper_device == "mps": print("Falling back to CPU for OpenAI-Whisper due to MPS NotImplementedError.") actual_whisper_device = "cpu" transcribe_options["fp16"] = False # CPU does not use fp16 print(f"DEBUG: Whisper transcribe options (CPU fallback): {transcribe_options}") try: model = whisper.load_model(OPENAI_WHISPER_MODEL_SIZE, device=actual_whisper_device) transcription_result_openai = model.transcribe(AUDIO_FILE_PATH, **transcribe_options) print(f"OpenAI-Whisper transcription on CPU fallback complete. Detected language (should be pl): {transcription_result_openai.get('language', 'N/A')}") except Exception as e_cpu: print(f"ERROR: OpenAI-Whisper on CPU fallback also failed: {e_cpu}") import traceback print(traceback.format_exc()) return else: import traceback print(traceback.format_exc()) return except Exception as e: print(f"ERROR: OpenAI-Whisper transcription on {actual_whisper_device} failed with other error: {e}") import traceback print(traceback.format_exc()) return if transcription_result_openai is None: print("ERROR: Transcription result is None after attempts. Cannot proceed.") return # --- 4. Format OpenAI-Whisper output for whisperx.assign_word_speakers --- formatted_transcript_for_speaker_assignment = {"segments": []} if "segments" not in transcription_result_openai or not transcription_result_openai["segments"]: print("Warning: OpenAI-Whisper produced no segments. Cannot assign speakers.") else: for seg_openai in transcription_result_openai["segments"]: segment_data = { "start": float(seg_openai["start"]), "end": float(seg_openai["end"]), "text": seg_openai["text"] } words_data = [] if "words" in seg_openai and seg_openai["words"]: for word_info in seg_openai["words"]: word_start = word_info.get("start") word_end = word_info.get("end") if word_start is not None and word_end is not None: words_data.append({ "word": word_info["word"], "start": float(word_start), "end": float(word_end), "score": float(word_info.get("probability", 0.0)) }) else: print(f"DEBUG: Skipping word with missing start/end: {word_info}") else: words_data.append({ "word": seg_openai["text"], "start": float(seg_openai["start"]), "end": float(seg_openai["end"]), "score": 0.0 }) segment_data["words"] = words_data formatted_transcript_for_speaker_assignment["segments"].append(segment_data) print(f"DEBUG: Total segments formatted for speaker assignment: {len(formatted_transcript_for_speaker_assignment['segments'])}") if formatted_transcript_for_speaker_assignment['segments']: print(f"DEBUG: First formatted segment: {formatted_transcript_for_speaker_assignment['segments'][0]}") # --- 5. Assign Speaker Labels (using whisperx.assign_word_speakers) --- print("Assigning speaker labels using whisperx.assign_word_speakers...") result_with_speakers = {"segments": []} if not formatted_transcript_for_speaker_assignment["segments"]: print("Skipping speaker assignment as there are no transcribed segments.") elif diarize_df.empty and formatted_transcript_for_speaker_assignment["segments"]: print("Warning: No diarization segments, but transcription exists. Output will not have speaker labels.") for seg in formatted_transcript_for_speaker_assignment["segments"]: new_seg = seg.copy() new_seg['speaker'] = '[NO_DIARIZATION_DATA]' if 'words' in new_seg: for word_data in new_seg['words']: word_data['speaker'] = '[NO_DIARIZATION_DATA]' result_with_speakers["segments"].append(new_seg) else: try: result_with_speakers = whisperx.assign_word_speakers( diarize_df, formatted_transcript_for_speaker_assignment ) print("Speaker assignment complete.") except Exception as e: print(f"ERROR: whisperx.assign_word_speakers failed: {e}") import traceback print(traceback.format_exc()) for seg_idx, seg_data in enumerate(formatted_transcript_for_speaker_assignment["segments"]): new_seg = seg_data.copy() new_seg['speaker'] = f'[SPEAKER_ASSIGNMENT_FAILED_{seg_idx}]' result_with_speakers["segments"].append(new_seg) print("Fell back to transcript with generic error speaker labels due to assignment error.") # --- 6. Format and Save Output --- print(f"Saving transcript to {OUTPUT_TRANSCRIPT_FILE}...") with open(OUTPUT_TRANSCRIPT_FILE, "w", encoding="utf-8") as f: if not result_with_speakers or "segments" not in result_with_speakers or not result_with_speakers["segments"]: f.write("No transcription results to save or speaker assignment failed to produce segments.\n") if not diarize_df.empty: f.write("\nDiarization Segments (if any, without transcription matching):\n") for _, row in diarize_df.iterrows(): f.write(f"Speaker: {row['speaker']} ({format_time(row['start'])} - {format_time(row['end'])})\n") else: for segment_info in result_with_speakers["segments"]: speaker_label = segment_info.get('speaker', '[SPEAKER_NOT_ASSIGNED]') text = segment_info.get("text", "").strip() start_time = float(segment_info.get("start", 0.0)) end_time = float(segment_info.get("end", 0.0)) if not text: continue f.write(f"{speaker_label} ({format_time(start_time)} - {format_time(end_time)}): {text}\n\n") print("--- Process Complete! ---") print(f"Find your transcript at: {OUTPUT_TRANSCRIPT_FILE}") print("\nNote: Speakers are labeled generically (e.g., SPEAKER_00, SPEAKER_01, or error labels).") print("You'll need to listen to identify and map these to actual person names.") if __name__ == "__main__": main()