From 0b84c1abcbc7ed5ac06ee4e6839fe22fdb4470ce Mon Sep 17 00:00:00 2001 From: Brad Cordova Date: Wed, 9 Sep 2015 13:52:46 -0400 Subject: [PATCH 1/2] update batch_mode to handle non constant matrices --- filterpy/kalman/kalman_filter.py | 68 ++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 25 deletions(-) diff --git a/filterpy/kalman/kalman_filter.py b/filterpy/kalman/kalman_filter.py index 55a399f..dfca04f 100644 --- a/filterpy/kalman/kalman_filter.py +++ b/filterpy/kalman/kalman_filter.py @@ -304,49 +304,57 @@ def predict(self, u=0): self._P = self._alpha_sq * dot3(self._F, self._P, self._F.T) + self._Q - def batch_filter(self, zs, Rs=None, update_first=False): + def batch_filter(self, zs, Rs=None, Fs=None, Hs=None, Qs=None, update_first=False): """ Batch processes a sequences of measurements. - **Parameters** - zs : list-like list of measurements at each time step `self.dt` Missing measurements must be represented by 'None'. - Rs : list-like, optional optional list of values to use for the measurement error covariance; a value of None in any position will cause the filter to use `self.R` for that time step. - + Fs : list-like, optional + optional list of values to use for the update matrix; + a value of None in any position will cause the filter + to use `self.R` for that time step. + Hs : list-like, optional + optional list of values to use for the measurement matrix; + a value of None in any position will cause the filter + to use `self.R` for that time step. + Qs : list-like, optional + optional list of values to use for the process error + covariance; a value of None in any position will cause the filter + to use `self.R` for that time step. update_first : bool, optional, controls whether the order of operations is update followed by predict, or predict followed by update. Default is predict->update. - **Returns** - - means: np.array((n,dim_x,1)) array of the state for each time step after the update. Each entry is an np.array. In other words `means[k,:]` is the state at step `k`. - covariance: np.array((n,dim_x,dim_x)) array of the covariances for each time step after the update. In other words `covariance[k,:,:]` is the covariance at step `k`. - means_predictions: np.array((n,dim_x,1)) array of the state for each time step after the predictions. Each entry is an np.array. In other words `means[k,:]` is the state at step `k`. - covariance_predictions: np.array((n,dim_x,dim_x)) array of the covariances for each time step after the prediction. In other words `covariance[k,:,:]` is the covariance at step `k`. """ n = np.size(zs,0) - if Rs is None: - Rs = [None]*n + if np.all(Rs == None): + Rs = [self.R] * n + if np.all(Fs == None): + Fs = [self.F] * n + if np.all(Hs == None): + Hs = [self.H] * n + if np.all(Qs == None): + Qs = [self.Q] * n # mean estimates from Kalman Filter if self.x.ndim == 1: @@ -361,8 +369,11 @@ def batch_filter(self, zs, Rs=None, update_first=False): covariances_p = zeros((n, self.dim_x, self.dim_x)) if update_first: - for i, (z, r) in enumerate(zip(zs, Rs)): - self.update(z, r) + for i, (z, R, F, H, Q) in enumerate(zip(zs, Rs, Fs, Hs, Qs)): + self.F = F + self.Q = Q + + self.update(z, R, H) means[i,:] = self._x covariances[i,:,:] = self._P @@ -370,12 +381,15 @@ def batch_filter(self, zs, Rs=None, update_first=False): means_p[i,:] = self._x covariances_p[i,:,:] = self._P else: - for i, (z, r) in enumerate(zip(zs, Rs)): + for i, (z, R, F, H, Q) in enumerate(zip(zs, Rs, Fs, Hs, Qs)): + self.F = F + self.Q = Q + self.predict() means_p[i,:] = self._x covariances_p[i,:,:] = self._P - self.update(z, r) + self.update(z, R, H) means[i,:] = self._x covariances[i,:,:] = self._P @@ -383,7 +397,7 @@ def batch_filter(self, zs, Rs=None, update_first=False): - def rts_smoother(self, Xs, Ps, Qs=None): + def rts_smoother(self, Xs, Ps, Fs=None, Qs=None): """ Runs the Rauch-Tung-Striebal Kalman smoother on a set of means and covariances computed by a Kalman filter. The usual input would come from the output of `KalmanFilter.batch_filter()`. @@ -397,11 +411,14 @@ def rts_smoother(self, Xs, Ps, Qs=None): Ps : numpy.array array of the covariances of the output of a kalman filter. - Q : list-like collection of numpy.array, optional + Fs : list-like collection of numpy.array, optional + Update Matrix of the Kalman filter at each time step. Optional, + if not provided the filter's self.F will be used + + Qs : list-like collection of numpy.array, optional Process noise of the Kalman filter at each time step. Optional, if not provided the filter's self.Q will be used - **Returns** 'x' : numpy.ndarray @@ -428,8 +445,9 @@ def rts_smoother(self, Xs, Ps, Qs=None): n = shape[0] dim_x = shape[1] - F = self._F - if not Qs: + if np.all(Fs == None): + Fs = [self.F] * n + if np.all(Qs == None): Qs = [self.Q] * n # smoother gain @@ -438,10 +456,10 @@ def rts_smoother(self, Xs, Ps, Qs=None): x, P = Xs.copy(), Ps.copy() for k in range(n-2,-1,-1): - P_pred = dot3(F, P[k], F.T) + Qs[k] + P_pred = dot3(Fs[k], P[k], Fs[k].T) + Qs[k] - K[k] = dot3(P[k], F.T, linalg.inv(P_pred)) - x[k] += dot (K[k], x[k+1] - dot(F, x[k])) + K[k] = dot3(P[k], Fs[k].T, linalg.inv(P_pred)) + x[k] += dot (K[k], x[k+1] - dot(Fs[k], x[k])) P[k] += dot3 (K[k], P[k+1] - P_pred, K[k].T) return (x, P, K) From fd1afd55f3903f0857f84aecdbd29fce6d232110 Mon Sep 17 00:00:00 2001 From: Brad Cordova Date: Thu, 10 Sep 2015 12:16:31 -0400 Subject: [PATCH 2/2] update predict and batch_filter to be more general --- filterpy/kalman/kalman_filter.py | 122 ++++++++++++++++++++++--------- 1 file changed, 88 insertions(+), 34 deletions(-) diff --git a/filterpy/kalman/kalman_filter.py b/filterpy/kalman/kalman_filter.py index dfca04f..08fd426 100644 --- a/filterpy/kalman/kalman_filter.py +++ b/filterpy/kalman/kalman_filter.py @@ -148,9 +148,9 @@ def update(self, z, R=None, H=None): Optionally provide R to override the measurement noise for this one call, otherwise self.R will be used. - H : np.array, or None + H : np.array, or None Optionally provide H to override the measurement function for this - one call, otherwise self.H will be used. + one call, otherwise self.H will be used. """ @@ -286,7 +286,7 @@ def test_matrix_dimensions(self): self.dim_x, self.dim_x, self._P.shape) - def predict(self, u=0): + def predict(self, u=0, B=None, F=None, Q=None): """ Predict next position using the Kalman filter state propagation equations. @@ -295,66 +295,124 @@ def predict(self, u=0): u : np.array Optional control vector. If non-zero, it is multiplied by B to create the control input into the system. + + B : np.array(dim_x, dim_z), or None + Optional state transition matrix; a value of None in + any position will cause the filter to use `self.B`. + + F : np.array(dim_x, dim_x), or None + Optional state transition matrix; a value of None in + any position will cause the filter to use `self.F`. + + Q : np.array(dim_x, dim_x), scalar, or None + Optional process noise matrix; a value of None in + any position will cause the filter to use `self.Q`. """ + if B is None: + B = self._B + if F is None: + F = self._F + if Q is None: + Q = self._Q + elif isscalar(Q): + Q = eye(self.dim_x) * Q + # x = Fx + Bu - self._x = dot(self._F, self.x) + dot(self._B, u) + self._x = dot(F, self.x) + dot(B, u) # P = FPF' + Q - self._P = self._alpha_sq * dot3(self._F, self._P, self._F.T) + self._Q + self._P = self._alpha_sq * dot3(F, self._P, F.T) + Q - def batch_filter(self, zs, Rs=None, Fs=None, Hs=None, Qs=None, update_first=False): + def batch_filter(self, zs, Fs=None, Qs=None, Hs=None, Rs=None, Bs=None, us=None, update_first=False): """ Batch processes a sequences of measurements. + **Parameters** + zs : list-like list of measurements at each time step `self.dt` Missing measurements must be represented by 'None'. - Rs : list-like, optional - optional list of values to use for the measurement error - covariance; a value of None in any position will cause the filter - to use `self.R` for that time step. + Fs : list-like, optional - optional list of values to use for the update matrix; + optional list of values to use for the state transition matrix matrix; a value of None in any position will cause the filter - to use `self.R` for that time step. + to use `self.F` for that time step. + + Qs : list-like, optional + optional list of values to use for the process error + covariance; a value of None in any position will cause the filter + to use `self.Q` for that time step. + Hs : list-like, optional optional list of values to use for the measurement matrix; a value of None in any position will cause the filter - to use `self.R` for that time step. - Qs : list-like, optional - optional list of values to use for the process error + to use `self.H` for that time step. + + Rs : list-like, optional + optional list of values to use for the measurement error covariance; a value of None in any position will cause the filter to use `self.R` for that time step. + + Bs : list-like, optional + optional list of values to use for the control transition matrix; + a value of None in any position will cause the filter + to use `self.B` for that time step. + + us : list-like, optional + optional list of values to use for the control input vector; + a value of None in any position will cause the filter to use + 0 for that time step. + update_first : bool, optional, controls whether the order of operations is update followed by predict, or predict followed by update. Default is predict->update. + **Returns** + means: np.array((n,dim_x,1)) array of the state for each time step after the update. Each entry is an np.array. In other words `means[k,:]` is the state at step `k`. + covariance: np.array((n,dim_x,dim_x)) array of the covariances for each time step after the update. In other words `covariance[k,:,:]` is the covariance at step `k`. + means_predictions: np.array((n,dim_x,1)) array of the state for each time step after the predictions. Each entry is an np.array. In other words `means[k,:]` is the state at step `k`. + covariance_predictions: np.array((n,dim_x,dim_x)) array of the covariances for each time step after the prediction. In other words `covariance[k,:,:]` is the covariance at step `k`. + + **Example** + + zs = [t + random.randn()*4 for t in range (40)] + Fs = [kf.F for t in range (40)] + Hs = [kf.H for t in range (40)] + + (mu, cov, _, _) = kf.batch_filter(zs, Rs=R_list, Fs=Fs, Hs=Hs, Qs=None, + Bs=None, us=None, update_first=False) + (xs, Ps, Ks) = kf.rts_smoother(mu, cov, Fs=Fs, Qs=None) + """ n = np.size(zs,0) - if np.all(Rs == None): - Rs = [self.R] * n - if np.all(Fs == None): + if Fs is None: Fs = [self.F] * n - if np.all(Hs == None): - Hs = [self.H] * n - if np.all(Qs == None): + if Qs is None: Qs = [self.Q] * n + if Hs is None: + Hs = [self.H] * n + if Rs is None: + Rs = [self.R] * n + if Bs is None: + Bs = [self.B] * n + if us is None: + us = [0] * n # mean estimates from Kalman Filter if self.x.ndim == 1: @@ -369,27 +427,23 @@ def batch_filter(self, zs, Rs=None, Fs=None, Hs=None, Qs=None, update_first=Fals covariances_p = zeros((n, self.dim_x, self.dim_x)) if update_first: - for i, (z, R, F, H, Q) in enumerate(zip(zs, Rs, Fs, Hs, Qs)): - self.F = F - self.Q = Q + for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)): - self.update(z, R, H) + self.update(z, R=R, H=H) means[i,:] = self._x covariances[i,:,:] = self._P - self.predict() + self.predict(u=u, B=B, F=F, Q=Q) means_p[i,:] = self._x covariances_p[i,:,:] = self._P else: - for i, (z, R, F, H, Q) in enumerate(zip(zs, Rs, Fs, Hs, Qs)): - self.F = F - self.Q = Q + for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)): - self.predict() + self.predict(u=u, B=B, F=F, Q=Q) means_p[i,:] = self._x covariances_p[i,:,:] = self._P - self.update(z, R, H) + self.update(z, R=R, H=H) means[i,:] = self._x covariances[i,:,:] = self._P @@ -436,7 +490,7 @@ def rts_smoother(self, Xs, Ps, Fs=None, Qs=None): zs = [t + random.randn()*4 for t in range (40)] (mu, cov, _, _) = kalman.batch_filter(zs) - (x, P, K) = rts_smoother(mu, cov, fk.F, fk.Q) + (x, P, K) = rts_smoother(mu, cov, kf.F, kf.Q) """ @@ -445,9 +499,9 @@ def rts_smoother(self, Xs, Ps, Fs=None, Qs=None): n = shape[0] dim_x = shape[1] - if np.all(Fs == None): + if Fs is None: Fs = [self.F] * n - if np.all(Qs == None): + if Qs is None: Qs = [self.Q] * n # smoother gain