Update app.py
Browse files
app.py
CHANGED
|
@@ -21,7 +21,8 @@ required_packages = {
|
|
| 21 |
"numpy": None,
|
| 22 |
"gradio": None,
|
| 23 |
"mediapipe": None,
|
| 24 |
-
"tensorflow": None
|
|
|
|
| 25 |
}
|
| 26 |
|
| 27 |
installed_packages = {pkg.key for pkg in pkg_resources.working_set}
|
|
@@ -38,9 +39,9 @@ import tensorflow as tf
|
|
| 38 |
from tensorflow.keras.preprocessing.image import img_to_array
|
| 39 |
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
|
| 40 |
import time
|
| 41 |
-
import os
|
| 42 |
from pathlib import Path
|
| 43 |
import tempfile
|
|
|
|
| 44 |
|
| 45 |
# Set TensorFlow to use memory growth to avoid consuming all GPU memory
|
| 46 |
physical_devices = tf.config.list_physical_devices('GPU')
|
|
@@ -58,22 +59,58 @@ mp_drawing = mp.solutions.drawing_utils
|
|
| 58 |
# Global variable for model
|
| 59 |
mask_model = None
|
| 60 |
|
| 61 |
-
def
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 62 |
"""Load the mask detection model once and cache it"""
|
| 63 |
global mask_model
|
| 64 |
if mask_model is None:
|
| 65 |
try:
|
| 66 |
-
#
|
| 67 |
-
|
| 68 |
-
|
| 69 |
-
|
| 70 |
-
mask_model =
|
| 71 |
-
print("Loaded
|
| 72 |
return True
|
| 73 |
-
|
| 74 |
-
|
| 75 |
-
|
| 76 |
-
return True
|
| 77 |
except Exception as e:
|
| 78 |
print(f"Error loading model: {e}")
|
| 79 |
return False
|
|
@@ -126,23 +163,8 @@ def predict_mask(face_img):
|
|
| 126 |
face_array = np.expand_dims(face_array, axis=0)
|
| 127 |
face_array = preprocess_input(face_array)
|
| 128 |
|
| 129 |
-
#
|
| 130 |
-
|
| 131 |
-
# Get input and output tensors
|
| 132 |
-
input_details = mask_model.get_input_details()
|
| 133 |
-
output_details = mask_model.get_output_details()
|
| 134 |
-
|
| 135 |
-
# Set input tensor
|
| 136 |
-
mask_model.set_tensor(input_details[0]['index'], face_array.astype(np.float32))
|
| 137 |
-
|
| 138 |
-
# Run inference
|
| 139 |
-
mask_model.invoke()
|
| 140 |
-
|
| 141 |
-
# Get output
|
| 142 |
-
preds = mask_model.get_tensor(output_details[0]['index'])
|
| 143 |
-
else:
|
| 144 |
-
# Use standard TF model
|
| 145 |
-
preds = mask_model.predict(face_array, verbose=0)
|
| 146 |
|
| 147 |
mask_prob = float(preds[0][0])
|
| 148 |
return mask_prob > 0.5, mask_prob
|
|
@@ -223,7 +245,7 @@ def analyze_frame(frame, face_detector, min_detection_confidence=0.5, blur_thres
|
|
| 223 |
def process_video(video_path, progress=gr.Progress(), min_detection_confidence=0.5, blur_threshold=100):
|
| 224 |
"""Process video file and return the path to the processed video"""
|
| 225 |
if not load_mask_model():
|
| 226 |
-
return None, "Error: Could not load the mask detection model."
|
| 227 |
|
| 228 |
# Create a temporary file for the output
|
| 229 |
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
|
|
@@ -283,7 +305,7 @@ def process_video(video_path, progress=gr.Progress(), min_detection_confidence=0
|
|
| 283 |
def process_webcam_frame(frame, min_detection_confidence, blur_threshold):
|
| 284 |
"""Process a single webcam frame"""
|
| 285 |
if not load_mask_model():
|
| 286 |
-
return
|
| 287 |
|
| 288 |
# Initialize face detector for each frame in webcam mode
|
| 289 |
# This is less efficient but necessary for the Gradio webcam interface
|
|
@@ -365,5 +387,14 @@ with gr.Blocks(title="Enhanced Face Analysis System") as demo:
|
|
| 365 |
- Higher blur threshold means more tolerance for blurry video
|
| 366 |
""")
|
| 367 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 368 |
if __name__ == "__main__":
|
|
|
|
| 369 |
demo.launch()
|
|
|
|
| 21 |
"numpy": None,
|
| 22 |
"gradio": None,
|
| 23 |
"mediapipe": None,
|
| 24 |
+
"tensorflow": None,
|
| 25 |
+
"gitpython": None # For git operations
|
| 26 |
}
|
| 27 |
|
| 28 |
installed_packages = {pkg.key for pkg in pkg_resources.working_set}
|
|
|
|
| 39 |
from tensorflow.keras.preprocessing.image import img_to_array
|
| 40 |
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
|
| 41 |
import time
|
|
|
|
| 42 |
from pathlib import Path
|
| 43 |
import tempfile
|
| 44 |
+
import git
|
| 45 |
|
| 46 |
# Set TensorFlow to use memory growth to avoid consuming all GPU memory
|
| 47 |
physical_devices = tf.config.list_physical_devices('GPU')
|
|
|
|
| 59 |
# Global variable for model
|
| 60 |
mask_model = None
|
| 61 |
|
| 62 |
+
def download_model_repo():
|
| 63 |
+
"""Download the face mask detection model from GitHub"""
|
| 64 |
+
repo_url = "https://github.com/misbah4064/face_mask_detection.git"
|
| 65 |
+
repo_dir = "face_mask_detection"
|
| 66 |
+
model_path = os.path.join(repo_dir, "mask_recog.h5")
|
| 67 |
+
|
| 68 |
+
# Check if model already exists
|
| 69 |
+
if os.path.exists(model_path):
|
| 70 |
+
print(f"Model already exists at {model_path}")
|
| 71 |
+
return model_path
|
| 72 |
+
|
| 73 |
+
# Check if repository directory exists
|
| 74 |
+
if os.path.exists(repo_dir):
|
| 75 |
+
print(f"Repository directory already exists at {repo_dir}")
|
| 76 |
+
else:
|
| 77 |
+
print(f"Cloning repository from {repo_url}...")
|
| 78 |
+
try:
|
| 79 |
+
git.Repo.clone_from(repo_url, repo_dir)
|
| 80 |
+
print("Repository cloned successfully")
|
| 81 |
+
except Exception as e:
|
| 82 |
+
print(f"Error cloning repository: {e}")
|
| 83 |
+
# Try alternative method with subprocess
|
| 84 |
+
try:
|
| 85 |
+
subprocess.check_call(["git", "clone", repo_url])
|
| 86 |
+
print("Repository cloned with subprocess")
|
| 87 |
+
except Exception as sub_e:
|
| 88 |
+
print(f"Error with subprocess clone: {sub_e}")
|
| 89 |
+
return None
|
| 90 |
+
|
| 91 |
+
# Verify model file exists
|
| 92 |
+
if os.path.exists(model_path):
|
| 93 |
+
print(f"Model file found at {model_path}")
|
| 94 |
+
return model_path
|
| 95 |
+
else:
|
| 96 |
+
print(f"Model file not found at {model_path}")
|
| 97 |
+
return None
|
| 98 |
+
|
| 99 |
+
def load_mask_model():
|
| 100 |
"""Load the mask detection model once and cache it"""
|
| 101 |
global mask_model
|
| 102 |
if mask_model is None:
|
| 103 |
try:
|
| 104 |
+
# First try to download/access the model from GitHub
|
| 105 |
+
model_path = download_model_repo()
|
| 106 |
+
if model_path and os.path.exists(model_path):
|
| 107 |
+
# Use standard TF model
|
| 108 |
+
mask_model = tf.keras.models.load_model(model_path)
|
| 109 |
+
print(f"Loaded {model_path} successfully")
|
| 110 |
return True
|
| 111 |
+
else:
|
| 112 |
+
print("Failed to download or find the model")
|
| 113 |
+
return False
|
|
|
|
| 114 |
except Exception as e:
|
| 115 |
print(f"Error loading model: {e}")
|
| 116 |
return False
|
|
|
|
| 163 |
face_array = np.expand_dims(face_array, axis=0)
|
| 164 |
face_array = preprocess_input(face_array)
|
| 165 |
|
| 166 |
+
# Use standard TF model
|
| 167 |
+
preds = mask_model.predict(face_array, verbose=0)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 168 |
|
| 169 |
mask_prob = float(preds[0][0])
|
| 170 |
return mask_prob > 0.5, mask_prob
|
|
|
|
| 245 |
def process_video(video_path, progress=gr.Progress(), min_detection_confidence=0.5, blur_threshold=100):
|
| 246 |
"""Process video file and return the path to the processed video"""
|
| 247 |
if not load_mask_model():
|
| 248 |
+
return None, "Error: Could not load the mask detection model. Please check the console for details."
|
| 249 |
|
| 250 |
# Create a temporary file for the output
|
| 251 |
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
|
|
|
|
| 305 |
def process_webcam_frame(frame, min_detection_confidence, blur_threshold):
|
| 306 |
"""Process a single webcam frame"""
|
| 307 |
if not load_mask_model():
|
| 308 |
+
return frame # Return original frame if model couldn't be loaded
|
| 309 |
|
| 310 |
# Initialize face detector for each frame in webcam mode
|
| 311 |
# This is less efficient but necessary for the Gradio webcam interface
|
|
|
|
| 387 |
- Higher blur threshold means more tolerance for blurry video
|
| 388 |
""")
|
| 389 |
|
| 390 |
+
# Ensure the model is downloaded when the app starts
|
| 391 |
+
def initialize_app():
|
| 392 |
+
print("Initializing app and downloading model...")
|
| 393 |
+
if load_mask_model():
|
| 394 |
+
print("Model loaded successfully!")
|
| 395 |
+
else:
|
| 396 |
+
print("Failed to load model, some features may not work.")
|
| 397 |
+
|
| 398 |
if __name__ == "__main__":
|
| 399 |
+
initialize_app()
|
| 400 |
demo.launch()
|