|
|
import onnxruntime as ort |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import time |
|
|
import yaml |
|
|
import glob |
|
|
import os |
|
|
import pyzbar.pyzbar as pyzbar |
|
|
|
|
|
names=['QRCode'] |
|
|
|
|
|
def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32): |
|
|
|
|
|
shape = im.shape[:2] |
|
|
if isinstance(new_shape, int): |
|
|
new_shape = (new_shape, new_shape) |
|
|
|
|
|
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) |
|
|
if not scaleup: |
|
|
r = min(r, 1.0) |
|
|
|
|
|
ratio = r, r |
|
|
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) |
|
|
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] |
|
|
if auto: |
|
|
dw, dh = np.mod(dw, stride), np.mod(dh, stride) |
|
|
elif scaleFill: |
|
|
dw, dh = 0.0, 0.0 |
|
|
new_unpad = (new_shape[1], new_shape[0]) |
|
|
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] |
|
|
|
|
|
dw /= 2 |
|
|
dh /= 2 |
|
|
|
|
|
if shape[::-1] != new_unpad: |
|
|
im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR) |
|
|
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) |
|
|
left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) |
|
|
im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) |
|
|
return im, ratio, (dw, dh) |
|
|
|
|
|
def data_process_cv2(frame, input_shape): |
|
|
''' |
|
|
对输入的图像进行预处理 |
|
|
:param frame: |
|
|
:param input_shape: |
|
|
:return: |
|
|
''' |
|
|
im0 = cv2.imread(frame) |
|
|
img = letterbox(im0, input_shape, auto=False, stride=32)[0] |
|
|
org_data = img.copy() |
|
|
img = np.ascontiguousarray(img[:, :, ::-1].transpose(2, 0, 1)) |
|
|
img = np.asarray(img, dtype=np.float32) |
|
|
img = np.expand_dims(img, 0) |
|
|
img /= 255.0 |
|
|
return img, im0, org_data |
|
|
|
|
|
def non_max_suppression(prediction, |
|
|
conf_thres=0.25, |
|
|
iou_thres=0.45, |
|
|
classes=None, |
|
|
agnostic=False, |
|
|
multi_label=False, |
|
|
labels=(), |
|
|
max_det=300, |
|
|
nm=0 |
|
|
): |
|
|
""" |
|
|
Perform Non-Maximum Suppression (NMS) on the boxes to filter out overlapping boxes. |
|
|
|
|
|
Parameters: |
|
|
prediction (ndarray): Predictions from the model. |
|
|
conf_thres (float): Confidence threshold to filter boxes. |
|
|
iou_thres (float): Intersection over Union (IoU) threshold for NMS. |
|
|
classes (list): Filter boxes by classes. |
|
|
agnostic (bool): If True, perform class-agnostic NMS. |
|
|
multi_label (bool): If True, perform multi-label NMS. |
|
|
labels (list): Labels for auto-labelling. |
|
|
max_det (int): Maximum number of detections. |
|
|
nm (int): Number of masks. |
|
|
|
|
|
Returns: |
|
|
list: A list of filtered boxes. |
|
|
""" |
|
|
bs = prediction.shape[0] |
|
|
nc = prediction.shape[2] - nm - 5 |
|
|
xc = prediction[..., 4] > conf_thres |
|
|
|
|
|
max_wh = 7680 |
|
|
max_nms = 30000 |
|
|
time_limit = 0.5 + 0.05 * bs |
|
|
|
|
|
multi_label &= nc > 1 |
|
|
|
|
|
|
|
|
t = time.time() |
|
|
mi = 5 + nc |
|
|
output = [np.zeros((0, 6 + nm))] * bs |
|
|
for xi, x in enumerate(prediction): |
|
|
|
|
|
x = x[xc[xi]] |
|
|
|
|
|
|
|
|
if labels and len(labels[xi]): |
|
|
lb = labels[xi] |
|
|
v = np.zeros((len(lb), nc + nm + 5)) |
|
|
v[:, :4] = lb[:, 1:5] |
|
|
v[:, 4] = 1.0 |
|
|
v[np.arange(len(lb)), lb[:, 0].astype(int) + 5] = 1.0 |
|
|
x = np.concatenate((x, v), 0) |
|
|
|
|
|
|
|
|
if not x.shape[0]: |
|
|
continue |
|
|
|
|
|
|
|
|
x[:, 5:] *= x[:, 4:5] |
|
|
|
|
|
|
|
|
box = xywh2xyxy(x[:, :4]) |
|
|
mask = x[:, mi:] |
|
|
|
|
|
|
|
|
if multi_label: |
|
|
i, j = np.nonzero(x[:, 5:mi] > conf_thres) |
|
|
x = np.concatenate((box[i], x[i, 5 + j][:, None], j[:, None].astype(float), mask[i]), 1) |
|
|
else: |
|
|
|
|
|
|
|
|
conf = np.max(x[:, 5:mi], 1).reshape(box.shape[:1][0], 1) |
|
|
j = np.argmax(x[:, 5:mi], 1).reshape(box.shape[:1][0], 1) |
|
|
x = np.concatenate((box, conf, j.astype(float), mask), 1)[conf[:, 0] > conf_thres] |
|
|
|
|
|
if classes is not None: |
|
|
x = x[(x[:, 5:6] == np.array(classes)[:, None]).any(1)] |
|
|
|
|
|
n = x.shape[0] |
|
|
if not n: |
|
|
continue |
|
|
sorted_indices = np.argsort(x[:, 4])[::-1] |
|
|
x = x[sorted_indices][:max_nms] |
|
|
|
|
|
|
|
|
c = x[:, 5:6] * (0 if agnostic else max_wh) |
|
|
boxes, scores = x[:, :4] + c, x[:, 4] |
|
|
i = nms(boxes, scores, iou_thres) |
|
|
|
|
|
i = i[:max_det] |
|
|
|
|
|
output[xi] = x[i] |
|
|
|
|
|
|
|
|
if (time.time() - t) > time_limit: |
|
|
|
|
|
break |
|
|
return output |
|
|
|
|
|
|
|
|
|
|
|
def nms(boxes, scores, iou_threshold): |
|
|
""" |
|
|
Perform Non-Maximum Suppression (NMS) on the given boxes with scores using numpy. |
|
|
|
|
|
Parameters: |
|
|
boxes (ndarray): The bounding boxes, shaped (N, 4). |
|
|
scores (ndarray): The confidence scores for each box, shaped (N,). |
|
|
iou_threshold (float): The IoU threshold for suppressing overlapping boxes. |
|
|
|
|
|
Returns: |
|
|
ndarray: The indices of the selected boxes after NMS. |
|
|
""" |
|
|
if len(boxes) == 0: |
|
|
return [] |
|
|
|
|
|
|
|
|
indices = np.argsort(scores)[::-1] |
|
|
|
|
|
selected_indices = [] |
|
|
while len(indices) > 0: |
|
|
|
|
|
current_index = indices[0] |
|
|
selected_indices.append(current_index) |
|
|
|
|
|
|
|
|
current_box = boxes[current_index] |
|
|
other_boxes = boxes[indices[1:]] |
|
|
iou = calculate_iou(current_box, other_boxes) |
|
|
|
|
|
|
|
|
indices = indices[1:][iou <= iou_threshold] |
|
|
|
|
|
return np.array(selected_indices) |
|
|
|
|
|
|
|
|
def calculate_iou(box, boxes): |
|
|
""" |
|
|
Calculate the Intersection over Union (IoU) between a given box and a set of boxes. |
|
|
|
|
|
Parameters: |
|
|
box (ndarray): The coordinates of the first box, shaped (4,). |
|
|
boxes (ndarray): The coordinates of the other boxes, shaped (N, 4). |
|
|
|
|
|
Returns: |
|
|
ndarray: The IoU between the given box and each box in the set, shaped (N,). |
|
|
""" |
|
|
|
|
|
x1 = np.maximum(box[0], boxes[:, 0]) |
|
|
y1 = np.maximum(box[1], boxes[:, 1]) |
|
|
x2 = np.minimum(box[2], boxes[:, 2]) |
|
|
y2 = np.minimum(box[3], boxes[:, 3]) |
|
|
|
|
|
|
|
|
intersection_area = np.maximum(x2 - x1, 0) * np.maximum(y2 - y1, 0) |
|
|
|
|
|
|
|
|
box_area = (box[2] - box[0]) * (box[3] - box[1]) |
|
|
boxes_area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) |
|
|
|
|
|
|
|
|
iou = intersection_area / (box_area + boxes_area - intersection_area) |
|
|
|
|
|
return iou |
|
|
|
|
|
|
|
|
def xywh2xyxy(x): |
|
|
""" |
|
|
Convert bounding boxes from (center_x, center_y, width, height) to (x1, y1, x2, y2) format. |
|
|
|
|
|
Parameters: |
|
|
x (ndarray): Bounding boxes in (center_x, center_y, width, height) format, shaped (N, 4). |
|
|
|
|
|
Returns: |
|
|
ndarray: Bounding boxes in (x1, y1, x2, y2) format, shaped (N, 4). |
|
|
""" |
|
|
y = x.copy() |
|
|
y[:, 0] = x[:, 0] - x[:, 2] / 2 |
|
|
y[:, 1] = x[:, 1] - x[:, 3] / 2 |
|
|
y[:, 2] = x[:, 0] + x[:, 2] / 2 |
|
|
y[:, 3] = x[:, 1] + x[:, 3] / 2 |
|
|
return y |
|
|
|
|
|
def xyxy2xywh(x): |
|
|
|
|
|
y = np.copy(x) |
|
|
y[:, 0] = (x[:, 0] + x[:, 2]) / 2 |
|
|
y[:, 1] = (x[:, 1] + x[:, 3]) / 2 |
|
|
y[:, 2] = x[:, 2] - x[:, 0] |
|
|
y[:, 3] = x[:, 3] - x[:, 1] |
|
|
return y |
|
|
|
|
|
def post_process_yolo(det, im, im0, gn, save_path, img_name): |
|
|
detections = [] |
|
|
if len(det): |
|
|
det[:, :4] = scale_boxes(im.shape[:2], det[:, :4], im0.shape).round() |
|
|
colors = Colors() |
|
|
for *xyxy, conf, cls in reversed(det): |
|
|
|
|
|
int_coords = [int(tensor.item()) for tensor in xyxy] |
|
|
|
|
|
detections.append(int_coords) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return detections |
|
|
|
|
|
def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None): |
|
|
if ratio_pad is None: |
|
|
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) |
|
|
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 |
|
|
else: |
|
|
gain = ratio_pad[0][0] |
|
|
pad = ratio_pad[1] |
|
|
|
|
|
boxes[..., [0, 2]] -= pad[0] |
|
|
boxes[..., [1, 3]] -= pad[1] |
|
|
boxes[..., :4] /= gain |
|
|
clip_boxes(boxes, img0_shape) |
|
|
return boxes |
|
|
|
|
|
def clip_boxes(boxes, shape): |
|
|
boxes[..., [0, 2]] = boxes[..., [0, 2]].clip(0, shape[1]) |
|
|
boxes[..., [1, 3]] = boxes[..., [1, 3]].clip(0, shape[0]) |
|
|
|
|
|
|
|
|
def yaml_load(file='coco128.yaml'): |
|
|
with open(file, errors='ignore') as f: |
|
|
return yaml.safe_load(f) |
|
|
|
|
|
|
|
|
class Colors: |
|
|
|
|
|
def __init__(self): |
|
|
""" |
|
|
Initializes the Colors class with a palette derived from Ultralytics color scheme, converting hex codes to RGB. |
|
|
Colors derived from `hex = matplotlib.colors.TABLEAU_COLORS.values()`. |
|
|
""" |
|
|
hexs = ( |
|
|
"FF3838", |
|
|
"FF9D97", |
|
|
"FF701F", |
|
|
"FFB21D", |
|
|
"CFD231", |
|
|
"48F90A", |
|
|
"92CC17", |
|
|
"3DDB86", |
|
|
"1A9334", |
|
|
"00D4BB", |
|
|
"2C99A8", |
|
|
"00C2FF", |
|
|
"344593", |
|
|
"6473FF", |
|
|
"0018EC", |
|
|
"8438FF", |
|
|
"520085", |
|
|
"CB38FF", |
|
|
"FF95C8", |
|
|
"FF37C7", |
|
|
) |
|
|
self.palette = [self.hex2rgb(f"#{c}") for c in hexs] |
|
|
self.n = len(self.palette) |
|
|
|
|
|
def __call__(self, i, bgr=False): |
|
|
"""Returns color from palette by index `i`, in BGR format if `bgr=True`, else RGB; `i` is an integer index.""" |
|
|
c = self.palette[int(i) % self.n] |
|
|
return (c[2], c[1], c[0]) if bgr else c |
|
|
|
|
|
@staticmethod |
|
|
def hex2rgb(h): |
|
|
"""Converts hex color codes to RGB values (i.e. default PIL order).""" |
|
|
return tuple(int(h[1 + i: 1 + i + 2], 16) for i in (0, 2, 4)) |
|
|
|
|
|
def plot_one_box(x, im, color=None, label=None, line_thickness=3, steps=2, orig_shape=None): |
|
|
assert im.data.contiguous, 'Image not contiguous. Apply np.ascontiguousarray(im) to plot_on_box() input image.' |
|
|
tl = line_thickness or round(0.002 * (im.shape[0] + im.shape[1]) / 2) + 1 |
|
|
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3])) |
|
|
cv2.rectangle(im, c1, c2, color, thickness=tl*1//3, lineType=cv2.LINE_AA) |
|
|
if label: |
|
|
if len(label.split(':')) > 1: |
|
|
tf = max(tl - 1, 1) |
|
|
t_size = cv2.getTextSize(label, 0, fontScale=tl / 6, thickness=tf)[0] |
|
|
c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3 |
|
|
cv2.rectangle(im, c1, c2, color, -1, cv2.LINE_AA) |
|
|
cv2.putText(im, label, (c1[0], c1[1] - 2), 0, tl / 6, [225, 255, 255], thickness=tf//2, lineType=cv2.LINE_AA) |
|
|
return im |
|
|
|
|
|
def model_load(model): |
|
|
providers = ['CPUExecutionProvider'] |
|
|
session = ort.InferenceSession(model, providers=providers) |
|
|
input_name = session.get_inputs()[0].name |
|
|
output_names = [ x.name for x in session.get_outputs()] |
|
|
return session, output_names |
|
|
|
|
|
def make_anchors(feats, strides, grid_cell_offset=0.5): |
|
|
"""Generate anchors from features.""" |
|
|
anchor_points, stride_tensor = [], [] |
|
|
assert feats is not None |
|
|
dtype = feats[0].dtype |
|
|
for i, stride in enumerate(strides): |
|
|
_, _, h, w = feats[i].shape |
|
|
sx = np.arange(w, dtype=dtype) + grid_cell_offset |
|
|
sy = np.arange(h, dtype=dtype) + grid_cell_offset |
|
|
sy, sx = np.meshgrid(sy, sx, indexing='ij') |
|
|
anchor_points.append(np.stack((sx, sy), axis=-1).reshape(-1, 2)) |
|
|
stride_tensor.append(np.full((h * w, 1), stride, dtype=dtype)) |
|
|
return np.concatenate(anchor_points), np.concatenate(stride_tensor) |
|
|
|
|
|
def dist2bbox(distance, anchor_points, xywh=True, dim=-1): |
|
|
"""Transform distance(ltrb) to box(xywh or xyxy).""" |
|
|
lt, rb = np.split(distance, 2, axis=dim) |
|
|
x1y1 = anchor_points - lt |
|
|
x2y2 = anchor_points + rb |
|
|
if xywh: |
|
|
c_xy = (x1y1 + x2y2) / 2 |
|
|
wh = x2y2 - x1y1 |
|
|
return np.concatenate((c_xy, wh), axis=dim) |
|
|
return np.concatenate((x1y1, x2y2), axis=dim) |
|
|
|
|
|
class DFL: |
|
|
""" |
|
|
NumPy implementation of Distribution Focal Loss (DFL) integral module. |
|
|
Original paper: Generalized Focal Loss (IEEE TPAMI 2023) |
|
|
""" |
|
|
|
|
|
def __init__(self, c1=16): |
|
|
"""Initialize with given number of distribution channels""" |
|
|
self.c1 = c1 |
|
|
|
|
|
self.weights = np.arange(c1, dtype=np.float32).reshape(1, c1, 1, 1) |
|
|
|
|
|
|
|
|
def __call__(self, x): |
|
|
""" |
|
|
前向传播逻辑 |
|
|
参数: |
|
|
x: 输入张量,形状为(batch, channels, anchors) |
|
|
返回: |
|
|
处理后的张量,形状为(batch, 4, anchors) |
|
|
""" |
|
|
b, c, a = x.shape |
|
|
|
|
|
|
|
|
x_reshaped = x.reshape(b, 4, self.c1, a) |
|
|
x_transposed = np.transpose(x_reshaped, (0, 2, 1, 3)) |
|
|
x_softmax = np.exp(x_transposed) / np.sum(np.exp(x_transposed), axis=1, keepdims=True) |
|
|
|
|
|
|
|
|
conv_result = np.sum(self.weights * x_softmax, axis=1) |
|
|
|
|
|
return conv_result.reshape(b, 4, a) |
|
|
|
|
|
class YOLOV8Detector: |
|
|
def __init__(self, model_path, imgsz=[640,640]): |
|
|
self.model_path = model_path |
|
|
self.session, self.output_names = model_load(self.model_path) |
|
|
self.imgsz = imgsz |
|
|
self.stride = [8.,16.,32.] |
|
|
self.reg_max = 16 |
|
|
self.nc = 1 |
|
|
self.no = self.nc + self.reg_max * 4 |
|
|
self.dfl = DFL(self.reg_max) |
|
|
|
|
|
def detect_objects(self, image, save_path): |
|
|
im, im0, org_data = data_process_cv2(image, self.imgsz) |
|
|
img_name = os.path.basename(image).split('.')[0] |
|
|
infer_start_time = time.time() |
|
|
x = self.session.run(self.output_names, {self.session.get_inputs()[0].name: im}) |
|
|
infer_end_time = time.time() |
|
|
print(f"infer time: {infer_end_time - infer_start_time:.4f}s") |
|
|
x = [np.transpose(x[i],(0,3,1,2)) for i in range(3)] |
|
|
|
|
|
anchors,strides = (np.transpose(x,(1, 0)) for x in make_anchors(x, self.stride, 0.5)) |
|
|
x_cat = np.concatenate([xi.reshape(1, self.no, -1) for xi in x], axis=2) |
|
|
box = x_cat[:, :self.reg_max * 4,:] |
|
|
cls = x_cat[:, self.reg_max * 4:,:] |
|
|
dbox = dist2bbox(self.dfl(box), np.expand_dims(anchors, axis=0), xywh=True, dim=1) * strides |
|
|
y = np.concatenate((dbox, 1/(1 + np.exp(-cls))), axis=1) |
|
|
pred = y.transpose([0, 2, 1]) |
|
|
pred_class = pred[..., 4:] |
|
|
pred_conf = np.max(pred_class, axis=-1) |
|
|
pred = np.insert(pred, 4, pred_conf, axis=-1) |
|
|
|
|
|
pred = non_max_suppression(pred, conf_thres=0.25, iou_thres=0.45, max_det=1000) |
|
|
gn = np.array(org_data.shape)[[1, 0, 1, 0]].astype(np.float32) |
|
|
res = post_process_yolo(pred[0], org_data, im0, gn, save_path, img_name) |
|
|
return res, im0 |
|
|
|
|
|
class QRCodeDecoder: |
|
|
def crop_qr_regions(self, image, regions): |
|
|
""" |
|
|
根据检测到的边界框裁剪二维码区域 |
|
|
""" |
|
|
cropped_images = [] |
|
|
for idx, region in enumerate(regions): |
|
|
x1, y1, x2, y2 = region |
|
|
|
|
|
x1-=15 |
|
|
y1-=15 |
|
|
x2+=15 |
|
|
y2+=15 |
|
|
|
|
|
cropped = image[y1:y2, x1:x2] |
|
|
if cropped.size > 0: |
|
|
cropped_images.append({ |
|
|
'image': cropped, |
|
|
'bbox': region, |
|
|
}) |
|
|
|
|
|
return cropped_images |
|
|
|
|
|
def decode_qrcode_pyzbar(self, cropped_image): |
|
|
""" |
|
|
使用pyzbar解码二维码 |
|
|
""" |
|
|
try: |
|
|
|
|
|
if len(cropped_image.shape) == 3: |
|
|
gray = cv2.cvtColor(cropped_image, cv2.COLOR_BGR2GRAY) |
|
|
else: |
|
|
gray = cropped_image |
|
|
|
|
|
|
|
|
decoded_objects = pyzbar.decode(gray) |
|
|
results = [] |
|
|
for obj in decoded_objects: |
|
|
try: |
|
|
data = obj.data.decode('utf-8') |
|
|
results.append({ |
|
|
'data': data, |
|
|
'type': obj.type, |
|
|
'points': obj.polygon |
|
|
}) |
|
|
except: |
|
|
continue |
|
|
|
|
|
return results |
|
|
except Exception as e: |
|
|
print(f"decode error: {e}") |
|
|
return [] |
|
|
|
|
|
if __name__ == '__main__': |
|
|
import time |
|
|
|
|
|
detector = YOLOV8Detector(model_path='./yolov8n.onnx',imgsz=[640,640]) |
|
|
decoder = QRCodeDecoder() |
|
|
img_path = './images' |
|
|
det_path='./v8_det_res' |
|
|
crop_path='./v8_crop_res' |
|
|
os.makedirs(det_path, exist_ok=True) |
|
|
os.makedirs(crop_path, exist_ok=True) |
|
|
imgs = glob.glob(f"{img_path}/*.jpg") |
|
|
totoal = len(imgs) |
|
|
success = 0 |
|
|
fail = 0 |
|
|
start_time = time.time() |
|
|
for idx,img in enumerate(imgs): |
|
|
pic_name=os.path.basename(img).split('.')[0] |
|
|
loop_start_time = time.time() |
|
|
det_result, res_img = detector.detect_objects(img,det_path) |
|
|
|
|
|
|
|
|
|
|
|
cropped_images = decoder.crop_qr_regions(res_img, det_result) |
|
|
|
|
|
|
|
|
|
|
|
all_decoded_results = [] |
|
|
for i, cropped_data in enumerate(cropped_images): |
|
|
decoded_results = decoder.decode_qrcode_pyzbar(cropped_data['image']) |
|
|
all_decoded_results.extend(decoded_results) |
|
|
|
|
|
|
|
|
|
|
|
if all_decoded_results: |
|
|
success += 1 |
|
|
print("识别成功!") |
|
|
else: |
|
|
fail += 1 |
|
|
print("识别失败!") |
|
|
loop_end_time = time.time() |
|
|
print(f"图片 {img} 处理耗时: {loop_end_time - loop_start_time:.4f} 秒") |
|
|
|
|
|
end_time = time.time() |
|
|
total_time = end_time - start_time |
|
|
|
|
|
print(f"总共测试图片数量: {totoal}") |
|
|
print(f"识别成功数量: {success}") |
|
|
print(f"识别失败数量: {fail}") |
|
|
print(f"识别成功率: {success/totoal*100:.2f}%") |
|
|
print(f"整体处理耗时: {total_time:.4f} 秒") |
|
|
print(f"平均每张图片处理耗时: {total_time/totoal:.4f} 秒") |