From a585294f002059a6d3a6b871b60a292086e01d17 Mon Sep 17 00:00:00 2001 From: Roger Labbe Date: Fri, 25 Dec 2015 11:49:34 -0800 Subject: [PATCH] Added dimensionality tests Added more tests to KalmanFilter.test_matrix_dimensions which now allow you to fully test the dimensionality of your matrices without having to call predict/update first. This is for GitHub issue #27 --- filterpy/kalman/kalman_filter.py | 516 ++++++++++++++++++++++++++++--- filterpy/kalman/tests/test_kf.py | 313 ++++++++++++++++++- 2 files changed, 777 insertions(+), 52 deletions(-) diff --git a/filterpy/kalman/kalman_filter.py b/filterpy/kalman/kalman_filter.py index 2a47d8b..3f9b559 100644 --- a/filterpy/kalman/kalman_filter.py +++ b/filterpy/kalman/kalman_filter.py @@ -15,11 +15,11 @@ for more information. """ -from __future__ import (absolute_import, division, print_function, - unicode_literals) +from __future__ import (absolute_import, division, unicode_literals) from filterpy.common import setter, setter_1d, setter_scalar, dot3 +import math import numpy as np -from numpy import dot, zeros, eye, isscalar +from numpy import dot, zeros, eye, isscalar, shape import scipy.linalg as linalg from scipy.stats import multivariate_normal @@ -41,21 +41,22 @@ class KalmanFilter(object): **Attributes** x : numpy.array(dim_x, 1) - state estimate vector + State estimate vector P : numpy.array(dim_x, dim_x) - covariance estimate matrix + Covariance matrix R : numpy.array(dim_z, dim_z) - measurement noise matrix + Measurement noise matrix Q : numpy.array(dim_x, dim_x) - process noise matrix + Process noise matrix F : numpy.array() State Transition matrix H : numpy.array(dim_x, dim_x) + Measurement function You may read the following attributes. @@ -71,17 +72,10 @@ class KalmanFilter(object): S : numpy.array Systen uncertaintly projected to measurement space - likelihood : scalar - Likelihood of last measurment update. - log_likelihood : scalar Log likelihood of last measurment update. - - """ - - def __init__(self, dim_x, dim_z, dim_u=0): """ Create a Kalman filter. You are responsible for setting the various state variables to reasonable values; the defaults below will @@ -142,7 +136,8 @@ def update(self, z, R=None, H=None): **Parameters** z : np.array - measurement for this update. + measurement for this update. z can be a scalar if dim_z is 1, + otherwise it must be a column vector. R : np.array, scalar, or None Optionally provide R to override the measurement noise for this @@ -168,20 +163,27 @@ def update(self, z, R=None, H=None): P = self._P x = self._x + # handle special case: if z is in form [[z]] but x is not a column + # vector dimensions will not match + if x.ndim==1 and shape(z) == (1,1): + z = z[0] + + if shape(z) == (): # is it scalar, e.g. z=3 or z=np.array(3) + z = np.asarray([z]) + # y = z - Hx # error (residual) between measurement and prediction - self._y = z - dot(H, x) + Hx = dot(H, x) + + assert shape(Hx) == shape(z) or (shape(Hx) == (1,1) and shape(z) == (1,)), \ + 'shape of z should be {}, but it is {}'.format( + shape(Hx), shape(z)) + self._y = z - Hx # S = HPH' + R # project system uncertainty into measurement space S = dot3(H, P, H.T) + R - mean = np.array(dot(H, x)).flatten() - flatz = np.array(z).flatten() - - self.likelihood = multivariate_normal.pdf(flatz, mean, cov=S, allow_singular=True) - self.log_likelihood = multivariate_normal.logpdf(flatz, mean, cov=S, allow_singular=True) - # K = PH'inv(S) # map system uncertainty into kalman gain K = dot3(P, H.T, linalg.inv(S)) @@ -197,6 +199,12 @@ def update(self, z, R=None, H=None): self._S = S self._K = K + # compute log likelihood + mean = np.array(dot(H, x)).flatten() + flatz = np.array(z).flatten() + self.log_likelihood = multivariate_normal.logpdf( + flatz, mean, cov=S, allow_singular=True) + def update_correlated(self, z, R=None, H=None): @@ -236,6 +244,14 @@ def update_correlated(self, z, R=None, H=None): P = self._P M = self.M + # handle special case: if z is in form [[z]] but x is not a column + # vector dimensions will not match + if x.ndim==1 and shape(z) == (1,1): + z = z[0] + + if shape(z) == (): # is it scalar, e.g. z=3 or z=np.array(3) + z = np.asarray([z]) + # y = z - Hx # error (residual) between measurement and prediction self._y = z - dot(H, x) @@ -243,12 +259,6 @@ def update_correlated(self, z, R=None, H=None): # project system uncertainty into measurement space S = dot3(H, P, H.T) + dot(H, M) + dot(M.T, H.T) + R - mean = np.array(dot(H, x)).flatten() - flatz = np.array(z).flatten() - - self.likelihood = multivariate_normal.pdf(flatz, mean, cov=S, allow_singular=True) - self.log_likelihood = multivariate_normal.logpdf(flatz, mean, cov=S, allow_singular=True) - # K = PH'inv(S) # map system uncertainty into kalman gain K = dot(dot(P, H.T) + M, linalg.inv(S)) @@ -261,29 +271,110 @@ def update_correlated(self, z, R=None, H=None): self._S = S self._K = K + # compute log likelihood + mean = np.array(dot(H, x)).flatten() + flatz = np.array(z).flatten() + + self.log_likelihood = multivariate_normal.logpdf( + flatz, mean, cov=S, allow_singular=True) + - def test_matrix_dimensions(self): + def test_matrix_dimensions(self, z=None, H=None, R=None, F=None, Q=None): """ Performs a series of asserts to check that the size of everything is what it should be. This can help you debug problems in your design. - This is only a test; you do not need to use it while filtering. - However, to use you will want to perform at least one predict() and - one update() before calling; some bad designs will cause the shapes - of x and P to change in a silent and bad way. For example, if you - pass in a badly dimensioned z into update that can cause x to be - misshapen.""" + If you pass in H, R, F, Q those will be used instead of this object's + value for those matrices. + + Testing `z` (the measurement) is problamatic. x is a vector, and can be + implemented as either a 1D array or as a nx1 column vector. Thus Hx + can be of different shapes. Then, if Hx is a single value, it can + be either a 1D array or 2D vector. If either is true, z can reasonably + be a scalar (either '3' or np.array('3') are scalars under this + definition), a 1D, 1 element array, or a 2D, 1 element array. You are + allowed to pass in any combination that works. + """ + + if H is None: H = self.H + if R is None: R = self.R + if F is None: F = self._F + if Q is None: Q = self._Q + x = self._x + P = self._P - assert self._x.shape == (self.dim_x, 1), \ - "Shape of x must be ({},{}), but is {}".format( - self.dim_x, 1, self._x.shape) + assert x.ndim == 1 or x.ndim == 2, \ + "x must have one or two dimensions, but has {}".format( + x.ndim) - assert self._P.shape == (self.dim_x, self.dim_x), \ + if x.ndim == 1: + assert x.shape[0] == self.dim_x, \ + "Shape of x must be ({},{}), but is {}".format( + self.dim_x, 1, x.shape) + else: + assert x.shape == (self.dim_x, 1), \ + "Shape of x must be ({},{}), but is {}".format( + self.dim_x, 1, x.shape) + + assert P.shape == (self.dim_x, self.dim_x), \ "Shape of P must be ({},{}), but is {}".format( - self.dim_x, self.dim_x, self._P.shape) + self.dim_x, self.dim_x, P.shape) - assert self._Q.shape == (self.dim_x, self.dim_x), \ + assert Q.shape == (self.dim_x, self.dim_x), \ "Shape of P must be ({},{}), but is {}".format( - self.dim_x, self.dim_x, self._P.shape) + self.dim_x, self.dim_x, P.shape) + + assert self._F.shape == (self.dim_x, self.dim_x), \ + "Shape of F must be ({},{}), but is {}".format( + self.dim_x, self.dim_x, F.shape) + + + assert np.ndim(H) == 2, \ + "Shape of H must be (dim_z, {}), but is {}".format( + P.shape[0], shape(H)) + + assert H.shape[1] == P.shape[0], \ + "Shape of H must be (dim_z, {}), but is {}".format( + P.shape[0], H.shape) + + # shape of R must be the same as HPH' + hph_shape = (H.shape[0], H.shape[0]) + r_shape = shape(R) + + if H.shape[0] == 1: + # r can be scalar, 1D, or 2D in this case + assert r_shape == () or r_shape == (1,) or r_shape == (1,1), \ + "R must be scalar or one element array, but is shaped {}".format( + r_shape) + else: + assert r_shape == hph_shape, \ + "shape of R should be {} but it is {}".format(hph_shape, r_shape) + + + if z is not None: + z_shape = shape(z) + else: + z_shape = (self.dim_z, 1) + + # H@x must have shape of z + Hx = dot(H, x) + + if z_shape == (): # scalar or np.array(scalar) + assert Hx.ndim == 1 or shape(Hx) == (1,1), "shape of z should be {}, not {}".format( + shape(Hx), z_shape) + + elif shape(Hx) == (1,): + assert z_shape[0] == 1, 'Shape of z must be {}'.format(shape(Hx)) + + else: + assert (z_shape == shape(Hx) or + (len(z_shape) == 1 and shape(Hx) == (z_shape[0], 1))), \ + "shape of z should be {}, not {}".format( + shape(Hx), z_shape) + + if np.ndim(Hx) > 1 and shape(Hx) != (1,1): + assert shape(Hx) == z_shape, \ + 'shape of z should be {}, but it is {}'.format( + shape(Hx), z_shape) def predict(self, u=0, B=None, F=None, Q=None): @@ -337,27 +428,32 @@ def batch_filter(self, zs, Fs=None, Qs=None, Hs=None, Rs=None, Bs=None, us=None, Fs : list-like, optional optional list of values to use for the state transition matrix matrix; a value of None in any position will cause the filter - to use `self.F` for that time step. + to use `self.F` for that time step. If Fs is None then self.F is + used for all epochs. Qs : list-like, optional optional list of values to use for the process error covariance; a value of None in any position will cause the filter - to use `self.Q` for that time step. + to use `self.Q` for that time step. If Qs is None then self.Q is + used for all epochs. Hs : list-like, optional optional list of values to use for the measurement matrix; a value of None in any position will cause the filter - to use `self.H` for that time step. + to use `self.H` for that time step. If Hs is None then self.H is + used for all epochs. Rs : list-like, optional optional list of values to use for the measurement error covariance; a value of None in any position will cause the filter - to use `self.R` for that time step. + to use `self.R` for that time step. If Rs is None then self.R is + used for all epochs. Bs : list-like, optional optional list of values to use for the control transition matrix; a value of None in any position will cause the filter - to use `self.B` for that time step. + to use `self.B` for that time step. If Bs is None then self.B is + used for all epochs. us : list-like, optional optional list of values to use for the control input vector; @@ -414,6 +510,14 @@ def batch_filter(self, zs, Fs=None, Qs=None, Hs=None, Rs=None, Bs=None, us=None, if us is None: us = [0] * n + if len(Fs) < n: Fs = [Fs]*n + if len(Qs) < n: Qs = [Qs]*n + if len(Hs) < n: Hs = [Hs]*n + if len(Rs) < n: Rs = [Rs]*n + if len(Bs) < n: Bs = [Bs]*n + if len(us) < n: us = [us]*n + + # mean estimates from Kalman Filter if self.x.ndim == 1: means = zeros((n, self.dim_x)) @@ -466,7 +570,7 @@ def rts_smoother(self, Xs, Ps, Fs=None, Qs=None): array of the covariances of the output of a kalman filter. Fs : list-like collection of numpy.array, optional - State transition matrix of the Kalman filter at each time step. + State transition matrix of the Kalman filter at each time step. Optional, if not provided the filter's self.F will be used Qs : list-like collection of numpy.array, optional @@ -580,6 +684,11 @@ def alpha(self): return self._alpha_sq**.5 + @property + def likelihood(self): + return math.exp(self.log_likelihood) + + @alpha.setter def alpha(self, value): assert np.isscalar(value) @@ -660,3 +769,314 @@ def y(self): def S(self): """ system uncertainty in measurement space """ return self._S + + + +def kf_update(x, P, z, R, H): + """ + Add a new measurement (z) to the Kalman filter. If z is None, nothing + is changed. + + **Parameters** + + x : numpy.array(dim_x, 1) + State estimate vector + + P : numpy.array(dim_x, dim_x) + Covariance matrix + + z : numpy.array + measurement for this update. + + R : numpy.array(dim_z, dim_z) + Measurement noise matrix + + H : numpy.array(dim_x, dim_x) + Measurement function + + **Returns** + + x : numpy.array + Posterior state estimate vector + + P : numpy.array + Posterior covariance matrix + + y : numpy.array or scalar + Residua. Difference between measurement and state in measurement space + + K : numpy.array + Kalman gain + + S : numpy.array + System uncertainty in measurement space + + log_likelihood : float + log likelihood of the measurement + """ + + if z is None: + return x, P, None, None, None, None + + # error (residual) between measurement and prediction + y = z - dot(H, x) + + # project system uncertainty into measurement space + S = dot3(H, P, H.T) + R + + + # map system uncertainty into kalman gain + K = dot3(P, H.T, linalg.inv(S)) + + # predict new x with residual scaled by the kalman gain + x = x + dot(K, y) + + # P = (I-KH)P(I-KH)' + KRK' + KH = dot(K, H) + I_KH = np.eye(KH.shape[0]) - KH + P = dot3(I_KH, P, I_KH.T) + dot3(K, R, K.T) + + # compute log likelihood + mean = np.array(dot(H, x)).flatten() + flatz = np.array(z).flatten() + log_likelihood = multivariate_normal.logpdf(flatz, mean, cov=S, allow_singular=True) + + return x, P, y, K, S, log_likelihood + + + +def kf_predict(x, P, F, Q, u=0, B=0, alpha=1.): + """ Predict next position using the Kalman filter state propagation + equations. + + **Parameters** + + x : numpy.array + State estimate vector + + P : numpy.array + Covariance matrix + + F : numpy.array() + State Transition matrix + + Q : numpy.array + Process noise matrix + + + u : numpy.array, default 0. + Control vector. If non-zero, it is multiplied by B + to create the control input into the system. + + B : numpy.array, default 0. + Optional control transition matrix. + + alpha : float, default=1.0 + Fading memory setting. 1.0 gives the normal Kalman filter, and + values slightly larger than 1.0 (such as 1.02) give a fading + memory effect - previous measurements have less influence on the + filter's estimates. This formulation of the Fading memory filter + (there are many) is due to Dan Simon + + **Returns** + + x : numpy.array + Prior state estimate vector + + P : numpy.array + Prior covariance matrix + """ + + x = dot(F, x) + dot(B, u) + P = (alpha * alpha) * dot3(F, P, F.T) + Q + + return x, P + + + +def kf_batch_filter(x, P, zs, Fs, Qs, Hs, Rs, Bs=None, us=None, update_first=False): + """ Batch processes a sequences of measurements. + + **Parameters** + + zs : list-like + list of measurements at each time step. Missing measurements must be + represented by 'None'. + + Fs : list-like + list of values to use for the state transition matrix matrix; + a value of None in any position will cause the filter + to use `self.F` for that time step. + + Qs : list-like, + list of values to use for the process error + covariance; a value of None in any position will cause the filter + to use `self.Q` for that time step. + + Hs : list-like, optional + list of values to use for the measurement matrix; + a value of None in any position will cause the filter + to use `self.H` for that time step. + + Rs : list-like, optional + list of values to use for the measurement error + covariance; a value of None in any position will cause the filter + to use `self.R` for that time step. + + Bs : list-like, optional + list of values to use for the control transition matrix; + a value of None in any position will cause the filter + to use `self.B` for that time step. + + us : list-like, optional + list of values to use for the control input vector; + a value of None in any position will cause the filter to use + 0 for that time step. + + update_first : bool, optional, + controls whether the order of operations is update followed by + predict, or predict followed by update. Default is predict->update. + + + **Returns** + + means: np.array((n,dim_x,1)) + array of the state for each time step after the update. Each entry + is an np.array. In other words `means[k,:]` is the state at step + `k`. + + covariance: np.array((n,dim_x,dim_x)) + array of the covariances for each time step after the update. + In other words `covariance[k,:,:]` is the covariance at step `k`. + + means_predictions: np.array((n,dim_x,1)) + array of the state for each time step after the predictions. Each + entry is an np.array. In other words `means[k,:]` is the state at + step `k`. + + covariance_predictions: np.array((n,dim_x,dim_x)) + array of the covariances for each time step after the prediction. + In other words `covariance[k,:,:]` is the covariance at step `k`. + + **Example** + + zs = [t + random.randn()*4 for t in range (40)] + Fs = [kf.F for t in range (40)] + Hs = [kf.H for t in range (40)] + + (mu, cov, _, _) = kf.batch_filter(zs, Rs=R_list, Fs=Fs, Hs=Hs, Qs=None, + Bs=None, us=None, update_first=False) + (xs, Ps, Ks) = kf.rts_smoother(mu, cov, Fs=Fs, Qs=None) + + """ + + n = np.size(zs,0) + dim_x = x.shape[0] + + # mean estimates from Kalman Filter + if x.ndim == 1: + means = zeros((n, dim_x)) + means_p = zeros((n, dim_x)) + else: + means = zeros((n, dim_x, 1)) + means_p = zeros((n, dim_x, 1)) + + # state covariances from Kalman Filter + covariances = zeros((n, dim_x, dim_x)) + covariances_p = zeros((n, dim_x, dim_x)) + + if us is None: + us = [0.]*n + Bs = [0.]*n + + if len(Fs) < n: Fs = [Fs]*n + if len(Qs) < n: Qs = [Qs]*n + if len(Hs) < n: Hs = [Hs]*n + if len(Rs) < n: Rs = [Rs]*n + if len(Bs) < n: Bs = [Bs]*n + if len(us) < n: us = [us]*n + + + if update_first: + for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)): + + x, P, _, _, _, _ = kf_update(x, P, z, R=R, H=H) + means[i,:] = x + covariances[i,:,:] = P + + x, P = kf_predict(x, P, u=u, B=B, F=F, Q=Q) + means_p[i,:] = x + covariances_p[i,:,:] = P + else: + for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)): + + x, P = kf_predict(x, P, u=u, B=B, F=F, Q=Q) + means_p[i,:] = x + covariances_p[i,:,:] = P + + x, P, _, _, _, _ = kf_update(x, P, z, R=R, H=H) + means[i,:] = x + covariances[i,:,:] = P + + return (means, covariances, means_p, covariances_p) + + + +def rts_smoother(Xs, Ps, Fs, Qs): + """ Runs the Rauch-Tung-Striebal Kalman smoother on a set of + means and covariances computed by a Kalman filter. The usual input + would come from the output of `KalmanFilter.batch_filter()`. + + **Parameters** + + Xs : numpy.array + array of the means (state variable x) of the output of a Kalman + filter. + + Ps : numpy.array + array of the covariances of the output of a kalman filter. + + Fs : list-like collection of numpy.array + State transition matrix of the Kalman filter at each time step. + Optional, if not provided the filter's self.F will be used + + Qs : list-like collection of numpy.array, optional + Process noise of the Kalman filter at each time step. Optional, + if not provided the filter's self.Q will be used + + **Returns** + + 'x' : numpy.ndarray + smoothed means + + 'P' : numpy.ndarray + smoothed state covariances + + 'K' : numpy.ndarray + smoother gain at each step + + + **Example**:: + + zs = [t + random.randn()*4 for t in range (40)] + + (mu, cov, _, _) = kalman.batch_filter(zs) + (x, P, K) = rts_smoother(mu, cov, kf.F, kf.Q) + """ + + assert len(Xs) == len(Ps) + n = Xs.shape[0] + dim_x = Xs.shape[1] + + # smoother gain + K = zeros((n,dim_x,dim_x)) + x, P = Xs.copy(), Ps.copy() + + for k in range(n-2,-1,-1): + P_pred = dot3(Fs[k], P[k], Fs[k].T) + Qs[k] + + K[k] = dot3(P[k], Fs[k].T, linalg.inv(P_pred)) + x[k] += dot (K[k], x[k+1] - dot(Fs[k], x[k])) + P[k] += dot3 (K[k], P[k+1] - P_pred, K[k].T) + + return (x, P, K) diff --git a/filterpy/kalman/tests/test_kf.py b/filterpy/kalman/tests/test_kf.py index db5e1bb..e7f3e06 100644 --- a/filterpy/kalman/tests/test_kf.py +++ b/filterpy/kalman/tests/test_kf.py @@ -21,9 +21,9 @@ from numpy.random import randn import numpy as np import matplotlib.pyplot as plt -from filterpy.kalman import KalmanFilter, FixedPointSmoother, GrewalFixedPointSmoother +from filterpy.kalman import * from filterpy.common import Q_discrete_white_noise -from scipy.linalg import block_diag +from scipy.linalg import block_diag, norm DO_PLOT = False @@ -42,6 +42,49 @@ def read(self): self.pos[1] + randn() * self.noise_std] +def const_vel_filter(dt, x0=0, x_ndim=1, P_diag=(1., 1.), R_std=1., Q_var=0.0001): + """ helper, constructs 1d, constant velocity filter""" + f = KalmanFilter (dim_x=2, dim_z=1) + + if x_ndim == 1: + f.x = np.array([x0, 0]) + else: + f.x = np.array([[x0, 0]]).T + + f.F = np.array([[1., dt], + [0., 1.]]) + + f.H = np.array([[1.,0.]]) + f.P = np.diag(P_diag) + f.R = np.eye(1) * (R_std**2) + f.Q = Q_discrete_white_noise(2, dt, Q_var) + + return f + + + +def const_vel_filter_2d(dt, x_ndim=1, P_diag=(1., 1, 1, 1), R_std=1., Q_var=0.0001): + """ helper, constructs 1d, constant velocity filter""" + + kf = KalmanFilter (dim_x=4, dim_z=2) + + kf.x = np.array([[0., 0.0, 0., 0.]]).T + kf.P *= np.diag(P_diag) + kf.F = np.array([[1, dt, 0, 0], + [0, 1, 0, 0], + [0, 0, 1, dt], + [0, 0, 0, 1]]) + + kf.H = np.array([[1., 0, 0, 0], + [0., 0, 1, 0]]) + + kf.R *= np.eye(2) * (R_std**2) + q = Q_discrete_white_noise(dim=2, dt=dt, var=Q_var) + kf.Q = block_diag(q, q) + + return kf + + def test_fixed_point(): j = 8 kf = FixedPointSmoother(4, 2, j) @@ -197,7 +240,7 @@ def test_1d_vel(): x = np.array([[0.], [0.]]) F = np.array([[1., dt], - [0., 1.]]) + [0., 1.]]) H = np.array([[1.,0.]]) P = np.eye(2) @@ -310,6 +353,7 @@ def test_batch_filter(): m,c,_,_ = f.batch_filter(zs,update_first=False) m,c,_,_ = f.batch_filter(zs,update_first=True) + def test_univariate(): f = KalmanFilter(dim_x=1, dim_z=1, dim_u=1) f.x = np.array([[0]]) @@ -326,9 +370,270 @@ def test_univariate(): f.update(i) +def test_procedure_form(): + + dt = 1. + std_z = 10.1 + + x = np.array([[0.], [0.]]) + F = np.array([[1., dt], [0., 1.]]) + H = np.array([[1.,0.]]) + P = np.eye(2) + R = np.eye(1)*std_z**2 + Q = Q_discrete_white_noise(2, dt, 5.1) + + kf = KalmanFilter(2, 1) + kf.x = x.copy() + kf.F = F.copy() + kf.H = H.copy() + kf.P = P.copy() + kf.R = R.copy() + kf.Q = Q.copy() + + + measurements = [] + results = [] + + xest = [] + ks = [] + pos = 0. + for t in range (2000): + z = pos + random.randn() * std_z + pos += 100 + + # perform kalman filtering + x, P = kf_predict(x, P, F, Q) + kf.predict() + assert norm(x - kf.x) < 1.e-12 + x, P, _, _, _, _ = kf_update(x, P, z, R, H) + kf.update(z) + assert norm(x - kf.x) < 1.e-12 + + # save data + xest.append (x.copy()) + measurements.append(z) + + xest = np.asarray(xest) + measurements = np.asarray(measurements) + # plot data + if DO_PLOT: + plt.plot(xest[:, 0]) + plt.plot(xest[:, 1]) + plt.plot(measurements) + + + +def test_procedural_batch_filter(): + f = KalmanFilter (dim_x=2, dim_z=1) + + f.x = np.array([2., 0]) + + f.F = np.array([[1.,1.], + [0.,1.]]) + + f.H = np.array([[1.,0.]]) + f.P = np.eye(2)*1000. + f.R = np.eye(1)*5 + f.Q = Q_discrete_white_noise(2, 1., 0.0001) + + f.test_matrix_dimensions() + + x = np.array([2., 0]) + + F = np.array([[1.,1.], + [0.,1.]]) + + H = np.array([[1., 0.]]) + P = np.eye(2)*1000. + R = np.eye(1)*5 + Q = Q_discrete_white_noise(2, 1., 0.0001) + + zs = [13., None, 1., 2.]*10 + m,c,_,_ = f.batch_filter(zs,update_first=False) + + n = len(zs) + # test both list of matrices, and single matrix forms + mp, cp, _, _ = kf_batch_filter(x, P, zs, F, Q, [H]*n, R) + + for x1, x2 in zip(m, mp): + assert abs(sum(sum(x1))) - abs(sum(sum(x2))) < 1.e-12 + + for p1, p2 in zip(c, cp): + assert abs(sum(sum(p1))) - abs(sum(sum(p2))) < 1.e-12 + + +def proc_form(): + """ This is for me to run against the class_form() function to see which, + if either, runs faster. They within a few ms of each other on my machine + with Python 3.5.1""" + + dt = 1. + std_z = 10.1 + + x = np.array([[0.], [0.]]) + F = np.array([[1., dt], [0., 1.]]) + H = np.array([[1.,0.]]) + P = np.eye(2) + R = np.eye(1)*std_z**2 + Q = Q_discrete_white_noise(2, dt, 5.1) + + pos = 0. + for t in range (2000): + z = pos + random.randn() * std_z + pos += 100 + + # perform kalman filtering + x, P = kf_predict(x, P, F, Q) + x, P, _ = kf_update(z, R, x, P, H) + + +def class_form(): + + dt = 1. + std_z = 10.1 + + f = const_vel_filter(dt, x0=2, R_std=std_z, Q_std=5.1) + + pos = 0. + for t in range (2000): + z = pos + random.randn() * std_z + pos += 100 + + kf.predict() + kf.update(z) + + +def test_z_dim(): + f = const_vel_filter(1.0, x0=2, R_std=1., Q_var=5.1) + f.test_matrix_dimensions() + f.update(3.) + assert f.x.shape == (2,) + + f.update([3]) + assert f.x.shape == (2,) + + f.update(np.array([[3]])) + assert f.x.shape == (2,) + + try: + f.update(np.array([[[3]]])) + assert False, "filter should have asserted that [[[3]]] is not a valid form for z" + except: + pass + + f = const_vel_filter_2d(1.0, R_std=1., Q_var=5.1) + try: + f.update(3) + assert False, "filter should have asserted that 3 is not a valid form for z" + except: + pass + + try: + f.update([3]) + assert False, "filter should have asserted that [3] is not a valid form for z" + except: + pass + + try: + f.update([3, 3]) + assert False, "filter should have asserted that [3] is not a valid form for z" + except: + pass + + try: + f.update([[3, 3]]) + assert False, "filter should have asserted that [3] is not a valid form for z" + except: + pass + + f = const_vel_filter_2d(1.0, R_std=1., Q_var=5.1) + f.update([[3], [3]]) + f.update(np.array([[3], [3]])) + + + # now make sure test_matrix_dimensions() is working + + f.test_matrix_dimensions() + try: + f.H = 3 + f.test_matrix_dimensions() + assert False, "test_matrix_dimensions should have asserted on shape of H" + except: + pass + + f = const_vel_filter_2d(1.0, R_std=1., Q_var=5.1) + try: + f.R = 3 + f.test_matrix_dimensions() + assert False, "test_matrix_dimensions should have asserted on shape of R" + except: + pass + + try: + f.R = [3] + f.test_matrix_dimensions() + assert False, "test_matrix_dimensions should have asserted on shape of R" + except: + pass + + try: + f.R = [3, 4.] + f.test_matrix_dimensions() + assert False, "test_matrix_dimensions should have asserted on shape of R" + except: + pass + + f.R = np.diag([3, 4.]) + f.test_matrix_dimensions() + + + f = const_vel_filter(1.0, x0=2, R_std=1., Q_var=5.1) + + #test case where x is 1d array + f.update([[3]]) + f.test_matrix_dimensions(z=3.) + f.test_matrix_dimensions(z=[3.]) + + # test case whre x is 2d array + f.x = np.array([[0., 0.]]).T + f.update([[3]]) + f.test_matrix_dimensions(z=3.) + f.test_matrix_dimensions(z=[3.]) + + try: + f.test_matrix_dimensions(z=[[3.]]) + assert False, "test_matrix_dimensions should have asserted on shape of z" + except: + pass + + f = const_vel_filter_2d(1.0, R_std=1., Q_var=5.1) + + # test for 1D value for x, then set to a 2D vector and try again + for i in range(2): + try: + f.test_matrix_dimensions(z=3.) + assert False, "test_matrix_dimensions should have asserted on shape of z" + except: + pass + + try: + f.test_matrix_dimensions(z=[3.]) + assert False, "test_matrix_dimensions should have asserted on shape of z" + except: + pass + + try: + f.test_matrix_dimensions(z=[3., 3.]) + assert False, "test_matrix_dimensions should have asserted on shape of z" + except: + pass + f.test_matrix_dimensions(z=[[3.], [3.]]) + f.x = np.array([[1,2,3,4.]]).T + + if __name__ == "__main__": DO_PLOT = True - test_1d_vel() + test_z_dim() #test_batch_filter() #test_univariate()