282 lines
14 KiB
Python
282 lines
14 KiB
Python
import os
|
|
import subprocess
|
|
import torch
|
|
from pyannote.audio import Pipeline
|
|
import whisper # Using openai-whisper
|
|
import whisperx # Still needed for assign_word_speakers
|
|
import pandas as pd
|
|
from dotenv import load_dotenv # For loading .env file
|
|
|
|
# --- Configuration ---
|
|
load_dotenv() # Load environment variables from .env file
|
|
YOUR_MP4_FILE = "jeden-z-10-final.mp4" # Replace with your MP4 file name
|
|
YOUR_HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # Ensure this is set in your .env file
|
|
# For OpenAI-Whisper: "tiny", "base", "small", "medium", "large", "large-v2", "large-v3"
|
|
# Larger models are more accurate but slower. "small" or "medium" are good next steps for Polish.
|
|
OPENAI_WHISPER_MODEL_SIZE = "large-v3" # Changed from "base" to "small" for better accuracy
|
|
# For FFmpeg audio extraction - set to None to process full audio
|
|
PROCESS_AUDIO_DURATION_SECONDS = None # User specified
|
|
|
|
# --- Paths ---
|
|
CURRENT_DIR = os.getcwd()
|
|
AUDIO_FILENAME = "extracted_audio.wav"
|
|
AUDIO_FILE_PATH = os.path.join(CURRENT_DIR, AUDIO_FILENAME)
|
|
MP4_FILE_PATH = os.path.join(CURRENT_DIR, YOUR_MP4_FILE)
|
|
OUTPUT_TRANSCRIPT_FILE = "transcript_with_speakers.txt"
|
|
|
|
# --- Device Selection ---
|
|
PYANNOTE_DEVICE_TARGET = "cpu" # Default
|
|
WHISPER_DEVICE_TARGET = "cpu" # Default
|
|
|
|
if torch.backends.mps.is_available():
|
|
print("MPS device is available.")
|
|
PYANNOTE_DEVICE_TARGET = "mps"
|
|
WHISPER_DEVICE_TARGET = "mps"
|
|
elif torch.cuda.is_available():
|
|
print("CUDA device is available.")
|
|
PYANNOTE_DEVICE_TARGET = "cuda"
|
|
WHISPER_DEVICE_TARGET = "cuda"
|
|
else:
|
|
print("MPS and CUDA not available. Using CPU for all operations.")
|
|
|
|
print(f"Target Pyannote Device: {PYANNOTE_DEVICE_TARGET}")
|
|
print(f"Target Whisper Device: {WHISPER_DEVICE_TARGET}")
|
|
|
|
|
|
# --- Helper Function to Format Time ---
|
|
def format_time(seconds):
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = int(seconds % 60)
|
|
millis = int((seconds - int(seconds)) * 1000)
|
|
return f"{hours:02}:{minutes:02}:{secs:02}.{millis:03}"
|
|
|
|
# --- Main Processing ---
|
|
def main():
|
|
if not YOUR_HUGGINGFACE_TOKEN:
|
|
print("ERROR: HUGGINGFACE_TOKEN not found. Please set it in your .env file.")
|
|
return
|
|
|
|
# --- 1. Extract Audio from MP4 ---
|
|
print(f"Extracting audio from {YOUR_MP4_FILE}...")
|
|
if not os.path.exists(MP4_FILE_PATH):
|
|
print(f"ERROR: MP4 file not found at {MP4_FILE_PATH}")
|
|
return
|
|
|
|
ffmpeg_command = [
|
|
"ffmpeg", "-i", MP4_FILE_PATH,
|
|
"-vn", "-acodec", "pcm_s16le",
|
|
"-ar", "16000", "-ac", "1",
|
|
AUDIO_FILE_PATH, "-y"
|
|
]
|
|
if PROCESS_AUDIO_DURATION_SECONDS is not None:
|
|
ffmpeg_command.extend(["-ss", "0", "-t", str(PROCESS_AUDIO_DURATION_SECONDS)])
|
|
|
|
try:
|
|
subprocess.run(ffmpeg_command, check=True, capture_output=True)
|
|
duration_msg = f"for the first {PROCESS_AUDIO_DURATION_SECONDS} seconds" if PROCESS_AUDIO_DURATION_SECONDS else "for the full duration"
|
|
print(f"Audio extracted {duration_msg} to {AUDIO_FILE_PATH}")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"ERROR: ffmpeg audio extraction failed.")
|
|
print("FFmpeg stdout:", e.stdout.decode() if e.stdout else "N/A")
|
|
print("FFmpeg stderr:", e.stderr.decode() if e.stderr else "N/A")
|
|
return
|
|
except FileNotFoundError:
|
|
print("ERROR: ffmpeg command not found. Make sure FFmpeg is installed and in your PATH.")
|
|
return
|
|
|
|
# --- 2. Speaker Diarization (pyannote.audio) ---
|
|
actual_pyannote_device = PYANNOTE_DEVICE_TARGET
|
|
print(f"Performing speaker diarization using device: {actual_pyannote_device}...")
|
|
try:
|
|
diarization_pipeline = Pipeline.from_pretrained(
|
|
"pyannote/speaker-diarization-3.1",
|
|
use_auth_token=YOUR_HUGGINGFACE_TOKEN
|
|
)
|
|
diarization_pipeline.to(torch.device(actual_pyannote_device))
|
|
diarization_result_annotation = diarization_pipeline(AUDIO_FILE_PATH, num_speakers=None)
|
|
print("Diarization complete.")
|
|
except Exception as e:
|
|
print(f"ERROR: Speaker diarization on {actual_pyannote_device} failed: {e}")
|
|
if actual_pyannote_device == "mps": # Fallback for Pyannote if MPS fails
|
|
print("Attempting diarization on CPU as a fallback...")
|
|
actual_pyannote_device = "cpu"
|
|
try:
|
|
# Re-initialize pipeline for CPU, as the original might be stuck on MPS
|
|
diarization_pipeline = Pipeline.from_pretrained(
|
|
"pyannote/speaker-diarization-3.1",
|
|
use_auth_token=YOUR_HUGGINGFACE_TOKEN
|
|
)
|
|
diarization_pipeline.to(torch.device(actual_pyannote_device))
|
|
diarization_result_annotation = diarization_pipeline(AUDIO_FILE_PATH, num_speakers=None)
|
|
print("Diarization on CPU fallback complete.")
|
|
except Exception as e_cpu:
|
|
print(f"ERROR: Speaker diarization on CPU fallback also failed: {e_cpu}")
|
|
import traceback
|
|
print(traceback.format_exc())
|
|
return
|
|
else: # If it wasn't MPS or CPU fallback already failed
|
|
import traceback
|
|
print(traceback.format_exc())
|
|
return
|
|
|
|
|
|
diarization_segments_for_df = []
|
|
for segment, _, speaker_label in diarization_result_annotation.itertracks(yield_label=True):
|
|
diarization_segments_for_df.append({
|
|
'start': segment.start,
|
|
'end': segment.end,
|
|
'speaker': speaker_label
|
|
})
|
|
|
|
if not diarization_segments_for_df:
|
|
print("Warning: No speaker segments found by pyannote.audio.")
|
|
diarize_df = pd.DataFrame(columns=['start', 'end', 'speaker'])
|
|
else:
|
|
diarize_df = pd.DataFrame(diarization_segments_for_df)
|
|
print(f"DEBUG: Diarization DataFrame head:\n{diarize_df.head()}")
|
|
|
|
# --- 3. Transcription (OpenAI-Whisper) ---
|
|
actual_whisper_device = WHISPER_DEVICE_TARGET
|
|
print(f"Attempting OpenAI-Whisper model '{OPENAI_WHISPER_MODEL_SIZE}' on device: {actual_whisper_device}...")
|
|
|
|
transcription_result_openai = None
|
|
model = None
|
|
|
|
try:
|
|
use_fp16_whisper = (actual_whisper_device != "cpu")
|
|
model = whisper.load_model(OPENAI_WHISPER_MODEL_SIZE, device=actual_whisper_device)
|
|
# Explicitly set language to Polish
|
|
transcription_result_openai = model.transcribe(AUDIO_FILE_PATH, language="pl", fp16=use_fp16_whisper, word_timestamps=True, verbose=False)
|
|
print(f"OpenAI-Whisper transcription on {actual_whisper_device} complete. Detected language (should be pl): {transcription_result_openai.get('language', 'N/A')}")
|
|
except NotImplementedError as nie:
|
|
print(f"ERROR: OpenAI-Whisper on {actual_whisper_device} failed with NotImplementedError: {nie}")
|
|
if actual_whisper_device == "mps":
|
|
print("Falling back to CPU for OpenAI-Whisper due to MPS NotImplementedError.")
|
|
actual_whisper_device = "cpu"
|
|
use_fp16_whisper = False
|
|
try:
|
|
model = whisper.load_model(OPENAI_WHISPER_MODEL_SIZE, device=actual_whisper_device)
|
|
# Explicitly set language to Polish for CPU fallback as well
|
|
transcription_result_openai = model.transcribe(AUDIO_FILE_PATH, language="pl", fp16=use_fp16_whisper, word_timestamps=True, verbose=False)
|
|
print(f"OpenAI-Whisper transcription on CPU fallback complete. Detected language (should be pl): {transcription_result_openai.get('language', 'N/A')}")
|
|
except Exception as e_cpu:
|
|
print(f"ERROR: OpenAI-Whisper on CPU fallback also failed: {e_cpu}")
|
|
import traceback
|
|
print(traceback.format_exc())
|
|
return
|
|
else:
|
|
import traceback
|
|
print(traceback.format_exc())
|
|
return
|
|
except Exception as e:
|
|
print(f"ERROR: OpenAI-Whisper transcription on {actual_whisper_device} failed with other error: {e}")
|
|
import traceback
|
|
print(traceback.format_exc())
|
|
return
|
|
|
|
if transcription_result_openai is None:
|
|
print("ERROR: Transcription result is None after attempts. Cannot proceed.")
|
|
return
|
|
|
|
# --- 4. Format OpenAI-Whisper output for whisperx.assign_word_speakers ---
|
|
formatted_transcript_for_speaker_assignment = {"segments": []}
|
|
if "segments" not in transcription_result_openai or not transcription_result_openai["segments"]:
|
|
print("Warning: OpenAI-Whisper produced no segments. Cannot assign speakers.")
|
|
else:
|
|
for seg_openai in transcription_result_openai["segments"]:
|
|
segment_data = {
|
|
"start": float(seg_openai["start"]),
|
|
"end": float(seg_openai["end"]),
|
|
"text": seg_openai["text"]
|
|
}
|
|
words_data = []
|
|
if "words" in seg_openai and seg_openai["words"]:
|
|
for word_info in seg_openai["words"]:
|
|
word_start = word_info.get("start")
|
|
word_end = word_info.get("end")
|
|
if word_start is not None and word_end is not None:
|
|
words_data.append({
|
|
"word": word_info["word"],
|
|
"start": float(word_start),
|
|
"end": float(word_end),
|
|
"score": float(word_info.get("probability", 0.0))
|
|
})
|
|
else:
|
|
print(f"DEBUG: Skipping word with missing start/end: {word_info}")
|
|
else:
|
|
words_data.append({
|
|
"word": seg_openai["text"],
|
|
"start": float(seg_openai["start"]),
|
|
"end": float(seg_openai["end"]),
|
|
"score": 0.0
|
|
})
|
|
segment_data["words"] = words_data
|
|
formatted_transcript_for_speaker_assignment["segments"].append(segment_data)
|
|
|
|
print(f"DEBUG: Total segments formatted for speaker assignment: {len(formatted_transcript_for_speaker_assignment['segments'])}")
|
|
if formatted_transcript_for_speaker_assignment['segments']:
|
|
print(f"DEBUG: First formatted segment: {formatted_transcript_for_speaker_assignment['segments'][0]}")
|
|
|
|
# --- 5. Assign Speaker Labels (using whisperx.assign_word_speakers) ---
|
|
print("Assigning speaker labels using whisperx.assign_word_speakers...")
|
|
result_with_speakers = {"segments": []} # Initialize
|
|
if not formatted_transcript_for_speaker_assignment["segments"]:
|
|
print("Skipping speaker assignment as there are no transcribed segments.")
|
|
elif diarize_df.empty and formatted_transcript_for_speaker_assignment["segments"]:
|
|
print("Warning: No diarization segments, but transcription exists. Output will not have speaker labels.")
|
|
for seg in formatted_transcript_for_speaker_assignment["segments"]:
|
|
new_seg = seg.copy()
|
|
new_seg['speaker'] = '[NO_DIARIZATION_DATA]' # Indicate no diarization data was available
|
|
if 'words' in new_seg:
|
|
for word_data in new_seg['words']:
|
|
word_data['speaker'] = '[NO_DIARIZATION_DATA]'
|
|
result_with_speakers["segments"].append(new_seg)
|
|
else:
|
|
try:
|
|
result_with_speakers = whisperx.assign_word_speakers(
|
|
diarize_df,
|
|
formatted_transcript_for_speaker_assignment
|
|
)
|
|
print("Speaker assignment complete.")
|
|
except Exception as e:
|
|
print(f"ERROR: whisperx.assign_word_speakers failed: {e}")
|
|
import traceback
|
|
print(traceback.format_exc())
|
|
# Fallback: use transcript but mark speakers as unassigned by whisperx
|
|
for seg_idx, seg_data in enumerate(formatted_transcript_for_speaker_assignment["segments"]):
|
|
new_seg = seg_data.copy()
|
|
new_seg['speaker'] = f'[SPEAKER_ASSIGNMENT_FAILED_{seg_idx}]'
|
|
result_with_speakers["segments"].append(new_seg)
|
|
print("Fell back to transcript with generic error speaker labels due to assignment error.")
|
|
|
|
# --- 6. Format and Save Output ---
|
|
print(f"Saving transcript to {OUTPUT_TRANSCRIPT_FILE}...")
|
|
with open(OUTPUT_TRANSCRIPT_FILE, "w", encoding="utf-8") as f:
|
|
if not result_with_speakers or "segments" not in result_with_speakers or not result_with_speakers["segments"]:
|
|
f.write("No transcription results to save or speaker assignment failed to produce segments.\n")
|
|
if not diarize_df.empty:
|
|
f.write("\nDiarization Segments (if any, without transcription matching):\n")
|
|
for _, row in diarize_df.iterrows():
|
|
f.write(f"Speaker: {row['speaker']} ({format_time(row['start'])} - {format_time(row['end'])})\n")
|
|
else:
|
|
# Iterate through each segment from Whisper/OpenAI, now with speaker labels from whisperx
|
|
for segment_info in result_with_speakers["segments"]:
|
|
speaker_label = segment_info.get('speaker', '[SPEAKER_NOT_ASSIGNED]') # Default if speaker key is missing after assignment
|
|
text = segment_info.get("text", "").strip()
|
|
start_time = float(segment_info.get("start", 0.0))
|
|
end_time = float(segment_info.get("end", 0.0))
|
|
|
|
if not text: # Skip segments with no actual text
|
|
continue
|
|
|
|
f.write(f"{speaker_label} ({format_time(start_time)} - {format_time(end_time)}): {text}\n\n")
|
|
|
|
print("--- Process Complete! ---")
|
|
print(f"Find your transcript at: {OUTPUT_TRANSCRIPT_FILE}")
|
|
print("\nNote: Speakers are labeled generically (e.g., SPEAKER_00, SPEAKER_01, or error labels).")
|
|
print("You'll need to listen to identify and map these to actual person names.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|