-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathradar_processing.py
228 lines (186 loc) · 9.22 KB
/
radar_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import cv2
import math
import numpy as np
import os
from processing.utils import filter_and_project_pcd_to_image, motion_compensate_pcd, create_image_from_point_cloud, enlarge_points_in_image
def extract_timestamps(front_radar_polar_image, i):
"""
Extract the timestamps from the radar data.
Args:
front_radar_polar_image (np.array): The raw azimuth range radar data.
i (int): The index.
Returns:
np.array: The timestamps in microseconds.
"""
timestamps_s = front_radar_polar_image[:4, i].copy().transpose().reshape(-1).view(np.uint32).astype(np.float64)
timestamps_ns = front_radar_polar_image[4:8, i].copy().transpose().reshape(-1).view(np.uint32).astype(
np.float64)
timestamps_us = (timestamps_s * 1000000.) + (timestamps_ns / 1e9 * 1000000.) # microseconds
return timestamps_us
def calculate_coordinates(distance, azimuths, valid_mask, i):
"""
Calculate the x and y coordinates.
Args:
distance (np.array): The distance data.
azimuths (np.array): The azimuth data.
valid_mask (np.array): The valid mask.
i (int): The index.
Returns:
np.array: The x and y coordinates.
"""
x = distance[valid_mask] * math.cos(azimuths[i])
y = distance[valid_mask] * math.sin(azimuths[i])
return x, y
def append_to_point_cloud(xyzi, point_count, num_points, x, y, fft_data, valid_mask, timestamps_us):
"""
Append the radar data to the point cloud.
Args:
xyzi (np.array): The point cloud data.
point_count (int): The current point count.
num_points (int): The number of points.
x (np.array): The x coordinates.
y (np.array): The y coordinates.
fft_data (np.array): The fft data.
valid_mask (np.array): The valid mask.
timestamps_us (np.array): The timestamps in microseconds.
Returns:
np.array: The updated point cloud.
"""
xyzi[point_count:point_count + num_points, 0] = x
xyzi[point_count:point_count + num_points, 1] = y
xyzi[point_count:point_count + num_points, 3] = fft_data[valid_mask]
xyzi[point_count:point_count + num_points, 4] = np.repeat(timestamps_us * 1.e-6, num_points)
return xyzi
def radar_to_point_cloud(front_radar_polar_image, intensity_threshold=0, image_fov_only=False, step=400,
ground_level=1.62, max_distance=330.0768, resolution=0.0438):
"""
Convert radar data to a point cloud.
This funktion sets the height channel to the ground level (at 1.62m).
Args:
front_radar_polar_image (np.array): The raw azimuth range radar data.
intensity_threshold (int): The intensity threshold for the radar data (default is 0)
step (int): The step size for the radar data (default is 400)
image_fov_only (bool): Whether to load points in the image Field of View only (default is False)
ground_level (float): The ground level (default is 1.62) used for the height channel of the point cloud.
max_distance (float): The maximum distance of points to keep (default is 330.0768)
resolution (float): The resolution of the radar (default is 0.0438)
Returns:
np.array: The radar data as a point cloud.
"""
assert front_radar_polar_image.shape[1] == step
if image_fov_only:
min_azimuths = 145
max_azimuths = 255
else:
min_azimuths = 0
max_azimuths = step - 1
azimuths = -np.radians(np.linspace(-180 + (360 / step), 180, step).astype(np.float32))
range_in_bins = int(max_distance//resolution)
front_radar_polar_image = front_radar_polar_image[:range_in_bins + 11]
end_distance = np.round(range_in_bins * resolution, decimals=4)
distance = np.linspace(resolution, end_distance, range_in_bins)
front_radar_polar_image = front_radar_polar_image[:, min_azimuths:max_azimuths]
azimuths = azimuths[min_azimuths:max_azimuths]
timestamps_s = front_radar_polar_image[:4].transpose().reshape(-1).view(np.uint32).astype(np.float64)
timestamps_ns = front_radar_polar_image[4:8].transpose().reshape(-1).view(np.uint32).astype(np.float64)
timestamps_us = (timestamps_s * 1000000.) + (timestamps_ns / 1e9 * 1000000.) # microseconds
valid_flag = front_radar_polar_image[10].view(np.uint8).astype(bool)
fft_data = front_radar_polar_image[11:]
valid_mask = fft_data >= intensity_threshold
valid_mask = valid_mask * valid_flag
x = np.outer(distance, np.cos(azimuths))
y = np.outer(distance, np.sin(azimuths))
num_points = np.sum(valid_mask, axis=0)
xyzi = np.zeros((num_points.sum(), 5), dtype=np.float64)
# process the array column-wise (azimuth by azimuth)
xyzi[:, 0] = x.T[valid_mask.T]
xyzi[:, 1] = y.T[valid_mask.T]
xyzi[:, 2] -= ground_level # Set height to ground level
xyzi[:, 3] = fft_data.T[valid_mask.T]
xyzi[:, 4] = np.repeat(timestamps_us * 1.e-6, num_points)
return xyzi
def load_raw_radar_data(radar_path):
"""
Load radar data from the specified path.
Args:
radar_path (str): The path to the radar data.
Returns:
np.array: The raw azimuth range radar data.
"""
radar_range_azimuth = cv2.imread(radar_path)[:, :, 0]
return radar_range_azimuth
def load_radar_as_pcd(radar_path, intensity_threshold=0, image_fov_only=False):
"""
Load radar data as a point cloud.
Args:
radar_path (str): The path to the radar data.
intensity_threshold (int): The intensity threshold for the radar data.
image_fov_only (bool): Whether to load points in the image Field of View only.
Returns:
np.array: The radar data as a point cloud.
"""
radar_range_azimuth = load_raw_radar_data(radar_path)
pcd_points = radar_to_point_cloud(radar_range_azimuth,
intensity_threshold=intensity_threshold,
image_fov_only=image_fov_only)
return pcd_points
def load_points_in_image_radar(radar_path,
calib_data,
scene_meta_dict=None,
motion_compensation=False,
muses_root=None,
target_shape=(1920, 1080),
intensity_threshold=0):
"""
This method loads radar data and projects it to an RGB frame.
Args:
radar_path (str): The path to the radar data.
calib_data (dict): The calibration data.
scene_meta_dict (dict): The metadata dictionary for the current data entry.
motion_compensation (bool): Enable motion compensation.
muses_root (str): The root directory where the muses dataset is located.
target_shape (tuple): The target output shape in pixels.
intensity_threshold (int): The intensity threshold for the radar data.
Returns:
None
"""
assert os.path.exists(radar_path), f"Radar data {radar_path} does not exist"
pcd_points = load_radar_as_pcd(radar_path, intensity_threshold, image_fov_only=True)
if motion_compensation:
assert scene_meta_dict is not None, "Meta_dict must be provided for motion compensation"
assert muses_root is not None, "MUSES root directory must be provided for motion compensation"
radar2gnss = np.array(calib_data["extrinsics"]["radar2gnss"])
pcd_points = motion_compensate_pcd(muses_root, scene_meta_dict, pcd_points, radar2gnss, ts_channel_num=4)
K_rgb = calib_data["intrinsics"]["rgb"]["K"]
radar2rgb = calib_data["extrinsics"]["radar2rgb"]
uv_img_cords_filtered, pcd_points_filtered = filter_and_project_pcd_to_image(pcd_points, radar2rgb, K_rgb,
target_shape, max_distance=150)
return uv_img_cords_filtered, pcd_points_filtered
def load_radar_projection(radar_path, calib_data, scene_meta_dict=None, motion_compensation=False,
muses_root=None, target_shape=(1920, 1080), enlarge_radar_points=False):
"""
This method loads the radar data and projects it to an RGB frame.
Optionally, the radar points can be dilated via the enlarge_radar_points flag.
Args:
radar_path (str): The path to the radar data.
calib_data (dict): The calibration data.
scene_meta_dict (dict): The metadata dictionary for the current data entry.
motion_compensation (bool): Enable motion compensation.
muses_root (str): The root directory where the muses dataset is located.
target_shape (tuple): The target output shape in pixels.
enlarge_radar_points (bool): Whether to enlarge the radar points in the image.
Returns:
np.array: The image with the projected radar points.
"""
uv_img_cords_filtered, pcd_points_filtered = load_points_in_image_radar(
radar_path,
calib_data,
scene_meta_dict=scene_meta_dict,
motion_compensation=motion_compensation,
muses_root=muses_root,
target_shape=target_shape)
image = create_image_from_point_cloud(uv_img_cords_filtered, pcd_points_filtered, target_shape,
height_channel=False)
if enlarge_radar_points:
image = enlarge_points_in_image(image, kernel_shape=(9, 9))
return image