medical-report-analyzer / file_detector.py
snikhilesh's picture
Deploy backend with monitoring infrastructure - Complete Medical AI Platform
13d5ab4 verified
"""
File Detection and Routing System - Phase 2
Multi-format medical file detection with confidence scoring and routing logic.
This module provides robust file type detection for medical documents including
PDFs, DICOM files, ECG signals, and archives with confidence-based routing.
Author: MiniMax Agent
Date: 2025-10-29
Version: 1.0.0
"""
import os
import mimetypes
import hashlib
from typing import Dict, List, Optional, Tuple, Any
from pathlib import Path
import magic
from dataclasses import dataclass
from enum import Enum
import logging
# Configure logging
logger = logging.getLogger(__name__)
class MedicalFileType(Enum):
"""Enumerated medical file types for routing"""
PDF_CLINICAL = "pdf_clinical"
PDF_RADIOLOGY = "pdf_radiology"
PDF_LABORATORY = "pdf_laboratory"
PDF_ECG_REPORT = "pdf_ecg_report"
DICOM_CT = "dicom_ct"
DICOM_MRI = "dicom_mri"
DICOM_XRAY = "dicom_xray"
DICOM_ULTRASOUND = "dicom_ultrasound"
ECG_XML = "ecg_xml"
ECG_SCPE = "ecg_scpe"
ECG_CSV = "ecg_csv"
ECG_WFDB = "ecg_wfdb"
ARCHIVE_ZIP = "archive_zip"
ARCHIVE_TAR = "archive_tar"
IMAGE_TIFF = "image_tiff"
IMAGE_JPEG = "image_jpeg"
UNKNOWN = "unknown"
@dataclass
class FileDetectionResult:
"""Result of file type detection with confidence scoring"""
file_type: MedicalFileType
confidence: float
detected_features: List[str]
mime_type: str
file_size: int
metadata: Dict[str, Any]
recommended_extractor: str
class MedicalFileDetector:
"""Medical file type detection with multi-modal analysis"""
def __init__(self):
self.known_patterns = self._init_detection_patterns()
self.magic = magic.Magic(mime=True)
def _init_detection_patterns(self) -> Dict[str, Dict]:
"""Initialize detection patterns for various medical file types"""
return {
# PDF Patterns
"pdf_clinical": {
"extensions": [".pdf"],
"magic_bytes": [[b"%PDF"]],
"keywords": ["clinical", "progress note", "consultation", "assessment", "plan"],
"extractor": "pdf_text_extractor"
},
"pdf_radiology": {
"extensions": [".pdf"],
"magic_bytes": [[b"%PDF"]],
"keywords": ["radiology", "ct scan", "mri", "x-ray", "imaging", "findings", "impression"],
"extractor": "pdf_radiology_extractor"
},
"pdf_laboratory": {
"extensions": [".pdf"],
"magic_bytes": [[b"%PDF"]],
"keywords": ["laboratory", "lab results", "blood work", "test results", "reference range"],
"extractor": "pdf_laboratory_extractor"
},
"pdf_ecg_report": {
"extensions": [".pdf"],
"magic_bytes": [[b"%PDF"]],
"keywords": ["ecg", "ekg", "electrocardiogram", "rhythm", "heart rate", "st segment"],
"extractor": "pdf_ecg_extractor"
},
# DICOM Patterns
"dicom_ct": {
"extensions": [".dcm", ".dicom"],
"magic_bytes": [[b"DICM"]],
"keywords": ["computed tomography", "ct", "slice"],
"extractor": "dicom_processor"
},
"dicom_mri": {
"extensions": [".dcm", ".dicom"],
"magic_bytes": [[b"DICM"]],
"keywords": ["magnetic resonance", "mri", "t1", "t2", "flair"],
"extractor": "dicom_processor"
},
"dicom_xray": {
"extensions": [".dcm", ".dicom"],
"magic_bytes": [[b"DICM"]],
"keywords": ["x-ray", "radiograph", "chest", "abdomen", "bone"],
"extractor": "dicom_processor"
},
"dicom_ultrasound": {
"extensions": [".dcm", ".dicom"],
"magic_bytes": [[b"DICM"]],
"keywords": ["ultrasound", "sonogram", "echocardiogram"],
"extractor": "dicom_processor"
},
# ECG File Patterns
"ecg_xml": {
"extensions": [".xml", ".ecg"],
"magic_bytes": [[b"<?xml"], [b"<ECG"], [b"<electrocardiogram"]],
"keywords": ["ecg", "lead", "signal", "waveform"],
"extractor": "ecg_xml_processor"
},
"ecg_scpe": {
"extensions": [".scp", ".scpe"],
"magic_bytes": [[b"SCP-ECG"]],
"keywords": ["scp-ecg", "electrocardiogram"],
"extractor": "ecg_scp_processor"
},
"ecg_csv": {
"extensions": [".csv"],
"magic_bytes": [],
"keywords": ["time", "lead", "voltage", "millivolts", "ecg"],
"extractor": "ecg_csv_processor"
},
# Archive Patterns
"archive_zip": {
"extensions": [".zip"],
"magic_bytes": [[b"PK"]],
"keywords": [],
"extractor": "archive_processor"
},
"archive_tar": {
"extensions": [".tar", ".gz", ".tgz"],
"magic_bytes": [[b"ustar"], [b"\x1f\x8b"]],
"keywords": [],
"extractor": "archive_processor"
},
# Image Patterns
"image_tiff": {
"extensions": [".tiff", ".tif"],
"magic_bytes": [[b"II*\x00"], [b"MM\x00*"]],
"keywords": [],
"extractor": "image_processor"
},
"image_jpeg": {
"extensions": [".jpg", ".jpeg"],
"magic_bytes": [[b"\xff\xd8\xff"]],
"keywords": [],
"extractor": "image_processor"
}
}
def detect_file_type(self, file_path: str, content_sample: Optional[bytes] = None) -> FileDetectionResult:
"""
Detect medical file type with confidence scoring
Args:
file_path: Path to the file
content_sample: Optional sample of file content for detection
Returns:
FileDetectionResult with detected type and confidence
"""
try:
# Get basic file info
file_size = os.path.getsize(file_path)
file_ext = Path(file_path).suffix.lower()
detected_features = []
# Try mime type detection
mime_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream"
# Get file content sample if not provided
if content_sample is None:
with open(file_path, 'rb') as f:
content_sample = f.read(min(8192, file_size)) # Read first 8KB
# Analyze against known patterns
pattern_scores = []
for pattern_name, pattern_config in self.known_patterns.items():
score = 0.0
features = []
# Check file extension
if file_ext in pattern_config.get("extensions", []):
score += 0.3
features.append(f"extension_{file_ext}")
# Check magic bytes
for magic_bytes in pattern_config.get("magic_bytes", []):
if magic_bytes in content_sample:
score += 0.4
features.append("magic_bytes")
break
# Check content keywords
try:
content_text = content_sample.decode('utf-8', errors='ignore').lower()
for keyword in pattern_config.get("keywords", []):
if keyword.lower() in content_text:
score += 0.1
features.append(f"keyword_{keyword}")
except:
pass # Non-text content
# Additional scoring based on file characteristics
if pattern_name.startswith("dicom") and file_size > 1024*1024: # DICOM files are typically >1MB
score += 0.1
features.append("size_dicom")
if pattern_name.startswith("pdf") and 1024 < file_size < 50*1024*1024: # Reasonable PDF size
score += 0.1
features.append("size_pdf")
if score > 0:
pattern_scores.append((pattern_name, score, features))
# Select best match
if pattern_scores:
best_pattern, best_score, best_features = max(pattern_scores, key=lambda x: x[1])
file_type = MedicalFileType(best_pattern)
confidence = min(best_score, 1.0) # Cap at 1.0
detected_features = best_features
recommended_extractor = self.known_patterns[best_pattern]["extractor"]
else:
# Fallback to unknown
file_type = MedicalFileType.UNKNOWN
confidence = 0.1
detected_features = ["no_pattern_match"]
recommended_extractor = "generic_extractor"
# Adjust confidence based on file size
if file_size < 100: # Very small files
confidence *= 0.5
detected_features.append("very_small_file")
elif file_size > 100*1024*1024: # Very large files
confidence *= 0.8
detected_features.append("large_file")
metadata = {
"file_extension": file_ext,
"detection_method": "multi_modal",
"content_length": len(content_sample)
}
logger.info(f"File detection: {file_path} -> {file_type.value} (confidence: {confidence:.2f})")
return FileDetectionResult(
file_type=file_type,
confidence=confidence,
detected_features=detected_features,
mime_type=mime_type,
file_size=file_size,
metadata=metadata,
recommended_extractor=recommended_extractor
)
except Exception as e:
logger.error(f"File detection error for {file_path}: {str(e)}")
return FileDetectionResult(
file_type=MedicalFileType.UNKNOWN,
confidence=0.0,
detected_features=["detection_error"],
mime_type="application/octet-stream",
file_size=0,
metadata={"error": str(e)},
recommended_extractor="error_handler"
)
def batch_detect(self, file_paths: List[str]) -> List[FileDetectionResult]:
"""Detect file types for multiple files"""
results = []
for file_path in file_paths:
if os.path.exists(file_path):
result = self.detect_file_type(file_path)
results.append(result)
else:
logger.warning(f"File not found: {file_path}")
return results
def get_routing_info(self, detection_result: FileDetectionResult) -> Dict[str, Any]:
"""Get routing information for detected file type"""
return {
"extractor": detection_result.recommended_extractor,
"priority": "high" if detection_result.confidence > 0.8 else "medium" if detection_result.confidence > 0.5 else "low",
"requires_ocr": detection_result.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY,
MedicalFileType.PDF_LABORATORY, MedicalFileType.PDF_ECG_REPORT],
"supports_batch": detection_result.file_type in [MedicalFileType.DICOM_CT, MedicalFileType.DICOM_MRI,
MedicalFileType.ECG_CSV, MedicalFileType.ARCHIVE_ZIP],
"phi_risk": "high" if detection_result.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY,
MedicalFileType.PDF_LABORATORY] else "medium"
}
def calculate_file_hash(file_path: str) -> str:
"""Calculate SHA256 hash for file deduplication"""
hash_sha256 = hashlib.sha256()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
except Exception as e:
logger.error(f"Hash calculation error for {file_path}: {str(e)}")
return ""
# Export main classes and functions
__all__ = [
"MedicalFileDetector",
"MedicalFileType",
"FileDetectionResult",
"calculate_file_hash"
]