Skip to content

Commit

Permalink
Add bulk sensor sampling feature (#197)
Browse files Browse the repository at this point in the history
* Start adding bulk sensor sampling feature

Setting the sampling strategies on KATCP devices with large numbers
of sensors is slow.  E.g. a device with 10k sensors can take more than
a minute, doing it one sensor at time.  Setting the strategies in
bulk requires less roundtrips, making it quicker.

This feature is not enabled by default.

Still WIP.  Tests must be added.

JIRA: CB-2748

* Add bulk sampling flag to ProtocolFlags docstring

* Fix some linting issues

Linter is a bit stricter, 3 years later.

JIRA: MT-1862

* Add bulk sampling tests for ProtocolFlags

JIRA: MT-1862

* Fix the reply parameters for 'differential-rate' strategy.

* Add 'sensor-sampling' test for protocol 5.x

* Simplify 'sensor-sampling' test

Do not access private strategy variables to reduce test coupling.

* Update ?sensor-sampling Docstring.

In preparation for bulk sensor sampling implementation.

* Add new tests for DeviceServer5.1

Update bulk sensor handlind method for list of sensors separated by comma.

* Update bulk_sensor_sampling method to handle list of sensors

* adding unittests for bulk sensor sampling

* Fix indentation issues in test_server

JIRA: MT-1862

* Add tests to check for invalid strategy, invalid strategy and parameters

* Add new test to check bulk sensor sampling fails for invalid strategy and invalid parameters

Fix request naming typo

* Fix parameter list to mock request

* improve ?sensor-sampling error message for multi-sensor query

* improve ?sensor-sampling error message for multi-sensor query

* Handle python3 byte-strings

* Add test to check various parameters on sensors individually

* Use latest version of coverage

* address PR comments

* fix typo

* Fix typos in in-line comments and compare strategies properly in test

* lint diff

* Fix example informs for sensor-status and value

Status field was missing.

Co-authored-by: Mpho Mphego <[email protected]>
Co-authored-by: amakhaba <[email protected]>
Co-authored-by: Buntu <[email protected]>
Co-authored-by: lanceWilliams <[email protected]>
  • Loading branch information
5 people authored Oct 29, 2020
1 parent f3060e9 commit 5fa8a6a
Show file tree
Hide file tree
Showing 9 changed files with 429 additions and 44 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ target/
# install all needed dependencies.
#Pipfile.lock

# virtualenv
/venv*

# celery beat schedule file
celerybeat-schedule

Expand Down
16 changes: 12 additions & 4 deletions katcp/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ class ProtocolFlags(object):
* M - server supports multiple clients
* I - server supports message identifiers
* T - server provides request timeout hints via ?request-timeout-hint
* B - server supports bulk list of sensors to ?sensor-sampling
Parameters
----------
Expand Down Expand Up @@ -657,9 +658,8 @@ class ProtocolFlags(object):

MULTI_CLIENT = b'M'
MESSAGE_IDS = b'I'
# New proposal flag to indicate that a device supports ?request-timeout-hint
# See CB-2051
REQUEST_TIMEOUT_HINTS = b'T'
BULK_SET_SENSOR_SAMPLING = b'B'

STRATEGIES_V4 = frozenset([b'none', b'auto', b'period', b'event',
b'differential'])
Expand All @@ -672,6 +672,7 @@ class ProtocolFlags(object):
}

REQUEST_TIMEOUT_HINTS_MIN_VERSION = (5, 1)
BULK_SET_SENSOR_SAMPLING_MIN_VERSION = (5, 1)

def __init__(self, major, minor, flags):
# PY2 and Py3 compatibility.
Expand All @@ -682,15 +683,22 @@ def __init__(self, major, minor, flags):
self.multi_client = self.MULTI_CLIENT in self.flags
self.message_ids = self.MESSAGE_IDS in self.flags
self.request_timeout_hints = self.REQUEST_TIMEOUT_HINTS in self.flags
self.bulk_set_sensor_sampling = self.BULK_SET_SENSOR_SAMPLING in self.flags
if self.message_ids and self.major < MID_KATCP_MAJOR:
raise ValueError(
'MESSAGE_IDS is only supported in katcp v5 and newer')
version_supports_hints = ((self.major, self.minor) >=
self.REQUEST_TIMEOUT_HINTS_MIN_VERSION)
version_supports_hints = ((self.major, self.minor)
>= self.REQUEST_TIMEOUT_HINTS_MIN_VERSION)
if self.request_timeout_hints and not version_supports_hints:
raise ValueError(
'REQUEST_TIMEOUT_HINTS only suported in katcp v{}.{} and newer'
.format(*self.REQUEST_TIMEOUT_HINTS_MIN_VERSION))
version_supports_bulk = ((self.major, self.minor)
>= self.BULK_SET_SENSOR_SAMPLING_MIN_VERSION)
if self.bulk_set_sensor_sampling and not version_supports_bulk:
raise ValueError(
'BULK_SET_SENSOR_SAMPLING only suported in katcp v{}.{} and newer'
.format(*self.BULK_SET_SENSOR_SAMPLING_MIN_VERSION))

