Spaces:
Sleeping
Sleeping
| import logging | |
| from typing import Dict, Any, List, Optional | |
| from textblob import TextBlob | |
| from transformers import pipeline | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| class SentimentAnalyzer: | |
| def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None): | |
| """ | |
| Initialize sentiment analyzer with both traditional and LLM-based approaches. | |
| Args: | |
| use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False) | |
| model_registry: Optional shared model registry for better performance | |
| """ | |
| self.use_ai = use_ai | |
| self.llm_available = False | |
| self.model_registry = model_registry | |
| self.toxicity_available = False | |
| # Traditional manipulation patterns | |
| self.manipulative_patterns = [ | |
| "experts say", | |
| "sources claim", | |
| "many believe", | |
| "some say", | |
| "everyone knows", | |
| "clearly", | |
| "obviously", | |
| "without doubt", | |
| "certainly" | |
| ] | |
| if use_ai: | |
| try: | |
| if model_registry and model_registry.is_available: | |
| # Use shared models | |
| self.sentiment_pipeline = model_registry.sentiment | |
| self.zero_shot = model_registry.zero_shot | |
| self.toxicity_pipeline = getattr(model_registry, 'toxicity', None) | |
| self.toxicity_available = self.toxicity_pipeline is not None | |
| self.llm_available = True | |
| logger.info("Using shared model pipelines for sentiment analysis") | |
| if self.toxicity_available: | |
| logger.info("Toxicity analysis enabled") | |
| else: | |
| logger.info("Toxicity analysis not available") | |
| else: | |
| # Initialize own pipelines | |
| self.sentiment_pipeline = pipeline( | |
| "text-classification", | |
| model="SamLowe/roberta-base-go_emotions", | |
| device=-1, | |
| batch_size=16 | |
| ) | |
| self.zero_shot = pipeline( | |
| "zero-shot-classification", | |
| model="facebook/bart-large-mnli", | |
| device=-1, | |
| batch_size=8 | |
| ) | |
| try: | |
| self.toxicity_pipeline = pipeline( | |
| "text-classification", | |
| model="unitary/toxic-bert", | |
| device=-1, | |
| batch_size=16 | |
| ) | |
| self.toxicity_available = True | |
| logger.info("Toxicity analysis enabled") | |
| except Exception as tox_error: | |
| logger.warning(f"Toxicity pipeline initialization failed: {str(tox_error)}") | |
| self.toxicity_available = False | |
| self.llm_available = True | |
| logger.info("Initialized dedicated model pipelines for sentiment analysis") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize LLM pipelines: {str(e)}") | |
| self.llm_available = False | |
| else: | |
| logger.info("Initializing sentiment analyzer in traditional mode") | |
| def _analyze_with_llm(self, text: str) -> Dict[str, Any]: | |
| """Perform sentiment analysis using LLM models.""" | |
| try: | |
| logger.info("\n" + "="*50) | |
| logger.info("SENTIMENT ANALYSIS STARTED") | |
| logger.info("="*50) | |
| # Clean the text of formatting markers | |
| logger.info("Cleaning and preparing text...") | |
| cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '') | |
| cleaned_text = '\n'.join(line for line in cleaned_text.split('\n') | |
| if not line.startswith('[') and not line.startswith('More on')) | |
| logger.info(f"Text prepared - Length: {len(cleaned_text)} characters") | |
| # Split text into chunks of 512 tokens (approximate) | |
| chunks = [cleaned_text[i:i+2000] for i in range(0, len(cleaned_text), 2000)] | |
| logger.info(f"Split text into {len(chunks)} chunks for processing") | |
| # Initialize aggregation variables | |
| sentiment_scores = [] | |
| toxicity_scores = [] | |
| manipulation_scores = [] | |
| flagged_phrases = [] | |
| manipulation_categories = [ | |
| "emotional manipulation", | |
| "fear mongering", | |
| "propaganda", | |
| "factual reporting", | |
| "balanced perspective" | |
| ] | |
| # Process each chunk | |
| for i, chunk in enumerate(chunks, 1): | |
| logger.info(f"\n{'-'*30}") | |
| logger.info(f"Processing chunk {i}/{len(chunks)}") | |
| logger.info(f"Chunk length: {len(chunk)} characters") | |
| try: | |
| # Get emotion scores | |
| logger.info("Analyzing emotions...") | |
| emotions = self.sentiment_pipeline(chunk) | |
| logger.debug(f"Raw emotion response: {emotions}") | |
| # Handle different response formats | |
| if isinstance(emotions, list): | |
| for emotion in emotions: | |
| if isinstance(emotion, dict) and 'label' in emotion and 'score' in emotion: | |
| sentiment_scores.append(emotion) | |
| logger.info(f"Detected emotion: {emotion['label']} (score: {emotion['score']:.3f})") | |
| elif isinstance(emotions, dict) and 'label' in emotions and 'score' in emotions: | |
| sentiment_scores.append(emotions) | |
| logger.info(f"Detected emotion: {emotions['label']} (score: {emotions['score']:.3f})") | |
| # Get toxicity scores if available | |
| if self.toxicity_available: | |
| logger.info("Analyzing toxicity...") | |
| try: | |
| toxicity = self.toxicity_pipeline(chunk) | |
| if isinstance(toxicity, list): | |
| toxicity_scores.extend(toxicity) | |
| else: | |
| toxicity_scores.append(toxicity) | |
| logger.info(f"Toxicity analysis complete for chunk {i}") | |
| logger.debug(f"Toxicity scores: {toxicity_scores[-1]}") | |
| except Exception as tox_error: | |
| logger.warning(f"Toxicity analysis failed for chunk {i}: {str(tox_error)}") | |
| # Get manipulation scores | |
| logger.info("Analyzing manipulation patterns...") | |
| manipulation = self.zero_shot( | |
| chunk, | |
| manipulation_categories, | |
| multi_label=True | |
| ) | |
| if isinstance(manipulation, dict) and 'labels' in manipulation and 'scores' in manipulation: | |
| chunk_scores = { | |
| label: score | |
| for label, score in zip(manipulation['labels'], manipulation['scores']) | |
| } | |
| manipulation_scores.append(chunk_scores) | |
| logger.info("Manipulation scores for chunk:") | |
| for label, score in chunk_scores.items(): | |
| logger.info(f" - {label}: {score:.3f}") | |
| # Analyze sentences for manipulation | |
| logger.info("Analyzing individual sentences for manipulation...") | |
| sentences = chunk.split('.') | |
| for sentence in sentences: | |
| if len(sentence.strip()) > 10: | |
| sent_result = self.zero_shot( | |
| sentence.strip(), | |
| manipulation_categories, | |
| multi_label=False | |
| ) | |
| if (sent_result['labels'][0] in ["emotional manipulation", "fear mongering", "propaganda"] | |
| and sent_result['scores'][0] > 0.7): | |
| logger.info(f"Found manipulative content (score: {sent_result['scores'][0]:.3f}): {sentence.strip()}") | |
| flagged_phrases.append({ | |
| 'text': sentence.strip(), | |
| 'type': sent_result['labels'][0], | |
| 'score': sent_result['scores'][0] | |
| }) | |
| except Exception as chunk_error: | |
| logger.error(f"Error processing chunk {i}: {str(chunk_error)}") | |
| continue | |
| logger.info("\nAggregating final scores...") | |
| # Aggregate scores with error handling | |
| def aggregate_scores(scores_list, score_type: str): | |
| try: | |
| if not scores_list: | |
| logger.warning(f"No {score_type} scores to aggregate") | |
| return {} | |
| all_scores = {} | |
| for scores in scores_list: | |
| if isinstance(scores, dict): | |
| if 'label' in scores and 'score' in scores: | |
| label = scores['label'] | |
| score = scores['score'] | |
| else: | |
| # Handle direct label-score mapping | |
| for label, score in scores.items(): | |
| if label not in all_scores: | |
| all_scores[label] = [] | |
| if isinstance(score, (int, float)): | |
| all_scores[label].append(score) | |
| continue | |
| else: | |
| logger.warning(f"Unexpected score format in {score_type}: {scores}") | |
| continue | |
| if isinstance(label, (str, bytes)): | |
| if label not in all_scores: | |
| all_scores[label] = [] | |
| if isinstance(score, (int, float)): | |
| all_scores[label].append(score) | |
| return {k: float(np.mean(v)) for k, v in all_scores.items() if v} | |
| except Exception as agg_error: | |
| logger.error(f"Error aggregating {score_type} scores: {str(agg_error)}") | |
| return {} | |
| emotion_scores = aggregate_scores(sentiment_scores, "emotion") | |
| toxicity_scores = aggregate_scores(toxicity_scores, "toxicity") if self.toxicity_available else {} | |
| logger.info("\nFinal emotion scores:") | |
| for emotion, score in emotion_scores.items(): | |
| logger.info(f" - {emotion}: {score:.3f}") | |
| if toxicity_scores: | |
| logger.info("\nFinal toxicity scores:") | |
| for category, score in toxicity_scores.items(): | |
| logger.info(f" - {category}: {score:.3f}") | |
| # Aggregate manipulation scores | |
| manipulation_agg = { | |
| category: float(np.mean([ | |
| scores.get(category, 0) | |
| for scores in manipulation_scores | |
| ])) | |
| for category in manipulation_categories | |
| if manipulation_scores | |
| } | |
| logger.info("\nFinal manipulation scores:") | |
| for category, score in manipulation_agg.items(): | |
| logger.info(f" - {category}: {score:.3f}") | |
| # Calculate manipulation score based on multiple factors | |
| manipulation_indicators = { | |
| 'emotional manipulation': 0.4, | |
| 'fear mongering': 0.3, | |
| 'propaganda': 0.3 | |
| } | |
| if self.toxicity_available: | |
| manipulation_indicators.update({ | |
| 'toxic': 0.2, | |
| 'severe_toxic': 0.3, | |
| 'threat': 0.2 | |
| }) | |
| # Combine toxicity and manipulation scores | |
| combined_scores = {**toxicity_scores, **manipulation_agg} | |
| # Calculate manipulation score with fallback | |
| if combined_scores: | |
| manipulation_score = min(100, sum( | |
| combined_scores.get(k, 0) * weight | |
| for k, weight in manipulation_indicators.items() | |
| ) * 100) | |
| else: | |
| # Fallback to traditional analysis if no scores available | |
| manipulation_score = len(self._detect_manipulative_phrases(text)) * 10 | |
| logger.info(f"\nFinal manipulation score: {manipulation_score:.1f}") | |
| # Determine overall sentiment | |
| positive_emotions = ['admiration', 'joy', 'amusement', 'approval'] | |
| negative_emotions = ['disgust', 'anger', 'disappointment', 'fear'] | |
| neutral_emotions = ['neutral', 'confusion', 'realization'] | |
| pos_score = sum(emotion_scores.get(emotion, 0) for emotion in positive_emotions) | |
| neg_score = sum(emotion_scores.get(emotion, 0) for emotion in negative_emotions) | |
| neu_score = sum(emotion_scores.get(emotion, 0) for emotion in neutral_emotions) | |
| logger.info(f"\nSentiment component scores:") | |
| logger.info(f" - Positive: {pos_score:.3f}") | |
| logger.info(f" - Negative: {neg_score:.3f}") | |
| logger.info(f" - Neutral: {neu_score:.3f}") | |
| # Determine sentiment based on highest score | |
| max_score = max(pos_score, neg_score, neu_score) | |
| if max_score == pos_score and pos_score > 0.3: | |
| sentiment = "Positive" | |
| elif max_score == neg_score and neg_score > 0.3: | |
| sentiment = "Negative" | |
| else: | |
| sentiment = "Neutral" | |
| logger.info(f"\nFinal sentiment determination: {sentiment}") | |
| # Sort and limit flagged phrases by manipulation score | |
| sorted_phrases = sorted(flagged_phrases, key=lambda x: x['score'], reverse=True) | |
| unique_phrases = [] | |
| seen = set() | |
| for phrase in sorted_phrases: | |
| clean_text = phrase['text'].strip() | |
| if clean_text not in seen: | |
| unique_phrases.append(clean_text) | |
| seen.add(clean_text) | |
| if len(unique_phrases) >= 5: | |
| break | |
| logger.info(f"\nFlagged {len(unique_phrases)} unique manipulative phrases") | |
| logger.info("\nSentiment analysis completed successfully") | |
| return { | |
| "sentiment": sentiment, | |
| "manipulation_score": round(manipulation_score, 1), | |
| "flagged_phrases": unique_phrases, | |
| "detailed_scores": { | |
| "emotions": emotion_scores, | |
| "manipulation": manipulation_agg, | |
| "toxicity": toxicity_scores | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"LLM analysis failed: {str(e)}", exc_info=True) | |
| return None | |
| def analyze(self, text: str) -> Dict[str, Any]: | |
| """ | |
| Analyze sentiment using LLM with fallback to traditional methods. | |
| Args: | |
| text: The text to analyze | |
| Returns: | |
| Dict containing sentiment analysis results | |
| """ | |
| try: | |
| # Try LLM analysis if enabled and available | |
| if self.use_ai and self.llm_available: | |
| llm_result = self._analyze_with_llm(text) | |
| if llm_result: | |
| return llm_result | |
| # Use traditional analysis | |
| logger.info("Using traditional sentiment analysis") | |
| blob = TextBlob(text) | |
| sentiment_score = blob.sentiment.polarity | |
| manipulative_phrases = self._detect_manipulative_phrases(text) | |
| manipulation_score = len(manipulative_phrases) * 10 | |
| if sentiment_score > 0.2: | |
| sentiment = "Positive" | |
| elif sentiment_score < -0.2: | |
| sentiment = "Negative" | |
| else: | |
| sentiment = "Neutral" | |
| return { | |
| "sentiment": sentiment, | |
| "manipulation_score": min(manipulation_score, 100), | |
| "flagged_phrases": manipulative_phrases[:5] # Limit to top 5 phrases | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in sentiment analysis: {str(e)}") | |
| return { | |
| "sentiment": "Error", | |
| "manipulation_score": 0, | |
| "flagged_phrases": [] | |
| } | |
| def _detect_manipulative_phrases(self, text: str) -> List[str]: | |
| """Detect potentially manipulative phrases.""" | |
| found_phrases = [] | |
| text_lower = text.lower() | |
| for pattern in self.manipulative_patterns: | |
| if pattern in text_lower: | |
| start = text_lower.find(pattern) | |
| context = text[max(0, start-20):min(len(text), start+len(pattern)+20)] | |
| found_phrases.append(context.strip()) | |
| return found_phrases |