medical-report-analyzer / confidence_gating_system.py
snikhilesh's picture
Deploy backend with monitoring infrastructure - Complete Medical AI Platform
13d5ab4 verified
"""
Confidence Gating and Validation System - Phase 4
Implements composite confidence scoring, thresholds, and human review queue management.
This module builds on the preprocessing pipeline and model routing to provide intelligent
confidence-based gating, validation workflows, and review queue management for medical AI.
Author: MiniMax Agent
Date: 2025-10-29
Version: 1.0.0
"""
import os
import logging
import asyncio
import time
import json
import hashlib
from typing import Dict, List, Optional, Any, Tuple, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
# Import existing components
from medical_schemas import ConfidenceScore, ValidationResult, MedicalDocumentMetadata
from specialized_model_router import SpecializedModelRouter, ModelInferenceResult
from preprocessing_pipeline import PreprocessingPipeline, ProcessingResult
logger = logging.getLogger(__name__)
class ReviewPriority(Enum):
"""Priority levels for human review"""
CRITICAL = "critical" # <0.60 confidence - immediate manual review required
HIGH = "high" # 0.60-0.75 confidence - review recommended within 1 hour
MEDIUM = "medium" # 0.75-0.85 confidence - review recommended within 4 hours
LOW = "low" # 0.85-0.95 confidence - optional review for quality assurance
NONE = "none" # ≥0.95 confidence - auto-approve, audit only
class ValidationDecision(Enum):
"""Final validation decisions"""
AUTO_APPROVE = "auto_approve" # ≥0.85 confidence - automatically approved
REVIEW_RECOMMENDED = "review_recommended" # 0.60-0.85 confidence - human review recommended
MANUAL_REQUIRED = "manual_required" # <0.60 confidence - manual review required
BLOCKED = "blocked" # Critical errors - processing blocked
@dataclass
class ReviewQueueItem:
"""Item in the human review queue"""
item_id: str
document_id: str
priority: ReviewPriority
confidence_score: ConfidenceScore
processing_result: ProcessingResult
model_inference: ModelInferenceResult
review_decision: ValidationDecision
created_timestamp: datetime
review_deadline: datetime
assigned_reviewer: Optional[str] = None
review_notes: Optional[str] = None
reviewer_decision: Optional[str] = None
reviewed_timestamp: Optional[datetime] = None
escalated: bool = False
@dataclass
class AuditLogEntry:
"""Audit log entry for compliance tracking"""
log_id: str
document_id: str
event_type: str # "confidence_gating", "manual_review", "auto_approval", "escalation"
timestamp: datetime
user_id: Optional[str]
confidence_scores: Dict[str, float]
decision: str
reasoning: str
metadata: Dict[str, Any]
class ConfidenceGatingSystem:
"""Main confidence gating and validation system"""
def __init__(self,
preprocessing_pipeline: Optional[PreprocessingPipeline] = None,
model_router: Optional[SpecializedModelRouter] = None,
review_queue_path: str = "/tmp/review_queue",
audit_log_path: str = "/tmp/audit_logs"):
"""Initialize confidence gating system"""
self.preprocessing_pipeline = preprocessing_pipeline or PreprocessingPipeline()
self.model_router = model_router or SpecializedModelRouter()
# Queue and logging setup
self.review_queue_path = Path(review_queue_path)
self.audit_log_path = Path(audit_log_path)
self.review_queue_path.mkdir(exist_ok=True)
self.audit_log_path.mkdir(exist_ok=True)
# Review queue storage
self.review_queue: Dict[str, ReviewQueueItem] = {}
self.load_review_queue()
# Confidence thresholds
self.confidence_thresholds = {
"auto_approve": 0.85,
"review_recommended": 0.60,
"manual_required": 0.0
}
# Review deadlines by priority
self.review_deadlines = {
ReviewPriority.CRITICAL: timedelta(minutes=30),
ReviewPriority.HIGH: timedelta(hours=1),
ReviewPriority.MEDIUM: timedelta(hours=4),
ReviewPriority.LOW: timedelta(hours=24),
ReviewPriority.NONE: timedelta(days=7) # Audit only
}
# Statistics tracking
self.stats = {
"total_processed": 0,
"auto_approved": 0,
"review_recommended": 0,
"manual_required": 0,
"blocked": 0,
"average_confidence": 0.0,
"processing_times": [],
"reviewer_performance": {}
}
logger.info("Confidence Gating System initialized")
async def process_document(self, file_path: Path, user_id: Optional[str] = None) -> Dict[str, Any]:
"""Main document processing with confidence gating"""
start_time = time.time()
document_id = self._generate_document_id(file_path)
try:
logger.info(f"Processing document {document_id}: {file_path.name}")
# Stage 1: Preprocessing pipeline
preprocessing_result = await self.preprocessing_pipeline.process_file(file_path)
if not preprocessing_result:
return self._create_error_response(document_id, "Preprocessing failed")
# Stage 2: Model inference
model_result = await self.model_router.route_and_infer(preprocessing_result)
if not model_result:
return self._create_error_response(document_id, "Model inference failed")
# Stage 3: Composite confidence calculation
composite_confidence = self._calculate_composite_confidence(
preprocessing_result, model_result
)
# Stage 4: Confidence gating decision
validation_decision = self._make_validation_decision(composite_confidence)
# Stage 5: Handle based on decision
if validation_decision == ValidationDecision.AUTO_APPROVE:
response = await self._handle_auto_approval(
document_id, preprocessing_result, model_result, composite_confidence, user_id
)
elif validation_decision in [ValidationDecision.REVIEW_RECOMMENDED, ValidationDecision.MANUAL_REQUIRED]:
response = await self._handle_review_required(
document_id, preprocessing_result, model_result, composite_confidence,
validation_decision, user_id
)
else: # BLOCKED
response = await self._handle_blocked(
document_id, preprocessing_result, model_result, composite_confidence, user_id
)
# Update statistics
processing_time = time.time() - start_time
self._update_statistics(validation_decision, composite_confidence, processing_time)
return response
except Exception as e:
logger.error(f"Document processing error for {document_id}: {str(e)}")
return self._create_error_response(document_id, f"Processing error: {str(e)}")
def _calculate_composite_confidence(self,
preprocessing_result: ProcessingResult,
model_result: ModelInferenceResult) -> ConfidenceScore:
"""Calculate composite confidence from all pipeline stages"""
# Extract individual confidence components
extraction_confidence = preprocessing_result.validation_result.compliance_score
model_confidence = model_result.confidence_score
# Calculate data quality based on multiple factors
data_quality_factors = []
# Factor 1: File detection confidence
if hasattr(preprocessing_result, 'file_detection'):
data_quality_factors.append(preprocessing_result.file_detection.confidence)
# Factor 2: PHI removal completeness (higher score = better quality)
if hasattr(preprocessing_result, 'phi_result'):
phi_completeness = 1.0 - (len(preprocessing_result.phi_result.redactions) / 100) # Normalize
data_quality_factors.append(max(0.0, min(1.0, phi_completeness)))
# Factor 3: Processing errors (fewer errors = higher quality)
processing_errors = len(model_result.errors) if model_result.errors else 0
error_factor = max(0.0, 1.0 - (processing_errors * 0.1)) # Each error reduces quality by 10%
data_quality_factors.append(error_factor)
# Factor 4: Model processing time (reasonable time = higher quality)
time_factor = 1.0
if model_result.processing_time > 0:
# Optimal processing time is 1-10 seconds
if 1.0 <= model_result.processing_time <= 10.0:
time_factor = 1.0
elif model_result.processing_time < 1.0:
time_factor = 0.8 # Too fast might indicate incomplete processing
else:
time_factor = max(0.5, 1.0 - ((model_result.processing_time - 10.0) / 50.0))
data_quality_factors.append(time_factor)
# Calculate average data quality
data_quality = sum(data_quality_factors) / len(data_quality_factors) if data_quality_factors else 0.5
data_quality = max(0.0, min(1.0, data_quality)) # Ensure 0-1 range
# Create composite confidence score
composite_confidence = ConfidenceScore(
extraction_confidence=extraction_confidence,
model_confidence=model_confidence,
data_quality=data_quality
)
logger.info(f"Composite confidence calculated: {composite_confidence.overall_confidence:.3f}")
logger.info(f" - Extraction: {extraction_confidence:.3f}")
logger.info(f" - Model: {model_confidence:.3f}")
logger.info(f" - Data Quality: {data_quality:.3f}")
return composite_confidence
def _make_validation_decision(self, confidence: ConfidenceScore) -> ValidationDecision:
"""Make validation decision based on confidence thresholds"""
overall_confidence = confidence.overall_confidence
if overall_confidence >= self.confidence_thresholds["auto_approve"]:
return ValidationDecision.AUTO_APPROVE
elif overall_confidence >= self.confidence_thresholds["review_recommended"]:
return ValidationDecision.REVIEW_RECOMMENDED
elif overall_confidence >= self.confidence_thresholds["manual_required"]:
return ValidationDecision.MANUAL_REQUIRED
else:
return ValidationDecision.BLOCKED
def _determine_review_priority(self, confidence: ConfidenceScore) -> ReviewPriority:
"""Determine review priority based on confidence score"""
overall = confidence.overall_confidence
if overall < 0.60:
return ReviewPriority.CRITICAL
elif overall < 0.70:
return ReviewPriority.HIGH
elif overall < 0.80:
return ReviewPriority.MEDIUM
elif overall < 0.90:
return ReviewPriority.LOW
else:
return ReviewPriority.NONE
async def _handle_auto_approval(self, document_id: str, preprocessing_result: ProcessingResult,
model_result: ModelInferenceResult, confidence: ConfidenceScore,
user_id: Optional[str]) -> Dict[str, Any]:
"""Handle auto-approved documents"""
# Log the auto-approval
await self._log_audit_event(
document_id=document_id,
event_type="auto_approval",
user_id=user_id,
confidence_scores={
"extraction": confidence.extraction_confidence,
"model": confidence.model_confidence,
"data_quality": confidence.data_quality,
"overall": confidence.overall_confidence
},
decision="auto_approved",
reasoning=f"Confidence score {confidence.overall_confidence:.3f} meets auto-approval threshold (≥{self.confidence_thresholds['auto_approve']})"
)
return {
"document_id": document_id,
"status": "auto_approved",
"confidence": confidence.overall_confidence,
"decision": "auto_approve",
"reasoning": "High confidence - automatically approved",
"processing_result": {
"extraction_data": preprocessing_result.extraction_result,
"model_output": model_result.output_data,
"confidence_breakdown": {
"extraction": confidence.extraction_confidence,
"model": confidence.model_confidence,
"data_quality": confidence.data_quality
}
},
"requires_review": False,
"review_queue_id": None
}
async def _handle_review_required(self, document_id: str, preprocessing_result: ProcessingResult,
model_result: ModelInferenceResult, confidence: ConfidenceScore,
decision: ValidationDecision, user_id: Optional[str]) -> Dict[str, Any]:
"""Handle documents requiring review"""
# Determine review priority
priority = self._determine_review_priority(confidence)
# Calculate review deadline
deadline = datetime.now() + self.review_deadlines[priority]
# Create review queue item
queue_item = ReviewQueueItem(
item_id=self._generate_queue_id(),
document_id=document_id,
priority=priority,
confidence_score=confidence,
processing_result=preprocessing_result,
model_inference=model_result,
review_decision=decision,
created_timestamp=datetime.now(),
review_deadline=deadline
)
# Add to review queue
self.review_queue[queue_item.item_id] = queue_item
await self._save_review_queue()
# Log the review requirement
await self._log_audit_event(
document_id=document_id,
event_type="review_required",
user_id=user_id,
confidence_scores={
"extraction": confidence.extraction_confidence,
"model": confidence.model_confidence,
"data_quality": confidence.data_quality,
"overall": confidence.overall_confidence
},
decision=decision.value,
reasoning=f"Confidence score {confidence.overall_confidence:.3f} requires review (threshold: {self.confidence_thresholds['review_recommended']}-{self.confidence_thresholds['auto_approve']})"
)
return {
"document_id": document_id,
"status": "review_required",
"confidence": confidence.overall_confidence,
"decision": decision.value,
"reasoning": self._get_review_reasoning(confidence, decision),
"review_queue_id": queue_item.item_id,
"priority": priority.value,
"review_deadline": deadline.isoformat(),
"processing_result": {
"extraction_data": preprocessing_result.extraction_result,
"model_output": model_result.output_data,
"confidence_breakdown": {
"extraction": confidence.extraction_confidence,
"model": confidence.model_confidence,
"data_quality": confidence.data_quality
},
"warnings": model_result.warnings
},
"requires_review": True
}
async def _handle_blocked(self, document_id: str, preprocessing_result: ProcessingResult,
model_result: ModelInferenceResult, confidence: ConfidenceScore,
user_id: Optional[str]) -> Dict[str, Any]:
"""Handle blocked documents"""
# Log the blocking
await self._log_audit_event(
document_id=document_id,
event_type="blocked",
user_id=user_id,
confidence_scores={
"extraction": confidence.extraction_confidence,
"model": confidence.model_confidence,
"data_quality": confidence.data_quality,
"overall": confidence.overall_confidence
},
decision="blocked",
reasoning=f"Confidence score {confidence.overall_confidence:.3f} below acceptable threshold ({self.confidence_thresholds['manual_required']})"
)
return {
"document_id": document_id,
"status": "blocked",
"confidence": confidence.overall_confidence,
"decision": "blocked",
"reasoning": "Confidence too low for processing - manual intervention required",
"errors": model_result.errors,
"warnings": model_result.warnings,
"requires_review": True,
"escalate_immediately": True
}
def _get_review_reasoning(self, confidence: ConfidenceScore, decision: ValidationDecision) -> str:
"""Generate human-readable reasoning for review requirement"""
overall = confidence.overall_confidence
reasons = []
if confidence.extraction_confidence < 0.80:
reasons.append(f"Low extraction confidence ({confidence.extraction_confidence:.3f})")
if confidence.model_confidence < 0.80:
reasons.append(f"Low model confidence ({confidence.model_confidence:.3f})")
if confidence.data_quality < 0.80:
reasons.append(f"Poor data quality ({confidence.data_quality:.3f})")
if decision == ValidationDecision.REVIEW_RECOMMENDED:
base_reason = f"Medium confidence ({overall:.3f}) - review recommended for quality assurance"
else:
base_reason = f"Low confidence ({overall:.3f}) - manual review required"
if reasons:
return f"{base_reason}. Issues: {', '.join(reasons)}"
else:
return base_reason
def get_review_queue_status(self) -> Dict[str, Any]:
"""Get current review queue status"""
now = datetime.now()
# Categorize queue items
by_priority = {priority: [] for priority in ReviewPriority}
overdue = []
pending_count = 0
for item in self.review_queue.values():
if not item.reviewed_timestamp: # Still pending
pending_count += 1
by_priority[item.priority].append(item)
if now > item.review_deadline:
overdue.append(item)
return {
"total_pending": pending_count,
"by_priority": {
priority.value: len(items) for priority, items in by_priority.items()
},
"overdue_count": len(overdue),
"overdue_items": [
{
"item_id": item.item_id,
"document_id": item.document_id,
"priority": item.priority.value,
"overdue_hours": (now - item.review_deadline).total_seconds() / 3600
}
for item in overdue
],
"queue_health": "healthy" if len(overdue) == 0 else "degraded" if len(overdue) < 5 else "critical"
}
async def _log_audit_event(self, document_id: str, event_type: str, user_id: Optional[str],
confidence_scores: Dict[str, float], decision: str, reasoning: str):
"""Log audit event for compliance"""
log_entry = AuditLogEntry(
log_id=self._generate_log_id(),
document_id=document_id,
event_type=event_type,
timestamp=datetime.now(),
user_id=user_id,
confidence_scores=confidence_scores,
decision=decision,
reasoning=reasoning,
metadata={}
)
# Save to audit log file
log_file = self.audit_log_path / f"audit_{datetime.now().strftime('%Y%m%d')}.jsonl"
with open(log_file, 'a') as f:
f.write(json.dumps(asdict(log_entry), default=str) + '\n')
def _generate_document_id(self, file_path: Path) -> str:
"""Generate unique document ID"""
content_hash = hashlib.sha256(str(file_path).encode()).hexdigest()[:8]
timestamp = int(time.time())
return f"doc_{timestamp}_{content_hash}"
def _generate_queue_id(self) -> str:
"""Generate unique review queue ID"""
timestamp = int(time.time() * 1000) # Milliseconds for uniqueness
return f"queue_{timestamp}"
def _generate_log_id(self) -> str:
"""Generate unique log ID"""
timestamp = int(time.time() * 1000)
return f"log_{timestamp}"
def _create_error_response(self, document_id: str, error_message: str) -> Dict[str, Any]:
"""Create standardized error response"""
return {
"document_id": document_id,
"status": "error",
"confidence": 0.0,
"decision": "blocked",
"reasoning": error_message,
"requires_review": True,
"escalate_immediately": True,
"error": error_message
}
def load_review_queue(self):
"""Load review queue from persistent storage"""
queue_file = self.review_queue_path / "review_queue.json"
if queue_file.exists():
try:
with open(queue_file, 'r') as f:
queue_data = json.load(f)
# Convert back to ReviewQueueItem objects
for item_id, item_data in queue_data.items():
# Handle datetime conversion
item_data['created_timestamp'] = datetime.fromisoformat(item_data['created_timestamp'])
item_data['review_deadline'] = datetime.fromisoformat(item_data['review_deadline'])
if item_data.get('reviewed_timestamp'):
item_data['reviewed_timestamp'] = datetime.fromisoformat(item_data['reviewed_timestamp'])
# Recreate objects (simplified for now)
self.review_queue[item_id] = item_data
logger.info(f"Loaded {len(self.review_queue)} items from review queue")
except Exception as e:
logger.error(f"Failed to load review queue: {e}")
async def _save_review_queue(self):
"""Save review queue to persistent storage"""
queue_file = self.review_queue_path / "review_queue.json"
try:
# Convert to JSON-serializable format
queue_data = {}
for item_id, item in self.review_queue.items():
if isinstance(item, ReviewQueueItem):
queue_data[item_id] = asdict(item)
else:
queue_data[item_id] = item
with open(queue_file, 'w') as f:
json.dump(queue_data, f, indent=2, default=str)
except Exception as e:
logger.error(f"Failed to save review queue: {e}")
def _update_statistics(self, decision: ValidationDecision, confidence: ConfidenceScore, processing_time: float):
"""Update system statistics"""
self.stats["total_processed"] += 1
if decision == ValidationDecision.AUTO_APPROVE:
self.stats["auto_approved"] += 1
elif decision == ValidationDecision.REVIEW_RECOMMENDED:
self.stats["review_recommended"] += 1
elif decision == ValidationDecision.MANUAL_REQUIRED:
self.stats["manual_required"] += 1
elif decision == ValidationDecision.BLOCKED:
self.stats["blocked"] += 1
# Update average confidence
total_confidence = self.stats["average_confidence"] * (self.stats["total_processed"] - 1)
self.stats["average_confidence"] = (total_confidence + confidence.overall_confidence) / self.stats["total_processed"]
# Track processing times
self.stats["processing_times"].append(processing_time)
if len(self.stats["processing_times"]) > 1000: # Keep last 1000 times
self.stats["processing_times"] = self.stats["processing_times"][-1000:]
def get_system_statistics(self) -> Dict[str, Any]:
"""Get comprehensive system statistics"""
if self.stats["total_processed"] == 0:
return {"total_processed": 0, "status": "no_data"}
return {
"total_processed": self.stats["total_processed"],
"distribution": {
"auto_approved": {
"count": self.stats["auto_approved"],
"percentage": (self.stats["auto_approved"] / self.stats["total_processed"]) * 100
},
"review_recommended": {
"count": self.stats["review_recommended"],
"percentage": (self.stats["review_recommended"] / self.stats["total_processed"]) * 100
},
"manual_required": {
"count": self.stats["manual_required"],
"percentage": (self.stats["manual_required"] / self.stats["total_processed"]) * 100
},
"blocked": {
"count": self.stats["blocked"],
"percentage": (self.stats["blocked"] / self.stats["total_processed"]) * 100
}
},
"confidence_metrics": {
"average_confidence": self.stats["average_confidence"],
"success_rate": ((self.stats["auto_approved"] + self.stats["review_recommended"]) / self.stats["total_processed"]) * 100
},
"performance_metrics": {
"average_processing_time": sum(self.stats["processing_times"]) / len(self.stats["processing_times"]) if self.stats["processing_times"] else 0,
"median_processing_time": sorted(self.stats["processing_times"])[len(self.stats["processing_times"])//2] if self.stats["processing_times"] else 0
},
"system_health": "healthy" if self.stats["blocked"] / self.stats["total_processed"] < 0.1 else "degraded"
}
# Export main classes
__all__ = [
"ConfidenceGatingSystem",
"ReviewQueueItem",
"AuditLogEntry",
"ValidationDecision",
"ReviewPriority"
]