medical-report-analyzer / load_test_monitoring.py
snikhilesh's picture
Deploy backend with monitoring infrastructure - Complete Medical AI Platform
13d5ab4 verified
"""
Load Testing Script for Medical AI Platform Monitoring Infrastructure
Tests system performance, monitoring accuracy, and error handling under stress
Requirements:
- Tests monitoring middleware performance impact
- Validates cache effectiveness under load
- Verifies error rate tracking accuracy
- Confirms alert system responsiveness
- Measures latency tracking precision
"""
import asyncio
import aiohttp
import time
import json
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import statistics
@dataclass
class LoadTestResult:
"""Result from a single request"""
success: bool
latency_ms: float
status_code: int
endpoint: str
timestamp: float
error_message: str = None
class MonitoringLoadTester:
"""Load tester for monitoring infrastructure"""
def __init__(self, base_url: str = "http://localhost:7860"):
self.base_url = base_url
self.results: List[LoadTestResult] = []
async def make_request(
self,
session: aiohttp.ClientSession,
endpoint: str,
method: str = "GET",
data: Dict = None
) -> LoadTestResult:
"""Make a single HTTP request and measure performance"""
start_time = time.time()
url = f"{self.base_url}{endpoint}"
try:
if method == "GET":
async with session.get(url) as response:
await response.text()
latency_ms = (time.time() - start_time) * 1000
return LoadTestResult(
success=response.status == 200,
latency_ms=latency_ms,
status_code=response.status,
endpoint=endpoint,
timestamp=time.time()
)
elif method == "POST":
async with session.post(url, json=data) as response:
await response.text()
latency_ms = (time.time() - start_time) * 1000
return LoadTestResult(
success=response.status == 200,
latency_ms=latency_ms,
status_code=response.status,
endpoint=endpoint,
timestamp=time.time()
)
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
return LoadTestResult(
success=False,
latency_ms=latency_ms,
status_code=0,
endpoint=endpoint,
timestamp=time.time(),
error_message=str(e)
)
async def run_concurrent_requests(
self,
endpoint: str,
num_requests: int,
concurrent_workers: int = 10
):
"""Run multiple concurrent requests to an endpoint"""
print(f"\n{'='*60}")
print(f"Testing: {endpoint}")
print(f"Requests: {num_requests}, Concurrent Workers: {concurrent_workers}")
print(f"{'='*60}")
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(num_requests):
task = self.make_request(session, endpoint)
tasks.append(task)
# Limit concurrency
if len(tasks) >= concurrent_workers or i == num_requests - 1:
results = await asyncio.gather(*tasks)
self.results.extend(results)
tasks = []
# Small delay to avoid overwhelming the server
await asyncio.sleep(0.1)
# Analyze results for this endpoint
self.analyze_endpoint_results(endpoint)
def analyze_endpoint_results(self, endpoint: str):
"""Analyze results for a specific endpoint"""
endpoint_results = [r for r in self.results if r.endpoint == endpoint]
if not endpoint_results:
print(f"No results for {endpoint}")
return
successes = [r for r in endpoint_results if r.success]
failures = [r for r in endpoint_results if not r.success]
latencies = [r.latency_ms for r in successes]
print(f"\n📊 Results for {endpoint}:")
print(f" Total Requests: {len(endpoint_results)}")
print(f" ✓ Successful: {len(successes)} ({len(successes)/len(endpoint_results)*100:.1f}%)")
print(f" ✗ Failed: {len(failures)} ({len(failures)/len(endpoint_results)*100:.1f}%)")
if latencies:
print(f"\n⏱ Latency Statistics:")
print(f" Mean: {statistics.mean(latencies):.2f} ms")
print(f" Median: {statistics.median(latencies):.2f} ms")
print(f" Min: {min(latencies):.2f} ms")
print(f" Max: {max(latencies):.2f} ms")
print(f" Std Dev: {statistics.stdev(latencies) if len(latencies) > 1 else 0:.2f} ms")
if len(latencies) >= 10:
sorted_latencies = sorted(latencies)
p95_index = int(len(sorted_latencies) * 0.95)
p99_index = int(len(sorted_latencies) * 0.99)
print(f" P95: {sorted_latencies[p95_index]:.2f} ms")
print(f" P99: {sorted_latencies[p99_index]:.2f} ms")
if failures:
print(f"\n⚠ Sample Errors:")
for failure in failures[:3]:
print(f" Status: {failure.status_code}, Error: {failure.error_message}")
async def test_health_endpoint(self, num_requests: int = 100):
"""Test health check endpoint"""
await self.run_concurrent_requests("/health", num_requests, concurrent_workers=20)
async def test_dashboard_endpoint(self, num_requests: int = 50):
"""Test dashboard endpoint (more intensive)"""
await self.run_concurrent_requests("/health/dashboard", num_requests, concurrent_workers=10)
async def test_admin_endpoints(self):
"""Test admin endpoints"""
# Test cache statistics
await self.run_concurrent_requests("/admin/cache/statistics", num_requests=30, concurrent_workers=5)
# Test metrics
await self.run_concurrent_requests("/admin/metrics", num_requests=30, concurrent_workers=5)
async def verify_monitoring_accuracy(self):
"""Verify that monitoring system accurately tracks requests"""
print(f"\n{'='*60}")
print("VERIFYING MONITORING ACCURACY")
print(f"{'='*60}")
# Get initial dashboard state
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/health/dashboard") as response:
initial_data = await response.json()
initial_requests = initial_data['system']['total_requests']
print(f"Initial request count: {initial_requests}")
# Make exactly 50 requests
print(f"\nMaking 50 test requests...")
await self.run_concurrent_requests("/health", num_requests=50, concurrent_workers=10)
# Wait for monitoring to update
await asyncio.sleep(2)
# Check final dashboard state
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/health/dashboard") as response:
final_data = await response.json()
final_requests = final_data['system']['total_requests']
print(f"Final request count: {final_requests}")
actual_increase = final_requests - initial_requests
expected_increase = 50
print(f"\n📈 Monitoring Accuracy:")
print(f" Expected increase: {expected_increase}")
print(f" Actual increase: {actual_increase}")
print(f" Accuracy: {(actual_increase/expected_increase*100):.1f}%")
if actual_increase >= expected_increase * 0.95:
print(f" ✓ Monitoring is accurately tracking requests")
else:
print(f" ⚠ Monitoring may have tracking issues")
async def test_cache_effectiveness(self):
"""Test cache effectiveness under repeated requests"""
print(f"\n{'='*60}")
print("TESTING CACHE EFFECTIVENESS")
print(f"{'='*60}")
# Get initial cache stats
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/health/dashboard") as response:
initial_data = await response.json()
initial_hits = initial_data['cache']['hits']
initial_misses = initial_data['cache']['misses']
initial_hit_rate = initial_data['cache']['hit_rate']
print(f"Initial cache state:")
print(f" Hits: {initial_hits}")
print(f" Misses: {initial_misses}")
print(f" Hit Rate: {(initial_hit_rate * 100):.1f}%")
# Make repeated requests to same endpoint (should benefit from caching)
print(f"\nMaking 100 requests to test caching...")
await self.run_concurrent_requests("/health/dashboard", num_requests=100, concurrent_workers=10)
# Wait for cache to update
await asyncio.sleep(2)
# Check final cache stats
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/health/dashboard") as response:
final_data = await response.json()
final_hits = final_data['cache']['hits']
final_misses = final_data['cache']['misses']
final_hit_rate = final_data['cache']['hit_rate']
print(f"\nFinal cache state:")
print(f" Hits: {final_hits}")
print(f" Misses: {final_misses}")
print(f" Hit Rate: {(final_hit_rate * 100):.1f}%")
print(f"\n📊 Cache Performance:")
print(f" Hit increase: {final_hits - initial_hits}")
print(f" Miss increase: {final_misses - initial_misses}")
print(f" Current hit rate: {(final_hit_rate * 100):.1f}%")
async def stress_test(self, duration_seconds: int = 30):
"""Run sustained load test"""
print(f"\n{'='*60}")
print(f"STRESS TEST - {duration_seconds} seconds")
print(f"{'='*60}")
start_time = time.time()
request_count = 0
async with aiohttp.ClientSession() as session:
while time.time() - start_time < duration_seconds:
tasks = []
for _ in range(10): # 10 concurrent requests per batch
task = self.make_request(session, "/health")
tasks.append(task)
results = await asyncio.gather(*tasks)
self.results.extend(results)
request_count += len(tasks)
await asyncio.sleep(0.5) # 0.5s between batches
total_time = time.time() - start_time
requests_per_second = request_count / total_time
print(f"\n⚡ Stress Test Results:")
print(f" Duration: {total_time:.2f} seconds")
print(f" Total Requests: {request_count}")
print(f" Requests/Second: {requests_per_second:.2f}")
# Analyze stress test results
recent_results = self.results[-request_count:]
successes = [r for r in recent_results if r.success]
print(f" Success Rate: {len(successes)/len(recent_results)*100:.1f}%")
def generate_report(self):
"""Generate comprehensive test report"""
print(f"\n{'='*60}")
print("COMPREHENSIVE LOAD TEST REPORT")
print(f"{'='*60}")
print(f"Generated: {datetime.now().isoformat()}")
if not self.results:
print("No test results available")
return
total_requests = len(self.results)
successes = [r for r in self.results if r.success]
failures = [r for r in self.results if not r.success]
print(f"\n📊 Overall Statistics:")
print(f" Total Requests: {total_requests}")
print(f" ✓ Successful: {len(successes)} ({len(successes)/total_requests*100:.1f}%)")
print(f" ✗ Failed: {len(failures)} ({len(failures)/total_requests*100:.1f}%)")
all_latencies = [r.latency_ms for r in successes]
if all_latencies:
print(f"\n⏱ Global Latency Statistics:")
print(f" Mean: {statistics.mean(all_latencies):.2f} ms")
print(f" Median: {statistics.median(all_latencies):.2f} ms")
print(f" Min: {min(all_latencies):.2f} ms")
print(f" Max: {max(all_latencies):.2f} ms")
# Breakdown by endpoint
endpoints = set(r.endpoint for r in self.results)
print(f"\n📍 Breakdown by Endpoint:")
for endpoint in sorted(endpoints):
endpoint_results = [r for r in self.results if r.endpoint == endpoint]
endpoint_successes = [r for r in endpoint_results if r.success]
print(f" {endpoint}:")
print(f" Requests: {len(endpoint_results)}")
print(f" Success Rate: {len(endpoint_successes)/len(endpoint_results)*100:.1f}%")
if endpoint_successes:
latencies = [r.latency_ms for r in endpoint_successes]
print(f" Avg Latency: {statistics.mean(latencies):.2f} ms")
print(f"\n✅ Load testing complete!")
async def run_comprehensive_load_test(base_url: str = "http://localhost:7860"):
"""Run comprehensive load testing suite"""
tester = MonitoringLoadTester(base_url)
print(f"{'='*60}")
print("MEDICAL AI PLATFORM - MONITORING LOAD TEST")
print(f"{'='*60}")
print(f"Target: {base_url}")
print(f"Started: {datetime.now().isoformat()}")
try:
# Test 1: Health endpoint load
await tester.test_health_endpoint(num_requests=100)
# Test 2: Dashboard endpoint load
await tester.test_dashboard_endpoint(num_requests=50)
# Test 3: Admin endpoints
# await tester.test_admin_endpoints() # Comment out if admin auth is required
# Test 4: Monitoring accuracy
await tester.verify_monitoring_accuracy()
# Test 5: Cache effectiveness
await tester.test_cache_effectiveness()
# Test 6: Stress test
await tester.stress_test(duration_seconds=30)
# Generate final report
tester.generate_report()
print(f"\n{'='*60}")
print("ALL TESTS COMPLETED SUCCESSFULLY")
print(f"{'='*60}")
except Exception as e:
print(f"\n❌ Test failed with error: {str(e)}")
raise
if __name__ == "__main__":
import sys
# Get base URL from command line or use default
base_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:7860"
print(f"Starting load tests against: {base_url}")
print(f"Ensure the server is running before continuing...\n")
# Run the tests
asyncio.run(run_comprehensive_load_test(base_url))