Skip to content

Commit

Permalink
Merge pull request rlabbe#23 from Censio/09-09-2015-brad-update-kf-ba…
Browse files Browse the repository at this point in the history
…tch-mode

update batch_mode to handle non constant matrices
  • Loading branch information
rlabbe committed Sep 26, 2015
2 parents 69884c5 + fd1afd5 commit 6419758
Showing 1 changed file with 95 additions and 23 deletions.
118 changes: 95 additions & 23 deletions filterpy/kalman/kalman_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ def update(self, z, R=None, H=None):
Optionally provide R to override the measurement noise for this
one call, otherwise self.R will be used.
H : np.array, or None
H : np.array, or None
Optionally provide H to override the measurement function for this
one call, otherwise self.H will be used.
one call, otherwise self.H will be used.
"""

Expand Down Expand Up @@ -286,7 +286,7 @@ def test_matrix_dimensions(self):
self.dim_x, self.dim_x, self._P.shape)


def predict(self, u=0):
def predict(self, u=0, B=None, F=None, Q=None):
""" Predict next position using the Kalman filter state propagation
equations.
Expand All @@ -295,16 +295,37 @@ def predict(self, u=0):
u : np.array
Optional control vector. If non-zero, it is multiplied by B
to create the control input into the system.
B : np.array(dim_x, dim_z), or None
Optional state transition matrix; a value of None in
any position will cause the filter to use `self.B`.
F : np.array(dim_x, dim_x), or None
Optional state transition matrix; a value of None in
any position will cause the filter to use `self.F`.
Q : np.array(dim_x, dim_x), scalar, or None
Optional process noise matrix; a value of None in
any position will cause the filter to use `self.Q`.
"""

if B is None:
B = self._B
if F is None:
F = self._F
if Q is None:
Q = self._Q
elif isscalar(Q):
Q = eye(self.dim_x) * Q

# x = Fx + Bu
self._x = dot(self._F, self.x) + dot(self._B, u)
self._x = dot(F, self.x) + dot(B, u)

# P = FPF' + Q
self._P = self._alpha_sq * dot3(self._F, self._P, self._F.T) + self._Q
self._P = self._alpha_sq * dot3(F, self._P, F.T) + Q


def batch_filter(self, zs, Rs=None, update_first=False):
def batch_filter(self, zs, Fs=None, Qs=None, Hs=None, Rs=None, Bs=None, us=None, update_first=False):
""" Batch processes a sequences of measurements.
**Parameters**
Expand All @@ -313,18 +334,42 @@ def batch_filter(self, zs, Rs=None, update_first=False):
list of measurements at each time step `self.dt` Missing
measurements must be represented by 'None'.
Fs : list-like, optional
optional list of values to use for the state transition matrix matrix;
a value of None in any position will cause the filter
to use `self.F` for that time step.
Qs : list-like, optional
optional list of values to use for the process error
covariance; a value of None in any position will cause the filter
to use `self.Q` for that time step.
Hs : list-like, optional
optional list of values to use for the measurement matrix;
a value of None in any position will cause the filter
to use `self.H` for that time step.
Rs : list-like, optional
optional list of values to use for the measurement error
covariance; a value of None in any position will cause the filter
to use `self.R` for that time step.
Bs : list-like, optional
optional list of values to use for the control transition matrix;
a value of None in any position will cause the filter
to use `self.B` for that time step.
us : list-like, optional
optional list of values to use for the control input vector;
a value of None in any position will cause the filter to use
0 for that time step.
update_first : bool, optional,
controls whether the order of operations is update followed by
predict, or predict followed by update. Default is predict->update.
**Returns**
means: np.array((n,dim_x,1))
array of the state for each time step after the update. Each entry
is an np.array. In other words `means[k,:]` is the state at step
Expand All @@ -342,11 +387,32 @@ def batch_filter(self, zs, Rs=None, update_first=False):
covariance_predictions: np.array((n,dim_x,dim_x))
array of the covariances for each time step after the prediction.
In other words `covariance[k,:,:]` is the covariance at step `k`.
**Example**
zs = [t + random.randn()*4 for t in range (40)]
Fs = [kf.F for t in range (40)]
Hs = [kf.H for t in range (40)]
(mu, cov, _, _) = kf.batch_filter(zs, Rs=R_list, Fs=Fs, Hs=Hs, Qs=None,
Bs=None, us=None, update_first=False)
(xs, Ps, Ks) = kf.rts_smoother(mu, cov, Fs=Fs, Qs=None)
"""

n = np.size(zs,0)
if Fs is None:
Fs = [self.F] * n
if Qs is None:
Qs = [self.Q] * n
if Hs is None:
Hs = [self.H] * n
if Rs is None:
Rs = [None]*n
Rs = [self.R] * n
if Bs is None:
Bs = [self.B] * n
if us is None:
us = [0] * n

# mean estimates from Kalman Filter
if self.x.ndim == 1:
Expand All @@ -361,29 +427,31 @@ def batch_filter(self, zs, Rs=None, update_first=False):
covariances_p = zeros((n, self.dim_x, self.dim_x))

if update_first:
for i, (z, r) in enumerate(zip(zs, Rs)):
self.update(z, r)
for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)):

self.update(z, R=R, H=H)
means[i,:] = self._x
covariances[i,:,:] = self._P

self.predict()
self.predict(u=u, B=B, F=F, Q=Q)
means_p[i,:] = self._x
covariances_p[i,:,:] = self._P
else:
for i, (z, r) in enumerate(zip(zs, Rs)):
self.predict()
for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)):

self.predict(u=u, B=B, F=F, Q=Q)
means_p[i,:] = self._x
covariances_p[i,:,:] = self._P

self.update(z, r)
self.update(z, R=R, H=H)
means[i,:] = self._x
covariances[i,:,:] = self._P

return (means, covariances, means_p, covariances_p)



def rts_smoother(self, Xs, Ps, Qs=None):
def rts_smoother(self, Xs, Ps, Fs=None, Qs=None):
""" Runs the Rauch-Tung-Striebal Kalman smoother on a set of
means and covariances computed by a Kalman filter. The usual input
would come from the output of `KalmanFilter.batch_filter()`.
Expand All @@ -397,11 +465,14 @@ def rts_smoother(self, Xs, Ps, Qs=None):
Ps : numpy.array
array of the covariances of the output of a kalman filter.
Q : list-like collection of numpy.array, optional
Fs : list-like collection of numpy.array, optional
Update Matrix of the Kalman filter at each time step. Optional,
if not provided the filter's self.F will be used
Qs : list-like collection of numpy.array, optional
Process noise of the Kalman filter at each time step. Optional,
if not provided the filter's self.Q will be used
**Returns**
'x' : numpy.ndarray
Expand All @@ -419,7 +490,7 @@ def rts_smoother(self, Xs, Ps, Qs=None):
zs = [t + random.randn()*4 for t in range (40)]
(mu, cov, _, _) = kalman.batch_filter(zs)
(x, P, K) = rts_smoother(mu, cov, fk.F, fk.Q)
(x, P, K) = rts_smoother(mu, cov, kf.F, kf.Q)
"""

Expand All @@ -428,8 +499,9 @@ def rts_smoother(self, Xs, Ps, Qs=None):
n = shape[0]
dim_x = shape[1]

F = self._F
if not Qs:
if Fs is None:
Fs = [self.F] * n
if Qs is None:
Qs = [self.Q] * n

# smoother gain
Expand All @@ -438,10 +510,10 @@ def rts_smoother(self, Xs, Ps, Qs=None):
x, P = Xs.copy(), Ps.copy()

for k in range(n-2,-1,-1):
P_pred = dot3(F, P[k], F.T) + Qs[k]
P_pred = dot3(Fs[k], P[k], Fs[k].T) + Qs[k]

K[k] = dot3(P[k], F.T, linalg.inv(P_pred))
x[k] += dot (K[k], x[k+1] - dot(F, x[k]))
K[k] = dot3(P[k], Fs[k].T, linalg.inv(P_pred))
x[k] += dot (K[k], x[k+1] - dot(Fs[k], x[k]))
P[k] += dot3 (K[k], P[k+1] - P_pred, K[k].T)

return (x, P, K)
Expand Down

0 comments on commit 6419758

Please sign in to comment.