""" File Detection and Routing System - Phase 2 Multi-format medical file detection with confidence scoring and routing logic. This module provides robust file type detection for medical documents including PDFs, DICOM files, ECG signals, and archives with confidence-based routing. Author: MiniMax Agent Date: 2025-10-29 Version: 1.0.0 """ import os import mimetypes import hashlib from typing import Dict, List, Optional, Tuple, Any from pathlib import Path import magic from dataclasses import dataclass from enum import Enum import logging # Configure logging logger = logging.getLogger(__name__) class MedicalFileType(Enum): """Enumerated medical file types for routing""" PDF_CLINICAL = "pdf_clinical" PDF_RADIOLOGY = "pdf_radiology" PDF_LABORATORY = "pdf_laboratory" PDF_ECG_REPORT = "pdf_ecg_report" DICOM_CT = "dicom_ct" DICOM_MRI = "dicom_mri" DICOM_XRAY = "dicom_xray" DICOM_ULTRASOUND = "dicom_ultrasound" ECG_XML = "ecg_xml" ECG_SCPE = "ecg_scpe" ECG_CSV = "ecg_csv" ECG_WFDB = "ecg_wfdb" ARCHIVE_ZIP = "archive_zip" ARCHIVE_TAR = "archive_tar" IMAGE_TIFF = "image_tiff" IMAGE_JPEG = "image_jpeg" UNKNOWN = "unknown" @dataclass class FileDetectionResult: """Result of file type detection with confidence scoring""" file_type: MedicalFileType confidence: float detected_features: List[str] mime_type: str file_size: int metadata: Dict[str, Any] recommended_extractor: str class MedicalFileDetector: """Medical file type detection with multi-modal analysis""" def __init__(self): self.known_patterns = self._init_detection_patterns() self.magic = magic.Magic(mime=True) def _init_detection_patterns(self) -> Dict[str, Dict]: """Initialize detection patterns for various medical file types""" return { # PDF Patterns "pdf_clinical": { "extensions": [".pdf"], "magic_bytes": [[b"%PDF"]], "keywords": ["clinical", "progress note", "consultation", "assessment", "plan"], "extractor": "pdf_text_extractor" }, "pdf_radiology": { "extensions": [".pdf"], "magic_bytes": [[b"%PDF"]], "keywords": ["radiology", "ct scan", "mri", "x-ray", "imaging", "findings", "impression"], "extractor": "pdf_radiology_extractor" }, "pdf_laboratory": { "extensions": [".pdf"], "magic_bytes": [[b"%PDF"]], "keywords": ["laboratory", "lab results", "blood work", "test results", "reference range"], "extractor": "pdf_laboratory_extractor" }, "pdf_ecg_report": { "extensions": [".pdf"], "magic_bytes": [[b"%PDF"]], "keywords": ["ecg", "ekg", "electrocardiogram", "rhythm", "heart rate", "st segment"], "extractor": "pdf_ecg_extractor" }, # DICOM Patterns "dicom_ct": { "extensions": [".dcm", ".dicom"], "magic_bytes": [[b"DICM"]], "keywords": ["computed tomography", "ct", "slice"], "extractor": "dicom_processor" }, "dicom_mri": { "extensions": [".dcm", ".dicom"], "magic_bytes": [[b"DICM"]], "keywords": ["magnetic resonance", "mri", "t1", "t2", "flair"], "extractor": "dicom_processor" }, "dicom_xray": { "extensions": [".dcm", ".dicom"], "magic_bytes": [[b"DICM"]], "keywords": ["x-ray", "radiograph", "chest", "abdomen", "bone"], "extractor": "dicom_processor" }, "dicom_ultrasound": { "extensions": [".dcm", ".dicom"], "magic_bytes": [[b"DICM"]], "keywords": ["ultrasound", "sonogram", "echocardiogram"], "extractor": "dicom_processor" }, # ECG File Patterns "ecg_xml": { "extensions": [".xml", ".ecg"], "magic_bytes": [[b" FileDetectionResult: """ Detect medical file type with confidence scoring Args: file_path: Path to the file content_sample: Optional sample of file content for detection Returns: FileDetectionResult with detected type and confidence """ try: # Get basic file info file_size = os.path.getsize(file_path) file_ext = Path(file_path).suffix.lower() detected_features = [] # Try mime type detection mime_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream" # Get file content sample if not provided if content_sample is None: with open(file_path, 'rb') as f: content_sample = f.read(min(8192, file_size)) # Read first 8KB # Analyze against known patterns pattern_scores = [] for pattern_name, pattern_config in self.known_patterns.items(): score = 0.0 features = [] # Check file extension if file_ext in pattern_config.get("extensions", []): score += 0.3 features.append(f"extension_{file_ext}") # Check magic bytes for magic_bytes in pattern_config.get("magic_bytes", []): if magic_bytes in content_sample: score += 0.4 features.append("magic_bytes") break # Check content keywords try: content_text = content_sample.decode('utf-8', errors='ignore').lower() for keyword in pattern_config.get("keywords", []): if keyword.lower() in content_text: score += 0.1 features.append(f"keyword_{keyword}") except: pass # Non-text content # Additional scoring based on file characteristics if pattern_name.startswith("dicom") and file_size > 1024*1024: # DICOM files are typically >1MB score += 0.1 features.append("size_dicom") if pattern_name.startswith("pdf") and 1024 < file_size < 50*1024*1024: # Reasonable PDF size score += 0.1 features.append("size_pdf") if score > 0: pattern_scores.append((pattern_name, score, features)) # Select best match if pattern_scores: best_pattern, best_score, best_features = max(pattern_scores, key=lambda x: x[1]) file_type = MedicalFileType(best_pattern) confidence = min(best_score, 1.0) # Cap at 1.0 detected_features = best_features recommended_extractor = self.known_patterns[best_pattern]["extractor"] else: # Fallback to unknown file_type = MedicalFileType.UNKNOWN confidence = 0.1 detected_features = ["no_pattern_match"] recommended_extractor = "generic_extractor" # Adjust confidence based on file size if file_size < 100: # Very small files confidence *= 0.5 detected_features.append("very_small_file") elif file_size > 100*1024*1024: # Very large files confidence *= 0.8 detected_features.append("large_file") metadata = { "file_extension": file_ext, "detection_method": "multi_modal", "content_length": len(content_sample) } logger.info(f"File detection: {file_path} -> {file_type.value} (confidence: {confidence:.2f})") return FileDetectionResult( file_type=file_type, confidence=confidence, detected_features=detected_features, mime_type=mime_type, file_size=file_size, metadata=metadata, recommended_extractor=recommended_extractor ) except Exception as e: logger.error(f"File detection error for {file_path}: {str(e)}") return FileDetectionResult( file_type=MedicalFileType.UNKNOWN, confidence=0.0, detected_features=["detection_error"], mime_type="application/octet-stream", file_size=0, metadata={"error": str(e)}, recommended_extractor="error_handler" ) def batch_detect(self, file_paths: List[str]) -> List[FileDetectionResult]: """Detect file types for multiple files""" results = [] for file_path in file_paths: if os.path.exists(file_path): result = self.detect_file_type(file_path) results.append(result) else: logger.warning(f"File not found: {file_path}") return results def get_routing_info(self, detection_result: FileDetectionResult) -> Dict[str, Any]: """Get routing information for detected file type""" return { "extractor": detection_result.recommended_extractor, "priority": "high" if detection_result.confidence > 0.8 else "medium" if detection_result.confidence > 0.5 else "low", "requires_ocr": detection_result.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY, MedicalFileType.PDF_LABORATORY, MedicalFileType.PDF_ECG_REPORT], "supports_batch": detection_result.file_type in [MedicalFileType.DICOM_CT, MedicalFileType.DICOM_MRI, MedicalFileType.ECG_CSV, MedicalFileType.ARCHIVE_ZIP], "phi_risk": "high" if detection_result.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY, MedicalFileType.PDF_LABORATORY] else "medium" } def calculate_file_hash(file_path: str) -> str: """Calculate SHA256 hash for file deduplication""" hash_sha256 = hashlib.sha256() try: with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest() except Exception as e: logger.error(f"Hash calculation error for {file_path}: {str(e)}") return "" # Export main classes and functions __all__ = [ "MedicalFileDetector", "MedicalFileType", "FileDetectionResult", "calculate_file_hash" ]