Skip to content

Commit

Permalink
Merge pull request #6 from dovf/master
Browse files Browse the repository at this point in the history
SerialMonitor: allow searching for (and reacting to) arbitrary patterns on the serial output
  • Loading branch information
BinyaminSharet committed Apr 6, 2016
2 parents 585a1c7 + 7580b10 commit 6838c77
Showing 1 changed file with 34 additions and 14 deletions.
48 changes: 34 additions & 14 deletions katnip/monitors/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ def __init__(self, name, dev_name=None, baudrate=115200,
:param logger: logger for the monitor object
'''
super(SerialMonitor, self).__init__(name, logger)
self.success_pattern = None
self.success_pattern_str = None
self.failure_pattern = None
self.failure_pattern_str = None
self.pattern_cbs = []
self.dev_name = dev_name
self.baudrate = baudrate
self.serial = None
Expand All @@ -92,25 +89,47 @@ def teardown(self):
if self.fd is not None:
self.fd.close()

def set_success_pattern(self, success_pattern):
def add_pattern_callback(self, pattern, cb):
'''
Add a pattern to search for on the serial output, and the callback that
will be called when the pattern is found.
:type pattern: str
:param pattern: regular expression pattern to be searched for in the serial output
:type cb: callable
:param cb: the callback to be called when pattern is found; must accept 3 params:
(1) a SerialMonitor instance
(2) the matching line
(3) the re match object of the found match
'''
self.pattern_cbs.append((re.compile(pattern), cb))

def add_success_pattern(self, success_pattern):
'''
Set a pattern that declares the test successful if received
:type success_pattern: str
:param success_pattern: regular expression pattern of output that signifies success (e.g. no bug there)
'''
self.success_pattern = re.compile(success_pattern)
self.success_pattern_str = success_pattern
def success_cb(self, line, match):
self.report.success()
self.add_pattern_callback(success_pattern, success_cb)

def set_failure_pattern(self, failure_pattern):
def set_success_pattern(self, success_pattern):
return self.add_success_pattern(success_pattern)

def add_failure_pattern(self, failure_pattern):
'''
Set a pattern that declares the test as failed if received
:type failure_pattern: str
:param failure_pattern: regular expression pattern of output that signifies failure (e.g. potential bug there)
'''
self.failure_pattern = re.compile(failure_pattern)
self.failure_pattern_str = failure_pattern
def failure_cb(self, line, match):
self.report.failed('failure pattern [%s] matched line [%s]' % (match.re.pattern, line))
self.add_pattern_callback(failure_pattern, failure_cb)

def set_failure_pattern(self, failure_pattern):
return self.add_failure_pattern(failure_pattern)

def close_fd(self):
if self.fd is not None:
Expand Down Expand Up @@ -150,11 +169,12 @@ def _monitor_func(self):
'''
line = self.serial.readline()
if line:
if self.failure_pattern and self.failure_pattern.search(line):
self.report.failed('failure pattern [%s] matched line [%s]' % (self.failure_pattern_str, line))
if self.success_pattern and self.success_pattern.search(line):
self.report.success()
for pattern, cb in self.pattern_cbs:
match = pattern.search(line)
if match:
cb(self, line, match)
self.fdlock.acquire()
if self.fd is not None:
self.fd.write(line)
self.fdlock.release()

0 comments on commit 6838c77

Please sign in to comment.