Skip to content

Commit

Permalink
Add tests, vend queue
Browse files Browse the repository at this point in the history
Adds unittests for the vending functions, makes vends get queued into a
workthread, making them nonblocking

Resolves #2
  • Loading branch information
abrandemuehl committed Mar 23, 2017
1 parent d5ce3c2 commit badc4c2
Show file tree
Hide file tree
Showing 6 changed files with 537 additions and 39 deletions.
6 changes: 3 additions & 3 deletions machine_controller/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
from vend import Merch

app = Flask(__name__)
merch = Merch()
merch = Merch.Instance()


@app.route('/vend', methods=['POST'])
def hello_world():
def vend():
if 'item' not in request.args:
abort(400)
item = request.args['item']
item = request.args.getlist('item')
merch.vend(item[0], item[1])
return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}

Expand Down
161 changes: 161 additions & 0 deletions machine_controller/mockgpio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import copy



class MockGPIO:
# From https://github.com/TCAllen07/raspi-device-mocks
# Map format is <BCM-#>: <BOARD-#>
bcm_board_map = { 2: 3,
3: 5, 4: 7, 14: 8, 15: 10, 17: 11,
18: 12, 27: 13, 22: 15, 23: 16, 24: 18,
10: 19, 9: 21, 25: 22, 11: 23, 8: 24,
7: 26, 5: 29, 6: 31, 12: 32, 13: 33,
19: 35, 16: 36, 26: 37, 20: 38, 21: 40}

# Map format is <BOARD-#>: <BCM-#>
gpio_board_map = { 3: 2,
5: 3, 7: 4, 8: 14, 10: 15, 11: 17,
12: 18, 13: 27, 15: 22, 16: 23, 18: 24,
19: 10, 21: 9, 22: 25, 23: 11, 24: 8,
26: 7, 29: 5, 31: 6, 32: 12, 33: 13,
35: 19, 36: 16, 37: 26, 38: 20, 40: 21}



LOW = 0
HIGH = 1

BCM = 11
BOARD = 10

OUT = 0
IN = 1

PUD_OFF = 20
PUD_DOWN = 21
PUD_UP = 22

# Indexed by board pin number
gpio_direction = {k: 1 for k in bcm_board_map.values()}
gpio_values = {}

def __init__(self):
self.mode = -1

self.setmode_run = False
self.setup_run = False

self.states = []



def setmode(self, mode):
if mode not in (self.BCM, self.BOARD):
raise ValueError("An invalid mode was passed to setmode()")
self.mode = mode
self.setmode_run = True

def getmode(self):
return self.mode


def __pin_validate(self, pin):
if self.mode == self.BCM:
if pin not in self.bcm_board_map.keys():
raise ValueError('Pin is invalid')
elif self.mode == self.BOARD:
if pin not in self.gpio_board_map.keys():
raise ValueError('Pin is invalid')
else:
raise ValueError('Setup has not been called yet')




def output(self, pins, value):
if not hasattr(pins, '__iter__'):
pins = [pins, ]
for pin in pins:
self.__pin_validate(pin)

if value not in (self.HIGH, self.LOW):
raise ValueError('An invalid value was passed to output()')

if not self.setmode_run:
raise RuntimeError('output() was called before setmode()')
if not self.setup_run:
raise RuntimeError('output() was called before setup()')

for pin in pins:
self.gpio_values[pin] = value
self.states.append(copy.deepcopy(self.gpio_values))


def input(self, pins):
if not hasattr(pins, '__iter__'):
pins = [pins, ]
for pin in pins:
self.__pin_validate(pin)

if not self.setmode_run:
raise RuntimeError('input() was called before setmode()')
if not self.setup_run:
raise RuntimeError('input() was called before setup()')

def gpio_function(self, pin):
self.__pin_validate(pin)
if not self.setmode_run:
raise RuntimeError('gpio_function() was called before setmode()')
if self.mode == self.BCM:
return self.gpio_direction[self.bcm_board_map[pin]]
else:
return self.gpio_direction[pin]

def cleanup(self):
self.setup_run = False
self.setmode_run = False
self.mode = -1

for pin in self.gpio_direction:
self.gpio_direction[pin] = self.IN

def setup(self, pins, direction, pull_up_down=None, initial=None):
if not hasattr(pins, '__iter__'):
pins = [pins, ]

for pin in pins:
self.__pin_validate(pin)

