Visual Servoing Platform version 3.7.0
Loading...
Searching...
No Matches
ukf-nonlinear-example.py
35
36""" @example ukf-nonlinear-example.py
37 Example of a simple non-linear use-case of the Unscented Kalman Filter (UKF).
38
39 The system we are interested in is an aircraft flying in the sky and
40 observed by a radar station.
41
42 We consider the plan perpendicular to the ground and passing by both the radar
43 station and the aircraft. The x-axis corresponds to the position on the ground
44 and the y-axis to the altitude.
45
46 The state vector of the UKF is:
47 \f[
48 \begin{array}{lcl}
49 \textbf{x}[0] &=& x \\
50 \textbf{x}[1] &=& \dot{x} \\
51 \textbf{x}[1] &=& y \\
52 \textbf{x}[2] &=& \dot{y}
53 \end{array}
54 \f]
55
56 The measurement \f$ \textbf{z} \f$ corresponds to the distance and angle between the ground and the aircraft
57 observed by the radar station. Be \f$ p_x \f$ and \f$ p_y \f$ the position of the radar station
58 along the x and y axis, the measurement vector can be written as:
59 \f[
60 \begin{array}{lcl}
61 \textbf{z}[0] &=& \sqrt{(p_x^i - x)^2 + (p_y^i - y)^2} \\
62 \textbf{z}[1] &=& \tan^{-1}{\frac{y - p_y}{x - p_x}}
63 \end{array}
64 \f]
65
66 Some noise is added to the measurement vector to simulate a sensor which is
67 not perfect.
68"""
69
70from visp.core import ColVector, Matrix, UnscentedKalman, UKSigmaDrawerMerwe, Math
71import numpy as np
72from typing import List
73
74# For the Graphical User Interface
75try:
76 from visp.gui import Plot
77 has_gui = True
78except:
79 has_gui = False
80import numpy as np
81
82def normalize_angle(angle: float) -> float:
83 angle_0_to_2pi = Math.modulo(angle, 2. * np.pi)
84 angle_MinPi_Pi = angle_0_to_2pi
85 if angle_MinPi_Pi > np.pi:
86 # Substract 2 PI to be in interval [-Pi; Pi]
87 angle_MinPi_Pi = angle_MinPi_Pi - 2. * np.pi
88 return angle_MinPi_Pi
89
90def measurement_mean(measurements: List[ColVector], wm: List[float]) -> ColVector:
91 """
92 Compute the weighted mean of measurement vectors.
93
94 :param measurements: The measurement vectors, such as measurements[i][0] = range and
95 measurements[i][1] = elevation_angle.
96 :param wm: The associated weights.
97
98 :return vpColVector: The weighted mean of the measurement vectors.
99 """
100 wm_np = np.array(wm)
101 ranges = np.array([meas[0] for meas in measurements])
102 elev_angles = np.array([meas[1] for meas in measurements])
103 meanRange = (wm_np * ranges).sum()
104 sum_cos = (wm_np * np.cos(elev_angles)).sum()
105 sum_sin = (wm_np * np.sin(elev_angles)).sum()
106 mean_angle = np.arctan2(sum_sin, sum_cos)
107
108 return ColVector([meanRange, mean_angle])
109
110def measurementResidual(meas: ColVector, to_subtract: ColVector) -> ColVector:
111 """
112 Compute the subtraction between two vectors expressed in the measurement space,
113 such as v[0] = range ; v[1] = elevation_angle
114
115 :param meas: Measurement to which we must subtract something.
116 :param toSubtract: The something we must subtract.
117
118 :return vpColVector: \b meas - \b toSubtract .
119 """
120 res_temp = meas - to_subtract
121 return ColVector([res_temp[0], normalize_angle(res_temp[1])])
122
123def state_add_vectors(a, b) -> ColVector:
124 """
125 Method that permits to add two state vectors.
126
127 :param a: The first state vector to which another state vector must be added.
128 :param b: The other state vector that must be added to a.
129
130 :return ColVector: The sum a + b.
131 """
132 return a + b
133
134def state_residual_vectors(a, b) -> ColVector:
135 """
136 Method that permits to subtract a state vector to another.
137
138 :param a: The first state vector to which another state vector must be subtracted.
139 :param b: The other state vector that must be subtracted to a.
140
141 :return ColVector: The subtraction a - b.
142 """
143 return a - b
144
145def fx(x: ColVector, dt: float) -> ColVector:
146 """
147 Process function that projects in time the internal state of the UKF.
148
149 :param x: The internal state of the UKF.
150 :param dt: The sampling time: how far in the future are we projecting x.
151
152 :return ColVector: The updated internal state, projected in time, also known as the prior.
153 """
154 return ColVector([
155 x[0] + dt * x[1],
156 x[1],
157 x[2] + dt * x[3],
158 x[3]
159 ])
160
162 """
163 Class that permits to convert the position of the aircraft into
164 range and elevation angle measurements.
165 """
166 def __init__(self, x: float, y: float, range_std: float, elev_angle_std: float):
167 """
168 Construct a new vpRadarStation
169
170 :param x: The position on the ground of the radar.
171 :param y: The altitude of the radar.
172 :param range_std: The standard deviation of the range measurements.
173 :param elev_angle_std: The standard deviation of the elevation angle measurements.
174 """
175 self._x = x
176 self._y = y
177 self._stdevRange = range_std
178 self._stdevElevAngle = elev_angle_std
179
180 def state_to_measurement(self, x: ColVector) -> ColVector:
181 """
182 Measurement function that expresses the internal state of the UKF in the measurement space.
183
184 :param x: The internal state of the UKF.
185
186 :return ColVector: The internal state, expressed in the measurement space.
187 """
188 dx = x[0] - self._x
189 dy = x[2] - self._y
190 range = np.sqrt(dx * dx + dy * dy)
191 elev_angle = np.arctan2(dy, dx)
192 return ColVector([range, elev_angle])
193
194 def measure_gt(self, pos: ColVector) -> ColVector:
195 """
196 Perfect measurement of the range and elevation angle that
197 correspond to pos.
198
199 :param pos: The actual position of the aircraft (pos[0]: projection of the position
200 on the ground, pos[1]: altitude).
201
202 :return ColVector: [0] the range [1] the elevation angle.
203 """
204 dx = pos[0] - self._x
205 dy = pos[1] - self._y
206 range = np.sqrt(dx * dx + dy * dy)
207 elev_angle = np.arctan2(dy, dx)
208 return ColVector([range, elev_angle])
209
210 def measure_with_noise(self, pos: ColVector) -> ColVector:
211 """
212 Noisy measurement of the range and elevation angle that
213 correspond to pos.
214
215 :param pos: The actual position of the aircraft (pos[0]: projection of the position
216 on the ground, pos[1]: altitude).
217 :return vpColVector: [0] the range [1] the elevation angle.
218 """
219 measurements_GT = self.measure_gt(pos)
220 measurements_noisy = ColVector([measurements_GT[0] + np.random.normal(0., self._stdevRange), measurements_GT[1] + np.random.normal(0., self._stdevElevAngle)])
221 return measurements_noisy
222
224 """
225 Class to simulate a flying aircraft.
226 """
227
228 def __init__(self, X0: ColVector, vel: ColVector, vel_std: float):
229 """
230 Construct a new vpACSimulator object.
231
232 :param X0: Initial position of the aircraft.
233 :param vel: Velocity of the aircraft.
234 :param vel_std: Standard deviation of the variation of the velocity.
235 """
236 self._pos = X0 # Position of the simulated aircraft
237 self._vel = vel # Velocity of the simulated aircraft
238 np.random.seed(4224)
239 self._stdevVel = vel_std # Standard deviation of the random generator for slight variations of the velocity of the aircraft
240
241
242 def update(self, dt: float) -> ColVector:
243 """
244 Compute the new position of the aircraft after dt seconds have passed
245 since the last update.
246
247 :param dt: Period since the last update.
248 :return ColVector: The new position of the aircraft.
249 """
250 dx_temp = self._vel * dt
251 dx = ColVector([dx_temp[0] + np.random.normal(0., self._stdevVel) * dt, dx_temp[1] + np.random.normal(0., self._stdevVel) * dt])
252 self._pos += dx
253 return self._pos
254
255def generate_Q_matrix(dt: float) -> Matrix:
256 """
257 Method that generates the process covariance matrix for a process for which the
258 state vector can be written as (x, dx/dt)^T
259
260 :param dt: The sampling period.
261
262 :return Matrix: The corresponding process covariance matrix.
263 """
264 return Matrix(
265 [[dt**3/3, dt**2/2, 0, 0],
266 [dt**2/2, dt, 0, 0],
267 [0, 0, dt**3/3, dt**2/2],
268 [0, 0, dt**2/2, dt]])
269
270def generate_P0_matrix() -> Matrix:
271 """
272 Method that generates the intial guess of the state covariance matrix.
273
274 @return Matrix The corresponding state covariance matrix.
275 """
276 return Matrix(
277 [[300*300, 0, 0, 0],
278 [0, 30*30, 0, 0],
279 [0, 0, 150*150, 0],
280 [0, 0, 0, 30*30]])
281
282if __name__ == '__main__':
283 dt = 3. # The sampling period
284 gt_X_init = -500. # Ground truth initial position along the X-axis, in meters
285 gt_Y_init = 1000. # Ground truth initial position along the Y-axis, in meters
286 gt_vX_init = 100. # The velocity along the x-axis
287 gt_vY_init = 5. # The velocity along the y-axis
288 proc_var = 0.1 # The variance of the process function
289 sigma_range = 5 # Standard deviation of the range measurement: 5m
290 sigma_elev_angle = Math.rad(0.5) # Standard deviation of the elevation angle measurent: 0.5deg
291 stdev_aircraft_velocity = 0.2; # Standard deviation of the velocity of the simulated aircraft,
292 # to make it deviate a bit from the constant velocity model
293
294 # The object that draws the sigma points used by the UKF
295 drawer = UKSigmaDrawerMerwe(n=4, alpha=0.3, beta=2, kappa=-1, resFunc=state_residual_vectors, addFunc=state_add_vectors)
296
297 # The object that performs radar measurements
298 radar = vpRadarStation(0., 0., sigma_range, sigma_elev_angle)
299
300 P0 = generate_P0_matrix() # The initial estimate of the state covariance matrix
301 R = Matrix([[sigma_range * sigma_range, 0], [0, sigma_elev_angle * sigma_elev_angle]]) # The measurement covariance matrix
302 Q = generate_Q_matrix(dt) * proc_var # The process covariance matrix
303 ukf = UnscentedKalman(Q, R, drawer, fx, radar.state_to_measurement) # The Unscented Kalman Filter instance
304
305 # Initializing the state vector and state covariance matrix estimates
306 ukf.init(ColVector([0.9 * gt_X_init, 0.9 * gt_vX_init, 0.9 * gt_Y_init, 0.9 * gt_vY_init]), P0)
307 ukf.setMeasurementMeanFunction(measurement_mean)
308 ukf.setMeasurementResidualFunction(measurementResidual)
309
310 # Initializing the Graphical User Interface if the needed libraries are available
311 if has_gui:
312 num_plots = 4
313 plot = Plot(num_plots)
314 plot_titles = [
315 'Position along X-axis', 'Velocity along X-axis',
316 'Position along Y-axis', 'Velocity along Y-axis'
317 ]
318 plot_y_units = [
319 'Position (m)', 'Velocity (m/s)',
320 'Position (m)', 'Velocity (m/s)'
321 ]
322 plot_legends = ['GT', 'Filtered']
323
324 # Initializing the subplots
325 for plot_index in range(num_plots):
326 plot.initGraph(plot_index, len(plot_legends))
327 plot.setTitle(plot_index, plot_titles[plot_index])
328 plot.setUnitY(plot_index, plot_y_units[plot_index])
329 plot.setUnitX(plot_index, 'Time (s)')
330 for legend_index in range(len(plot_legends)):
331 plot.setLegend(plot_index, legend_index, plot_legends[legend_index])
332
333 ac_pos = ColVector([gt_X_init, gt_Y_init]) # Ground truth position
334 ac_vel = ColVector([gt_vX_init, gt_vY_init]) # Ground truth velocity
335 ac = vpACSimulator(ac_pos, ac_vel, stdev_aircraft_velocity)
336 gt_X_prev = ColVector([ac_pos[0], ac_pos[1]]) # Previous ground truth position
337 for i in range(500):
338 # Creating noisy measurements
339 gt_X = ac.update(dt)
340 gt_V = (gt_X - gt_X_prev) / dt
341 z = radar.measure_with_noise(gt_X)
342
343 # Filtering using the UKF
344 ukf.filter(z, dt)
345
346 # Getting the filtered state vector
347 Xest = ukf.getXest()
348
349 # Update the GUI if available
350 if has_gui:
351 plot.plot(0, 0, i, gt_X[0])
352 plot.plot(0, 1, i, Xest[0])
353
354 plot.plot(1, 0, i, gt_V[0])
355 plot.plot(1, 1, i, Xest[1])
356
357 plot.plot(2, 0, i, gt_X[1])
358 plot.plot(2, 1, i, Xest[2])
359
360 plot.plot(3, 0, i, gt_V[1])
361 plot.plot(3, 1, i, Xest[3])
362
363 # Updating last measurement for future computation of the noisy velocity
364 gt_X_prev = ColVector([gt_X[0], gt_X[1]])
365
366 print('Finished')
367 input('Press enter to quit')
__init__(self, ColVector X0, ColVector vel, float vel_std)
__init__(self, float x, float y, float range_std, float elev_angle_std)
ColVector measure_gt(self, ColVector pos)
ColVector measure_with_noise(self, ColVector pos)
ColVector state_to_measurement(self, ColVector x)
ColVector measurement_mean(List[ColVector] measurements, List[float] wm)
float normalize_angle(float angle)
ColVector state_residual_vectors(a, b)
ColVector fx(ColVector x, float dt)
Matrix generate_Q_matrix(float dt)
ColVector measurementResidual(ColVector meas, ColVector to_subtract)