|
|
""" |
|
|
Load Testing Script for Medical AI Platform Monitoring Infrastructure |
|
|
Tests system performance, monitoring accuracy, and error handling under stress |
|
|
|
|
|
Requirements: |
|
|
- Tests monitoring middleware performance impact |
|
|
- Validates cache effectiveness under load |
|
|
- Verifies error rate tracking accuracy |
|
|
- Confirms alert system responsiveness |
|
|
- Measures latency tracking precision |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import aiohttp |
|
|
import time |
|
|
import json |
|
|
from typing import List, Dict, Any |
|
|
from dataclasses import dataclass |
|
|
from datetime import datetime |
|
|
import statistics |
|
|
|
|
|
@dataclass |
|
|
class LoadTestResult: |
|
|
"""Result from a single request""" |
|
|
success: bool |
|
|
latency_ms: float |
|
|
status_code: int |
|
|
endpoint: str |
|
|
timestamp: float |
|
|
error_message: str = None |
|
|
|
|
|
class MonitoringLoadTester: |
|
|
"""Load tester for monitoring infrastructure""" |
|
|
|
|
|
def __init__(self, base_url: str = "http://localhost:7860"): |
|
|
self.base_url = base_url |
|
|
self.results: List[LoadTestResult] = [] |
|
|
|
|
|
async def make_request( |
|
|
self, |
|
|
session: aiohttp.ClientSession, |
|
|
endpoint: str, |
|
|
method: str = "GET", |
|
|
data: Dict = None |
|
|
) -> LoadTestResult: |
|
|
"""Make a single HTTP request and measure performance""" |
|
|
start_time = time.time() |
|
|
url = f"{self.base_url}{endpoint}" |
|
|
|
|
|
try: |
|
|
if method == "GET": |
|
|
async with session.get(url) as response: |
|
|
await response.text() |
|
|
latency_ms = (time.time() - start_time) * 1000 |
|
|
return LoadTestResult( |
|
|
success=response.status == 200, |
|
|
latency_ms=latency_ms, |
|
|
status_code=response.status, |
|
|
endpoint=endpoint, |
|
|
timestamp=time.time() |
|
|
) |
|
|
elif method == "POST": |
|
|
async with session.post(url, json=data) as response: |
|
|
await response.text() |
|
|
latency_ms = (time.time() - start_time) * 1000 |
|
|
return LoadTestResult( |
|
|
success=response.status == 200, |
|
|
latency_ms=latency_ms, |
|
|
status_code=response.status, |
|
|
endpoint=endpoint, |
|
|
timestamp=time.time() |
|
|
) |
|
|
except Exception as e: |
|
|
latency_ms = (time.time() - start_time) * 1000 |
|
|
return LoadTestResult( |
|
|
success=False, |
|
|
latency_ms=latency_ms, |
|
|
status_code=0, |
|
|
endpoint=endpoint, |
|
|
timestamp=time.time(), |
|
|
error_message=str(e) |
|
|
) |
|
|
|
|
|
async def run_concurrent_requests( |
|
|
self, |
|
|
endpoint: str, |
|
|
num_requests: int, |
|
|
concurrent_workers: int = 10 |
|
|
): |
|
|
"""Run multiple concurrent requests to an endpoint""" |
|
|
print(f"\n{'='*60}") |
|
|
print(f"Testing: {endpoint}") |
|
|
print(f"Requests: {num_requests}, Concurrent Workers: {concurrent_workers}") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
tasks = [] |
|
|
for i in range(num_requests): |
|
|
task = self.make_request(session, endpoint) |
|
|
tasks.append(task) |
|
|
|
|
|
|
|
|
if len(tasks) >= concurrent_workers or i == num_requests - 1: |
|
|
results = await asyncio.gather(*tasks) |
|
|
self.results.extend(results) |
|
|
tasks = [] |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.1) |
|
|
|
|
|
|
|
|
self.analyze_endpoint_results(endpoint) |
|
|
|
|
|
def analyze_endpoint_results(self, endpoint: str): |
|
|
"""Analyze results for a specific endpoint""" |
|
|
endpoint_results = [r for r in self.results if r.endpoint == endpoint] |
|
|
|
|
|
if not endpoint_results: |
|
|
print(f"No results for {endpoint}") |
|
|
return |
|
|
|
|
|
successes = [r for r in endpoint_results if r.success] |
|
|
failures = [r for r in endpoint_results if not r.success] |
|
|
|
|
|
latencies = [r.latency_ms for r in successes] |
|
|
|
|
|
print(f"\n📊 Results for {endpoint}:") |
|
|
print(f" Total Requests: {len(endpoint_results)}") |
|
|
print(f" ✓ Successful: {len(successes)} ({len(successes)/len(endpoint_results)*100:.1f}%)") |
|
|
print(f" ✗ Failed: {len(failures)} ({len(failures)/len(endpoint_results)*100:.1f}%)") |
|
|
|
|
|
if latencies: |
|
|
print(f"\n⏱ Latency Statistics:") |
|
|
print(f" Mean: {statistics.mean(latencies):.2f} ms") |
|
|
print(f" Median: {statistics.median(latencies):.2f} ms") |
|
|
print(f" Min: {min(latencies):.2f} ms") |
|
|
print(f" Max: {max(latencies):.2f} ms") |
|
|
print(f" Std Dev: {statistics.stdev(latencies) if len(latencies) > 1 else 0:.2f} ms") |
|
|
|
|
|
if len(latencies) >= 10: |
|
|
sorted_latencies = sorted(latencies) |
|
|
p95_index = int(len(sorted_latencies) * 0.95) |
|
|
p99_index = int(len(sorted_latencies) * 0.99) |
|
|
print(f" P95: {sorted_latencies[p95_index]:.2f} ms") |
|
|
print(f" P99: {sorted_latencies[p99_index]:.2f} ms") |
|
|
|
|
|
if failures: |
|
|
print(f"\n⚠ Sample Errors:") |
|
|
for failure in failures[:3]: |
|
|
print(f" Status: {failure.status_code}, Error: {failure.error_message}") |
|
|
|
|
|
async def test_health_endpoint(self, num_requests: int = 100): |
|
|
"""Test health check endpoint""" |
|
|
await self.run_concurrent_requests("/health", num_requests, concurrent_workers=20) |
|
|
|
|
|
async def test_dashboard_endpoint(self, num_requests: int = 50): |
|
|
"""Test dashboard endpoint (more intensive)""" |
|
|
await self.run_concurrent_requests("/health/dashboard", num_requests, concurrent_workers=10) |
|
|
|
|
|
async def test_admin_endpoints(self): |
|
|
"""Test admin endpoints""" |
|
|
|
|
|
await self.run_concurrent_requests("/admin/cache/statistics", num_requests=30, concurrent_workers=5) |
|
|
|
|
|
|
|
|
await self.run_concurrent_requests("/admin/metrics", num_requests=30, concurrent_workers=5) |
|
|
|
|
|
async def verify_monitoring_accuracy(self): |
|
|
"""Verify that monitoring system accurately tracks requests""" |
|
|
print(f"\n{'='*60}") |
|
|
print("VERIFYING MONITORING ACCURACY") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(f"{self.base_url}/health/dashboard") as response: |
|
|
initial_data = await response.json() |
|
|
initial_requests = initial_data['system']['total_requests'] |
|
|
print(f"Initial request count: {initial_requests}") |
|
|
|
|
|
|
|
|
print(f"\nMaking 50 test requests...") |
|
|
await self.run_concurrent_requests("/health", num_requests=50, concurrent_workers=10) |
|
|
|
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(f"{self.base_url}/health/dashboard") as response: |
|
|
final_data = await response.json() |
|
|
final_requests = final_data['system']['total_requests'] |
|
|
print(f"Final request count: {final_requests}") |
|
|
|
|
|
actual_increase = final_requests - initial_requests |
|
|
expected_increase = 50 |
|
|
|
|
|
print(f"\n📈 Monitoring Accuracy:") |
|
|
print(f" Expected increase: {expected_increase}") |
|
|
print(f" Actual increase: {actual_increase}") |
|
|
print(f" Accuracy: {(actual_increase/expected_increase*100):.1f}%") |
|
|
|
|
|
if actual_increase >= expected_increase * 0.95: |
|
|
print(f" ✓ Monitoring is accurately tracking requests") |
|
|
else: |
|
|
print(f" ⚠ Monitoring may have tracking issues") |
|
|
|
|
|
async def test_cache_effectiveness(self): |
|
|
"""Test cache effectiveness under repeated requests""" |
|
|
print(f"\n{'='*60}") |
|
|
print("TESTING CACHE EFFECTIVENESS") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(f"{self.base_url}/health/dashboard") as response: |
|
|
initial_data = await response.json() |
|
|
initial_hits = initial_data['cache']['hits'] |
|
|
initial_misses = initial_data['cache']['misses'] |
|
|
initial_hit_rate = initial_data['cache']['hit_rate'] |
|
|
|
|
|
print(f"Initial cache state:") |
|
|
print(f" Hits: {initial_hits}") |
|
|
print(f" Misses: {initial_misses}") |
|
|
print(f" Hit Rate: {(initial_hit_rate * 100):.1f}%") |
|
|
|
|
|
|
|
|
print(f"\nMaking 100 requests to test caching...") |
|
|
await self.run_concurrent_requests("/health/dashboard", num_requests=100, concurrent_workers=10) |
|
|
|
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(f"{self.base_url}/health/dashboard") as response: |
|
|
final_data = await response.json() |
|
|
final_hits = final_data['cache']['hits'] |
|
|
final_misses = final_data['cache']['misses'] |
|
|
final_hit_rate = final_data['cache']['hit_rate'] |
|
|
|
|
|
print(f"\nFinal cache state:") |
|
|
print(f" Hits: {final_hits}") |
|
|
print(f" Misses: {final_misses}") |
|
|
print(f" Hit Rate: {(final_hit_rate * 100):.1f}%") |
|
|
|
|
|
print(f"\n📊 Cache Performance:") |
|
|
print(f" Hit increase: {final_hits - initial_hits}") |
|
|
print(f" Miss increase: {final_misses - initial_misses}") |
|
|
print(f" Current hit rate: {(final_hit_rate * 100):.1f}%") |
|
|
|
|
|
async def stress_test(self, duration_seconds: int = 30): |
|
|
"""Run sustained load test""" |
|
|
print(f"\n{'='*60}") |
|
|
print(f"STRESS TEST - {duration_seconds} seconds") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
start_time = time.time() |
|
|
request_count = 0 |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
while time.time() - start_time < duration_seconds: |
|
|
tasks = [] |
|
|
for _ in range(10): |
|
|
task = self.make_request(session, "/health") |
|
|
tasks.append(task) |
|
|
|
|
|
results = await asyncio.gather(*tasks) |
|
|
self.results.extend(results) |
|
|
request_count += len(tasks) |
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
total_time = time.time() - start_time |
|
|
requests_per_second = request_count / total_time |
|
|
|
|
|
print(f"\n⚡ Stress Test Results:") |
|
|
print(f" Duration: {total_time:.2f} seconds") |
|
|
print(f" Total Requests: {request_count}") |
|
|
print(f" Requests/Second: {requests_per_second:.2f}") |
|
|
|
|
|
|
|
|
recent_results = self.results[-request_count:] |
|
|
successes = [r for r in recent_results if r.success] |
|
|
print(f" Success Rate: {len(successes)/len(recent_results)*100:.1f}%") |
|
|
|
|
|
def generate_report(self): |
|
|
"""Generate comprehensive test report""" |
|
|
print(f"\n{'='*60}") |
|
|
print("COMPREHENSIVE LOAD TEST REPORT") |
|
|
print(f"{'='*60}") |
|
|
print(f"Generated: {datetime.now().isoformat()}") |
|
|
|
|
|
if not self.results: |
|
|
print("No test results available") |
|
|
return |
|
|
|
|
|
total_requests = len(self.results) |
|
|
successes = [r for r in self.results if r.success] |
|
|
failures = [r for r in self.results if not r.success] |
|
|
|
|
|
print(f"\n📊 Overall Statistics:") |
|
|
print(f" Total Requests: {total_requests}") |
|
|
print(f" ✓ Successful: {len(successes)} ({len(successes)/total_requests*100:.1f}%)") |
|
|
print(f" ✗ Failed: {len(failures)} ({len(failures)/total_requests*100:.1f}%)") |
|
|
|
|
|
all_latencies = [r.latency_ms for r in successes] |
|
|
if all_latencies: |
|
|
print(f"\n⏱ Global Latency Statistics:") |
|
|
print(f" Mean: {statistics.mean(all_latencies):.2f} ms") |
|
|
print(f" Median: {statistics.median(all_latencies):.2f} ms") |
|
|
print(f" Min: {min(all_latencies):.2f} ms") |
|
|
print(f" Max: {max(all_latencies):.2f} ms") |
|
|
|
|
|
|
|
|
endpoints = set(r.endpoint for r in self.results) |
|
|
print(f"\n📍 Breakdown by Endpoint:") |
|
|
for endpoint in sorted(endpoints): |
|
|
endpoint_results = [r for r in self.results if r.endpoint == endpoint] |
|
|
endpoint_successes = [r for r in endpoint_results if r.success] |
|
|
print(f" {endpoint}:") |
|
|
print(f" Requests: {len(endpoint_results)}") |
|
|
print(f" Success Rate: {len(endpoint_successes)/len(endpoint_results)*100:.1f}%") |
|
|
if endpoint_successes: |
|
|
latencies = [r.latency_ms for r in endpoint_successes] |
|
|
print(f" Avg Latency: {statistics.mean(latencies):.2f} ms") |
|
|
|
|
|
print(f"\n✅ Load testing complete!") |
|
|
|
|
|
async def run_comprehensive_load_test(base_url: str = "http://localhost:7860"): |
|
|
"""Run comprehensive load testing suite""" |
|
|
tester = MonitoringLoadTester(base_url) |
|
|
|
|
|
print(f"{'='*60}") |
|
|
print("MEDICAL AI PLATFORM - MONITORING LOAD TEST") |
|
|
print(f"{'='*60}") |
|
|
print(f"Target: {base_url}") |
|
|
print(f"Started: {datetime.now().isoformat()}") |
|
|
|
|
|
try: |
|
|
|
|
|
await tester.test_health_endpoint(num_requests=100) |
|
|
|
|
|
|
|
|
await tester.test_dashboard_endpoint(num_requests=50) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
await tester.verify_monitoring_accuracy() |
|
|
|
|
|
|
|
|
await tester.test_cache_effectiveness() |
|
|
|
|
|
|
|
|
await tester.stress_test(duration_seconds=30) |
|
|
|
|
|
|
|
|
tester.generate_report() |
|
|
|
|
|
print(f"\n{'='*60}") |
|
|
print("ALL TESTS COMPLETED SUCCESSFULLY") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"\n❌ Test failed with error: {str(e)}") |
|
|
raise |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import sys |
|
|
|
|
|
|
|
|
base_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:7860" |
|
|
|
|
|
print(f"Starting load tests against: {base_url}") |
|
|
print(f"Ensure the server is running before continuing...\n") |
|
|
|
|
|
|
|
|
asyncio.run(run_comprehensive_load_test(base_url)) |
|
|
|