if direction not in (self.IN, self.OUT):
raise ValueError('An invalid direction was passed to setup()')
if (pull_up_down is not None and
pull_up_down not in (self.PUD_OFF, self.PUD_DOWN, self.PUD_UP)):
raise ValueError('Invalid Pull Up Down setting passed to setup()')
self.setup_run = True
if self.mode == self.BCM:
self.gpio_direction[self.bcm_board_map[pin]] = direction
else:
self.gpio_direction[pin] = direction


# Placeholders
def add_event_callback(self, *args):
pass

def add_event_detect(self, *args):
pass

def setwarnings(self, *args):
pass

def wait_for_edge(self, *args):
pass

def event_detected(self, *args):
pass

def remove_event_detect(self, *args):
pass


GPIO = MockGPIO()
38 changes: 38 additions & 0 deletions machine_controller/singleton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# From http://stackoverflow.com/questions/31875/is-there-a-simple-elegant-way-to-define-singletons
class Singleton:
"""
A non-thread-safe helper class to ease implementing singletons.
This should be used as a decorator -- not a metaclass -- to the
class that should be a singleton.
The decorated class can define one `__init__` function that
takes only the `self` argument. Also, the decorated class cannot be
inherited from. Other than that, there are no restrictions that apply
to the decorated class.
To get the singleton instance, use the `Instance` method. Trying
to use `__call__` will result in a `TypeError` being raised.
"""

def __init__(self, decorated):
self._decorated = decorated

def Instance(self):
"""
Returns the singleton instance. Upon its first call, it creates a
new instance of the decorated class and calls its `__init__` method.
On all subsequent calls, the already created instance is returned.
"""
try:
return self._instance
except AttributeError:
self._instance = self._decorated()
return self._instance

def __call__(self):
raise TypeError('Singletons must be accessed through `Instance()`.')

def __instancecheck__(self, inst):
return isinstance(inst, self._decorated)
137 changes: 137 additions & 0 deletions machine_controller/task_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# University of Illinois/NCSA Open Source License
#
# Copyright (c) 2017 ACM@UIUC
# All rights reserved.
#
# Developed by: SIGBot
# ACM@UIUC
# https://acm.illinois.edu
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the 'Software'), to deal
# with the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimers.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimers in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the names of the SIGBot, ACM@UIUC, nor the names of its
# contributors may be used to endorse or promote products derived from
# this Software without specific prior written permission.
#
# THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS WITH
# THE SOFTWARE.


from threading import Thread, Condition, Lock


class TaskQueue(Thread):
'''Task Queue that runs in a separate "thread"'''
class Promise():
'''An object that can be waited on'''
def __init__(self):
self.condition = Condition()
self.wait_done = False

def wait(self):
'''Wait for the work to be done'''
with self.condition:
while not self.wait_done:
self.condition.wait()

def notify(self):
'''Wake waiters'''
with self.condition:
self.wait_done = True
self.condition.notifyAll()

class Work():
'''Represents a piece of work to be done'''
def __init__(self, func):
self.func = func
self.promise = TaskQueue.Promise()

def __call__(self):
self.run()

def run(self):
self.func()
self.promise.notify()

def __init__(self):
super(TaskQueue, self).__init__()
self.work_queue = []
# Condition variable to protect the work queue
# In the threading library, this acts as both a lock and a condition
# variable
self.work_condition = Condition()

self.shutdown_lock = Lock()
self.shutdown_ = False
def __del__(self):
self.shutdown()

def run(self):
'''Start doing work in a separate thread'''

self.shutdown_lock.acquire()
while not self.shutdown_:
self.shutdown_lock.release()

work = None
# Make sure to handle the work queue with the lock
with self.work_condition:
while len(self.work_queue) == 0:
self.shutdown_lock.acquire()
if self.shutdown_:
self.shutdown_lock.release()
return
self.shutdown_lock.release()
# Wait for values to be available
self.work_condition.wait()

# I just recently found out that this is an atomic operation...
work = self.work_queue.pop(0)

if work:
# Do the work. Arguments should be bound to the function object
work()

# Reacquire the lock before we check its value in the loop
self.shutdown_lock.acquire()
self.shutdown_lock.release()

def add_work(self, func):
'''Add work to the queue
Arguments:
work -- a function to be called by the work queue. If the function to
be called has arguments, use partial application
(`from functools import partial`)
'''
with self.work_condition:
work = TaskQueue.Work(func)
self.work_queue.append(work)

# We're notifying all waiters, but there should only be one
self.work_condition.notifyAll()
return work.promise

def shutdown(self):
'''Shut down the work queue'''
with self.shutdown_lock:
self.shutdown_ = True
with self.work_condition:
self.work_condition.notifyAll()
Loading

0 comments on commit badc4c2

Please sign in to comment.