Spaces:
Sleeping
Sleeping
Fix indentation in expert analysis display section of Streamlit app for improved layout
5235a31
unverified
| import streamlit as st | |
| import os | |
| import yt_dlp | |
| import subprocess | |
| import librosa | |
| import numpy as np | |
| import torch | |
| import sys | |
| # Global flag for SpeechBrain availability | |
| HAS_SPEECHBRAIN = False | |
| # Handle SpeechBrain import with fallbacks for different versions | |
| try: | |
| # Try the new path first (SpeechBrain 1.0+) | |
| from speechbrain.inference.classifiers import EncoderClassifier | |
| HAS_SPEECHBRAIN = True | |
| except ImportError: | |
| try: | |
| # Try the legacy path | |
| from speechbrain.pretrained.interfaces import EncoderClassifier | |
| HAS_SPEECHBRAIN = True | |
| except ImportError: | |
| try: | |
| # Try the very old path | |
| from speechbrain.pretrained import EncoderClassifier | |
| HAS_SPEECHBRAIN = True | |
| except ImportError: | |
| # If all fail, we'll handle this later in the code | |
| st.error("⚠️ Unable to import SpeechBrain. Limited functionality available.") | |
| EncoderClassifier = None | |
| # Handle potential compatibility issues with transformers | |
| try: | |
| from transformers import AutoProcessor, AutoModelForAudioClassification | |
| HAS_AUTO_PROCESSOR = True | |
| except ImportError: | |
| from transformers import AutoModelForAudioClassification | |
| HAS_AUTO_PROCESSOR = False | |
| st.warning("Using a compatible but limited version of transformers. Some features may be limited.") | |
| from dotenv import load_dotenv | |
| import matplotlib.pyplot as plt | |
| import tempfile | |
| import time | |
| # Deployment instructions: | |
| # To deploy this app: | |
| # 1. Make sure Docker is installed | |
| # 2. Build the Docker image: docker build -t accent-detector . | |
| # 3. Run the container: docker run -p 8501:8501 --volume /tmp/accent-detector:/app/uploads accent-detector | |
| # For Windows: docker run -p 8501:8501 --volume C:\temp\accent-detector:/app/uploads accent-detector | |
| # 4. Access the app at http://localhost:8501 | |
| # | |
| # For cloud deployment: | |
| # - Streamlit Cloud: Connect your GitHub repository to Streamlit Cloud | |
| # - Hugging Face Spaces: Use the Docker deployment option with proper volume mounts | |
| # - Azure/AWS/GCP: Deploy the container using their container services with persistent storage | |
| # | |
| # Troubleshooting file uploads: | |
| # - Set maxUploadSize in .streamlit/config.toml | |
| # - Ensure write permissions on upload directories | |
| # - For 403 errors, check file size and format compatibility | |
| # Load environment variables (if .env file exists) | |
| try: | |
| load_dotenv() | |
| except: | |
| pass | |
| # Check for OpenAI API access - optional for enhanced explanations | |
| try: | |
| import openai | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| have_openai = openai.api_key is not None | |
| except (ImportError, AttributeError): | |
| have_openai = False | |
| # English accent categories | |
| ENGLISH_ACCENTS = { | |
| "en-us": "American English", | |
| "en-gb": "British English", | |
| "en-au": "Australian English", | |
| "en-ca": "Canadian English", | |
| "en-ie": "Irish English", | |
| "en-scotland": "Scottish English", | |
| "en-in": "Indian English", | |
| "en-za": "South African English", | |
| "en-ng": "Nigerian English", | |
| "en-caribbean": "Caribbean English", | |
| } | |
| def download_video(url, video_path="video.mp4", cookies_file=None): | |
| """Download a video from a URL""" | |
| # Determine if this is a YouTube URL | |
| is_youtube = "youtube" in url.lower() or "youtu.be" in url.lower() | |
| # Create a unique directory for each download to avoid permission issues | |
| timestamp = str(int(time.time())) | |
| # Use proper temp directory for Windows or Linux | |
| if os.name == 'nt': # Windows | |
| temp_dir = os.path.join(os.environ.get('TEMP', 'C:\\temp'), f"video_download_{timestamp}") | |
| else: # Linux/Mac | |
| temp_dir = f"/tmp/video_download_{timestamp}" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| # Set correct permissions for the temp directory | |
| try: | |
| os.chmod(temp_dir, 0o777) # Full permissions for all users | |
| except Exception as e: | |
| st.warning(f"Could not set directory permissions: {str(e)}. Continuing anyway.") | |
| # Use the temp directory for the video path | |
| if not os.path.isabs(video_path): | |
| video_path = os.path.join(temp_dir, video_path) | |
| ydl_opts = { | |
| "outtmpl": video_path, | |
| "quiet": False, | |
| "verbose": True, # More detailed output for debugging | |
| "format": "bestaudio/best", # Prefer audio formats since we only need audio | |
| "postprocessors": [{ | |
| "key": "FFmpegExtractAudio", | |
| "preferredcodec": "wav", | |
| }] if is_youtube else [], # Extract audio directly for YouTube | |
| "noplaylist": True, | |
| "extractor_retries": 5, # Increased from 3 to 5 | |
| "socket_timeout": 45, # Increased from 30 to 45 | |
| "retry_sleep_functions": { | |
| "http": lambda n: 5 * (n + 1), # 5, 10, 15, 20, 25 seconds | |
| }, | |
| "nocheckcertificate": True, # Skip HTTPS certificate validation | |
| "ignoreerrors": False, # Don't ignore errors (we want to handle them) | |
| } | |
| # Add cookies if provided | |
| if cookies_file and os.path.exists(cookies_file): | |
| ydl_opts["cookiefile"] = cookies_file | |
| st.info("Using provided cookies file for authentication") | |
| # Set permissions on cookies file to make sure it's readable | |
| try: | |
| os.chmod(cookies_file, 0o644) # Read-write for owner, read-only for others | |
| except Exception as e: | |
| st.warning(f"Could not set permissions on cookies file: {str(e)}. Continuing anyway.") | |
| # Setup environment variables for cache directories | |
| os.environ['HOME'] = temp_dir # Set HOME to our temp dir for YouTube-DL cache | |
| os.environ['XDG_CACHE_HOME'] = os.path.join(temp_dir, '.cache') # For Linux | |
| os.environ['APPDATA'] = temp_dir # For Windows | |
| try: | |
| if is_youtube: | |
| st.info("Attempting to download from YouTube. This might take longer...") | |
| # List of alternative YouTube frontends to try | |
| youtube_alternatives = [ | |
| (url, "Standard YouTube"), | |
| (url.replace("youtube.com", "yewtu.be"), "Invidious (yewtu.be)"), | |
| (url.replace("youtube.com", "piped.video"), "Piped"), | |
| (url.replace("youtube.com", "inv.riverside.rocks"), "Invidious (riverside)") | |
| ] | |
| # If youtu.be is used, create proper alternatives | |
| if "youtu.be" in url.lower(): | |
| video_id = url.split("/")[-1].split("?")[0] | |
| youtube_alternatives = [ | |
| (url, "Standard YouTube"), | |
| (f"https://yewtu.be/watch?v={video_id}", "Invidious (yewtu.be)"), | |
| (f"https://piped.video/watch?v={video_id}", "Piped"), | |
| (f"https://inv.riverside.rocks/watch?v={video_id}", "Invidious (riverside)") | |
| ] | |
| success = False | |
| for alt_url, alt_name in youtube_alternatives: | |
| if alt_url == url and alt_name != "Standard YouTube": | |
| continue # Skip redundant first entry | |
| st.info(f"Trying {alt_name}... Please wait.") | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([alt_url]) | |
| # If we get here without exception, it worked | |
| st.success(f"Successfully downloaded using {alt_name}") | |
| success = True | |
| break | |
| except Exception as download_error: | |
| error_msg = str(download_error) | |
| st.warning(f"{alt_name} download attempt failed: {error_msg}") | |
| # Break early if it's a permission issue to avoid trying alternatives | |
| if "permission" in error_msg.lower() or "access" in error_msg.lower(): | |
| st.error("Permission error detected. Stopping download attempts.") | |
| raise download_error | |
| # If all attempts failed | |
| if not success: | |
| st.error("All YouTube download methods failed.") | |
| return False | |
| else: | |
| # For non-YouTube URLs | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| # Check if download was successful | |
| if os.path.exists(video_path): | |
| return True | |
| else: | |
| # Look for any downloaded files in the temp directory - more comprehensive search | |
| downloaded_files = [] | |
| for root, _, files in os.walk(temp_dir): | |
| for file in files: | |
| if file.endswith(('.mp4', '.mp3', '.wav', '.m4a')): | |
| downloaded_files.append(os.path.join(root, file)) | |
| if downloaded_files: | |
| # Use the first media file found | |
| first_file = downloaded_files[0] | |
| try: | |
| # Copy instead of move to avoid cross-device link issues | |
| import shutil | |
| shutil.copy(first_file, video_path) | |
| return True | |
| except Exception as copy_error: | |
| st.error(f"Error copying downloaded file: {str(copy_error)}") | |
| return False | |
| st.error(f"Video downloaded but file not found: {video_path}") | |
| return False | |
| except Exception as e: | |
| error_msg = str(e) | |
| st.error(f"Download error: {error_msg}") | |
| # Provide specific guidance based on error type | |
| if is_youtube and ("bot" in error_msg.lower() or "sign in" in error_msg.lower() or "403" in error_msg): | |
| st.warning("⚠️ YouTube requires authentication. Please try one of these solutions:") | |
| st.markdown(""" | |
| 1. **Upload a cookies.txt file** using the file uploader above | |
| 2. **Try a different video source** like Loom, Vimeo or direct MP3/WAV files | |
| 3. **Use the Audio Upload tab** instead of YouTube URLs | |
| """) | |
| elif "not find" in error_msg.lower() and "cookies" in error_msg.lower(): | |
| st.warning("Browser cookies could not be accessed. Please upload a cookies.txt file.") | |
| elif "network" in error_msg.lower() or "timeout" in error_msg.lower(): | |
| st.warning("Network error. Please check your internet connection and try again.") | |
| elif "permission" in error_msg.lower(): | |
| st.warning("Permission error. The application doesn't have access to create or write files in the temporary directory.") | |
| st.info("Try running the Docker container with the proper volume mounts: `docker run -p 8501:8501 --volume /tmp/accent-detector:/app/uploads accent-detector`") | |
| elif "not found" in error_msg.lower() and "ffmpeg" in error_msg.lower(): | |
| st.error("FFmpeg is not installed or not found in PATH.") | |
| st.info("If running locally, please install FFmpeg. If using Docker, the container may be misconfigured.") | |
| return False | |
| finally: | |
| # Clean up temp directory if it still exists | |
| try: | |
| if os.path.exists(temp_dir) and ("tmp" in temp_dir or "temp" in temp_dir.lower()): | |
| import shutil | |
| shutil.rmtree(temp_dir) | |
| except Exception as cleanup_error: | |
| st.warning(f"Could not clean up temporary directory: {str(cleanup_error)}") | |
| pass | |
| def extract_audio(video_path="video.mp4", audio_path="audio.wav"): | |
| """Extract audio from video file using ffmpeg""" | |
| try: | |
| subprocess.run( | |
| ['ffmpeg', '-i', video_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', audio_path], | |
| check=True, | |
| capture_output=True | |
| ) | |
| return os.path.exists(audio_path) | |
| except subprocess.CalledProcessError as e: | |
| st.error(f"Error extracting audio: {e}") | |
| st.error(f"ffmpeg output: {e.stderr.decode('utf-8')}") | |
| raise | |
| class AccentDetector: | |
| def __init__(self): | |
| # Initialize language identification model | |
| self.have_lang_id = False | |
| try: | |
| if EncoderClassifier is not None: | |
| self.lang_id = EncoderClassifier.from_hparams( | |
| source="speechbrain/lang-id-commonlanguage_ecapa", | |
| savedir="tmp_model" | |
| ) | |
| self.have_lang_id = True | |
| else: | |
| st.error("SpeechBrain not available. Language identification disabled.") | |
| except Exception as e: | |
| st.error(f"Error loading language ID model: {str(e)}") | |
| # Initialize the accent classifier | |
| self.have_accent_model = False | |
| try: | |
| self.model_name = "speechbrain/lang-id-voxlingua107-ecapa" | |
| # Handle case where AutoProcessor is not available | |
| if HAS_AUTO_PROCESSOR: | |
| self.processor = AutoProcessor.from_pretrained(self.model_name) | |
| else: | |
| # Fall back to using feature_extractor | |
| from transformers import AutoFeatureExtractor | |
| self.processor = AutoFeatureExtractor.from_pretrained(self.model_name) | |
| self.model = AutoModelForAudioClassification.from_pretrained(self.model_name) | |
| self.have_accent_model = True | |
| except Exception as e: | |
| st.warning(f"Could not load accent model: {str(e)}") | |
| self.have_accent_model = False | |
| def is_english(self, audio_path, threshold=0.7): | |
| """ | |
| Determine if the speech is English and return confidence score | |
| """ | |
| if not hasattr(self, 'have_lang_id') or not self.have_lang_id: | |
| # If language ID model is not available, assume English | |
| st.warning("Language identification is not available. Assuming English speech.") | |
| return True, "en", 1.0 | |
| try: | |
| out_prob, score, index, lang = self.lang_id.classify_file(audio_path) | |
| score = float(score) | |
| # Check if language is English (slightly fuzzy match) | |
| is_english = "eng" in lang.lower() or "en-" in lang.lower() or lang.lower() == "en" | |
| return is_english, lang, score | |
| except Exception as e: | |
| st.warning(f"Error identifying language: {str(e)}. Assuming English speech.") | |
| return True, "en", 0.5 | |
| def classify_accent(self, audio_path): | |
| """ | |
| Classify the specific English accent | |
| """ | |
| if not self.have_accent_model: | |
| return "Unknown English Accent", 0.0 | |
| try: | |
| # Load and preprocess audio | |
| audio, sr = librosa.load(audio_path, sr=16000) | |
| inputs = self.processor(audio, sampling_rate=sr, return_tensors="pt") | |
| # Get predictions | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| # Get probabilities | |
| probs = outputs.logits.softmax(dim=-1)[0] | |
| prediction_id = probs.argmax().item() | |
| confidence = probs[prediction_id].item() | |
| # Get predicted label | |
| id2label = self.model.config.id2label | |
| accent_code = id2label[prediction_id] | |
| # Map to English accent if possible | |
| if accent_code.startswith('en-'): | |
| accent = ENGLISH_ACCENTS.get(accent_code, f"English ({accent_code})") | |
| confidence = confidence # Keep confidence as-is for English accents | |
| else: | |
| # If it's not an English accent code, use our pre-classification | |
| is_english, _, _ = self.is_english(audio_path) | |
| if is_english: | |
| accent = "General English" | |
| else: | |
| accent = f"Non-English ({accent_code})" | |
| confidence *= 0.7 # Reduce confidence for non-specific matches | |
| return accent, confidence | |
| except Exception as e: | |
| st.error(f"Error in accent classification: {str(e)}") | |
| return "Unknown English Accent", 0.0 | |
| def generate_explanation(self, audio_path, accent, confidence, is_english, language): | |
| """ | |
| Generate an explanation of the accent detection results using OpenAI API (if available) | |
| """ | |
| if not have_openai: | |
| if is_english: | |
| return f"The speaker has a {accent} accent with {confidence*100:.1f}% confidence. The speech was identified as English." | |
| else: | |
| return f"The speech was identified as {language}, not English. English confidence is low." | |
| try: | |
| import openai | |
| is_english, lang, lang_score = self.is_english(audio_path) | |
| prompt = f""" | |
| Audio analysis detected a speaker with the following characteristics: | |
| - Primary accent/language: {accent} | |
| - Confidence score: {confidence*100:.1f}% | |
| - Detected language category: {lang} | |
| - Is English: {is_english} | |
| Based on this information, provide a 2-3 sentence summary about the speaker's accent. | |
| Focus on how clear their English is and any notable accent characteristics. | |
| This is for hiring purposes to evaluate English speaking abilities. | |
| """ | |
| response = openai.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are an accent analysis specialist providing factual assessments."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=150 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| st.error(f"Error generating explanation: {str(e)}") | |
| if is_english: | |
| return f"The speaker has a {accent} accent with {confidence*100:.1f}% confidence. The speech was identified as English." | |
| else: | |
| return f"The speech was identified as {language}, not English. English confidence is low." | |
| def analyze_audio(self, audio_path): | |
| """ | |
| Complete analysis pipeline returning all needed results | |
| """ | |
| # Check if it's English | |
| is_english, lang, lang_score = self.is_english(audio_path) | |
| # Classify accent if it's English | |
| if is_english: | |
| accent, accent_confidence = self.classify_accent(audio_path) | |
| english_confidence = lang_score * 100 # Scale to percentage | |
| else: | |
| accent = f"Non-English ({lang})" | |
| accent_confidence = lang_score | |
| english_confidence = max(0, min(30, lang_score * 50)) # Cap at 30% if non-English | |
| # Generate explanation | |
| explanation = self.generate_explanation(audio_path, accent, accent_confidence, is_english, lang) | |
| # Create visualization of the audio waveform | |
| try: | |
| y, sr = librosa.load(audio_path, sr=None) | |
| fig, ax = plt.subplots(figsize=(10, 2)) | |
| ax.plot(y) | |
| ax.set_xlabel('Sample') | |
| ax.set_ylabel('Amplitude') | |
| ax.set_title('Audio Waveform') | |
| plt.tight_layout() | |
| audio_viz = fig | |
| # Make sure the figure can be saved | |
| try: | |
| # Test if the figure can be saved | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(suffix='.png') as tmp: | |
| plt.savefig(tmp.name) | |
| except Exception as viz_save_error: | |
| st.warning(f"Could not save visualization: {str(viz_save_error)}. Using simpler visualization.") | |
| # Create a simple alternative visualization | |
| import numpy as np | |
| # Downsample for performance | |
| sample_rate = max(1, len(y) // 1000) | |
| y_downsampled = y[::sample_rate] | |
| fig2, ax2 = plt.subplots(figsize=(8, 2)) | |
| ax2.plot(np.arange(len(y_downsampled)), y_downsampled) | |
| ax2.set_title("Audio Waveform (simplified)") | |
| audio_viz = fig2 | |
| except Exception as e: | |
| st.warning(f"Could not generate audio visualization: {str(e)}") | |
| audio_viz = None | |
| return { | |
| "is_english": is_english, | |
| "accent": accent, | |
| "accent_confidence": accent_confidence * 100, # Scale to percentage | |
| "english_confidence": english_confidence, | |
| "language_detected": lang, | |
| "explanation": explanation, | |
| "audio_viz": audio_viz | |
| } | |
| def process_uploaded_audio(file_input): | |
| """Process uploaded audio file | |
| Args: | |
| file_input: Either a StreamlitUploadedFile object or a string path to a file | |
| """ | |
| audio_path = None | |
| temp_input_path = None | |
| try: | |
| # Create a unique filename based on timestamp | |
| timestamp = str(int(time.time())) | |
| # Create a deterministic uploads directory with full permissions | |
| uploads_dir = os.path.join(os.getcwd(), "uploads") | |
| os.makedirs(uploads_dir, exist_ok=True) | |
| # Try Streamlit's own upload path first if available | |
| streamlit_uploads_path = os.environ.get('STREAMLIT_UPLOADS_PATH') | |
| if streamlit_uploads_path and os.path.isdir(streamlit_uploads_path): | |
| uploads_dir = streamlit_uploads_path | |
| st.info(f"Using Streamlit's upload directory: {uploads_dir}") | |
| # Make sure uploads directory has proper permissions | |
| try: | |
| os.chmod(uploads_dir, 0o777) # Full permissions | |
| except Exception as chmod_error: | |
| st.warning(f"Could not set permissions on uploads directory: {str(chmod_error)}. Continuing anyway.") | |
| # Log upload dir info for debugging | |
| st.info(f"Upload directory: {uploads_dir} (exists: {os.path.exists(uploads_dir)}, writable: {os.access(uploads_dir, os.W_OK)})") | |
| # Handle different input types | |
| if isinstance(file_input, str): | |
| # If it's already a file path | |
| temp_input_path = file_input | |
| file_extension = os.path.splitext(temp_input_path)[1].lower() | |
| st.info(f"Processing from saved file: {os.path.basename(temp_input_path)}") | |
| else: | |
| # If it's a StreamlitUploadedFile | |
| file_extension = os.path.splitext(file_input.name)[1].lower() | |
| # Write the uploaded file to disk with proper extension in the uploads directory | |
| # Use a unique filename to avoid conflicts | |
| safe_filename = ''.join(c if c.isalnum() or c in '._- ' else '_' for c in file_input.name) | |
| temp_input_path = os.path.join(uploads_dir, f"uploaded_{timestamp}_{safe_filename}") | |
| st.info(f"Saving uploaded file to: {temp_input_path}") | |
| try: | |
| # Write in chunks to handle large files better | |
| chunk_size = 1024 * 1024 # 1MB chunks | |
| buffer = file_input.getbuffer() | |
| with open(temp_input_path, "wb") as f: | |
| for i in range(0, len(buffer), chunk_size): | |
| f.write(buffer[i:i+chunk_size]) | |
| # Verify file was written properly | |
| if os.path.exists(temp_input_path): | |
| file_size = os.path.getsize(temp_input_path) | |
| st.success(f"File saved successfully: {file_size} bytes") | |
| else: | |
| st.error(f"Failed to save file - file doesn't exist after writing") | |
| except Exception as write_error: | |
| st.error(f"Error writing uploaded file: {str(write_error)}") | |
| # Try alternative temp directory as fallback | |
| try: | |
| import tempfile | |
| temp_dir = tempfile.gettempdir() | |
| temp_input_path = os.path.join(temp_dir, f"uploaded_{timestamp}_{safe_filename}") | |
| st.warning(f"Trying alternative location: {temp_input_path}") | |
| with open(temp_input_path, "wb") as f: | |
| f.write(file_input.getbuffer()) | |
| except Exception as alt_write_error: | |
| st.error(f"Alternative write also failed: {str(alt_write_error)}") | |
| raise | |
| # For MP4 files, extract the audio using ffmpeg | |
| if file_extension == ".mp4": | |
| st.info("Extracting audio from video file...") | |
| audio_path = os.path.join(uploads_dir, f"extracted_audio_{timestamp}.wav") | |
| try: | |
| # Add -y flag to overwrite output file if it exists | |
| subprocess.run( | |
| ['ffmpeg', '-y', '-i', temp_input_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', audio_path], | |
| check=True, | |
| capture_output=True | |
| ) | |
| st.success(f"Audio extracted successfully to {audio_path}") | |
| # Remove the original video file if extraction was successful | |
| if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0: | |
| os.remove(temp_input_path) | |
| except subprocess.CalledProcessError as e: | |
| st.error(f"Error extracting audio: {e}") | |
| if e.stderr: | |
| st.error(f"FFmpeg output: {e.stderr.decode('utf-8')}") | |
| raise | |
| else: | |
| # For audio files, process based on format | |
| if file_extension in [".mp3", ".m4a", ".ogg", ".flac"]: | |
| # Convert to WAV for better compatibility | |
| audio_path = os.path.join(uploads_dir, f"converted_audio_{timestamp}.wav") | |
| st.info(f"Converting {file_extension} to WAV format for analysis...") | |
| try: | |
| # Use a verbose ffmpeg command with more options for compatibility | |
| process = subprocess.run( | |
| [ | |
| 'ffmpeg', '-y', '-i', temp_input_path, | |
| '-ar', '16000', '-ac', '1', '-c:a', 'pcm_s16le', | |
| # Add error handling flags | |
| '-err_detect', 'ignore_err', | |
| # Add buffers for better handling | |
| '-analyzeduration', '10000000', '-probesize', '10000000', | |
| audio_path | |
| ], | |
| check=True, | |
| capture_output=True | |
| ) | |
| # Verify the file was created successfully | |
| if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0: | |
| st.success(f"Audio converted successfully: {os.path.getsize(audio_path)} bytes") | |
| # If conversion was successful, remove the original file to save space | |
| os.remove(temp_input_path) | |
| else: | |
| st.warning("Conversion produced an empty file. Trying fallback conversion method...") | |
| # Try alternative conversion method - simpler command | |
| fallback_cmd = ['ffmpeg', '-y', '-i', temp_input_path, audio_path] | |
| subprocess.run(fallback_cmd, check=True, capture_output=True) | |
| if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0: | |
| st.warning("Fallback conversion also failed. Using original file.") | |
| audio_path = temp_input_path | |
| except subprocess.CalledProcessError as e: | |
| st.warning(f"Conversion warning: {e}") | |
| if e.stderr: | |
| st.warning(f"FFmpeg error: {e.stderr.decode('utf-8')}") | |
| st.info("Using original file instead.") | |
| audio_path = temp_input_path | |
| else: | |
| # For already WAV files, use them directly | |
| audio_path = temp_input_path | |
| st.info(f"Using WAV file directly: {audio_path}") | |
| detector = AccentDetector() | |
| results = detector.analyze_audio(audio_path) | |
| # Clean up | |
| if audio_path and audio_path != temp_input_path and os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| return results | |
| except Exception as e: | |
| error_msg = str(e) | |
| st.error(f"Error processing audio: {error_msg}") | |
| # Add detailed debugging info | |
| import traceback | |
| st.error(f"Error details: {traceback.format_exc()}") | |
| # Show file info if available | |
| if temp_input_path and os.path.exists(temp_input_path): | |
| st.info(f"Input file exists: {temp_input_path}, size: {os.path.getsize(temp_input_path)} bytes") | |
| os.remove(temp_input_path) | |
| else: | |
| if temp_input_path: | |
| st.warning(f"Input file does not exist: {temp_input_path}") | |
| if audio_path and os.path.exists(audio_path): | |
| st.info(f"Audio file exists: {audio_path}, size: {os.path.getsize(audio_path)} bytes") | |
| os.remove(audio_path) | |
| else: | |
| if audio_path: | |
| st.warning(f"Audio file does not exist: {audio_path}") | |
| # Check for common error types | |
| if "ffmpeg" in error_msg.lower(): | |
| st.warning("FFmpeg error detected. The audio conversion failed.") | |
| st.info("Try a different audio format or check if FFmpeg is installed correctly.") | |
| elif "permission" in error_msg.lower(): | |
| st.warning("Permission error detected.") | |
| st.info("Check that the uploads directory is writable.") | |
| elif "no such file" in error_msg.lower(): | |
| st.warning("File not found error detected.") | |
| st.info("The file may have been moved, deleted, or not saved correctly.") | |
| raise | |
| return results | |
| # --- Streamlit App --- | |
| st.set_page_config( | |
| page_title="🎤 English Accent Detector", | |
| page_icon="🎤", | |
| layout="wide" | |
| ) | |
| st.title("🎤 English Accent Detection Tool") | |
| st.markdown(""" | |
| This application analyzes a speaker's English accent from video URLs or audio uploads, | |
| providing detailed insights for hiring evaluation purposes. | |
| """) | |
| # Add container for tips | |
| with st.container(): | |
| st.info(""" | |
| 💡 **Tips for best results:** | |
| - Use **Loom** or **Vimeo** videos (more reliable than YouTube) | |
| - For YouTube videos, you may need to provide cookies | |
| - Audio clips of 15-30 seconds work best | |
| - Clear speech with minimal background noise is ideal | |
| """) | |
| st.markdown(""" | |
| This app analyzes a speaker's English accent from a video or audio source. | |
| It provides: | |
| - Classification of the accent (British, American, etc.) | |
| - Confidence score for English proficiency | |
| - Explanation of accent characteristics | |
| """) | |
| # Create tabs for different input methods | |
| tab1, tab2 = st.tabs(["Video URL", "Upload Audio"]) | |
| with tab1: | |
| st.markdown("### 🎬 Analyze video from URL") | |
| url = st.text_input("Enter a public video URL", | |
| placeholder="https://www.loom.com/..., https://vimeo.com/..., or direct MP4 link") | |
| # Add alternative invidious frontend option for YouTube | |
| use_alternative = st.checkbox("Try alternative YouTube source (for authentication issues)", | |
| value=True, | |
| help="Uses an alternative frontend (Invidious) that may bypass YouTube restrictions") | |
| # Recommend alternative sources | |
| st.caption("⚠️ **Note**: YouTube videos often require authentication. For best results, use Loom, Vimeo or direct video links.") | |
| # Add file uploader for cookies.txt | |
| cookies_file = None | |
| uploaded_cookies = st.file_uploader("Upload cookies.txt file for YouTube (if needed)", | |
| type="txt", | |
| help="Only needed for YouTube videos that require authentication") | |
| if uploaded_cookies is not None: | |
| # Save the uploaded cookies file to a temporary file | |
| cookies_file = f"cookies_{int(time.time())}.txt" | |
| with open(cookies_file, "wb") as f: | |
| f.write(uploaded_cookies.getbuffer()) | |
| st.success("Cookies file uploaded successfully!") | |
| with st.expander("Having trouble with YouTube videos?"): | |
| st.markdown(""" | |
| ### YouTube Authentication Issues | |
| YouTube's anti-bot measures often block automated video downloads. To solve this: | |
| #### Option 1: Use Alternative Video Sources (Recommended) | |
| These typically work without authentication issues: | |
| - [Loom](https://www.loom.com/) - Great for screen recordings | |
| - [Vimeo](https://vimeo.com/) - High-quality video hosting | |
| - [Streamable](https://streamable.com/) - Simple video sharing | |
| - Any direct MP4 link | |
| #### Option 2: Upload Cookies for YouTube | |
| 1. Install a browser extension like [Get cookies.txt](https://chrome.google.com/webstore/detail/get-cookiestxt-locally/cclelndahbckbenkjhflpdbgdldlbecc) | |
| 2. Login to YouTube in your browser | |
| 3. Use the extension to export cookies to a .txt file | |
| 4. Upload the cookies.txt file using the uploader above | |
| #### Option 3: Use Audio Upload Instead | |
| The 'Upload Audio' tab allows direct analysis of audio files without URL issues. | |
| """) | |
| if st.button("Analyze Video"): | |
| if not url: | |
| st.warning("Please enter a valid URL") | |
| else: | |
| try: | |
| # Create a placeholder for status updates | |
| status = st.empty() | |
| # Generate unique filenames using timestamp to avoid conflicts | |
| timestamp = str(int(time.time())) | |
| video_path = f"video_{timestamp}.mp4" | |
| audio_path = f"audio_{timestamp}.wav" | |
| # Download and process the video | |
| status.text("Downloading video...") | |
| download_success = download_video(url, video_path, cookies_file) | |
| if not download_success: | |
| st.error("Failed to download video") | |
| else: | |
| status.text("Extracting audio...") | |
| extract_success = extract_audio(video_path, audio_path) | |
| if not extract_success: | |
| st.error("Failed to extract audio") | |
| else: | |
| status.text("Analyzing accent... (this may take a moment)") | |
| detector = AccentDetector() | |
| results = detector.analyze_audio(audio_path) | |
| # Display results | |
| st.success("✅ Analysis Complete!") | |
| # Create columns for results | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.subheader("Accent Analysis Results") | |
| st.markdown(f"**Detected Accent:** {results['accent']}") | |
| st.markdown(f"**English Proficiency:** {results['english_confidence']:.1f}%") | |
| st.markdown(f"**Accent Confidence:** {results['accent_confidence']:.1f}%") | |
| # Show explanation in a box | |
| st.markdown("### Expert Analysis") | |
| st.info(results['explanation']) | |
| with col2: | |
| if results['audio_viz']: | |
| try: | |
| st.pyplot(results['audio_viz']) | |
| except Exception as viz_error: | |
| st.warning("Could not display visualization due to torchvision issue.") | |
| st.info("Audio analysis was successful even though visualization failed.") | |
| # Show audio playback | |
| st.audio(audio_path) | |
| # Clean up files | |
| try: | |
| if os.path.exists(video_path): | |
| os.remove(video_path) | |
| if os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| if cookies_file and os.path.exists(cookies_file): | |
| os.remove(cookies_file) | |
| except Exception as e: | |
| st.warning(f"Couldn't clean up temporary files: {str(e)}") | |
| except Exception as e: | |
| st.error(f"Error during analysis: {str(e)}") | |
| with tab2: | |
| st.markdown("### 🎵 Upload Audio File") | |
| st.caption("**Recommended option!** Direct audio upload is more reliable than video URLs.") | |
| # Add some information about file size limits | |
| st.info("📝 **File Requirements**: \n" | |
| "• Maximum file size: 200MB \n" | |
| "• Supported formats: WAV, MP3, M4A, OGG, FLAC, MP4 \n" | |
| "• Recommended length: 15-60 seconds of clear speech") | |
| uploaded_file = st.file_uploader("Upload an audio file", | |
| type=["wav", "mp3", "m4a", "ogg", "flac", "mp4"], | |
| help="Support for WAV, MP3, M4A, OGG, FLAC and MP4 formats", | |
| accept_multiple_files=False) | |
| if uploaded_file is not None: # Show a preview of the audio | |
| st.markdown("#### Audio Preview:") | |
| try: | |
| st.audio(uploaded_file) | |
| st.markdown("#### Ready for Analysis") | |
| col1, col2 = st.columns([1, 3]) | |
| with col1: | |
| analyze_button = st.button("Analyze Audio", type="primary", use_container_width=True) | |
| with col2: | |
| st.caption("Tip: 15-30 seconds of clear speech works best for accent detection") | |
| except Exception as preview_error: | |
| st.warning(f"Could not preview audio: {str(preview_error)}") | |
| # If preview fails, still allow analysis | |
| analyze_button = st.button("Analyze Audio (Preview Failed)", type="primary") | |
| st.caption("Proceeding with analysis might still work even if preview failed") | |
| if analyze_button: | |
| with st.spinner("Analyzing audio... (this may take 15-30 seconds)"): | |
| try: | |
| # Check file size before processing | |
| file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024) | |
| if file_size_mb > 190: # Stay below the 200MB limit with some buffer | |
| st.error(f"File size ({file_size_mb:.1f}MB) is too large. Maximum allowed is 190MB.") | |
| st.info("Tip: Try trimming your audio to just the speech segment for better results.") | |
| else: # Create a progress bar to show processing stages | |
| progress_bar = st.progress(0) | |
| # Check the file type and inform user about processing steps | |
| file_extension = os.path.splitext(uploaded_file.name)[1].lower() | |
| if file_extension == '.mp4': | |
| st.info("Processing video file - extracting audio track...") | |
| elif file_extension in ['.mp3', '.m4a', '.ogg', '.flac']: | |
| st.info(f"Processing {file_extension} audio file...") | |
| progress_bar.progress(25, text="Saving file...") | |
| # First save the file to a known location to bypass 403 errors | |
| # Create an uploads directory if it doesn't exist | |
| uploads_dir = os.path.join(os.getcwd(), "uploads") | |
| os.makedirs(uploads_dir, exist_ok=True) # Save the file first to avoid streaming it multiple times | |
| temp_file_path = os.path.join(uploads_dir, f"temp_{int(time.time())}_{uploaded_file.name}") | |
| with open(temp_file_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| progress_bar.progress(50, text="Analyzing audio...") | |
| # Process using the saved file path directly | |
| results = process_uploaded_audio(temp_file_path) | |
| progress_bar.progress(100, text="Analysis complete!") | |
| # Display results | |
| st.success("✅ Analysis Complete!") | |
| # Create columns for results | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.subheader("Accent Analysis Results") | |
| st.markdown(f"**Detected Accent:** {results['accent']}") | |
| st.markdown(f"**English Proficiency:** {results['english_confidence']:.1f}%") | |
| st.markdown(f"**Accent Confidence:** {results['accent_confidence']:.1f}%") | |
| # Show explanation in a box | |
| st.markdown("### Expert Analysis") | |
| st.info(results['explanation']) | |
| with col2: | |
| if results['audio_viz']: | |
| try: | |
| st.pyplot(results['audio_viz']) | |
| except Exception as viz_error: | |
| st.warning("Could not display visualization due to torchvision issue.") | |
| st.info("Audio analysis was successful even though visualization failed.") | |
| except subprocess.CalledProcessError as e: | |
| st.error("Error processing audio file") | |
| st.error(f"FFmpeg error: {e.stderr.decode('utf-8') if e.stderr else str(e)}") | |
| st.info("Troubleshooting tips:\n" | |
| "• Try a different audio file format (WAV or MP3 recommended)\n" | |
| "• Make sure the file is not corrupted\n" | |
| "• Try a shorter audio clip") | |
| except PermissionError as e: | |
| st.error(f"Permission error: {str(e)}") | |
| st.info("The app doesn't have permission to access or create temporary files. " | |
| "This could be due to Docker container permissions. " | |
| "Contact the administrator or try using a different file.") | |
| except OSError as e: | |
| st.error(f"System error: {str(e)}") | |
| st.info("Check that the file isn't corrupted and try with a smaller audio clip.") | |
| except Exception as e: | |
| error_msg = str(e) | |
| st.error(f"Error during analysis: {error_msg}") | |
| if "403" in error_msg: | |
| st.warning("Received a 403 Forbidden error. This may be due to: \n" | |
| "• File size exceeding limits\n" | |
| "• Temporary file permission issues\n" | |
| "• Network restrictions") | |
| st.info("Try a smaller audio file (less than 50MB) or a different format.") | |
| elif "timeout" in error_msg.lower(): | |
| st.warning("The request timed out. Try a shorter audio clip or check your internet connection.") | |
| elif "memory" in error_msg.lower(): | |
| st.warning("Out of memory error. Try a shorter audio clip.") | |
| else: | |
| st.info("If the problem persists, try a different audio file format such as MP3 or WAV.") | |
| # Add footer with deployment info | |
| st.markdown("---") | |
| st.markdown("Deployed using Streamlit • Built with SpeechBrain and Transformers") | |
| # Add a section for how it works | |
| with st.expander("ℹ️ How It Works"): | |
| st.markdown(""" | |
| This app uses a multi-stage process to analyze a speaker's accent: | |
| 1. **Audio Extraction**: The audio track is extracted from the input video or directly processed from uploaded audio. | |
| 2. **Language Identification**: First, we determine if the speech is English using SpeechBrain's language identification model. | |
| 3. **Accent Classification**: For English speech, we analyze the specific accent using a transformer-based model trained on diverse accent data. | |
| 4. **English Proficiency Score**: A confidence score is calculated based on both language identification and accent clarity. | |
| 5. **Analysis Summary**: An explanation is generated describing accent characteristics relevant for hiring evaluations. | |
| """) | |
| # Add debug function for troubleshooting HTTP errors | |
| def debug_http_errors(): | |
| """Print debug information for HTTP errors""" | |
| st.warning("⚠️ HTTP 400 Error Debugging Mode") | |
| st.markdown(""" | |
| ### Common HTTP 400 Error Causes: | |
| 1. **File size exceeds limits** (current limit: 150MB) | |
| 2. **File format incompatibility** | |
| 3. **Network interruption** during upload | |
| 4. **Server-side timeout** during processing | |
| 5. **Permissions issues** in container | |
| """) | |
| # Show environment info | |
| st.subheader("Environment Information") | |
| env_info = { | |
| "STREAMLIT_UPLOADS_PATH": os.environ.get("STREAMLIT_UPLOADS_PATH", "Not set"), | |
| "STREAMLIT_SERVER_MAX_UPLOAD_SIZE": os.environ.get("STREAMLIT_SERVER_MAX_UPLOAD_SIZE", "Not set"), | |
| "Current directory": os.getcwd(), | |
| "Python version": sys.version | |
| } | |
| for key, value in env_info.items(): | |
| st.code(f"{key}: {value}") | |
| # Check if uploads directory is writable | |
| uploads_dir = os.environ.get("STREAMLIT_UPLOADS_PATH", os.path.join(os.getcwd(), "uploads")) | |
| os.makedirs(uploads_dir, exist_ok=True) | |
| try: | |
| test_file = os.path.join(uploads_dir, "test_write.txt") | |
| with open(test_file, "w") as f: | |
| f.write("Test write permission") | |
| os.remove(test_file) | |
| st.success(f"✓ Upload directory is writable: {uploads_dir}") | |
| except Exception as e: | |
| st.error(f"✗ Cannot write to upload directory: {str(e)}") | |
| # Test ffmpeg | |
| try: | |
| result = subprocess.run(["ffmpeg", "-version"], capture_output=True, text=True) | |
| st.success(f"✓ FFmpeg is available") | |
| except Exception as e: | |
| st.error(f"✗ FFmpeg error: {str(e)}") | |
| # Add debug mode flag to the app | |
| debug_mode = False | |
| with st.expander("🔧 Troubleshooting Tools"): | |
| debug_mode = st.checkbox("Enable Debug Mode for HTTP 400 Errors") | |
| if debug_mode: | |
| debug_http_errors() | |
| # Add option for user to try different upload method | |
| alt_upload = st.checkbox("Use alternative upload method (for HTTP 400 errors)") | |
| if alt_upload: | |
| st.info("Using alternative upload method that may bypass some HTTP 400 errors") | |