Visual Servoing Platform version 3.7.0
Loading...
Searching...
No Matches
XFeatPoseEstimator.py
35from dataclasses import dataclass
36from pathlib import Path
37from typing import List, Optional, Tuple
38import numpy as np
39import time
40
41import torch
42
43from visp.core import ImageRGBa, CameraParameters, Point, ImageRGBf, PoseVector
44from visp.core import Matrix, ColVector, Robust
45
46from visp.core import PixelMeterConversion, HomogeneousMatrix, MeterPixelConversion
47from visp.core import Display, MeterPixelConversion, Color
48from visp.vision import Pose
49from visp.rbt import RBFeatureTrackerInput
50from visp.rbt import RBVisualOdometryUtils, LevenbergMarquardtParameters
51
52from visp.python.vision.xfeat import XFeatBackend, XFeatRepresentation, XFeatStarRepresentation
53from visp.python.display_utils import get_display
54
56 """Pose estimation based on XFeat keypoints and a PnP approach.
57 This strategy first matches the current frame with viewpoints learned beforehand.
58
59 Viewpoints that have a large enough number of matched keypoints are retained for the second step of the method.
60 In the second step, The 3D points associated to a view have their reprojection error with the currently observed keypoints minimized.
61 Minimization can be done in 2D or 3D space.
62 """
64 def __init__(self, optim_params: LevenbergMarquardtParameters):
65 self.pose = None
66 self.view_id = None
67 self.points_3d = None
68 self.descriptors = None
70 self.optim_params = optim_params
71
72
73 def save(self, path: Path):
74 if self.view_id is None:
75 raise RuntimeError("Id is none and trying to save viewpoint data")
76
77 save_folder = path / str(self.view_id)
78 save_folder.mkdir(exist_ok=True)
79 np.save(save_folder / 'points.npy', self.points_3d)
80 print(f'Saving pose {self.pose}')
81 np.save(save_folder / 'pose.npy', self.pose.numpy())
82 for k in self.representation.keys():
83 v = self.representation[k]
84 if isinstance(v, torch.Tensor):
85 v = v.cpu().numpy()
86 np.save(save_folder / f'{k}.npy', v)
87
88 def load(self, path: Path, view_id: int) -> bool:
89 self.view_id = view_id
90 load_folder = path / str(self.view_id)
91 if not load_folder.exists():
92 return False
93
94 self.points_3d = np.load(load_folder / 'points.npy')
95 self.pose = HomogeneousMatrix(np.load(load_folder / 'pose.npy'))
96 # self.descriptors = torch.tensor(np.load(load_folder / 'descriptors.npy')).cuda()
97 self.representation = {}
98 for k in ['keypoints', 'descriptors', 'scales']:
99 self.representation[k] = torch.tensor(np.load(load_folder / f'{k}.npy')).cuda()
100 self.representation = XFeatViewPointPoseEstimator.ViewPointMatcher.reformat_repr_if_needed(self.representation)
101 return True
102
103
104 @staticmethod
106 new_repr = {}
107 for k, shape in [('keypoints', 3), ('descriptors', 3), ('scales', 2)]:
108 v = r[k]
109 new_v = None
110 if not isinstance(v, torch.Tensor):
111 new_v = torch.tensor(v).cuda()
112 else:
113 new_v = v
114 if len(v.shape) != shape:
115 new_v = new_v.reshape(-1, *new_v.size())
116 new_repr[k] = new_v
117 return new_repr
118
119 def match_and_score(self, cam, backend: XFeatBackend, current_repr):
120 self.idx_curr_matches, self.idx_stored_matches = backend.match(current_repr['descriptors'], self.representation['descriptors'])
121 kps_curr = current_repr['keypoints'].cpu().numpy()
122 if len(kps_curr.shape) == 3:
123 kps_curr = kps_curr[0]
124 self.idx_stored_matches = self.idx_stored_matches.cpu().numpy()
125 # self.idx_curr_matches = self.idx_curr_matches.cpu().numpy()
126 # self.idx_stored_matches = self.idx_stored_matches.cpu().numpy()
127 self.matched_p3d = self.points_3d[self.idx_stored_matches].copy()
128 self.matched_obs = kps_curr[self.idx_curr_matches.cpu().numpy()]
129 self.matched_obs_x, self.matched_obs_y = PixelMeterConversion.convertPoints(cam, self.matched_obs[:, 0], self.matched_obs[:, 1])
130
131 return len(self.idx_stored_matches) / len(self.points_3d)
132
133 def optimize_3d(self, depth, threshold):
134 cMo = HomogeneousMatrix(self.pose)
135 oMc = self.pose.inverse()
136 Zs = depth.numpy()[(self.matched_obs[:, 1].astype(np.int32)), self.matched_obs[:, 0].astype(np.int32)][..., None]
137
138 oX = (oMc.numpy() @ self.matched_p3d.T).T
139 oX_3 = np.ascontiguousarray(oX[..., :3])
140 observations = np.concatenate((self.matched_obs_x[..., None] * Zs, self.matched_obs_y[..., None] * Zs, Zs), axis=-1)
141 observations = Matrix.view(observations)
142
143 RBVisualOdometryUtils.levenbergMarquardtKeypoints3D(Matrix.view(oX_3), observations, self.optim_params, cMo)
144
145 cX = (cMo.numpy() @ oX.T).T
146
147 robust = Robust()
148 distances = np.linalg.norm(observations.numpy() - cX[..., :3], axis=-1)
149
150 weights = ColVector(len(distances))
151 robust.MEstimator(Robust.TUKEY, ColVector.view(distances), weights)
152 res = distances * weights.numpy()
153
154 self.error = np.mean(res)
155 return cMo, self.error <= threshold
156
157 def optimize_2d(self, cam, threshold):
158 cMo = HomogeneousMatrix(self.pose)
159 oMc = self.pose.inverse()
160 oX = (oMc.numpy() @ self.matched_p3d.T).T
161 oX /= oX[:, 3, None]
162 oX_3 = np.ascontiguousarray(oX[..., :3])
163
164 observations = np.concatenate((self.matched_obs_x[..., None], self.matched_obs_y[..., None]), axis=-1)
165 RBVisualOdometryUtils.levenbergMarquardtKeypoints2D(Matrix.view(oX_3), Matrix.view(observations), self.optim_params, cMo)
166
167 cX = (cMo.numpy() @ oX.T).T
168 cX /= cX[:, 3, None]
169 new_xy = cX[:, :2] / cX[:, 2, None]
170
171 print(f'Used {len(oX)} points for optimization')
172
173
174 new_us, new_vs = MeterPixelConversion.convertPoints(cam, new_xy[:, 0], new_xy[:, 1])
175
176 new_pix = np.concatenate((new_us[..., None], new_vs[..., None]), axis=-1)
177 distances = np.linalg.norm(self.matched_obs - new_pix, axis=-1)
178
179 # weights = ColVector(len(distances))
180 # robust.MEstimator(Robust.TUKEY, ColVector.view(distances), weights)
181
182 self.error = np.median(distances)
183 print(f'Final error for view {self.view_id}: {self.error}')
184 return cMo, self.error <= threshold
185
186 def optimize(self, use_3d, threshold, cam: CameraParameters = None, depth=None) -> Tuple[HomogeneousMatrix, bool]:
187 if use_3d:
188 assert depth is not None
189 return self.optimize_3d(depth, threshold)
190 else:
191 assert cam is not None
192 return self.optimize_2d(cam, threshold)
193
194 def display_record(self, image: ImageRGBa, cTo, cam: CameraParameters):
195 IG = ImageRGBa(image)
196 d = get_display()
197 d.init(IG)
198
199 Display.display(IG)
200
201 cTr = cTo * self.pose.inverse()
202 points_cam = (cTr.numpy() @ self.points_3d.T).T
203 xs,ys = points_cam[:, 0] / points_cam[:, 2], points_cam[:, 1] / points_cam[:, 2]
204 us,vs = MeterPixelConversion.convertPoints(cam, xs, ys)
205 for i in range(len(us)):
206 Display.displayPoint(IG, int(np.rint(vs[i])), int(np.rint(us[i])), Color.green, 2)
207
208
209 Display.displayFrame(IG, self.pose, cam, 0.05, Color.yellow)
210 Display.displayFrame(IG, cTo, cam, 0.05)
211
212 Display.flush(IG)
213 Display.getKeyboardEvent(IG, True)
214
215 def display_result(self, image, cMo, cam):
216 oMc = self.pose.inverse()
217 oX = (oMc.numpy() @ self.matched_p3d.T).T
218 points = []
219 for i in range(len(self.matched_p3d)):
220 p = Point()
221
222 p.setWorldCoordinates(oX[i, 0], oX[i, 1], oX[i, 2])
223 p.set_x(self.matched_obs_x[i])
224 p.set_y(self.matched_obs_y[i])
225 points.append(p)
226
227 IG = ImageRGBa(image)
228 d = get_display()
229 d.init(IG)
230
231 Display.display(IG)
232
233 for i,p in enumerate(points):
234 # if i not in self.ransac_inliers_indices:
235 # continue
236 p.changeFrame(self.pose)
237 p.project()
238 u1, v1 = MeterPixelConversion.convertPoint(cam, p.get_x(), p.get_y())
239 Display.displayPoint(IG, int(np.rint(v1)), int(np.rint(u1)), Color.blue, 2)
240 Display.displayPoint(IG, int(np.rint(self.matched_obs[i, 1])), int(np.rint(self.matched_obs[i, 0])), Color.green, 2)
241
242 p.changeFrame(cMo)
243 p.project()
244 u2, v2 = MeterPixelConversion.convertPoint(cam, p.get_x(), p.get_y())
245 Display.displayPoint(IG, int(np.rint(v2)), int(np.rint(u2)), Color.red, 2)
246 Display.displayLine(IG, int(np.rint(v2)), int(np.rint(u2)), int(np.rint(self.matched_obs[i, 1])), int(np.rint(self.matched_obs[i, 0])), Color.red, 1, segment=True)
247
248 Display.displayFrame(IG, self.pose, cam, 0.05, Color.yellow)
249 Display.displayFrame(IG, cMo, cam, 0.05)
250
251 Display.flush(IG)
252 Display.getKeyboardEvent(IG, True)
253
254
255 def compute_residual(self, cMo, cam):
256 return self.pose_estimator.computeResidual(cMo) / len(self.matched_obs)
257
258
259 def __init__(self, save_folder: Path, use_3d: bool, use_dense: bool, top_k: int, min_similarity: float, min_match_ratio: float, geometric_threshold: float):
260 """Initialize the Pose estimator.
261
262
263 Args:
264 backend (XFeatBackend): _description_
265 save_folder (Path): _description_
266 top_k (int): _description_
267 min_similarity (float): _description_
268 min_match_ratio (float): _description_
269 point_distance_threshold (float): _description_
270 """
271 self.xfeat_backend = XFeatBackend(top_k, min_similarity)
272 self.xfeat_backend.use_dense = use_dense
273 self.save_folder = save_folder
274 self.use_3d = use_3d
276 self.min_match_ratio = min_match_ratio
277 self.geometric_threshold = geometric_threshold
278
279 self.optim_params = LevenbergMarquardtParameters()
280 self.optim_params.gain = 0.5
281 self.optim_params.maxNumIters = 1000
282 self.optim_params.muInit = 0.0
283 self.optim_params.muIterFactor = 0.0
284 self.optim_params.minImprovementFactor = 0.0
285
286
287 if self.save_folder.exists():
288 print('Loading viewpoints matching db...')
289 view_id = 0
290 while True:
292 loaded = v.load(self.save_folder, view_id)
293 if not loaded:
294 break
295 self.views.append(v)
296 view_id += 1
297 print(f'Loading database finished: found {view_id} viewpoints!')
298
299
300
301 def record(self, frame: RBFeatureTrackerInput, cTo: HomogeneousMatrix):
302 """Record a new viewpoint
303
304 Args:
305 frame (RBFeatureTrackerInput): _description_
306 cTo (HomogeneousMatrix): _description_
307 """
308 # bb = frame.renders.boundingBox
309 # from visp.core import ImageTools
310 # bbcrop = ImageRGBa()
311 # ImageTools.crop(frame.IRGB, bb, bbcrop)
312 representation = self.xfeat_backend.detect(frame.IRGB)
313 kps, descriptors = representation.keypoints, representation.descriptors
314 # kps[:, 0] += bb.getLeft()
315 # kps[:, 1] += bb.getTop()
316
317 if isinstance(representation, XFeatStarRepresentation):
318 scales = representation.scales
319 else:
320 scales = np.ones_like(kps[..., 0])
321
322 # kps, descriptors = self.xfeat_backend.get_keypoints(frame.IRGB)
323 us, vs = kps[:, 0], kps[:, 1] # Keypoints in current image with cTo intrinsics
324 xs,ys = PixelMeterConversion.convertPoints(frame.cam, us, vs)
325
326
327 Z = frame.renders.depth.numpy()[np.rint(vs).astype(np.int32), np.rint(us).astype(np.int32)]
328 depth_ok = Z > 0
329 xs, ys, Z, kps, descriptors, scales = [array[depth_ok] for array in [xs, ys, Z, kps, descriptors, scales]]
330
331 points_3d = np.empty((len(xs), 4))
332 points_3d[:, 0] = xs * Z
333 points_3d[:, 1] = ys * Z
334 points_3d[:, 2] = Z
335 points_3d[:, 3] = 1.0
336
337
338 assert len(points_3d) == len(descriptors) == len(kps) == len(scales)
340 viewpoint.view_id = len(self.views)
341 viewpoint.pose = HomogeneousMatrix(frame.renders.cMo)
342 print(f'Recorded view {viewpoint.view_id} with {len(descriptors)} keypoints and pose {PoseVector(viewpoint.pose).t()}')
343 viewpoint.points_3d = points_3d
344 viewpoint.descriptors = descriptors
345 viewpoint.representation = {
346 'keypoints': kps,
347 'descriptors': descriptors,
348 'scales': scales
349 }
350
351
352 self.views.append(viewpoint)
353
354 def estimate_pose(self, image: ImageRGBa, depth, cam: CameraParameters) -> Tuple[bool, HomogeneousMatrix]:
355
356 # kps, descriptors = self.xfeat_backend.get_keypoints(image)
357 representation = self.xfeat_backend.detect(image)
358 kps, descriptors = representation.keypoints, representation.descriptors
359 if isinstance(representation, XFeatStarRepresentation):
360 scales = representation.scales
361 else:
362 scales = np.ones_like(kps[..., 0])
363 current_representation = {
364 'keypoints': kps,
365 'descriptors': descriptors,
366 'scales': scales
367 }
368
369 current_representation = XFeatViewPointPoseEstimator.ViewPointMatcher.reformat_repr_if_needed(current_representation)
370
371 scores = []
372 for view in self.views:
373 scores.append(view.match_and_score(cam, self.xfeat_backend, current_representation))
374 views_enough_matches = list(filter(lambda x: x[1] > self.min_match_ratio, zip(self.views, scores)))
375 # print(f'Found {len(views_enough_matches)} for pose optimization')
376
377 if len(views_enough_matches) == 0:
378 return False, HomogeneousMatrix()
379
380 poses = []
381 errors = []
382 final_views = []
383 for view, score in views_enough_matches:
384 cMo, pose_ok = view.optimize(self.use_3d, self.geometric_threshold, cam, depth)
385 if not pose_ok:
386 continue
387
388 final_views.append(view)
389 poses.append(cMo)
390 errors.append(view.error)
391 # view.display_result(image, cMo, cam)
392
393 if len(poses) == 0:
394 return False, HomogeneousMatrix()
395
396
397 # print(f'Matching: {len(views_enough_matches)}, Geometric: {len(poses)}')
398
399 pose_ok = len(errors) > 0
400 best_view_index = np.argmin(errors)
401 cMo = poses[best_view_index]
402
403 return pose_ok, cMo
404
405 def save(self):
406 self.save_folder.mkdir(exist_ok=True)
407 for v in self.views:
408 v.save(self.save_folder)
Tuple[HomogeneousMatrix, bool] optimize(self, use_3d, threshold, CameraParameters cam=None, depth=None)
Tuple[bool, HomogeneousMatrix] estimate_pose(self, ImageRGBa image, depth, CameraParameters cam)
record(self, RBFeatureTrackerInput frame, HomogeneousMatrix cTo)
__init__(self, Path save_folder, bool use_3d, bool use_dense, int top_k, float min_similarity, float min_match_ratio, float geometric_threshold)