def strategy_allowed(self, strategy):
return strategy in self.STRATEGIES_ALLOWED_BY_MAJOR_VERSION[self.major]
Expand Down
4 changes: 4 additions & 0 deletions katcp/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,10 @@ def __init__(self, inform_callback, sensor, *params, **kwargs):
raise ValueError("The 'differential-rate' strategy takes three parameters.")
super(SampleDifferentialRate, self).__init__(inform_callback, sensor,
*params[1:], **kwargs)
# Fix up the parameters so we do not lose the params that are not
# passed to SampleEventRate
self._params = params

difference = params[0]
if sensor.stype == 'integer':
difference = int(difference)
Expand Down
156 changes: 139 additions & 17 deletions katcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2164,13 +2164,13 @@ def request_sensor_value(self, req, msg):
::
?sensor-value
#sensor-value 1244631611.415231 1 psu.voltage 4.5
#sensor-value 1244631611.415200 1 cpu.status off
#sensor-value 1244631611.415231 1 psu.voltage nominal 4.5
#sensor-value 1244631611.415200 1 cpu.status nominal off
...
!sensor-value ok 5
?sensor-value cpu.power.on
#sensor-value 1244631611.415231 1 cpu.power.on 0
#sensor-value 1244631611.415231 1 cpu.power.on nominal 0
!sensor-value ok 1
"""
Expand All @@ -2189,19 +2189,28 @@ def request_sensor_value(self, req, msg):
return req.make_reply("ok", str(len(sensors)))

def request_sensor_sampling(self, req, msg):
"""Configure or query the way a sensor is sampled.
r"""Configure or query the way a sensor is sampled.
Sampled values are reported asynchronously using the #sensor-status
message.
Parameters
----------
name : str
Name of the sensor whose sampling strategy to query or configure.
strategy : {'none', 'auto', 'event', 'differential', \
names : str
One or more names of sensors whose sampling strategy will be queried
or configured. If specifying multiple sensors, these must be provided as a
comma-separated list.
A query can only be done on a single sensor. However, configuration
can be done on many sensors with a single request, as long as they
all use the same strategy.
Note: prior to KATCP v5.1 only a single sensor could be configured.
Multiple sensors are only allowed if the device server sets the protocol
version to KATCP v5.1 or higher and enables the BULK_SET_SENSOR_SAMPLING
flag in its PROTOCOL_INFO class attribute.
strategy : {'none', 'auto', 'event', 'differential', 'differential-rate',
'period', 'event-rate'}, optional
Type of strategy to use to report the sensor value. The
differential strategy type may only be used with integer or float
differential strategy types may only be used with integer or float
sensors. If this parameter is supplied, it sets the new strategy.
params : list of str, optional
Additional strategy parameters (dependent on the strategy type).
Expand All @@ -2217,35 +2226,75 @@ def request_sensor_sampling(self, req, msg):
given. If the event occurs more than once within the minimum period,
only one update will occur. Whether or not the event occurs, the
sensor value will be updated at least once per maximum period.
The differential-rate strategy is not supported in this release.
For the differential-rate strategy there are 3 parameters. The first is the
same as the differential strategy parameter. The second and third are the
minimum and maximum periods, respectively, as with the event-rate strategy.
Informs
-------
timestamp : float
Timestamp of the sensor reading in seconds since the Unix
epoch, or milliseconds for katcp versions <= 4.
count : {1}
Number of sensors described in this #sensor-status inform. Will
always be one. It exists to keep this inform compatible with
#sensor-value.
name : str
Name of the sensor whose value is being reported.
value : object
Value of the named sensor. Type depends on the type of the sensor.
Returns
-------
success : {'ok', 'fail'}
Whether the sensor-sampling request succeeded.
name : str
Name of the sensor queried or configured.
strategy : {'none', 'auto', 'event', 'differential', 'period'}
Name of the new or current sampling strategy for the sensor.
names : str
Name(s) of the sensor queried or configured. If multiple sensors, this will
be a comma-separated list.
strategy : {'none', 'auto', 'event', 'differential', 'differential-rate',
'period', 'event-rate'}.
Name of the new or current sampling strategy for the sensor(s).
params : list of str
Additional strategy parameters (see description under Parameters).
Examples
--------
::
?sensor-sampling cpu.power.on
!sensor-sampling ok cpu.power.on none
?sensor-sampling cpu.power.on period 500
!sensor-sampling ok cpu.power.on period 500
?sensor-sampling cpu.power.on period 0.5
#sensor-status 1244631611.415231 1 cpu.power.on nominal 1
!sensor-sampling ok cpu.power.on period 0.5
if BULK_SET_SENSOR_SAMPLING is enabled then:
?sensor-sampling cpu.power.on,fan.speed
!sensor-sampling fail Cannot\_query\_multiple\_sensors
?sensor-sampling cpu.power.on,fan.speed period 0.5
#sensor-status 1244631611.415231 1 cpu.power.on nominal 1
#sensor-status 1244631611.415200 1 fan.speed nominal 10.0
!sensor-sampling ok cpu.power.on,fan.speed period 0.5
"""
if self._is_bulk_sampling_required(msg.arguments):
handler = self._handle_bulk_set_sensor_sampling
else:
handler = self._handle_sensor_sampling
f = Future()
self.ioloop.add_callback(lambda: chain_future(
self._handle_sensor_sampling(req, msg), f))
handler(req, msg), f))
return f

