Spaces:
Sleeping
Sleeping
| """ | |
| Analysis Synthesizer - Result Aggregation and Synthesis | |
| Combines outputs from multiple specialized models | |
| """ | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class AnalysisSynthesizer: | |
| """ | |
| Synthesizes results from multiple specialized models into | |
| a comprehensive medical document analysis | |
| Implements: | |
| - Result aggregation | |
| - Conflict resolution | |
| - Confidence calibration | |
| - Clinical insights generation | |
| """ | |
| def __init__(self): | |
| self.fusion_strategies = { | |
| "early": self._early_fusion, | |
| "late": self._late_fusion, | |
| "weighted": self._weighted_fusion | |
| } | |
| logger.info("Analysis Synthesizer initialized") | |
| async def synthesize( | |
| self, | |
| classification: Dict[str, Any], | |
| specialized_results: List[Dict[str, Any]], | |
| pdf_content: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Synthesize results from multiple models | |
| Returns comprehensive analysis with: | |
| - Aggregated findings | |
| - Key insights | |
| - Recommendations | |
| - Risk assessment | |
| - Confidence scores | |
| """ | |
| try: | |
| logger.info(f"Synthesizing {len(specialized_results)} model results") | |
| # Extract successful results | |
| successful_results = [ | |
| r for r in specialized_results | |
| if r.get("status") == "completed" | |
| ] | |
| if not successful_results: | |
| return self._generate_fallback_analysis(classification, pdf_content) | |
| # Aggregate findings by domain | |
| aggregated_findings = self._aggregate_by_domain(successful_results) | |
| # Generate clinical insights | |
| insights = self._generate_insights( | |
| aggregated_findings, | |
| classification, | |
| pdf_content | |
| ) | |
| # Calculate overall confidence | |
| overall_confidence = self._calculate_overall_confidence(successful_results) | |
| # Generate summary | |
| summary = self._generate_summary( | |
| classification, | |
| aggregated_findings, | |
| insights | |
| ) | |
| # Generate recommendations | |
| recommendations = self._generate_recommendations( | |
| aggregated_findings, | |
| classification | |
| ) | |
| # Compile final analysis | |
| analysis = { | |
| "document_type": classification["document_type"], | |
| "classification_confidence": classification["confidence"], | |
| "overall_confidence": overall_confidence, | |
| "summary": summary, | |
| "aggregated_findings": aggregated_findings, | |
| "clinical_insights": insights, | |
| "recommendations": recommendations, | |
| "models_used": [ | |
| { | |
| "model": r["model_name"], | |
| "domain": r["domain"], | |
| "confidence": r.get("result", {}).get("confidence", 0.0) | |
| } | |
| for r in successful_results | |
| ], | |
| "quality_metrics": { | |
| "models_executed": len(successful_results), | |
| "models_failed": len(specialized_results) - len(successful_results), | |
| "overall_confidence": overall_confidence | |
| }, | |
| "metadata": { | |
| "synthesis_timestamp": datetime.utcnow().isoformat(), | |
| "page_count": pdf_content.get("page_count", 0), | |
| "has_images": len(pdf_content.get("images", [])) > 0, | |
| "has_tables": len(pdf_content.get("tables", [])) > 0 | |
| } | |
| } | |
| logger.info("Synthesis completed successfully") | |
| return analysis | |
| except Exception as e: | |
| logger.error(f"Synthesis failed: {str(e)}") | |
| return self._generate_fallback_analysis(classification, pdf_content) | |
| def _aggregate_by_domain( | |
| self, | |
| results: List[Dict[str, Any]] | |
| ) -> Dict[str, Any]: | |
| """Aggregate results by medical domain""" | |
| aggregated = {} | |
| for result in results: | |
| domain = result.get("domain", "general") | |
| if domain not in aggregated: | |
| aggregated[domain] = { | |
| "models": [], | |
| "findings": [], | |
| "confidence_scores": [] | |
| } | |
| aggregated[domain]["models"].append(result["model_name"]) | |
| # Extract findings from result | |
| result_data = result.get("result", {}) | |
| if "findings" in result_data: | |
| aggregated[domain]["findings"].append(result_data["findings"]) | |
| if "key_findings" in result_data: | |
| aggregated[domain]["findings"].extend(result_data["key_findings"]) | |
| if "analysis" in result_data: | |
| aggregated[domain]["findings"].append(result_data["analysis"]) | |
| confidence = result_data.get("confidence", 0.0) | |
| aggregated[domain]["confidence_scores"].append(confidence) | |
| # Calculate average confidence per domain | |
| for domain in aggregated: | |
| scores = aggregated[domain]["confidence_scores"] | |
| aggregated[domain]["average_confidence"] = sum(scores) / len(scores) if scores else 0.0 | |
| return aggregated | |
| def _generate_insights( | |
| self, | |
| aggregated_findings: Dict[str, Any], | |
| classification: Dict[str, Any], | |
| pdf_content: Dict[str, Any] | |
| ) -> List[Dict[str, str]]: | |
| """Generate clinical insights from aggregated findings""" | |
| insights = [] | |
| # Document structure insight | |
| page_count = pdf_content.get("page_count", 0) | |
| if page_count > 0: | |
| insights.append({ | |
| "category": "Document Structure", | |
| "insight": f"Document contains {page_count} pages with {'comprehensive' if page_count > 5 else 'standard'} documentation", | |
| "importance": "medium" | |
| }) | |
| # Classification insight | |
| doc_type = classification["document_type"] | |
| confidence = classification["confidence"] | |
| insights.append({ | |
| "category": "Document Classification", | |
| "insight": f"Document identified as {doc_type.replace('_', ' ').title()} with {confidence*100:.0f}% confidence", | |
| "importance": "high" | |
| }) | |
| # Domain-specific insights | |
| for domain, data in aggregated_findings.items(): | |
| avg_confidence = data.get("average_confidence", 0.0) | |
| model_count = len(data.get("models", [])) | |
| insights.append({ | |
| "category": domain.replace("_", " ").title(), | |
| "insight": f"Analysis completed by {model_count} specialized model(s) with {avg_confidence*100:.0f}% average confidence", | |
| "importance": "high" if avg_confidence > 0.8 else "medium" | |
| }) | |
| # Data richness insight | |
| has_images = pdf_content.get("images", []) | |
| has_tables = pdf_content.get("tables", []) | |
| if has_images: | |
| insights.append({ | |
| "category": "Multimodal Content", | |
| "insight": f"Document contains {len(has_images)} image(s) for enhanced analysis", | |
| "importance": "medium" | |
| }) | |
| if has_tables: | |
| insights.append({ | |
| "category": "Structured Data", | |
| "insight": f"Document contains {len(has_tables)} table(s) with structured information", | |
| "importance": "medium" | |
| }) | |
| return insights | |
| def _calculate_overall_confidence(self, results: List[Dict[str, Any]]) -> float: | |
| """Calculate weighted overall confidence score""" | |
| if not results: | |
| return 0.0 | |
| confidences = [] | |
| weights = [] | |
| for result in results: | |
| confidence = result.get("result", {}).get("confidence", 0.0) | |
| priority = result.get("priority", "secondary") | |
| # Weight by priority | |
| weight = 1.5 if priority == "primary" else 1.0 | |
| confidences.append(confidence) | |
| weights.append(weight) | |
| # Weighted average | |
| weighted_sum = sum(c * w for c, w in zip(confidences, weights)) | |
| total_weight = sum(weights) | |
| return weighted_sum / total_weight if total_weight > 0 else 0.0 | |
| def _generate_summary( | |
| self, | |
| classification: Dict[str, Any], | |
| aggregated_findings: Dict[str, Any], | |
| insights: List[Dict[str, str]] | |
| ) -> str: | |
| """Generate executive summary of analysis""" | |
| doc_type = classification["document_type"].replace("_", " ").title() | |
| summary_parts = [ | |
| f"Medical Document Analysis: {doc_type}", | |
| f"\nThis document has been processed through our comprehensive AI analysis pipeline using {len(aggregated_findings)} specialized medical AI domain(s).", | |
| ] | |
| # Add domain summaries | |
| for domain, data in aggregated_findings.items(): | |
| domain_name = domain.replace("_", " ").title() | |
| model_count = len(data.get("models", [])) | |
| avg_conf = data.get("average_confidence", 0.0) | |
| summary_parts.append( | |
| f"\n\n{domain_name}: Analyzed by {model_count} model(s) with {avg_conf*100:.0f}% confidence. " | |
| f"{'High confidence analysis completed.' if avg_conf > 0.8 else 'Analysis completed with moderate confidence.'}" | |
| ) | |
| # Add insights summary | |
| high_importance = [i for i in insights if i.get("importance") == "high"] | |
| if high_importance: | |
| summary_parts.append( | |
| f"\n\nKey Findings: {len(high_importance)} high-priority insights identified for clinical review." | |
| ) | |
| summary_parts.append( | |
| "\n\nThis analysis provides AI-assisted insights and should be reviewed by qualified healthcare professionals for clinical decision-making." | |
| ) | |
| return "".join(summary_parts) | |
| def _generate_recommendations( | |
| self, | |
| aggregated_findings: Dict[str, Any], | |
| classification: Dict[str, Any] | |
| ) -> List[Dict[str, str]]: | |
| """Generate recommendations based on analysis""" | |
| recommendations = [] | |
| # Classification-based recommendations | |
| doc_type = classification["document_type"] | |
| if doc_type == "radiology": | |
| recommendations.append({ | |
| "category": "Clinical Review", | |
| "recommendation": "Radiologist review recommended for imaging findings confirmation", | |
| "priority": "high" | |
| }) | |
| elif doc_type == "pathology": | |
| recommendations.append({ | |
| "category": "Clinical Review", | |
| "recommendation": "Pathologist verification required for tissue analysis", | |
| "priority": "high" | |
| }) | |
| elif doc_type == "laboratory": | |
| recommendations.append({ | |
| "category": "Clinical Review", | |
| "recommendation": "Review laboratory values in context of patient history", | |
| "priority": "medium" | |
| }) | |
| elif doc_type == "cardiology": | |
| recommendations.append({ | |
| "category": "Clinical Review", | |
| "recommendation": "Cardiologist review recommended for cardiac findings", | |
| "priority": "high" | |
| }) | |
| # General recommendations | |
| recommendations.append({ | |
| "category": "Data Quality", | |
| "recommendation": "All AI-generated insights should be validated by qualified healthcare professionals", | |
| "priority": "high" | |
| }) | |
| recommendations.append({ | |
| "category": "Documentation", | |
| "recommendation": "Maintain this analysis report with patient medical records", | |
| "priority": "medium" | |
| }) | |
| # Confidence-based recommendations | |
| low_confidence_domains = [ | |
| domain for domain, data in aggregated_findings.items() | |
| if data.get("average_confidence", 0.0) < 0.7 | |
| ] | |
| if low_confidence_domains: | |
| recommendations.append({ | |
| "category": "Analysis Quality", | |
| "recommendation": f"Lower confidence detected in {', '.join(low_confidence_domains)}. Consider manual review.", | |
| "priority": "medium" | |
| }) | |
| return recommendations | |
| def _generate_fallback_analysis( | |
| self, | |
| classification: Dict[str, Any], | |
| pdf_content: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Generate fallback analysis when no models succeeded""" | |
| return { | |
| "document_type": classification["document_type"], | |
| "classification_confidence": classification["confidence"], | |
| "overall_confidence": 0.0, | |
| "summary": "Analysis could not be completed. Document was classified but specialized model processing failed.", | |
| "aggregated_findings": {}, | |
| "clinical_insights": [], | |
| "recommendations": [{ | |
| "category": "Manual Review", | |
| "recommendation": "Manual review required - automated analysis unavailable", | |
| "priority": "high" | |
| }], | |
| "models_used": [], | |
| "quality_metrics": { | |
| "models_executed": 0, | |
| "models_failed": 0, | |
| "overall_confidence": 0.0 | |
| }, | |
| "metadata": { | |
| "synthesis_timestamp": datetime.utcnow().isoformat(), | |
| "page_count": pdf_content.get("page_count", 0), | |
| "fallback": True | |
| } | |
| } | |
| def _early_fusion(self, results: List[Dict]) -> Dict: | |
| """Early fusion strategy - combine features before analysis""" | |
| pass | |
| def _late_fusion(self, results: List[Dict]) -> Dict: | |
| """Late fusion strategy - combine predictions after analysis""" | |
| pass | |
| def _weighted_fusion(self, results: List[Dict]) -> Dict: | |
| """Weighted fusion strategy - weight by model confidence""" | |
| pass | |