Visual Servoing Platform version 3.7.0
Loading...
Searching...
No Matches
yolo-centering-task-afma6.py
2
3import visp
4from visp.core import ColVector, Color, PixelMeterConversion, Display, Matrix
5from visp.core import CameraParameters, HomogeneousMatrix , PoseVector, ImagePoint
6from visp.core import ImageRGBa, ImageUInt16
7from visp.core import UnscentedKalman, UKSigmaDrawerMerwe
8
9from visp.visual_features import BasicFeature, FeaturePoint
10from visp.vs import Servo
11from visp.gui import ColorBlindFriendlyPalette
12from visp.python.display_utils import get_display
13from visp.robot import RobotAfma6, Robot
14
15from typing import List, Optional, Tuple
16try:
17 from ultralytics import YOLO
18except ImportError:
19 print('This example requires YoloV8: pip install ultralytics')
20
21import time
22import numpy as np
23import matplotlib.pyplot as plt
24from matplotlib import animation
25import argparse
26import pyrealsense2 as rs
27from functools import partial
28plt.rcParams['text.usetex'] = False
29
30
31
32def fx(x: ColVector, dt: float) -> ColVector:
33 """
34 @brief Process function that projects in time the internal state of the UKF.
35
36 @param x The internal state of the UKF.
37 @param dt The sampling time: how far in the future are we projecting x.
38
39 @return ColVector The updated internal state, projected in time, also known as the prior.
40 """
41 return ColVector([
42 x[0] + dt * x[1] + dt ** 2 * x[2],
43 x[1] + dt * x[2],
44 x[2],
45 x[3] + dt * x[4] + dt ** 2 * x[5],
46 x[4] + dt * x[5],
47 x[5]
48 ])
49
50
51def hx(x: ColVector) -> ColVector:
52 """
53 @brief Measurement function that expresses the internal state of the UKF in the measurement space.
54
55 @param x The internal state of the UKF.
56
57 @return ColVector The internal state, expressed in the measurement space.
58 """
59 return ColVector([
60 x[0],
61 x[3]
62 ])
63
64def add_state_vectors(a, b) -> ColVector:
65 """
66 @brief Method that permits to add two state vectors.
67
68 @param a The first state vector to which another state vector must be added.
69 @param b The other state vector that must be added to a.
70
71 @return ColVector The sum a + b.
72 """
73 return a + b
74
75def residual_state_vectors(a, b) -> ColVector:
76 """
77 @brief Method that permits to subtract a state vector to another.
78
79 @param a The first state vector to which another state vector must be subtracted.
80 @param b The other state vector that must be subtracted to a.
81
82 @return ColVector The subtraction a - b.
83 """
84 return a - b
85
86def generate_Q_matrix(dt: float, sigma: float) -> Matrix:
87 """
88 @brief Method that generates the process covariance matrix for a process for which the
89 state vector can be written as (x, dx/dt)^T
90
91 @param dt The sampling period.
92
93 @return Matrix The corresponding process covariance matrix.
94 """
95 return Matrix(np.asarray([
96 [dt**4/4, dt**3/3, dt**2/2, 0, 0, 0],
97 [dt**3/3, dt**2/2, dt, 0, 0, 0],
98 [dt**2/2, dt, 1, 0, 0, 0],
99 [0, 0, 0, dt**4/4, dt**3/3, dt**2/2],
100 [0, 0, 0, dt**3/3, dt**2/2, dt],
101 [0, 0, 0, dt**2/2, dt, 1],
102 ]) * sigma)
103
104
105
106def read_data(I_rgba: ImageRGBa, I_depth: ImageUInt16, pipe: rs.pipeline, align: rs.align) -> None:
107 frames = pipe.wait_for_frames()
108 aligned_frames = align.process(frames)
109 I_np = np.asanyarray(aligned_frames.get_color_frame().as_frame().get_data())
110 I_np = np.concatenate((I_np, np.ones_like(I_np[..., 0:1], dtype=np.uint8) * 255), axis=-1)
111 rgba_numpy_view = I_rgba.numpy() # writable numpy view of rgba image
112 np.copyto(rgba_numpy_view, I_np)
113 depth_numpy_view = I_depth.numpy()
114 depth_np = np.asanyarray(aligned_frames.get_depth_frame().as_frame().get_data())
115 np.copyto(depth_numpy_view, depth_np)
116
117def cam_from_rs_profile(profile) -> Tuple[CameraParameters, int, int]:
118 intr = profile.as_video_stream_profile().get_intrinsics() # Downcast to video_stream_profile and fetch intrinsics
119 return CameraParameters(intr.fx, intr.fy, intr.ppx, intr.ppy), intr.height, intr.width
120
121class VSPlot(object):
122 def __init__(self):
123 self.v = []
124 self.error = []
125 self.r = []
126 self.I = []
127
128 def on_iter(self, Idisp: ImageRGBa, v: ColVector, error: ColVector, cTw: HomogeneousMatrix) -> None:
129 self.I.append(Idisp.numpy().copy())
130 self.v.append(v.numpy()[3:5].flatten())
131 self.error.append(error.numpy().flatten())
132 self.r.append(PoseVector(cTw).numpy()[3:5].flatten())
133
134 def generate_anim(self):
135 self.error = np.asarray(self.error)[1:]
136 self.v = np.asarray(self.v)[1:]
137 self.r = np.asarray(self.r)[1:]
138
139
140 fig, axs = plt.subplots(2, 2, figsize=(15, 15 * (self.I[0].shape[0] / self.I[0].shape[1])))
141 axs = [axs[0][0], axs[0][1], axs[1][0],axs[1][1]]
142 titles = ['I', 'Feature error', 'Velocity', 'Pose']
143 legends = [
144 None,
145 [r"$x$", r"$y$"],
146 [r"$\mathbf{\upsilon}_x$", r"$\mathbf{\upsilon}_y$"],
147 [r"$\theta\mathbf{u}_x$", r"$\theta\mathbf{u}_y$"],
148 ]
149 data = [None, self.error, self.v, self.r]
150 artists = []
151 for i in range(len(axs)):
152 axs[i].set_title(titles[i])
153 if data[i] is not None:
154 axs[i].set_xlabel('Iteration')
155 axs[i].grid()
156 axs[i].set_xlim(0, len(self.v))
157 min_val, max_val = np.min(data[i]), np.max(data[i])
158 margin = (max_val - min_val) * 0.05
159 axs[i].set_ylim(min_val - margin, max_val + margin)
160 artists.append(axs[i].plot(data[i]))
161 axs[i].legend(legends[i])
162 else:
163 artists.append(axs[i].imshow(self.I[0]))
164 axs[i].set_axis_off()
165 plt.tight_layout()
166 def animate(i):
167 print(f'Processing frame {i+1}/{len(self.I)}')
168 xs = range(i)
169 artists[0].set_data(self.I[i])
170 for j in range(2):
171 artists[1][j].set_data(xs, self.error[:i, j])
172 artists[2][j].set_data(xs, self.v[:i, j])
173 artists[3][j].set_data(xs, self.r[:i, j])
174 return artists
175
176 anim = animation.FuncAnimation(fig, animate, frames=len(self.v), blit=False, repeat=False)
177 writervideo = animation.FFMpegWriter(fps=30)
178 anim.save('exp.mp4', writer=writervideo)
179 plt.savefig('exp.pdf')
180 plt.close()
181
182if __name__ == '__main__':
183 parser = argparse.ArgumentParser('Centering task using a YOLO network')
184 parser.add_argument('--class-id', type=int, help='COCO class id of the object to track (e.g, 2 for a car)')
185 args = parser.parse_args()
186
187 detection_model = YOLO('yolov8n.pt')
188
189 plotter = VSPlot()
190
191 # Initialization
192 print('Initializing Realsense camera...')
193 # Realsense2
194 pipe = rs.pipeline()
195 config = rs.config()
196 fps = 30
197 config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, fps)
198 config.enable_stream(rs.stream.color, 640, 480, rs.format.rgb8, fps)
199 cfg = pipe.start(config)
200
201 # Initialize data
202 cam, h, w = cam_from_rs_profile(cfg.get_stream(rs.stream.color))
203 depth_scale = cfg.get_device().first_depth_sensor().get_depth_scale()
204 print(f'Depth scale is {depth_scale}')
205
206 I = ImageRGBa(h, w)
207 I_depth = ImageUInt16(h, w)
208 Idisp = ImageRGBa(h, w)
209
210 # Align depth stream with color stream
211 align = rs.align(rs.stream.color)
212 get_images = partial(read_data, pipe=pipe, align=align)
213
214 print('Initializing Afma6...')
215 robot = RobotAfma6()
216 robot.setPositioningVelocity(5.0)
217 print(robot.getPosition(Robot.ControlFrameType.REFERENCE_FRAME))
218
219 print('Moving Afma6 to starting pose...')
220 r = PoseVector(0.06706274856, 0.3844766362, -0.04551332622 , 0.3111005431, 0.3031078532, 0.01708581392)
221 cTw = HomogeneousMatrix(r)
222
223 robot.setPosition(Robot.ControlFrameType.REFERENCE_FRAME, r)
224
225
226 print('Warming up camera...')
227 for _ in range(100):
228 get_images(I, I_depth)
229
230 # Define kalman filter
231
232 drawer = UKSigmaDrawerMerwe(6, alpha=0.0001, beta=2, kappa=-3, resFunc=residual_state_vectors, addFunc=add_state_vectors)
233 pixel_noise = 1
234 noise_x, noise_y = [pixel_noise / f for f in [cam.get_px(), cam.get_py()]]
235 noise_vel = 1e-8
236 noise_accel = 1e-12
237 measure_covariance = Matrix([
238 [noise_x ** 2, 0.0],
239 [0.0, noise_y ** 2]
240 ])
241 process_covariance = Matrix([
242 [noise_x ** 2, 0, 0, 0, 0, 0],
243 [0, noise_vel, 0, 0, 0, 0],
244 [0, 0, noise_accel, 0, 0, 0],
245 [0, 0, 0, noise_y ** 2, 0,0],
246 [0, 0, 0, 0, noise_vel,0],
247 [0, 0, 0, 0, 0, noise_accel],
248 ])
249 kalman = UnscentedKalman(generate_Q_matrix(1 / fps, sigma=1e-8), measure_covariance, drawer, fx, hx)
250
251
252 # Define centering task
253 xd, yd = PixelMeterConversion.convertPoint(cam, w / 2.0, h / 2.0)
254 get_images(I, I_depth)
255 Zd = I_depth[h // 2, w // 2] * depth_scale
256 print(f'Desired depth is {Zd}')
257 sd = FeaturePoint()
258 sd.buildFrom(xd, yd, Zd)
259
260 s = FeaturePoint()
261 s.buildFrom(0.0, 0.0, Zd)
262
263 task = Servo()
264 task.addFeature(s, sd)
265 task.setLambda(0.4)
266 task.setCameraDoF(ColVector([0, 0, 0, 1, 1, 0]))
267 task.setServo(Servo.ServoType.EYEINHAND_CAMERA)
268 task.setInteractionMatrixType(Servo.ServoIteractionMatrixType.CURRENT)
269 target_class = args.class_id # Car
270
271 v = ColVector(6, 0.0)
272
273 d = get_display()
274 d.init(I)
275 Display.display(I)
276 Display.flush(I)
277 _ = detection_model(np.array(I.numpy()[..., 2::-1]))
278 error_norm = 1e10
279 last_detection_time = -1.0
280 first_iter = True
281 # Servoing loop
282 while error_norm > 5e-7:
283 start = time.time()
284 # Data acquisition
285 get_images(I, I_depth)
286
288 return box.cls is not None and len(box.cls) > 0 and box.cls[0]
289
290 # Build current features
291 results = detection_model(np.array(I.numpy()[..., 2::-1]))[0] # Run detection
292 boxes = results.boxes
293 max_conf = 0.0
294 idx = -1
295 bb = None
296 for i in range(len(boxes.conf)):
297 if boxes.cls[i] == target_class and boxes.conf[i] > max_conf:
298 idx = i
299 max_conf = boxes.conf[i]
300 bb = boxes.xywh[i].cpu().numpy()
301
302 if bb is not None:
303 u, v = bb[0], bb[1]
304 x, y = PixelMeterConversion.convertPoint(cam, u, v)
305 if first_iter:
306 initial_state = ColVector([x, 0, 0, y, 0, 0])
307 kalman.init(initial_state, process_covariance)
308 first_iter = False
309 kalman.filter(ColVector([x, y]), (1 / fps))
310 kalman_state = kalman.getXest()
311 last_detection_time = time.time()
312 s.buildFrom(kalman_state[0], kalman_state[3], Zd)
313 v = task.computeControlLaw()
314 else:
315 if last_detection_time < 0.0:
316 raise RuntimeError('No detection at first iteration')
317 kalman.predict(time.time() - last_detection_time)
318 kalman_pred = kalman.getXpred()
319 s.buildFrom(kalman_pred[0], kalman_pred[3], Zd)
320 task.computeControlLaw()
321
322 error: ColVector = task.getError()
323 error_norm = error.sumSquare()
324
325 # Display and logging
326 Display.display(I)
327 current_color = ColorBlindFriendlyPalette(ColorBlindFriendlyPalette.SkyBlue).to_vpColor()
328 if bb is not None:
329 Display.displayRectangle(I, i=int(bb[1] - bb[3] / 2), j=int(bb[0] - bb[2] / 2), width=bb[2], height=bb[3],
330 color=current_color, fill=False, thickness=2)
331 sd.display(cam, I, ColorBlindFriendlyPalette(ColorBlindFriendlyPalette.Yellow).to_vpColor(), thickness=3)
332 s.display(cam, I, current_color, thickness=3)
333 Display.flush(I)
334 Display.getImage(I, Idisp)
335 robot.getPosition(Robot.ControlFrameType.REFERENCE_FRAME, r)
336 cTw.buildFrom(r)
337 plotter.on_iter(Idisp, v, error, cTw)
338
339 # Move robot/update simulator
340 robot.setRobotState(Robot.RobotStateType.STATE_VELOCITY_CONTROL)
341 robot.setVelocity(Robot.ControlFrameType.CAMERA_FRAME, v)
342 print(f'Iter took {time.time() - start}s')
343 #simulator.setCameraPosition(cTw)
344
345 robot.stopMotion()
346
347 plotter.generate_anim()
None on_iter(self, ImageRGBa Idisp, ColVector v, ColVector error, HomogeneousMatrix cTw)
None read_data(ImageRGBa I_rgba, ImageUInt16 I_depth, rs.pipeline pipe, rs.align align)
Matrix generate_Q_matrix(float dt, float sigma)
Tuple[CameraParameters, int, int] cam_from_rs_profile(profile)
ColVector fx(ColVector x, float dt)