def _is_bulk_sampling_required(self, msg_args):
bulk_sampling_enabled = self.PROTOCOL_INFO.bulk_set_sensor_sampling
multiple_sensors_provided = b',' in msg_args[0] if msg_args else False
strategy_provided = len(msg_args) > 1
return (
bulk_sampling_enabled and (multiple_sensors_provided or strategy_provided)
)

@gen.coroutine
def _handle_sensor_sampling(self, req, msg):
if not msg.arguments:
Expand Down Expand Up @@ -2318,6 +2367,79 @@ def inform_callback(sensor, reading):
yield gen.moment
raise gen.Return(req.make_reply("ok", name, strategy, *params))

@gen.coroutine
def _handle_bulk_set_sensor_sampling(self, req, msg):
if not msg.arguments:
raise FailReply("No sensor name given.")

# The client connection that is not specific to this request context
client = req.client_connection

def inform_callback(sensor, reading):
"""Inform callback for sensor strategy."""
timestamp, status, value = reading
cb_msg = format_inform_v5(sensor, timestamp, status, value)
client.inform(cb_msg)

names_arg = ensure_native_str(msg.arguments[0])
names = names_arg.split(',')

if len(msg.arguments) <= 1:
raise FailReply("Cannot query multiple sensors.")

strategy = msg.arguments[1]
params = msg.arguments[2:]

invalid_names = []
for name in names:
if name not in self._sensors:
invalid_names.append(name)
if invalid_names:
raise FailReply("Unknown sensor names: %r." % invalid_names)

requested_strategies = {}
try:
for name in names:
sensor = self._sensors[name]
if self.PROTOCOL_INFO.strategy_allowed(strategy):
requested_strategy = SampleStrategy.get_strategy(
strategy, inform_callback, sensor, *params, ioloop=self.ioloop)
requested_strategies[name] = requested_strategy
except Exception as e:
raise FailReply("Invalid strategy requested: %s." % e)
if not requested_strategies:
raise FailReply(
"Invalid request: names %r, strategy %r, params %r." % (
names,
ensure_native_str(strategy),
[ensure_native_str(p) for p in params]
)
)

for name in names:
sensor = self._sensors[name]

# Remove and cancel old strategy
old_strategy = self._strategies[client].pop(sensor, None)
if old_strategy:
old_strategy.cancel()

# Replace with new strategy
new_strategy = requested_strategies[name]
if not isinstance(new_strategy, SampleNone):
self._strategies[client][sensor] = new_strategy
new_strategy.start()

# Let the ioloop run so that the #sensor-status informs are sent before
# the reply. Not strictly neccesary, but a number of tests depend on
# this behaviour, less effort to fix it here :-/
yield gen.moment

strategy, params = requested_strategy.get_sampling_formatted()
reply_args = [",".join(names)] + [strategy] + params
raise gen.Return(req.make_reply("ok", *reply_args))


