-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
169 lines (129 loc) · 4.72 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import os
import time
from multiprocessing import Pool
import cv2
import numpy as np
from PIL import Image
from functools import wraps
from albumentations import Compose, LongestMaxSize, PadIfNeeded
from albumentations.pytorch import ToTensorV2
import config
def timeit(func):
@wraps(func)
def wrapper(*args, **kwargs):
t = time.perf_counter()
result = func(*args, **kwargs)
print(f"Function {func.__name__} completed in {time.perf_counter() - t:.3f} seconds.")
return result
return wrapper
def get_frames(video_path, start=config.START, stop=config.STOP, step=config.STEP, output_type='CV'):
"""
samples given amount of frames with given frequency either in opencv of PIL Image format
:param video_path: path to video
:param start: start of frame sampling
:param stop: end of frame sampling if None checks all frames, default : None
:param step: sampling frequency, default 1
:param output_type: 'CV' or 'PIL' type of output image
:return: list of images
"""
capture = cv2.VideoCapture(video_path)
frames = []
total_frames_in_video = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
if stop is None:
stop = total_frames_in_video
stop = min(total_frames_in_video, stop)
for i in range(stop):
ret = capture.grab()
if i < start:
continue
if i % step == 0:
ret, frame = capture.retrieve()
if output_type == 'PIL':
frames.append(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))
elif output_type == 'CV':
frames.append(frame)
return frames
def get_frames_with_cv(video_path, convert_bgr=False):
cap = cv2.VideoCapture(video_path)
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
if convert_bgr:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append(frame)
return frames
def save_video(frames, input_path, fourcc=cv2.VideoWriter_fourcc(*'MP4V'), fps=30, output_size=None,
output_path='data/faces'):
if not os.path.exists(output_path):
os.makedirs(output_path)
video_name = os.path.basename(input_path)
output_path = os.path.join(output_path, video_name)
if output_size is not None:
w, h = output_size
else:
h, w, c = frames[0].shape
video_writer = cv2.VideoWriter(output_path, fourcc, fps, (w, h))
print(f"Total detected frames: {len(frames)}")
for i, frame in enumerate(frames):
video_writer.write(cv2.resize(cv2.cvtColor(np.array(frame), cv2.COLOR_RGB2BGR), (w, h)))
video_writer.release()
print(f"{video_name} saved to {output_path}")
def display(images, names=None):
if names is None:
names = [f'image-{i}' for i in range(len(images))]
for image, name in zip(images, names):
cv2.imshow(name, image)
q = cv2.waitKey(0)
if q == ord('q'):
exit(-1)
cv2.destroyAllWindows()
def extract_box(image, coordinates, output_path=None):
"""
Extracts given coordinates from an image and saves
:param image: input image
:param coordinates: coordinates of box.
:param output_path: (Optional) path to save image
:return: extracted image, if output_path is not provided
"""
crop = image.crop(coordinates)
if output_path is not None:
crop.save(output_path)
return crop
@timeit
def run_in_parallel(func, args) -> list:
"""
Runs given function for each of the items in the argument list in parallel.
Gathers all results and returns
:param func: Function to run in parallel. Should return a value
:param args: List of items to pass function
:return: List, Results of all runs
"""
results = []
with Pool(processes=11) as pool:
for item in args:
if type(item) == list:
result = pool.apply_async(func, item)
else:
result = pool.apply_async(func, (item,))
results.append(result)
return [result.get() for result in results]
def transform(size):
return Compose([
LongestMaxSize(size),
PadIfNeeded(min_height=size, min_width=size, border_mode=cv2.BORDER_CONSTANT),
ToTensorV2()
])
def get_unified_img(p, size=224):
transforms = transform(size)
img = cv2.imread(p)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
return transforms(image=img)['image']
def compute_eer(fpr, tpr, thresholds):
""" Returns equal error rate (EER) and the corresponding threshold. """
fnr = 1 - tpr
abs_diffs = np.abs(fpr - fnr)
min_index = np.argmin(abs_diffs)
eer = np.mean((fpr[min_index], fnr[min_index]))
return round(eer, 4), round(thresholds[min_index], 4)