-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathyolov5_dnn.py
259 lines (231 loc) · 11.3 KB
/
yolov5_dnn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import cv2
import numpy as np
import time
import os
from numpy import array
class Colors:
# Ultralytics color palette https://ultralytics.com/
def __init__(self):
# hex = matplotlib.colors.TABLEAU_COLORS.values()
hex = ('FF3838', 'FF9D97', 'FF701F', 'FFB21D', 'CFD231', '48F90A', '92CC17', '3DDB86', '1A9334', '00D4BB',
'2C99A8', '00C2FF', '344593', '6473FF', '0018EC', '8438FF', '520085', 'CB38FF', 'FF95C8', 'FF37C7')
self.palette = [self.hex2rgb('#' + c) for c in hex]
self.n = len(self.palette)
def __call__(self, i, bgr=False):
c = self.palette[int(i) % self.n]
return (c[2], c[1], c[0]) if bgr else c
@staticmethod
def hex2rgb(h): # rgb order (PIL)
return tuple(int(h[1 + i:1 + i + 2], 16) for i in (0, 2, 4))
colors = Colors()
class yolov5():
def __init__(self, onnx_path, confThreshold=0.25, nmsThreshold=0.45):
self.classes = ["joint","embed","arrange"] # ['joint', 'err1', 'err2']
self.colors = [np.random.randint(0, 255, size=3).tolist() for _ in range(len(self.classes))]
num_classes = len(self.classes)
self.anchors = [[10, 13, 16, 30, 33, 23], [30, 61, 62, 45, 59, 119], [116, 90, 156, 198, 373, 326]]
self.nl = len(self.anchors)
self.na = len(self.anchors[0]) // 2
self.no = num_classes + 5
self.stride = np.array([8., 16., 32.])
self.inpWidth = 640
self.inpHeight = 640
self.net = cv2.dnn.readNetFromONNX(onnx_path)
self.confThreshold = confThreshold
self.nmsThreshold = nmsThreshold
def _make_grid(self, nx=20, ny=20):
xv, yv = np.meshgrid(np.arange(ny), np.arange(nx))
return np.stack((xv, yv), 2).reshape((-1, 2)).astype(np.float32)
def letterbox(self, im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):
# Resize and pad image while meeting stride-multiple constraints
shape = im.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better val mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return im, ratio, (dw, dh)
def box_area(self,boxes :array):
return (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
def box_iou(self,box1 :array, box2: array):
"""
:param box1: [N, 4]
:param box2: [M, 4]
:return: [N, M]
"""
area1 = self.box_area(box1) # N
area2 = self.box_area(box2) # M
# broadcasting, 两个数组各维度大小 从后往前对比一致, 或者 有一维度值为1;
lt = np.maximum(box1[:, np.newaxis, :2], box2[:, :2])
rb = np.minimum(box1[:, np.newaxis, 2:], box2[:, 2:])
wh = rb - lt
wh = np.maximum(0, wh) # [N, M, 2]
inter = wh[:, :, 0] * wh[:, :, 1]
iou = inter / (area1[:, np.newaxis] + area2 - inter)
return iou # NxM
def numpy_nms(self, boxes :array, scores :array, iou_threshold :float):
idxs = scores.argsort() # 按分数 降序排列的索引 [N]
keep = []
while idxs.size > 0: # 统计数组中元素的个数
max_score_index = idxs[-1]
max_score_box = boxes[max_score_index][None, :]
keep.append(max_score_index)
if idxs.size == 1:
break
idxs = idxs[:-1] # 将得分最大框 从索引中删除; 剩余索引对应的框 和 得分最大框 计算IoU;
other_boxes = boxes[idxs] # [?, 4]
ious = self.box_iou(max_score_box, other_boxes) # 一个框和其余框比较 1XM
idxs = idxs[ious[0] <= iou_threshold]
keep = np.array(keep)
return keep
def xywh2xyxy(self,x):
# Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right
# y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x)
y = np.copy(x)
y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x
y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y
y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x
y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y
return y
def non_max_suppression(self,prediction, conf_thres=0.25,agnostic=False): #25200 = 20*20*3 + 40*40*3 + 80*80*3
xc = prediction[..., 4] > conf_thres # candidates,获取置信度,prediction为所有的预测结果.shape(1, 25200, 21),batch为1,25200个预测结果,21 = x,y,w,h,c + class个数
# Settings
min_wh, max_wh = 2, 4096 # (pixels) minimum and maximum box width and height
max_nms = 30000 # maximum number of boxes into torchvision.ops.nms()
output = [np.zeros((0, 6))] * prediction.shape[0]
# for p in prediction:
# for i in p:
# with open('./result.txt','a') as f:
# f.write(str(i) + '\n')
for xi, x in enumerate(prediction): # image index, image inference
# Apply constraints
x = x[xc[xi]] # confidence,获取confidence大于conf_thres的结果
if not x.shape[0]:
continue
# Compute conf
x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf
# Box (center x, center y, width, height) to (x1, y1, x2, y2)
box = self.xywh2xyxy(x[:, :4])
# Detections matrix nx6 (xyxy, conf, cls)
conf = np.max(x[:, 5:], axis=1) #获取类别最高的置信度
j = np.argmax(x[:, 5:],axis=1) #获取下标
#转为array: x = torch.cat((box, conf, j.float()), 1)[conf.view(-1) > conf_thres]
re = np.array(conf.reshape(-1)> conf_thres)
#转为维度
conf =conf.reshape(-1,1)
j = j.reshape(-1,1)
#numpy的拼接
x = np.concatenate((box,conf,j),axis=1)[re]
# Check shape
n = x.shape[0] # number of boxes
if not n: # no boxes
continue
elif n > max_nms: # excess boxes
x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence
# Batched NMS
c = x[:, 5:6] * (0 if agnostic else max_wh) # classes
boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores
i = self.numpy_nms(boxes, scores, self.nmsThreshold)
output[xi] = x[i]
return output
def detect(self, srcimg):
im = srcimg.copy()
im, ratio, wh = self.letterbox(srcimg, self.inpWidth, stride=self.stride, auto=False)
# Sets the input to the network
blob = cv2.dnn.blobFromImage(im, 1 / 255.0,swapRB=True, crop=False)
self.net.setInput(blob)
outs = self.net.forward(self.net.getUnconnectedOutLayersNames())[0]
#NMS
pred = self.non_max_suppression(outs, self.confThreshold,agnostic=False)
#draw box
for i in pred[0]:
left = int((i[0] - wh[0])/ratio[0])
top = int((i[1]-wh[1])/ratio[1])
width = int((i[2] - wh[0])/ratio[0])
height = int((i[3]-wh[1])/ratio[1])
conf = i[4]
classId = i[5]
cv2.rectangle(srcimg, (int(left), int(top)), (int(width),int(height)), colors(classId, True), 2, lineType=cv2.LINE_AA)
label = '%.2f' % conf
label = '%s:%s' % (self.classes[int(classId)], label)
# Display the label at the top of the bounding box
labelSize, baseLine = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
top = max(top, labelSize[1])
cv2.putText(srcimg, label, (int(left-20),int(top - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,255), thickness=1, lineType=cv2.LINE_AA)
return srcimg
def mult_test(onnx_path, img_dir, save_root_path, video=False):
model = yolov5(onnx_path)
if video:
cap = cv2.VideoCapture(0)
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
fps = cap.get(cv2.CAP_PROP_FPS) #视频平均帧率
size = (frame_height,frame_width) #尺寸和帧率和原视频相同
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('zi.mp4',fourcc,fps,size)
while cap.isOpened():
ok, frame = cap.read()
if not ok:
break
frame = model.detect(frame)
out.write(frame)
cv2.imshow('result', frame)
c = cv2.waitKey(1) & 0xFF
if c==27 or c==ord('q'):
break
cap.release()
out.release()
cv2.destroyAllWindows()
else:
if not os.path.exists(save_root_path):
os.mkdir(save_root_path)
for root, dir, files in os.walk(img_dir):
for file in files:
image_path = os.path.join(root, file)
save_path = os.path.join(save_root_path, file)
if "mp4" in file or 'avi' in file:
cap = cv2.VideoCapture(image_path)
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
fps = cap.get(cv2.CAP_PROP_FPS)
size = (frame_width, frame_height)
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(save_path,fourcc,fps,size)
while cap.isOpened():
ok, frame = cap.read()
if not ok:
break
frame = model.detect(frame)
out.write(frame)
cap.release()
out.release()
print(" finish: ", file)
elif 'jpg' or 'png' in file:
srcimg = cv2.imread(image_path)
srcimg = model.detect(srcimg)
print(" finish: ", file)
cv2.imwrite(save_path, srcimg)
if __name__ == "__main__":
onnx_path = r'best20231031_4.onnx'
input_path = r'input/'
save_path = r'./output'
#video=True代表开启摄像头
mult_test(onnx_path, input_path, save_path, video=False)