@request()
@return_reply()
def request_sensor_sampling_clear(self, req):
Expand Down
61 changes: 45 additions & 16 deletions katcp/test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,38 +322,67 @@ def test_parse_version(self):
PF = katcp.ProtocolFlags
self.assertEqual(PF.parse_version(b"foo"), PF(None, None, set()))
self.assertEqual(PF.parse_version(b"1.0"), PF(1, 0, set()))
self.assertEqual(PF.parse_version(b"5.0-MI"),
PF(5, 0, set([PF.MULTI_CLIENT, PF.MESSAGE_IDS])))
self.assertEqual(
PF.parse_version(b"5.0-MI"),
PF(5, 0, {PF.MULTI_CLIENT, PF.MESSAGE_IDS})
)
# check an unknown flag
self.assertEqual(PF.parse_version(b"5.1-MIU"),
PF(5, 1, set([PF.MULTI_CLIENT, PF.MESSAGE_IDS, b"U"])))
self.assertEqual(
PF.parse_version(b"5.1-MIU"),
PF(5, 1, {PF.MULTI_CLIENT, PF.MESSAGE_IDS, b"U"})
)
# Check request timeout hint flag
self.assertEqual(PF.parse_version(b"5.1-MTI"),
PF(5, 1, set([PF.MULTI_CLIENT, PF.MESSAGE_IDS,
PF.REQUEST_TIMEOUT_HINTS])))
self.assertEqual(
PF.parse_version(b"5.1-MTI"),
PF(5, 1, {PF.MULTI_CLIENT, PF.MESSAGE_IDS, PF.REQUEST_TIMEOUT_HINTS})
)
# Check bulk sensor sampling flag
self.assertEqual(
PF.parse_version(b"5.1-MIB"),
PF(5, 1, {PF.MULTI_CLIENT, PF.MESSAGE_IDS, PF.BULK_SET_SENSOR_SAMPLING})
)

def test_str(self):
PF = katcp.ProtocolFlags
self.assertEqual(str(PF(1, 0, set())), "1.0")
self.assertEqual(str(PF(5, 0, set([PF.MULTI_CLIENT, PF.MESSAGE_IDS]))),
"5.0-IM")
self.assertEqual(str(PF(5, 0, set([PF.MULTI_CLIENT, PF.MESSAGE_IDS,
b"U"]))),
"5.0-IMU")
self.assertEqual(str(PF(5, 1, set([PF.MULTI_CLIENT, PF.MESSAGE_IDS,
PF.REQUEST_TIMEOUT_HINTS]))),
"5.1-IMT")
self.assertEqual(
str(PF(5, 0, {PF.MULTI_CLIENT, PF.MESSAGE_IDS})),
"5.0-IM"
)
self.assertEqual(
str(PF(5, 0, {PF.MULTI_CLIENT, PF.MESSAGE_IDS, b"U"})),
"5.0-IMU"
)
self.assertEqual(
str(PF(5, 1, {PF.MULTI_CLIENT, PF.MESSAGE_IDS, PF.REQUEST_TIMEOUT_HINTS})),
"5.1-IMT"
)
self.assertEqual(
str(PF(5, 1, {PF.MULTI_CLIENT, PF.MESSAGE_IDS, PF.BULK_SET_SENSOR_SAMPLING})),
"5.1-BIM"
)

def test_incompatible_options(self):
PF = katcp.ProtocolFlags
# Katcp v4 and below don't support message ids
with self.assertRaises(ValueError):
PF(4, 0, [PF.MESSAGE_IDS])

# Katcp v5 and below don't support (proposed) timeout hint flag
# Katcp v5.0 and below don't support timeout hint flag
with self.assertRaises(ValueError):
PF(5, 0, [PF.REQUEST_TIMEOUT_HINTS])

# Katcp v5.0 and below don't support bulk sensor sampling flag
with self.assertRaises(ValueError):
PF(5, 0, [PF.BULK_SET_SENSOR_SAMPLING])

def test_supports(self):
PF = katcp.ProtocolFlags
DUT = PF(5, 0, {b"A", b"Z"})
self.assertTrue(DUT.supports(b"A"))
self.assertTrue(DUT.supports(b"Z"))
self.assertFalse(DUT.supports(b"X"))


class TestSensor(unittest.TestCase):

Expand Down
5 changes: 2 additions & 3 deletions katcp/test/test_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,16 +349,15 @@ def test_event(self):

@tornado.testing.gen_test(timeout=200)
def test_differential_rate(self):
delta = 2
shortest = 1.5
longest = 4.5
delta = 2
t, status, value = self.sensor.read()
# TODO change to be differentialrate test

DUT = sampling.SampleDifferentialRate(
self.inform, self.sensor, delta, shortest, longest)
self.assertEqual(
DUT.get_sampling_formatted(), (b'differential-rate', [b'1.5', b'4.5']))
DUT.get_sampling_formatted(), (b'differential-rate', [b'2', b'1.5', b'4.5']))
self.assertEqual(len(self.calls), 0)
DUT.start()
yield self.wake_ioloop()
Expand Down
Loading

0 comments on commit 5fa8a6a

Please sign in to comment.