|
|
""" |
|
|
Enterprise Monitoring Service for Medical AI Platform |
|
|
Comprehensive monitoring, metrics tracking, and alerting system |
|
|
|
|
|
Features: |
|
|
- Real-time performance monitoring |
|
|
- Error rate tracking with automated alerts |
|
|
- Latency analysis across pipeline stages |
|
|
- Resource utilization monitoring |
|
|
- Model performance tracking |
|
|
- System health indicators |
|
|
|
|
|
Author: MiniMax Agent |
|
|
Date: 2025-10-29 |
|
|
Version: 1.0.0 |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import time |
|
|
import hashlib |
|
|
import json |
|
|
import pickle |
|
|
from typing import Dict, List, Any, Optional, Tuple |
|
|
from datetime import datetime, timedelta |
|
|
from collections import defaultdict, deque |
|
|
from dataclasses import dataclass, asdict |
|
|
from enum import Enum |
|
|
import asyncio |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class SystemStatus(Enum): |
|
|
"""System operational status levels""" |
|
|
OPERATIONAL = "operational" |
|
|
DEGRADED = "degraded" |
|
|
CRITICAL = "critical" |
|
|
MAINTENANCE = "maintenance" |
|
|
|
|
|
|
|
|
class AlertLevel(Enum): |
|
|
"""Alert severity levels""" |
|
|
INFO = "info" |
|
|
WARNING = "warning" |
|
|
ERROR = "error" |
|
|
CRITICAL = "critical" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PerformanceMetric: |
|
|
"""Performance metric data structure""" |
|
|
metric_name: str |
|
|
value: float |
|
|
unit: str |
|
|
timestamp: str |
|
|
tags: Dict[str, str] |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
return asdict(self) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Alert: |
|
|
"""Alert data structure""" |
|
|
alert_id: str |
|
|
level: AlertLevel |
|
|
message: str |
|
|
category: str |
|
|
timestamp: str |
|
|
details: Dict[str, Any] |
|
|
resolved: bool = False |
|
|
resolved_at: Optional[str] = None |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"alert_id": self.alert_id, |
|
|
"level": self.level.value, |
|
|
"message": self.message, |
|
|
"category": self.category, |
|
|
"timestamp": self.timestamp, |
|
|
"details": self.details, |
|
|
"resolved": self.resolved, |
|
|
"resolved_at": self.resolved_at |
|
|
} |
|
|
|
|
|
|
|
|
class MetricsCollector: |
|
|
""" |
|
|
Collects and aggregates performance metrics |
|
|
Provides time-series data for monitoring and analysis |
|
|
""" |
|
|
|
|
|
def __init__(self, retention_hours: int = 24): |
|
|
self.retention_hours = retention_hours |
|
|
self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=10000)) |
|
|
self.counters: Dict[str, int] = defaultdict(int) |
|
|
self.gauges: Dict[str, float] = defaultdict(float) |
|
|
|
|
|
logger.info(f"Metrics Collector initialized (retention: {retention_hours}h)") |
|
|
|
|
|
def record_metric( |
|
|
self, |
|
|
metric_name: str, |
|
|
value: float, |
|
|
unit: str = "count", |
|
|
tags: Optional[Dict[str, str]] = None |
|
|
): |
|
|
"""Record a performance metric""" |
|
|
metric = PerformanceMetric( |
|
|
metric_name=metric_name, |
|
|
value=value, |
|
|
unit=unit, |
|
|
timestamp=datetime.utcnow().isoformat(), |
|
|
tags=tags or {} |
|
|
) |
|
|
|
|
|
self.metrics[metric_name].append(metric) |
|
|
self._cleanup_old_metrics() |
|
|
|
|
|
def increment_counter(self, counter_name: str, value: int = 1): |
|
|
"""Increment a counter metric""" |
|
|
self.counters[counter_name] += value |
|
|
|
|
|
def set_gauge(self, gauge_name: str, value: float): |
|
|
"""Set a gauge metric (current value)""" |
|
|
self.gauges[gauge_name] = value |
|
|
|
|
|
def get_metrics( |
|
|
self, |
|
|
metric_name: str, |
|
|
start_time: Optional[datetime] = None, |
|
|
end_time: Optional[datetime] = None |
|
|
) -> List[PerformanceMetric]: |
|
|
"""Retrieve metrics within time range""" |
|
|
metrics = list(self.metrics.get(metric_name, [])) |
|
|
|
|
|
if start_time or end_time: |
|
|
filtered = [] |
|
|
for metric in metrics: |
|
|
metric_time = datetime.fromisoformat(metric.timestamp) |
|
|
if start_time and metric_time < start_time: |
|
|
continue |
|
|
if end_time and metric_time > end_time: |
|
|
continue |
|
|
filtered.append(metric) |
|
|
return filtered |
|
|
|
|
|
return metrics |
|
|
|
|
|
def get_statistics( |
|
|
self, |
|
|
metric_name: str, |
|
|
window_minutes: int = 60 |
|
|
) -> Dict[str, float]: |
|
|
"""Calculate statistics for a metric over time window""" |
|
|
cutoff = datetime.utcnow() - timedelta(minutes=window_minutes) |
|
|
metrics = [ |
|
|
m for m in self.metrics.get(metric_name, []) |
|
|
if datetime.fromisoformat(m.timestamp) > cutoff |
|
|
] |
|
|
|
|
|
if not metrics: |
|
|
return { |
|
|
"count": 0, |
|
|
"mean": 0.0, |
|
|
"min": 0.0, |
|
|
"max": 0.0, |
|
|
"p50": 0.0, |
|
|
"p95": 0.0, |
|
|
"p99": 0.0 |
|
|
} |
|
|
|
|
|
values = sorted([m.value for m in metrics]) |
|
|
count = len(values) |
|
|
|
|
|
return { |
|
|
"count": count, |
|
|
"mean": sum(values) / count, |
|
|
"min": values[0], |
|
|
"max": values[-1], |
|
|
"p50": values[int(count * 0.50)], |
|
|
"p95": values[int(count * 0.95)] if count > 1 else values[0], |
|
|
"p99": values[int(count * 0.99)] if count > 1 else values[0] |
|
|
} |
|
|
|
|
|
def _cleanup_old_metrics(self): |
|
|
"""Remove metrics older than retention period""" |
|
|
cutoff = datetime.utcnow() - timedelta(hours=self.retention_hours) |
|
|
|
|
|
for metric_name in list(self.metrics.keys()): |
|
|
metrics = self.metrics[metric_name] |
|
|
|
|
|
while metrics and datetime.fromisoformat(metrics[0].timestamp) < cutoff: |
|
|
metrics.popleft() |
|
|
|
|
|
def get_counter(self, counter_name: str, default: int = 0) -> int: |
|
|
"""Get value of a specific counter""" |
|
|
return self.counters.get(counter_name, default) |
|
|
|
|
|
def get_all_counters(self) -> Dict[str, int]: |
|
|
"""Get all counter values""" |
|
|
return dict(self.counters) |
|
|
|
|
|
def get_all_gauges(self) -> Dict[str, float]: |
|
|
"""Get all gauge values""" |
|
|
return dict(self.gauges) |
|
|
|
|
|
|
|
|
class ErrorMonitor: |
|
|
""" |
|
|
Monitors error rates and triggers alerts |
|
|
Tracks errors across different categories and stages |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
error_threshold: float = 0.05, |
|
|
window_minutes: int = 15 |
|
|
): |
|
|
self.error_threshold = error_threshold |
|
|
self.window_minutes = window_minutes |
|
|
self.errors: deque = deque(maxlen=10000) |
|
|
self.success_count: deque = deque(maxlen=10000) |
|
|
self.error_categories: Dict[str, int] = defaultdict(int) |
|
|
|
|
|
logger.info(f"Error Monitor initialized (threshold: {error_threshold*100}%, window: {window_minutes}m)") |
|
|
|
|
|
def record_error( |
|
|
self, |
|
|
error_type: str, |
|
|
error_message: str, |
|
|
stage: str, |
|
|
details: Optional[Dict[str, Any]] = None |
|
|
): |
|
|
"""Record an error occurrence""" |
|
|
error_record = { |
|
|
"error_type": error_type, |
|
|
"error_message": error_message, |
|
|
"stage": stage, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"details": details or {} |
|
|
} |
|
|
|
|
|
self.errors.append(error_record) |
|
|
self.error_categories[f"{stage}:{error_type}"] += 1 |
|
|
|
|
|
logger.warning(f"Error recorded: {stage} - {error_type}: {error_message}") |
|
|
|
|
|
def record_success(self, stage: str): |
|
|
"""Record a successful operation""" |
|
|
self.success_count.append({ |
|
|
"stage": stage, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
}) |
|
|
|
|
|
def get_error_rate(self, stage: Optional[str] = None) -> float: |
|
|
"""Calculate error rate within time window""" |
|
|
cutoff = datetime.utcnow() - timedelta(minutes=self.window_minutes) |
|
|
|
|
|
|
|
|
recent_errors = [ |
|
|
e for e in self.errors |
|
|
if datetime.fromisoformat(e["timestamp"]) > cutoff |
|
|
] |
|
|
|
|
|
|
|
|
recent_successes = [ |
|
|
s for s in self.success_count |
|
|
if datetime.fromisoformat(s["timestamp"]) > cutoff |
|
|
] |
|
|
|
|
|
|
|
|
if stage: |
|
|
recent_errors = [e for e in recent_errors if e["stage"] == stage] |
|
|
recent_successes = [s for s in recent_successes if s["stage"] == stage] |
|
|
|
|
|
total = len(recent_errors) + len(recent_successes) |
|
|
if total == 0: |
|
|
return 0.0 |
|
|
|
|
|
return len(recent_errors) / total |
|
|
|
|
|
def check_threshold_exceeded(self, stage: Optional[str] = None) -> bool: |
|
|
"""Check if error rate exceeds threshold""" |
|
|
error_rate = self.get_error_rate(stage) |
|
|
return error_rate > self.error_threshold |
|
|
|
|
|
def get_error_summary(self) -> Dict[str, Any]: |
|
|
"""Get error summary statistics""" |
|
|
cutoff = datetime.utcnow() - timedelta(minutes=self.window_minutes) |
|
|
|
|
|
recent_errors = [ |
|
|
e for e in self.errors |
|
|
if datetime.fromisoformat(e["timestamp"]) > cutoff |
|
|
] |
|
|
|
|
|
|
|
|
category_counts = defaultdict(int) |
|
|
stage_counts = defaultdict(int) |
|
|
for error in recent_errors: |
|
|
category_counts[error["error_type"]] += 1 |
|
|
stage_counts[error["stage"]] += 1 |
|
|
|
|
|
return { |
|
|
"total_errors": len(recent_errors), |
|
|
"error_rate": self.get_error_rate(), |
|
|
"threshold_exceeded": self.check_threshold_exceeded(), |
|
|
"by_category": dict(category_counts), |
|
|
"by_stage": dict(stage_counts), |
|
|
"window_minutes": self.window_minutes |
|
|
} |
|
|
|
|
|
|
|
|
class LatencyTracker: |
|
|
""" |
|
|
Tracks latency across pipeline stages |
|
|
Provides detailed timing analysis |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.active_traces: Dict[str, Dict[str, float]] = {} |
|
|
self.completed_traces: deque = deque(maxlen=1000) |
|
|
|
|
|
logger.info("Latency Tracker initialized") |
|
|
|
|
|
def start_trace(self, trace_id: str, stage: str): |
|
|
"""Start timing a pipeline stage""" |
|
|
if trace_id not in self.active_traces: |
|
|
self.active_traces[trace_id] = {} |
|
|
|
|
|
self.active_traces[trace_id][f"{stage}_start"] = time.time() |
|
|
|
|
|
def end_trace(self, trace_id: str, stage: str) -> float: |
|
|
"""End timing a pipeline stage and return duration""" |
|
|
if trace_id not in self.active_traces: |
|
|
logger.warning(f"Trace {trace_id} not found") |
|
|
return 0.0 |
|
|
|
|
|
start_key = f"{stage}_start" |
|
|
if start_key not in self.active_traces[trace_id]: |
|
|
logger.warning(f"Start time for {stage} not found in trace {trace_id}") |
|
|
return 0.0 |
|
|
|
|
|
duration = time.time() - self.active_traces[trace_id][start_key] |
|
|
self.active_traces[trace_id][f"{stage}_duration"] = duration |
|
|
|
|
|
return duration |
|
|
|
|
|
def complete_trace(self, trace_id: str) -> Dict[str, float]: |
|
|
"""Mark trace as complete and get timing summary""" |
|
|
if trace_id not in self.active_traces: |
|
|
return {} |
|
|
|
|
|
trace_data = self.active_traces.pop(trace_id) |
|
|
|
|
|
|
|
|
durations = { |
|
|
key.replace("_duration", ""): value |
|
|
for key, value in trace_data.items() |
|
|
if key.endswith("_duration") |
|
|
} |
|
|
|
|
|
|
|
|
total_duration = sum(durations.values()) |
|
|
|
|
|
completed_trace = { |
|
|
"trace_id": trace_id, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"total_duration": total_duration, |
|
|
"stages": durations |
|
|
} |
|
|
|
|
|
self.completed_traces.append(completed_trace) |
|
|
|
|
|
return durations |
|
|
|
|
|
def get_stage_statistics( |
|
|
self, |
|
|
stage: str, |
|
|
window_minutes: int = 60 |
|
|
) -> Dict[str, float]: |
|
|
"""Get latency statistics for a specific stage""" |
|
|
cutoff = datetime.utcnow() - timedelta(minutes=window_minutes) |
|
|
|
|
|
durations = [] |
|
|
for trace in self.completed_traces: |
|
|
if datetime.fromisoformat(trace["timestamp"]) < cutoff: |
|
|
continue |
|
|
|
|
|
if stage in trace["stages"]: |
|
|
durations.append(trace["stages"][stage]) |
|
|
|
|
|
if not durations: |
|
|
return { |
|
|
"count": 0, |
|
|
"mean": 0.0, |
|
|
"min": 0.0, |
|
|
"max": 0.0, |
|
|
"p50": 0.0, |
|
|
"p95": 0.0, |
|
|
"p99": 0.0 |
|
|
} |
|
|
|
|
|
durations_sorted = sorted(durations) |
|
|
count = len(durations_sorted) |
|
|
|
|
|
return { |
|
|
"count": count, |
|
|
"mean": sum(durations_sorted) / count, |
|
|
"min": durations_sorted[0], |
|
|
"max": durations_sorted[-1], |
|
|
"p50": durations_sorted[int(count * 0.50)], |
|
|
"p95": durations_sorted[int(count * 0.95)] if count > 1 else durations_sorted[0], |
|
|
"p99": durations_sorted[int(count * 0.99)] if count > 1 else durations_sorted[0] |
|
|
} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CacheEntry: |
|
|
"""Cache entry with metadata""" |
|
|
key: str |
|
|
value: Any |
|
|
created_at: float |
|
|
accessed_at: float |
|
|
access_count: int |
|
|
size_bytes: int |
|
|
ttl: Optional[int] = None |
|
|
|
|
|
def is_expired(self) -> bool: |
|
|
"""Check if entry has expired""" |
|
|
if self.ttl is None: |
|
|
return False |
|
|
return (time.time() - self.created_at) > self.ttl |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"key": self.key, |
|
|
"created_at": datetime.fromtimestamp(self.created_at).isoformat(), |
|
|
"accessed_at": datetime.fromtimestamp(self.accessed_at).isoformat(), |
|
|
"access_count": self.access_count, |
|
|
"size_bytes": self.size_bytes, |
|
|
"ttl": self.ttl, |
|
|
"expired": self.is_expired() |
|
|
} |
|
|
|
|
|
|
|
|
class CacheService: |
|
|
""" |
|
|
SHA256-based caching service for deduplication and performance optimization |
|
|
|
|
|
Features: |
|
|
- SHA256 fingerprinting for input deduplication |
|
|
- LRU eviction policy |
|
|
- TTL support for automatic expiration |
|
|
- Cache hit/miss tracking |
|
|
- Memory usage monitoring |
|
|
- Performance metrics |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
max_entries: int = 10000, |
|
|
max_memory_mb: int = 512, |
|
|
default_ttl: Optional[int] = 3600 |
|
|
): |
|
|
self.max_entries = max_entries |
|
|
self.max_memory_mb = max_memory_mb |
|
|
self.default_ttl = default_ttl |
|
|
|
|
|
self.cache: Dict[str, CacheEntry] = {} |
|
|
self.access_order: deque = deque() |
|
|
|
|
|
|
|
|
self.hits = 0 |
|
|
self.misses = 0 |
|
|
self.evictions = 0 |
|
|
self.total_retrieval_time = 0.0 |
|
|
self.retrieval_count = 0 |
|
|
|
|
|
logger.info(f"Cache Service initialized (max_entries: {max_entries}, max_memory: {max_memory_mb}MB)") |
|
|
|
|
|
def _compute_fingerprint(self, data: Any) -> str: |
|
|
""" |
|
|
Compute SHA256 fingerprint for any data |
|
|
|
|
|
Args: |
|
|
data: Any serializable data (dict, str, bytes, etc.) |
|
|
|
|
|
Returns: |
|
|
SHA256 hash as hex string |
|
|
""" |
|
|
if isinstance(data, bytes): |
|
|
data_bytes = data |
|
|
elif isinstance(data, str): |
|
|
data_bytes = data.encode('utf-8') |
|
|
elif isinstance(data, (dict, list)): |
|
|
|
|
|
json_str = json.dumps(data, sort_keys=True) |
|
|
data_bytes = json_str.encode('utf-8') |
|
|
else: |
|
|
|
|
|
data_bytes = pickle.dumps(data) |
|
|
|
|
|
return hashlib.sha256(data_bytes).hexdigest() |
|
|
|
|
|
def _estimate_size(self, obj: Any) -> int: |
|
|
"""Estimate size of object in bytes""" |
|
|
try: |
|
|
return len(pickle.dumps(obj)) |
|
|
except Exception: |
|
|
|
|
|
if isinstance(obj, (str, bytes)): |
|
|
return len(obj) |
|
|
elif isinstance(obj, dict): |
|
|
return sum(len(str(k)) + len(str(v)) for k, v in obj.items()) |
|
|
elif isinstance(obj, list): |
|
|
return sum(len(str(item)) for item in obj) |
|
|
else: |
|
|
return 1024 |
|
|
|
|
|
def _get_memory_usage_mb(self) -> float: |
|
|
"""Calculate current memory usage in MB""" |
|
|
total_bytes = sum(entry.size_bytes for entry in self.cache.values()) |
|
|
return total_bytes / (1024 * 1024) |
|
|
|
|
|
def _evict_lru(self): |
|
|
"""Evict least recently used entry""" |
|
|
if not self.access_order: |
|
|
return |
|
|
|
|
|
|
|
|
while self.access_order: |
|
|
lru_key = self.access_order.popleft() |
|
|
if lru_key in self.cache: |
|
|
del self.cache[lru_key] |
|
|
self.evictions += 1 |
|
|
logger.debug(f"Evicted LRU cache entry: {lru_key[:16]}...") |
|
|
break |
|
|
|
|
|
def _cleanup_expired(self): |
|
|
"""Remove expired entries""" |
|
|
expired_keys = [ |
|
|
key for key, entry in self.cache.items() |
|
|
if entry.is_expired() |
|
|
] |
|
|
|
|
|
for key in expired_keys: |
|
|
del self.cache[key] |
|
|
logger.debug(f"Removed expired cache entry: {key[:16]}...") |
|
|
|
|
|
def _ensure_capacity(self, new_entry_size: int): |
|
|
"""Ensure cache has capacity for new entry""" |
|
|
|
|
|
while len(self.cache) >= self.max_entries: |
|
|
self._evict_lru() |
|
|
|
|
|
|
|
|
while self._get_memory_usage_mb() + (new_entry_size / 1024 / 1024) > self.max_memory_mb: |
|
|
if len(self.cache) == 0: |
|
|
break |
|
|
self._evict_lru() |
|
|
|
|
|
def get(self, key: str) -> Optional[Any]: |
|
|
""" |
|
|
Retrieve value from cache by key |
|
|
|
|
|
Args: |
|
|
key: Cache key (typically SHA256 fingerprint) |
|
|
|
|
|
Returns: |
|
|
Cached value if found and not expired, None otherwise |
|
|
""" |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
if self.retrieval_count % 100 == 0: |
|
|
self._cleanup_expired() |
|
|
|
|
|
if key not in self.cache: |
|
|
self.misses += 1 |
|
|
retrieval_time = time.time() - start_time |
|
|
self.total_retrieval_time += retrieval_time |
|
|
self.retrieval_count += 1 |
|
|
return None |
|
|
|
|
|
entry = self.cache[key] |
|
|
|
|
|
|
|
|
if entry.is_expired(): |
|
|
del self.cache[key] |
|
|
self.misses += 1 |
|
|
retrieval_time = time.time() - start_time |
|
|
self.total_retrieval_time += retrieval_time |
|
|
self.retrieval_count += 1 |
|
|
return None |
|
|
|
|
|
|
|
|
entry.accessed_at = time.time() |
|
|
entry.access_count += 1 |
|
|
|
|
|
|
|
|
if key in self.access_order: |
|
|
self.access_order.remove(key) |
|
|
self.access_order.append(key) |
|
|
|
|
|
self.hits += 1 |
|
|
retrieval_time = time.time() - start_time |
|
|
self.total_retrieval_time += retrieval_time |
|
|
self.retrieval_count += 1 |
|
|
|
|
|
logger.debug(f"Cache hit: {key[:16]}... (access_count: {entry.access_count})") |
|
|
|
|
|
return entry.value |
|
|
|
|
|
def set(self, key: str, value: Any, ttl: Optional[int] = None): |
|
|
""" |
|
|
Store value in cache with key |
|
|
|
|
|
Args: |
|
|
key: Cache key (typically SHA256 fingerprint) |
|
|
value: Value to cache |
|
|
ttl: Time to live in seconds (None for default, 0 for no expiration) |
|
|
""" |
|
|
size_bytes = self._estimate_size(value) |
|
|
|
|
|
|
|
|
if ttl is None: |
|
|
ttl = self.default_ttl |
|
|
elif ttl == 0: |
|
|
ttl = None |
|
|
|
|
|
|
|
|
self._ensure_capacity(size_bytes) |
|
|
|
|
|
|
|
|
current_time = time.time() |
|
|
entry = CacheEntry( |
|
|
key=key, |
|
|
value=value, |
|
|
created_at=current_time, |
|
|
accessed_at=current_time, |
|
|
access_count=0, |
|
|
size_bytes=size_bytes, |
|
|
ttl=ttl |
|
|
) |
|
|
|
|
|
|
|
|
self.cache[key] = entry |
|
|
self.access_order.append(key) |
|
|
|
|
|
logger.debug(f"Cached entry: {key[:16]}... (size: {size_bytes} bytes, ttl: {ttl}s)") |
|
|
|
|
|
def get_or_compute( |
|
|
self, |
|
|
data: Any, |
|
|
compute_fn: callable, |
|
|
ttl: Optional[int] = None |
|
|
) -> Tuple[Any, bool]: |
|
|
""" |
|
|
Get cached value or compute and cache it |
|
|
|
|
|
Args: |
|
|
data: Input data to fingerprint |
|
|
compute_fn: Function to compute value if not cached |
|
|
ttl: Time to live for cached result |
|
|
|
|
|
Returns: |
|
|
Tuple of (result, was_cached) |
|
|
""" |
|
|
|
|
|
fingerprint = self._compute_fingerprint(data) |
|
|
|
|
|
|
|
|
cached_value = self.get(fingerprint) |
|
|
if cached_value is not None: |
|
|
return cached_value, True |
|
|
|
|
|
|
|
|
result = compute_fn() |
|
|
|
|
|
|
|
|
self.set(fingerprint, result, ttl) |
|
|
|
|
|
return result, False |
|
|
|
|
|
def invalidate(self, key: str) -> bool: |
|
|
""" |
|
|
Invalidate (remove) a cache entry |
|
|
|
|
|
Args: |
|
|
key: Cache key to invalidate |
|
|
|
|
|
Returns: |
|
|
True if entry was removed, False if not found |
|
|
""" |
|
|
if key in self.cache: |
|
|
del self.cache[key] |
|
|
if key in self.access_order: |
|
|
self.access_order.remove(key) |
|
|
logger.debug(f"Invalidated cache entry: {key[:16]}...") |
|
|
return True |
|
|
return False |
|
|
|
|
|
def invalidate_by_fingerprint(self, data: Any) -> bool: |
|
|
""" |
|
|
Invalidate cache entry by computing fingerprint of data |
|
|
|
|
|
Args: |
|
|
data: Data to fingerprint and invalidate |
|
|
|
|
|
Returns: |
|
|
True if entry was removed, False if not found |
|
|
""" |
|
|
fingerprint = self._compute_fingerprint(data) |
|
|
return self.invalidate(fingerprint) |
|
|
|
|
|
def clear(self): |
|
|
"""Clear all cache entries""" |
|
|
self.cache.clear() |
|
|
self.access_order.clear() |
|
|
logger.info("Cache cleared") |
|
|
|
|
|
def get_statistics(self) -> Dict[str, Any]: |
|
|
"""Get cache performance statistics""" |
|
|
total_requests = self.hits + self.misses |
|
|
hit_rate = self.hits / total_requests if total_requests > 0 else 0.0 |
|
|
avg_retrieval_time = ( |
|
|
self.total_retrieval_time / self.retrieval_count |
|
|
if self.retrieval_count > 0 else 0.0 |
|
|
) |
|
|
|
|
|
return { |
|
|
"total_entries": len(self.cache), |
|
|
"hits": self.hits, |
|
|
"misses": self.misses, |
|
|
"hit_rate": hit_rate, |
|
|
"evictions": self.evictions, |
|
|
"memory_usage_mb": self._get_memory_usage_mb(), |
|
|
"max_memory_mb": self.max_memory_mb, |
|
|
"avg_retrieval_time_ms": avg_retrieval_time * 1000, |
|
|
"cache_efficiency": hit_rate * 100 |
|
|
} |
|
|
|
|
|
def get_entry_info(self, key: str) -> Optional[Dict[str, Any]]: |
|
|
"""Get information about a specific cache entry""" |
|
|
if key not in self.cache: |
|
|
return None |
|
|
return self.cache[key].to_dict() |
|
|
|
|
|
def list_entries(self, limit: int = 100) -> List[Dict[str, Any]]: |
|
|
"""List cache entries with metadata""" |
|
|
entries = sorted( |
|
|
self.cache.values(), |
|
|
key=lambda e: e.accessed_at, |
|
|
reverse=True |
|
|
)[:limit] |
|
|
return [entry.to_dict() for entry in entries] |
|
|
|
|
|
|
|
|
class AlertManager: |
|
|
""" |
|
|
Manages alerts and notifications |
|
|
Handles alert lifecycle and delivery |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.active_alerts: Dict[str, Alert] = {} |
|
|
self.alert_history: deque = deque(maxlen=1000) |
|
|
self.alert_handlers: List[callable] = [] |
|
|
|
|
|
logger.info("Alert Manager initialized") |
|
|
|
|
|
def create_alert( |
|
|
self, |
|
|
level: AlertLevel, |
|
|
message: str, |
|
|
category: str, |
|
|
details: Optional[Dict[str, Any]] = None |
|
|
) -> Alert: |
|
|
"""Create a new alert""" |
|
|
alert_id = hashlib.sha256( |
|
|
f"{category}:{message}:{datetime.utcnow().isoformat()}".encode() |
|
|
).hexdigest()[:16] |
|
|
|
|
|
alert = Alert( |
|
|
alert_id=alert_id, |
|
|
level=level, |
|
|
message=message, |
|
|
category=category, |
|
|
timestamp=datetime.utcnow().isoformat(), |
|
|
details=details or {} |
|
|
) |
|
|
|
|
|
self.active_alerts[alert_id] = alert |
|
|
self.alert_history.append(alert) |
|
|
|
|
|
|
|
|
asyncio.create_task(self._trigger_handlers(alert)) |
|
|
|
|
|
logger.warning(f"Alert created: [{level.value}] {category} - {message}") |
|
|
|
|
|
return alert |
|
|
|
|
|
def resolve_alert(self, alert_id: str): |
|
|
"""Resolve an active alert""" |
|
|
if alert_id in self.active_alerts: |
|
|
alert = self.active_alerts.pop(alert_id) |
|
|
alert.resolved = True |
|
|
alert.resolved_at = datetime.utcnow().isoformat() |
|
|
|
|
|
logger.info(f"Alert resolved: {alert_id}") |
|
|
|
|
|
def add_handler(self, handler: callable): |
|
|
"""Add an alert handler function""" |
|
|
self.alert_handlers.append(handler) |
|
|
|
|
|
async def _trigger_handlers(self, alert: Alert): |
|
|
"""Trigger all registered alert handlers""" |
|
|
for handler in self.alert_handlers: |
|
|
try: |
|
|
if asyncio.iscoroutinefunction(handler): |
|
|
await handler(alert) |
|
|
else: |
|
|
handler(alert) |
|
|
except Exception as e: |
|
|
logger.error(f"Alert handler failed: {str(e)}") |
|
|
|
|
|
def get_active_alerts( |
|
|
self, |
|
|
level: Optional[AlertLevel] = None, |
|
|
category: Optional[str] = None |
|
|
) -> List[Alert]: |
|
|
"""Get active alerts with optional filtering""" |
|
|
alerts = list(self.active_alerts.values()) |
|
|
|
|
|
if level: |
|
|
alerts = [a for a in alerts if a.level == level] |
|
|
|
|
|
if category: |
|
|
alerts = [a for a in alerts if a.category == category] |
|
|
|
|
|
return alerts |
|
|
|
|
|
def get_alert_summary(self) -> Dict[str, Any]: |
|
|
"""Get summary of alert status""" |
|
|
active = list(self.active_alerts.values()) |
|
|
|
|
|
by_level = defaultdict(int) |
|
|
by_category = defaultdict(int) |
|
|
|
|
|
for alert in active: |
|
|
by_level[alert.level.value] += 1 |
|
|
by_category[alert.category] += 1 |
|
|
|
|
|
return { |
|
|
"total_active": len(active), |
|
|
"by_level": dict(by_level), |
|
|
"by_category": dict(by_category), |
|
|
"critical_count": by_level[AlertLevel.CRITICAL.value], |
|
|
"error_count": by_level[AlertLevel.ERROR.value] |
|
|
} |
|
|
|
|
|
|
|
|
class MonitoringService: |
|
|
""" |
|
|
Central monitoring service coordinating all monitoring components |
|
|
Provides unified interface for system monitoring and health checks |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
error_threshold: float = 0.05, |
|
|
window_minutes: int = 15 |
|
|
): |
|
|
self.metrics_collector = MetricsCollector() |
|
|
self.error_monitor = ErrorMonitor(error_threshold, window_minutes) |
|
|
self.latency_tracker = LatencyTracker() |
|
|
self.alert_manager = AlertManager() |
|
|
self.cache_service = CacheService( |
|
|
max_entries=10000, |
|
|
max_memory_mb=512, |
|
|
default_ttl=3600 |
|
|
) |
|
|
|
|
|
self.system_status = SystemStatus.OPERATIONAL |
|
|
self.start_time = datetime.utcnow() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info("Monitoring Service initialized") |
|
|
|
|
|
def _setup_automatic_checks(self): |
|
|
"""Setup automatic health checks and alerts""" |
|
|
async def check_error_rate(): |
|
|
"""Periodically check error rate and create alerts""" |
|
|
while True: |
|
|
try: |
|
|
error_summary = self.error_monitor.get_error_summary() |
|
|
|
|
|
if error_summary["threshold_exceeded"]: |
|
|
self.alert_manager.create_alert( |
|
|
level=AlertLevel.ERROR, |
|
|
message=f"Error rate ({error_summary['error_rate']*100:.1f}%) exceeds threshold", |
|
|
category="error_rate", |
|
|
details=error_summary |
|
|
) |
|
|
|
|
|
await asyncio.sleep(60) |
|
|
except Exception as e: |
|
|
logger.error(f"Error rate check failed: {str(e)}") |
|
|
await asyncio.sleep(60) |
|
|
|
|
|
|
|
|
asyncio.create_task(check_error_rate()) |
|
|
|
|
|
def record_processing_stage( |
|
|
self, |
|
|
trace_id: str, |
|
|
stage: str, |
|
|
success: bool, |
|
|
duration: Optional[float] = None, |
|
|
error_details: Optional[Dict[str, Any]] = None |
|
|
): |
|
|
"""Record completion of a processing stage""" |
|
|
|
|
|
if success: |
|
|
self.error_monitor.record_success(stage) |
|
|
else: |
|
|
error_type = error_details.get("error_type", "unknown") if error_details else "unknown" |
|
|
error_message = error_details.get("message", "No details") if error_details else "No details" |
|
|
self.error_monitor.record_error(error_type, error_message, stage, error_details) |
|
|
|
|
|
|
|
|
if duration is not None: |
|
|
self.metrics_collector.record_metric( |
|
|
f"latency_{stage}", |
|
|
duration, |
|
|
unit="seconds", |
|
|
tags={"stage": stage, "success": str(success)} |
|
|
) |
|
|
|
|
|
|
|
|
self.metrics_collector.increment_counter(f"stage_{stage}_total") |
|
|
if success: |
|
|
self.metrics_collector.increment_counter(f"stage_{stage}_success") |
|
|
else: |
|
|
self.metrics_collector.increment_counter(f"stage_{stage}_error") |
|
|
|
|
|
def get_system_health(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive system health status""" |
|
|
error_summary = self.error_monitor.get_error_summary() |
|
|
alert_summary = self.alert_manager.get_alert_summary() |
|
|
|
|
|
|
|
|
if alert_summary["critical_count"] > 0: |
|
|
status = SystemStatus.CRITICAL |
|
|
elif error_summary["threshold_exceeded"] or alert_summary["error_count"] > 5: |
|
|
status = SystemStatus.DEGRADED |
|
|
else: |
|
|
status = SystemStatus.OPERATIONAL |
|
|
|
|
|
self.system_status = status |
|
|
|
|
|
uptime = (datetime.utcnow() - self.start_time).total_seconds() |
|
|
|
|
|
return { |
|
|
"status": status.value, |
|
|
"uptime_seconds": uptime, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"error_rate": error_summary["error_rate"], |
|
|
"error_threshold": self.error_monitor.error_threshold, |
|
|
"active_alerts": alert_summary["total_active"], |
|
|
"critical_alerts": alert_summary["critical_count"], |
|
|
"total_requests": self.metrics_collector.get_counter("total_requests", 0), |
|
|
"counters": self.metrics_collector.get_all_counters(), |
|
|
"gauges": self.metrics_collector.get_all_gauges() |
|
|
} |
|
|
|
|
|
def get_performance_dashboard(self) -> Dict[str, Any]: |
|
|
"""Get performance metrics for dashboard display""" |
|
|
|
|
|
stages = ["pdf_processing", "classification", "model_routing", "synthesis"] |
|
|
|
|
|
stage_stats = {} |
|
|
for stage in stages: |
|
|
stage_stats[stage] = self.latency_tracker.get_stage_statistics(stage) |
|
|
|
|
|
return { |
|
|
"system_health": self.get_system_health(), |
|
|
"error_summary": self.error_monitor.get_error_summary(), |
|
|
"latency_by_stage": stage_stats, |
|
|
"active_alerts": [a.to_dict() for a in self.alert_manager.get_active_alerts()], |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
def start_monitoring(self): |
|
|
"""Start monitoring services (placeholder for initialization)""" |
|
|
logger.info("Monitoring services started") |
|
|
self.system_status = SystemStatus.OPERATIONAL |
|
|
|
|
|
def track_request(self, endpoint: str, latency_ms: float, status_code: int): |
|
|
"""Track incoming request for monitoring""" |
|
|
|
|
|
self.metrics_collector.record_metric( |
|
|
f"request_latency_{endpoint}", |
|
|
latency_ms, |
|
|
unit="milliseconds", |
|
|
tags={"endpoint": endpoint, "status_code": str(status_code)} |
|
|
) |
|
|
|
|
|
|
|
|
self.metrics_collector.increment_counter("total_requests") |
|
|
self.metrics_collector.increment_counter(f"requests_{endpoint}") |
|
|
|
|
|
|
|
|
if status_code >= 500: |
|
|
self.metrics_collector.increment_counter("server_errors") |
|
|
elif status_code >= 400: |
|
|
self.metrics_collector.increment_counter("client_errors") |
|
|
else: |
|
|
self.metrics_collector.increment_counter("successful_requests") |
|
|
|
|
|
def track_error(self, endpoint: str, error_type: str, error_message: str): |
|
|
"""Track error occurrence""" |
|
|
self.error_monitor.record_error( |
|
|
error_type=error_type, |
|
|
message=error_message, |
|
|
component=endpoint, |
|
|
details={"endpoint": endpoint} |
|
|
) |
|
|
|
|
|
|
|
|
self.metrics_collector.increment_counter("total_errors") |
|
|
self.metrics_collector.increment_counter(f"errors_{error_type}") |
|
|
|
|
|
def get_cache_statistics(self) -> Dict[str, Any]: |
|
|
"""Get cache performance statistics from real cache service""" |
|
|
return self.cache_service.get_statistics() |
|
|
|
|
|
def cache_result(self, data: Any, result: Any, ttl: Optional[int] = None): |
|
|
""" |
|
|
Cache a computation result with SHA256 fingerprint |
|
|
|
|
|
Args: |
|
|
data: Input data to fingerprint |
|
|
result: Result to cache |
|
|
ttl: Time to live in seconds |
|
|
""" |
|
|
fingerprint = self.cache_service._compute_fingerprint(data) |
|
|
self.cache_service.set(fingerprint, result, ttl) |
|
|
logger.debug(f"Cached result for fingerprint: {fingerprint[:16]}...") |
|
|
|
|
|
def get_cached_result(self, data: Any) -> Optional[Any]: |
|
|
""" |
|
|
Retrieve cached result by computing fingerprint |
|
|
|
|
|
Args: |
|
|
data: Input data to fingerprint |
|
|
|
|
|
Returns: |
|
|
Cached result if found, None otherwise |
|
|
""" |
|
|
fingerprint = self.cache_service._compute_fingerprint(data) |
|
|
return self.cache_service.get(fingerprint) |
|
|
|
|
|
def get_or_compute_cached( |
|
|
self, |
|
|
data: Any, |
|
|
compute_fn: callable, |
|
|
ttl: Optional[int] = None |
|
|
) -> Tuple[Any, bool]: |
|
|
""" |
|
|
Get cached result or compute and cache it |
|
|
|
|
|
Args: |
|
|
data: Input data to fingerprint |
|
|
compute_fn: Function to compute result if not cached |
|
|
ttl: Time to live for cached result |
|
|
|
|
|
Returns: |
|
|
Tuple of (result, was_cached) |
|
|
""" |
|
|
return self.cache_service.get_or_compute(data, compute_fn, ttl) |
|
|
|
|
|
def get_recent_alerts(self, limit: int = 10) -> List[Dict[str, Any]]: |
|
|
"""Get recent alerts""" |
|
|
alerts = self.alert_manager.get_active_alerts() |
|
|
recent = sorted(alerts, key=lambda a: a.timestamp, reverse=True)[:limit] |
|
|
return [a.to_dict() for a in recent] |
|
|
|
|
|
|
|
|
|
|
|
_monitoring_service = None |
|
|
|
|
|
|
|
|
def get_monitoring_service() -> MonitoringService: |
|
|
"""Get singleton monitoring service instance""" |
|
|
global _monitoring_service |
|
|
if _monitoring_service is None: |
|
|
_monitoring_service = MonitoringService() |
|
|
return _monitoring_service |
|
|
|