| from pathlib import Path |
| import math |
|
|
| from rich.console import Console |
| from rich.table import Table |
| from rich.pretty import Pretty |
|
|
| import numpy as np |
|
|
| import pandas as pd |
|
|
| import cv2 |
|
|
| from sklearn.cluster import MeanShift |
|
|
| from skimage.transform import hough_circle, hough_circle_peaks |
|
|
|
|
| import torch |
| from torch.utils.data import Dataset, DataLoader |
| from torchvision import transforms |
| from torchvision.models.detection.faster_rcnn import FastRCNNPredictor |
|
|
| from torchvision.models.detection import ( |
| fasterrcnn_resnet50_fpn_v2, |
| FasterRCNN_ResNet50_FPN_V2_Weights, |
| ) |
|
|
| import pytorch_lightning as pl |
| from pytorch_lightning.callbacks import RichProgressBar |
| from pytorch_lightning import Trainer |
|
|
| import albumentations as A |
| from albumentations.pytorch.transforms import ToTensorV2 |
|
|
| import matplotlib.pyplot as plt |
|
|
| import com_const as cc |
| import com_image as ci |
|
|
| g_device = ( |
| "mps" |
| if torch.backends.mps.is_built() is True |
| else "cuda" if torch.backends.cuda.is_built() else "cpu" |
| ) |
|
|
|
|
| def load_tray_image(image_name): |
| return ci.load_image( |
| file_name=image_name, path_to_images=cc.path_to_plates, rgb=True |
| ) |
|
|
|
|
| def build_albumentations( |
| image_size: int = 10, |
| gamma=(60, 180), |
| mean=(0.485, 0.456, 0.406), |
| std=(0.229, 0.224, 0.225), |
| ): |
| return { |
| "resize": [ |
| A.Resize(height=image_size * 32 * 2, width=image_size * 32 * 3, p=1) |
| ], |
| "train": [ |
| A.HorizontalFlip(p=0.3), |
| A.RandomBrightnessContrast( |
| brightness_limit=0.25, contrast_limit=0.25, p=0.5 |
| ), |
| A.RandomGamma(gamma_limit=gamma, p=0.5), |
| ], |
| "to_tensor": [A.Normalize(mean=mean, std=std, p=1), ToTensorV2()], |
| "un_normalize": [ |
| A.Normalize( |
| mean=[-m / s for m, s in zip(mean, std)], |
| std=[1.0 / s for s in std], |
| always_apply=True, |
| max_pixel_value=1.0, |
| ), |
| ], |
| } |
|
|
|
|
| def get_augmentations( |
| image_size: int = 10, |
| gamma=(60, 180), |
| kinds: list = ["resize", "to_tensor"], |
| mean=(0.485, 0.456, 0.406), |
| std=(0.229, 0.224, 0.225), |
| inferrence: bool = False, |
| ): |
| td_ = build_albumentations( |
| image_size=image_size, |
| gamma=gamma, |
| mean=mean, |
| std=std, |
| ) |
| augs = [] |
| for k in kinds: |
| augs += td_[k] |
| if inferrence is True: |
| return A.Compose(augs) |
| else: |
| return A.Compose( |
| augs, |
| bbox_params={"format": "pascal_voc", "label_fields": ["labels"]}, |
| ) |
|
|
|
|
| def safe_row_col(row, col): |
| """Ensures that row is a string and col is an integer |
| Args: |
| row (int or str): row output must be string |
| col (int or str): col output must be int |
| """ |
| if row is not None and col is not None: |
| if isinstance(col, str): |
| row, col = col, row |
| return row, col |
|
|
|
|
| def _update_axis(axis, image, title=None, fontsize=10, remove_axis=True): |
| axis.imshow(image, origin="upper") |
| if title is not None: |
| axis.set_title(title, fontsize=fontsize) |
|
|
|
|
| def make_patches_grid(images, row_count, col_count=None, figsize=(20, 20)): |
| col_count = row_count if col_count is None else col_count |
| _, axii = plt.subplots(row_count, col_count, figsize=figsize) |
| for ax, image in zip(axii.reshape(-1), images): |
| if isinstance(image, tuple): |
| title = image[1] |
| image = image[0] |
| else: |
| title = None |
| try: |
| _update_axis(axis=ax, image=image, remove_axis=True, title=title) |
| except: |
| pass |
| ax.set_axis_off() |
|
|
| plt.tight_layout() |
| plt.show() |
|
|
|
|
| def print_boxes( |
| image_name, |
| boxes, |
| highlight=(None, None), |
| draw_first_line: bool = False, |
| return_plot: bool = True, |
| ): |
| r, c = safe_row_col(*highlight) |
| image = load_tray_image(image_name=image_name) |
|
|
| fnt = cv2.FONT_HERSHEY_SIMPLEX |
| fnt_scale = 3 |
| fnt_thickness = 8 |
|
|
| column_colors = { |
| 1: (255, 0, 0), |
| 2: (0, 0, 255), |
| 3: (255, 255, 0), |
| 4: (0, 255, 255), |
| } |
|
|
| for box in boxes[["x1", "y1", "x2", "y2", "cx", "cy", "row", "col"]].values: |
| color = ( |
| (255, 0, 255) |
| if c == box[7] and r == box[6] |
| else column_colors.get(box[7], (255, 255, 244)) |
| ) |
| thickness = 20 if c == box[7] and r == box[6] else 10 |
| image = cv2.rectangle( |
| image, |
| (int(box[0]), int(box[1])), |
| (int(box[2]), int(box[3])), |
| color, |
| thickness, |
| ) |
| label = str(box[6]).upper() + str(int(box[7])) |
| (w, h), _ = cv2.getTextSize(label, fnt, fnt_scale, fnt_thickness) |
| x, y = (int(box[0]), int(box[1]) - fnt_thickness) |
| image = cv2.rectangle( |
| image, |
| (x - fnt_thickness, y - h - fnt_thickness), |
| (x + fnt_thickness + w, y + fnt_thickness), |
| color, |
| -1, |
| ) |
| image = cv2.putText( |
| image, |
| label, |
| (x + fnt_thickness, y), |
| fnt, |
| fnt_scale, |
| (0, 0, 0), |
| fnt_thickness, |
| ) |
|
|
| if draw_first_line is True: |
| line = get_first_vert_line(image_name=image_name) |
| if line is not None: |
| x1, y1, x2, y2 = line |
| cv2.line( |
| image, |
| [ |
| int(i) |
| for i in (np.array([x2, y2]) - np.array([x1, y1])) * 10 |
| + np.array([x1, y1]) |
| ], |
| [ |
| int(i) |
| for i in (np.array([x1, y1]) - np.array([x2, y2])) * 10 |
| + np.array([x2, y2]) |
| ], |
| (255, 0, 255), |
| 20, |
| lineType=8, |
| ) |
|
|
| if return_plot is True: |
| plt.figure(figsize=(10, 10)) |
| plt.imshow(image) |
| plt.tight_layout() |
| plt.axis("off") |
| plt.show() |
| else: |
| return image |
|
|
|
|
| def crop_to_vert(image): |
| return image[0 : image.shape[1] // 2, 0 : image.shape[0] // 3] |
|
|
|
|
| def get_first_vert_line(image_name, min_angle=80, max_angle=100): |
| r, *_ = cv2.split(load_tray_image(image_name)) |
|
|
| red_crop = cv2.normalize( |
| crop_to_vert(r), |
| None, |
| alpha=0, |
| beta=200, |
| norm_type=cv2.NORM_MINMAX, |
| ) |
|
|
| lines = cv2.HoughLinesP( |
| image=ci.close( |
| cv2.Canny(red_crop, 50, 200, None, 3), |
| kernel_size=5, |
| proc_times=5, |
| ), |
| rho=1, |
| theta=np.pi / 180, |
| threshold=50, |
| minLineLength=red_crop.shape[0] // 5, |
| maxLineGap=20, |
| ) |
| if lines is not None: |
| min_x = red_crop.shape[0] |
| sel_line = None |
| for _, line in enumerate(lines): |
| x1, y1, x2, y2 = line[0] |
| min_angle, max_angle = min(min_angle, max_angle), max(min_angle, max_angle) |
| line_angle = math.atan2(y2 - y1, x2 - x1) * 180 / math.pi * -1 |
| if min_angle <= abs(line_angle) <= max_angle and min(x1, x2) < min_x: |
| min_x = min(x1, x2) |
| sel_line = (x1, y1, x2, y2) |
|
|
| if sel_line is not None: |
| return sel_line |
| else: |
| return None |
|
|
|
|
| def draw_first_line(image_name, dot_size=10, crop_canvas: bool = False): |
| line = get_first_vert_line(image_name=image_name) |
| if line is None: |
| return canvas |
| x1, y1, x2, y2 = line |
| canvas = load_tray_image(image_name) |
| if crop_canvas is True: |
| canvas = crop_to_vert(canvas) |
| cv2.circle(canvas, (x1, y1), dot_size, (255, 0, 0)) |
| cv2.circle(canvas, (x2, y2), dot_size, (0, 255, 0)) |
| cv2.line(canvas, (x1, y1), (x2, y2), (0, 0, 255), 10) |
| return canvas |
|
|
|
|
| def get_bbox(image_name, bboxes, row, col): |
| if isinstance(bboxes, pd.Series): |
| return bboxes |
| else: |
| row, col = safe_row_col(row, col) |
| return bboxes[ |
| ( |
| bboxes.file_name |
| == (image_name.name if isinstance(image_name, Path) else image_name) |
| ) |
| & (bboxes.row == row) |
| & (bboxes.col == col) |
| ].iloc[0] |
|
|
|
|
| def get_hough_leaf_disc_circle( |
| image_name, |
| bboxes, |
| row=-1, |
| col=-1, |
| padding: int = 10, |
| allow_move: bool = False, |
| ): |
| padded_leaf_disk = get_leaf_disk_wbb( |
| image_name=image_name, |
| bboxes=bboxes, |
| row=row, |
| col=col, |
| padding=padding, |
| ) |
| *_, b = cv2.split(padded_leaf_disk) |
|
|
| min_t, max_t = 100, 200 |
| rb = cv2.Canny( |
| cv2.normalize( |
| b, |
| None, |
| alpha=0, |
| beta=200, |
| norm_type=cv2.NORM_MINMAX, |
| ), |
| min_t, |
| max_t, |
| None, |
| 3, |
| ) |
|
|
| bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) |
| hough_radii = np.arange(bbox.max_size // 2 - 10, bbox.max_size // 2 + 10, 10) |
| hough_res = hough_circle(rb, hough_radii) |
|
|
| |
| _, cx, cy, radii = hough_circle_peaks( |
| hough_res, |
| hough_radii, |
| min_xdistance=10, |
| min_ydistance=10, |
| total_num_peaks=1, |
| ) |
|
|
| cx = cx[0] |
| cy = cy[0] |
| r = radii[0] |
|
|
| if allow_move is True: |
| h, w, c = padded_leaf_disk.shape |
| if cx - r < 0: |
| cx += abs(r - cx) |
| if cx + r > w: |
| cx -= abs(r - cx) |
| if cy - r < 0: |
| cy += abs(cy - r) |
| if cy + r > h: |
| cy -= abs(cy - r) |
|
|
| return dict(cx=cx, cy=cy, r=radii) |
|
|
|
|
| def get_hough_leaf_disk_patch( |
| image_name, |
| bboxes, |
| patch_size=-1, |
| row=-1, |
| col=-1, |
| padding: int = 10, |
| radius_crop=0, |
| disc=None, |
| allow_move: bool = False, |
| image_folder=None, |
| ): |
| if patch_size > 0: |
| try: |
| bbox = get_bbox(image_name, bboxes, row, col) |
| cx = int(bbox.cx) |
| cy = int(bbox.cy) |
| except: |
| return None |
| patch_size = patch_size // 2 |
|
|
| return A.crop( |
| load_tray_image(image_name, image_folder=image_folder), |
| cx - patch_size, |
| cy - patch_size, |
| cx + patch_size, |
| cy + patch_size, |
| ) |
| else: |
| if disc is None: |
| disc = get_hough_leaf_disc_circle( |
| image_name=image_name, |
| bboxes=bboxes, |
| row=row, |
| col=col, |
| padding=padding, |
| allow_move=allow_move, |
| ) |
|
|
| r = int((disc["r"] - radius_crop) / math.sqrt(2)) |
| cx = int(disc["cx"]) |
| cy = int(disc["cy"]) |
|
|
| left = cx - r |
| top = cy - r |
| right = cx + r |
| bottom = cy + r |
|
|
| return get_leaf_disk_wbb( |
| image_name=image_name, |
| bboxes=bboxes, |
| row=row, |
| col=col, |
| padding=padding, |
| )[top:bottom, left:right] |
|
|
|
|
| def get_hough_segment_disk( |
| image_name, |
| bboxes, |
| row=-1, |
| col=-1, |
| padding: int = 10, |
| radius_crop=0, |
| disc=None, |
| allow_move: bool = False, |
| ): |
| if disc is None: |
| disc = get_hough_leaf_disc_circle( |
| image_name=image_name, |
| bboxes=bboxes, |
| row=row, |
| col=col, |
| padding=padding, |
| allow_move=allow_move, |
| ) |
|
|
| padded_leaf_disk = get_leaf_disk_wbb( |
| image_name=image_name, |
| bboxes=bboxes, |
| row=row, |
| col=col, |
| padding=padding, |
| ) |
| r = int(disc["r"] - radius_crop) |
| rc = int((disc["r"] - radius_crop) / math.sqrt(2)) |
| cx = int(disc["cx"]) |
| cy = int(disc["cy"]) |
| left = cx - r |
| top = cy - r |
| right = cx + r |
| bottom = cy + r |
|
|
| return cv2.bitwise_and( |
| padded_leaf_disk, |
| padded_leaf_disk, |
| mask=cv2.circle(np.zeros_like(padded_leaf_disk[:, :, 0]), (cx, cy), r, 255, -1), |
| )[top:bottom, left:right] |
|
|
|
|
| def draw_hough_bb_to_patch_process( |
| image_name, |
| bboxes, |
| row=-1, |
| col=-1, |
| padding: int = 10, |
| radius_crop=0, |
| disc=None, |
| allow_move: bool = False, |
| ): |
| if disc is None: |
| disc = get_hough_leaf_disc_circle( |
| image_name=image_name, |
| bboxes=bboxes, |
| row=row, |
| col=col, |
| padding=padding, |
| allow_move=allow_move, |
| ) |
|
|
| padded_leaf_disk = get_leaf_disk_wbb( |
| image_name=image_name, |
| bboxes=bboxes, |
| row=row, |
| col=col, |
| padding=padding, |
| ) |
| r = int(disc["r"] - radius_crop) |
| rc = int((disc["r"] - radius_crop) / math.sqrt(2)) |
| cx = int(disc["cx"]) |
| cy = int(disc["cy"]) |
| left = cx - r |
| top = cy - r |
| right = cx + r |
| bottom = cy + r |
|
|
| return cv2.circle( |
| cv2.circle( |
| cv2.rectangle( |
| cv2.rectangle( |
| padded_leaf_disk, |
| (cx - rc, cy - rc), |
| (cx + rc, cy + rc), |
| (0, 255, 0), |
| 5, |
| ), |
| (left, top), |
| (right, bottom), |
| (255, 0, 155), |
| 5, |
| ), |
| (cx, cy), |
| 10, |
| (255, 0, 155), |
| -1, |
| ), |
| (cx, cy), |
| r, |
| (255, 0, 155), |
| 5, |
| ) |
|
|
|
|
| def get_leaf_disk_wbb(image_name, bboxes, row=-1, col=-1, image_path: Path = None): |
| try: |
| bbox = get_bbox(image_name, bboxes, row, col) |
| return load_tray_image(image_name if image_path is None else image_path)[ |
| int(bbox.y1) : int(bbox.y2), int(bbox.x1) : int(bbox.x2) |
| ] |
| except: |
| return None |
|
|
|
|
| def get_fast_leaf_disc_circle( |
| image_name, bboxes, row=-1, col=-1, percent_radius: float = 1.0 |
| ): |
| bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) |
| return int(bbox.cx), int(bbox.cy), int((bbox.max_size / 2) * percent_radius) |
|
|
|
|
| def get_fast_segment_disk( |
| image_name, |
| bboxes, |
| row=-1, |
| col=-1, |
| percent_radius: float = 1.0, |
| image_path: Path = None, |
| ): |
| cx, cy, r = get_fast_leaf_disc_circle( |
| image_name=image_name, |
| bboxes=bboxes, |
| row=row, |
| col=col, |
| percent_radius=percent_radius, |
| ) |
| src_image = load_tray_image(image_name if image_path is None else image_path) |
| left = cx - r |
| top = cy - r |
| right = cx + r |
| bottom = cy + r |
|
|
| return cv2.bitwise_and( |
| src_image, |
| src_image, |
| mask=cv2.circle(np.zeros_like(src_image[:, :, 0]), (cx, cy), r, 255, -1), |
| )[top:bottom, left:right] |
|
|
|
|
| def get_fast_leaf_disk_patch( |
| image_name, |
| bboxes, |
| row=-1, |
| col=-1, |
| percent_radius: float = 1.0, |
| image_path: Path = None, |
| ): |
| cx, cy, r = get_fast_leaf_disc_circle( |
| image_name=image_name, |
| bboxes=bboxes, |
| row=row, |
| col=col, |
| percent_radius=percent_radius, |
| ) |
| r = int(r / math.sqrt(2)) |
| left = cx - r |
| top = cy - r |
| right = cx + r |
| bottom = cy + r |
|
|
| return load_tray_image(image_name if image_path is None else image_path)[ |
| top:bottom, left:right |
| ] |
|
|
|
|
| def draw_fast_bb_to_patch_process( |
| image_name, |
| bboxes, |
| row=-1, |
| col=-1, |
| percent_radius: float = 1.0, |
| image_path: Path = None, |
| add_center: bool = True, |
| ): |
| cx, cy, r = get_fast_leaf_disc_circle( |
| image_name=image_name, |
| bboxes=bboxes, |
| row=row, |
| col=col, |
| percent_radius=percent_radius, |
| ) |
| bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) |
| image = load_tray_image(image_name if image_path is None else image_path) |
| rc = int(r / math.sqrt(2)) |
|
|
| cv2.circle(image, (cx, cy), r, color=(255, 0, 155), thickness=5) |
| if add_center is True: |
| cv2.circle(image, (cx, cy), 10, color=(255, 0, 155), thickness=-1) |
| cv2.rectangle(image, (cx - rc, cy - rc), (cx + rc, cy + rc), (0, 255, 0), 5) |
|
|
| return image[int(bbox.y1) : int(bbox.y2), int(bbox.x1) : int(bbox.x2)] |
|
|
|
|
| class LeafDiskDetectorDataset(Dataset): |
| def __init__( |
| self, |
| csv, |
| transform=None, |
| yxyx: bool = False, |
| return_id: bool = False, |
| bboxes: bool = True, |
| ): |
| self.boxes = csv.copy() |
| self.images = list(self.boxes.plate_name.unique()) |
| self.transforms = transform |
| if transform is not None: |
| self.width, self.height = transform[0].width, transform[0].height |
| else: |
| self.width, self.height = 0, 0 |
| self.yxyx = yxyx |
| self.return_id = return_id |
| self.bboxes = bboxes |
|
|
| def __len__(self): |
| return len(self.images) |
|
|
| def load_boxes(self, idx): |
| if "x" in self.boxes.columns: |
| boxes = self.boxes[self.boxes.plate_name == self.images[idx]].dropna() |
| size = boxes.shape[0] |
| return ( |
| (size, boxes[["x1", "y1", "x2", "y2"]].values) if size > 0 else (0, []) |
| ) |
| return 0, [] |
|
|
| def load_tray_image(self, idx): |
| return load_tray_image(self.images[idx]) |
|
|
| def get_by_sample_name(self, plate_name): |
| return self[self.images.index(plate_name)] |
|
|
| def get_image_by_name(self, plate_name): |
| return load_tray_image(plate_name) |
|
|
| def draw_image_with_boxes(self, plate_name): |
| image, labels, *_ = self[self.images.index(plate_name)] |
| boxes = labels[self.get_boxes_key()] |
| for box in boxes: |
| box_indexes = [1, 0, 3, 2] if self.yxyx is True else [0, 1, 2, 3] |
| image = cv2.rectangle( |
| image, |
| |
| (int(box[box_indexes[0]]), int(box[box_indexes[1]])), |
| (int(box[box_indexes[2]]), int(box[box_indexes[3]])), |
| (255, 0, 0), |
| 2, |
| ) |
| return image |
|
|
| def get_boxes_key(self): |
| return "bboxes" if self.bboxes is True else "boxes" |
|
|
| def __getitem__(self, index): |
| num_box, boxes = self.load_boxes( |
| index |
| ) |
| img = self.load_tray_image(index) |
|
|
| if num_box > 0: |
| boxes = torch.as_tensor(boxes, dtype=torch.float32) |
| else: |
| |
| boxes = torch.zeros((0, 4), dtype=torch.float32) |
|
|
| image_id = torch.tensor([index]) |
| labels = torch.ones((num_box,), dtype=torch.int64) |
| target = { |
| self.get_boxes_key(): boxes, |
| "labels": labels, |
| "image_id": image_id, |
| "area": torch.as_tensor( |
| (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0]), |
| dtype=torch.float32, |
| ), |
| "iscrowd": torch.zeros((num_box,), dtype=torch.int64), |
| "img_size": torch.tensor([self.height, self.width]), |
| "img_scale": torch.tensor([1.0]), |
| } |
|
|
| if self.transforms is not None: |
| sample = { |
| "image": img, |
| "bboxes": target[self.get_boxes_key()], |
| "labels": labels, |
| } |
| sample = self.transforms(**sample) |
| img = sample["image"] |
| if num_box > 0: |
| |
| boxes = np.array(sample["bboxes"]) |
| |
| if self.yxyx is True: |
| boxes[:, [0, 1, 2, 3]] = boxes[:, [1, 0, 3, 2]] |
| |
| target[self.get_boxes_key()] = torch.as_tensor( |
| boxes, dtype=torch.float32 |
| ) |
| else: |
| target[self.get_boxes_key()] = torch.zeros((0, 4), dtype=torch.float32) |
| else: |
| img = transforms.ToTensor()(img) |
| if self.return_id is True: |
| return img, target, image_id |
| else: |
| return img, target |
|
|
|
|
| def collate_fn(batch): |
| images, targets = tuple(zip(*batch)) |
| images = torch.stack(images) |
| images = images.float() |
|
|
| boxes = [target["boxes"].float() for target in targets] |
| labels = [target["labels"].float() for target in targets] |
|
|
| return images, targets |
|
|
|
|
| def find_best_lr(model, default_root_dir=cc.path_to_chk_detector): |
| |
| trainer = Trainer( |
| default_root_dir=default_root_dir, |
| auto_lr_find=True, |
| accelerator="gpu", |
| callbacks=[RichProgressBar()], |
| ) |
|
|
| |
| trainer.tune(model) |
|
|
| return model.learning_rate |
|
|
|
|
| class LeafDiskDetector(pl.LightningModule): |
| def __init__( |
| self, |
| batch_size: int, |
| learning_rate: float, |
| max_epochs: int, |
| image_factor: int, |
| train_data: pd.DataFrame, |
| val_data: pd.DataFrame, |
| test_data: pd.DataFrame, |
| augmentations_kinds: list = ["resize", "train", "to_tensor"], |
| augmentations_params: dict = {"gamma": (60, 180)}, |
| num_workers: int = 0, |
| accumulate_grad_batches: int = 3, |
| selected_device: str = g_device, |
| optimizer: str = "adam", |
| scheduler: str = None, |
| scheduler_params: dict = {}, |
| ): |
| super().__init__() |
|
|
| self.model_name = "ldd" |
|
|
| |
| self.batch_size = batch_size |
| self.selected_device = selected_device |
| self.learning_rate = learning_rate |
| self.num_workers = num_workers |
| self.max_epochs = max_epochs |
| self.accumulate_grad_batches = accumulate_grad_batches |
|
|
| |
| self.train_data = train_data |
| self.val_data = val_data |
| self.test_data = test_data |
|
|
| |
| self.optimizer = optimizer |
| self.scheduler = scheduler |
| self.scheduler_params = scheduler_params |
|
|
| |
| self.image_factor = image_factor |
| self.augmentations_kinds = augmentations_kinds |
| self.augmentations_params = augmentations_params |
|
|
| self.train_augmentations = get_augmentations( |
| image_size=self.image_factor, |
| kinds=self.augmentations_kinds, |
| **self.augmentations_params, |
| ) |
|
|
| self.val_augmentations = get_augmentations( |
| image_size=self.image_factor, |
| kinds=["resize", "to_tensor"], |
| **self.augmentations_params, |
| ) |
|
|
| |
| self.encoder = fasterrcnn_resnet50_fpn_v2( |
| weights=FasterRCNN_ResNet50_FPN_V2_Weights |
| ) |
| num_classes = 2 |
| |
| in_features = self.encoder.roi_heads.box_predictor.cls_score.in_features |
| |
| self.encoder.roi_heads.box_predictor = FastRCNNPredictor( |
| in_features, num_classes |
| ) |
|
|
| self.save_hyperparameters() |
|
|
| def hr_desc(self): |
| table = Table(title=f"{self.model_name} params & values") |
| table.add_column("Param", justify="right", style="bold", no_wrap=True) |
| table.add_column("Value") |
|
|
| def add_pairs(table_, attributes: list) -> None: |
| for a in attributes: |
| try: |
| table_.add_row(a, Pretty(getattr(self, a))) |
| except: |
| pass |
|
|
| add_pairs( |
| table, |
| ["model_name", "batch_size", "num_workers", "accumulate_grad_batches"], |
| ) |
| table.add_row("image_width", Pretty(self.train_augmentations[0].width)) |
| table.add_row("image_height", Pretty(self.train_augmentations[0].height)) |
| add_pairs( |
| table, |
| ["image_factor", "augmentations_kinds", "augmentations_params"], |
| ) |
|
|
| add_pairs( |
| table, |
| ["learning_rate", "optimizer", "scheduler", "scheduler_params"], |
| ) |
|
|
| for name, df in zip( |
| ["train", "val", "test"], |
| [self.train_data, self.val_data, self.test_data], |
| ): |
| table.add_row( |
| name, |
| Pretty( |
| f"shape: {str(df.shape)}, images: {len(df.plate_name.unique())}" |
| ), |
| ) |
|
|
| Console().print(table) |
|
|
| def configure_optimizers(self): |
| |
| if self.optimizer == "adam": |
| optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate) |
| elif self.optimizer == "sgd": |
| optimizer = torch.optim.SGD(self.parameters(), lr=self.learning_rate) |
| else: |
| optimizer = None |
|
|
| |
| if self.scheduler == "cycliclr": |
| scheduler = torch.optim.lr_scheduler.CyclicLR( |
| optimizer, |
| base_lr=self.learning_rate, |
| max_lr=0.01, |
| step_size_up=100, |
| mode=self.scheduler_mode, |
| ) |
| elif self.scheduler == "steplr": |
| self.scheduler_params["optimizer"] = optimizer |
| scheduler = torch.optim.lr_scheduler.StepLR(**self.scheduler_params) |
| self.scheduler_params.pop("optimizer") |
| elif self.scheduler == "plateau": |
| scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( |
| optimizer, |
| mode="min", |
| factor=0.2, |
| patience=10, |
| min_lr=1e-6, |
| ) |
| scheduler = {"scheduler": scheduler, "monitor": "val_loss"} |
| else: |
| scheduler = None |
| if scheduler is None: |
| return optimizer |
| else: |
| return [optimizer], [scheduler] |
|
|
| def train_dataloader(self): |
| return DataLoader( |
| LeafDiskDetectorDataset( |
| csv=self.train_data, |
| transform=self.train_augmentations, |
| bboxes=False, |
| ), |
| batch_size=self.batch_size, |
| shuffle=True, |
| num_workers=self.num_workers, |
| collate_fn=collate_fn, |
| pin_memory=True, |
| ) |
|
|
| def val_dataloader(self): |
| return DataLoader( |
| LeafDiskDetectorDataset( |
| csv=self.train_data, |
| transform=self.val_augmentations, |
| bboxes=False, |
| ), |
| batch_size=self.batch_size, |
| num_workers=self.num_workers, |
| collate_fn=collate_fn, |
| pin_memory=True, |
| ) |
|
|
| def test_dataloader(self): |
| return DataLoader( |
| LeafDiskDetectorDataset( |
| csv=self.train_data, |
| transform=self.val_augmentations, |
| bboxes=False, |
| ), |
| batch_size=self.batch_size, |
| num_workers=self.num_workers, |
| collate_fn=collate_fn, |
| pin_memory=True, |
| ) |
|
|
| def forward(self, x): |
| return self.encoder(x) |
|
|
| def step_(self, batch, batch_index): |
| x, y = batch |
| self.train() |
| loss_dict = self.encoder(x, y) |
| return sum(loss for loss in loss_dict.values()) |
|
|
| def training_step(self, batch, batch_idx): |
| loss = self.step_(batch=batch, batch_index=batch_idx) |
| self.log( |
| "train_loss", loss, on_step=True, prog_bar=True, batch_size=self.batch_size |
| ) |
| return loss |
|
|
| def validation_step(self, batch, batch_idx): |
| loss = self.step_(batch=batch, batch_index=batch_idx) |
| self.log( |
| "val_loss", |
| loss, |
| on_epoch=True, |
| on_step=False, |
| prog_bar=True, |
| batch_size=self.batch_size, |
| ) |
| self.log("train_loss", loss) |
| return loss |
|
|
| def test_step(self, batch, batch_idx): |
| loss = self.step_( |
| batch=batch, batch_index=batch_idx, batch_size=self.batch_size |
| ) |
| self.log("test_loss", loss) |
| return loss |
|
|
| def prepare_bboxes( |
| self, |
| image_name, |
| score_threshold=0.90, |
| ar_threshold=1.5, |
| size_threshold=0.30, |
| ): |
| augs = get_augmentations( |
| image_size=self.image_factor, |
| kinds=["resize", "to_tensor"], |
| inferrence=True, |
| **self.augmentations_params, |
| ) |
| image = load_tray_image(image_name=image_name) |
|
|
| self.to(g_device) |
| self.eval() |
| predictions = self(augs(image=image)["image"].to(g_device).unsqueeze(0)) |
|
|
| boxes = predictions[0]["boxes"].detach().to("cpu").numpy() |
| scores = predictions[0]["scores"].detach().to("cpu").numpy() |
|
|
| filtered_predictions = [ |
| [box[i] for i in range(4)] |
| for box, score in zip(boxes, scores) |
| if score > score_threshold |
| ] |
|
|
| restore_size = A.Compose( |
| [A.Resize(width=image.shape[1], height=image.shape[0])], |
| |
| bbox_params={"format": "pascal_voc", "label_fields": ["labels"]}, |
| ) |
|
|
| sample = { |
| "image": image, |
| "bboxes": filtered_predictions, |
| "labels": [1 for _ in range(len(filtered_predictions))], |
| } |
| sample = restore_size(**sample) |
|
|
| resized_predictions = sample["bboxes"] |
|
|
| from siuba import _, filter, mutate |
|
|
| boxes = ( |
| pd.DataFrame(data=resized_predictions, columns=["x1", "y1", "x2", "y2"]) |
| >> mutate( |
| x1=_.x1 * image.shape[1] / augs[0].width, |
| y1=_.y1 * image.shape[0] / augs[0].height, |
| x2=_.x2 * image.shape[1] / augs[0].width, |
| y2=_.y2 * image.shape[0] / augs[0].height, |
| ) |
| >> mutate(width=_.x2 - _.x1, height=_.y2 - _.y1) |
| >> mutate(cx=(_.x1 + _.x2) / 2, cy=(_.y1 + _.y2) / 2) |
| >> mutate(area=_.width * _.height) |
| >> mutate(ar=_.width / _.height) |
| ) |
| boxes.insert( |
| 0, |
| "file_name", |
| image_name.name if isinstance(image_name, Path) else image_name, |
| ) |
| boxes["max_size"] = boxes[["width", "height"]].max(axis=1) |
|
|
| ar_boxes = ( |
| boxes |
| >> filter(_.width / _.height < ar_threshold) |
| >> filter(_.height / _.width < ar_threshold) |
| ) |
|
|
| return ar_boxes[ar_boxes.area > ar_boxes.area.max() * size_threshold] |
|
|
| @staticmethod |
| def init_cols(bboxes): |
| bboxes = bboxes.copy() |
|
|
| |
| X = np.reshape(bboxes.cx.to_list(), (-1, 1)) |
| ms = MeanShift(bandwidth=100, bin_seeding=True) |
| ms.fit(X) |
| cols = ms.predict(X) |
| bboxes["col"] = cols |
|
|
| bboxes = bboxes.sort_values("cx") |
| bboxes["mean_cx"] = ( |
| bboxes.groupby("col").transform("mean", numeric_only=True).cx |
| ) |
| bboxes = bboxes.sort_values("mean_cx") |
| for i, val in enumerate(bboxes.mean_cx.unique()): |
| bboxes.loc[bboxes["mean_cx"] == val, "col"] = i |
|
|
| |
| bboxes = bboxes.sort_values("cy") |
| X = np.reshape(bboxes.cy.to_list(), (-1, 1)) |
| ms = MeanShift(bandwidth=100, bin_seeding=True) |
| ms.fit(X) |
| rows = ms.predict(X) |
| bboxes["row"] = rows |
|
|
| bboxes = bboxes.sort_values("cy") |
| bboxes["mean_cy"] = ( |
| bboxes.groupby("row").transform("mean", numeric_only=True).cy |
| ) |
| bboxes = bboxes.sort_values("mean_cy") |
| for i, val in zip(["a", "b", "c"], bboxes.mean_cy.unique()): |
| bboxes.loc[bboxes["mean_cy"] == val, "row"] = i |
|
|
| bboxes = bboxes.sort_values("cx") |
|
|
| return bboxes |
|
|
| @staticmethod |
| def finalize_indexing(bboxes, image_name): |
| bboxes = bboxes.copy() |
| bboxes = bboxes.sort_values("cx") |
| labels_unique = bboxes.col.unique() |
| labels = bboxes.col.to_numpy() |
| if len(labels_unique) < 4: |
| inc_labels = [[i, 0] for i in range(len(labels_unique))] |
| max_width = bboxes.max_size.max() |
|
|
| |
| |
| left_most_line = get_first_vert_line(image_name=image_name) |
| if left_most_line is not None: |
| left_most_point = bboxes.x1.min() - min( |
| left_most_line[0], left_most_line[1] |
| ) |
| else: |
| left_most_point = bboxes.x1.min() - (max_width / 2) |
| i = 1 |
| while left_most_point > i * 1.1 * max_width: |
| inc_labels[0][1] += 1 |
| i += 1 |
|
|
| |
| prev_min_min = bboxes[bboxes.col == 0].x2.max() |
|
|
| for label in labels_unique[1:]: |
| current_label_contours = bboxes[bboxes.col == label] |
| max_width = current_label_contours.max_size.max() |
| min_left = current_label_contours.x1.min() |
| i = 1 |
| while min_left - prev_min_min > i * 1.1 * max_width: |
| inc_labels[label][1] += 1 |
| i += 1 |
| prev_min_min = min_left + max_width |
|
|
| for pos, inc in reversed(inc_labels): |
| labels[labels >= pos] += inc |
|
|
| bboxes["col"] = labels |
|
|
| labels_unique = np.unique(labels) |
|
|
| bboxes["col"] += 1 |
|
|
| return bboxes.sort_values(["row", "col"]) |
|
|
| def index_plate( |
| self, |
| image_name, |
| score_threshold=0.90, |
| ar_threshold=1.5, |
| size_threshold=0.50, |
| ): |
| bboxes = self.prepare_bboxes( |
| image_name=image_name, |
| score_threshold=score_threshold, |
| ar_threshold=ar_threshold, |
| size_threshold=size_threshold, |
| ) |
| if bboxes.shape[0] == 0: |
| return bboxes |
|
|
| bboxes = self.init_cols(bboxes=bboxes) |
| bboxes = self.finalize_indexing(bboxes=bboxes, image_name=image_name) |
|
|
| return bboxes |
|
|
|
|
| def test_augmentations( |
| df, |
| image_size, |
| kinds: list = ["resize", "train"], |
| row_count=2, |
| col_count=4, |
| **aug_params, |
| ): |
| src_dataset = LeafDiskDetectorDataset( |
| csv=df, |
| transform=get_augmentations( |
| image_size=image_size, kinds=["resize"], **aug_params |
| ), |
| ) |
|
|
| test_dataset = LeafDiskDetectorDataset( |
| csv=df, |
| transform=get_augmentations(image_size=image_size, kinds=kinds, **aug_params), |
| ) |
|
|
| image_name = df.sample(n=1).iloc[0].plate_name |
|
|
| images = [(src_dataset.draw_image_with_boxes(plate_name=image_name), "Source")] + [ |
| (test_dataset.draw_image_with_boxes(plate_name=image_name), "Augmented") |
| for i in range(row_count * col_count - 1) |
| ] |
|
|
| make_patches_grid( |
| images=images, |
| row_count=row_count, |
| col_count=col_count, |
| figsize=(col_count * 4, row_count * 3), |
| ) |
|
|
|
|
| def get_file_path_from_row(row, path_to_patches: Path): |
| return path_to_patches.joinpath(row.file_name) |
|
|
|
|
| def get_fast_images( |
| row, path_to_patches, percent_radius: float = 1.0, add_process_image: bool = False |
| ): |
| d = {} |
| try: |
| d["leaf_disc_box"] = get_leaf_disk_wbb( |
| row.file_name, row, image_path=get_file_path_from_row(row, path_to_patches) |
| ) |
| except: |
| pass |
| try: |
| d["segmented_leaf_disc"] = get_fast_segment_disk( |
| image_name=row.file_name, |
| bboxes=row, |
| percent_radius=percent_radius, |
| image_path=get_file_path_from_row(row, path_to_patches), |
| ) |
| except: |
| pass |
| try: |
| d["leaf_disc_patch"] = get_fast_leaf_disk_patch( |
| image_name=row.file_name, |
| bboxes=row, |
| percent_radius=percent_radius, |
| image_path=get_file_path_from_row(row, path_to_patches), |
| ) |
| except: |
| pass |
| if add_process_image is True: |
| try: |
| d["process_image"] = draw_fast_bb_to_patch_process( |
| image_name=row.file_name, |
| bboxes=row, |
| percent_radius=percent_radius, |
| image_path=get_file_path_from_row(row, path_to_patches), |
| ) |
| except: |
| pass |
|
|
| return d |
|
|
|
|
| def save_images(row: pd.Series, images_data: dict, errors: dict, paths: dict): |
| fn = f"{Path(row.file_name).stem}_{row.row}_{int(row.col)}.png" |
| for k, image in images_data.items(): |
| if k not in paths: |
| continue |
| path_to_image = paths[k].joinpath(fn) |
| if image is not None: |
| if path_to_image.is_file() is False: |
| cv2.imwrite(str(path_to_image), cv2.cvtColor(image, cv2.COLOR_RGB2BGR)) |
| elif errors is not None: |
| errors[k].append(row.file_name) |
| else: |
| pass |
|
|
|
|
| def handle_bbox( |
| row: pd.Series, |
| paths: dict, |
| errors: dict = None, |
| percent_radius: float = 1.0, |
| add_process_image: bool = False, |
| ): |
| save_images( |
| row=row, |
| images_data=get_fast_images( |
| row=row, |
| percent_radius=percent_radius, |
| add_process_image=add_process_image, |
| path_to_patches=paths["plates"], |
| ), |
| errors=errors, |
| paths=paths, |
| ) |
|
|