-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathfilters.py
135 lines (113 loc) · 3.84 KB
/
filters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import random
class BaseFilter(object):
"""
Keep a history of the added values and calcul the average and the standard
deviation for filter based on it.
"""
def __init__(self, size):
super(BaseFilter, self).__init__()
self.size = size
self.hist = []
self.sum = 0
self.sq_sum = 0
def empty(self):
self.hist = []
self.sum = 0
self.sq_sum = 0
def around(self, origin, distance):
return abs(self.value - origin) <= distance
@property
def avg(self):
if len(self.hist) == 0:
return 0
return self.sum * 1.0 / len(self.hist)
@property
def std(self):
if len(self.hist) == 0:
return 0
return (self.sq_sum - self.sum) * 1.0 / len(self.hist)
@property
def derivative(self):
if len(self.hist) == 0:
return 0
return (self.value - self.hist[0]) * 1.0 / 10
@property
def value(self):
if not self.hist:
return 0
return self.hist[-1]
def add_value(self, value):
self.sum += value
self.sq_sum += value ** 2
self.hist += [value]
if len(self.hist) > self.size:
self.sum -= self.hist[0]
self.sq_sum -= self.hist[0] ** 2
self.hist = self.hist[1:]
class MixedFilter(object):
"""
Decorate the *BaseFilter* class for using a list of different filters.
When a value is added, the initial value is put inside the first filter of
the list and then the filtered value is added in the second, ... to the
last filter.
"""
def __init__(self, filters):
assert len(filters) != 0 # Happy JM ?
self.filters = filters
def empty(self):
for f in self.filters:
f.empty()
def around(self, origin, distance):
return self.filters[-1].around(origin, distance)
@property
def avg(self):
return self.filters[-1].avg
@property
def std(self):
return self.filters[-1].std
@property
def derivative(self):
return self.filters[-1].derivative
@property
def value(self):
return self.filters[-1].value
def add_value(self, value):
for f in self.filters:
f.add_value(value)
value = f.value
class LowpassFilter(BaseFilter):
def __init__(self, alpha):
BaseFilter.__init__(self, 2)
self.alpha = alpha
def add_value(self, value):
"""
Add the new value in the history after applying a simple lowpass filter
on it.
The lowpass filter affects only the new and the last element. It does a
ponderated average between them with a weight of *alpha*for the last
value and *1 - alpha* for the new.
"""
if len(self.hist) < 2:
BaseFilter.add_value(self, value)
else:
filtered_value = self.hist[-1] * self.alpha + value * (1.0 - self.alpha)
BaseFilter.add_value(self, filtered_value)
class NoiseFilter(BaseFilter):
def __init__(self, deviation_scale, deviation_offset, size):
BaseFilter.__init__(self, size)
self.deviation_scale = deviation_scale
self.deviation_offset = deviation_offset
def add_value(self, value):
"""
Add the new value in the history after applying a noise filter
on it.
Calculate the distance between the new value and the average. If this
one is less than the squared standard deviation times *deviatin_scale*
then the value is filtered and the current average is added instead.
Else, the new value is added.
"""
if (value - self.avg) ** 2 > \
self.deviation_scale * (self.std + self.deviation_offset):
BaseFilter.add_value(self, self.avg)
else:
BaseFilter.add_value(self, value)