Skip to content

Commit

Permalink
Merge pull request google#90 from google/aiy_common
Browse files Browse the repository at this point in the history
Add AIY common drivers.
  • Loading branch information
enetor authored Jun 29, 2017
2 parents 6c574ff + 7fcc47e commit 5245e12
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 90 deletions.
Empty file added src/aiy/__init__.py
Empty file.
227 changes: 227 additions & 0 deletions src/aiy/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This library provides common drivers for the AIY projects."""

import itertools
import os
import threading
import time

import RPi.GPIO as GPIO


class Button(object):
"""Detect edges on the given GPIO channel."""

def __init__(self,
channel,
polarity=GPIO.FALLING,
pull_up_down=GPIO.PUD_UP,
debounce_time=0.15):
"""A simple GPIO-based button driver.
This driver supports a simple GPIO-based button. It works by detecting
edges on the given GPIO channel. Debouncing is automatic.
Args:
channel: the GPIO pin number to use (BCM mode)
polarity: the GPIO polarity to detect; either GPIO.FALLING or
GPIO.RISING.
pull_up_down: whether the port should be pulled up or down; defaults to
GPIO.PUD_UP.
debounce_time: the time used in debouncing the button in seconds.
"""

if polarity not in [GPIO.FALLING, GPIO.RISING]:
raise ValueError(
'polarity must be one of: GPIO.FALLING or GPIO.RISING')

self.channel = int(channel)
self.polarity = polarity
self.expected_value = polarity == GPIO.RISING
self.debounce_time = debounce_time

GPIO.setmode(GPIO.BCM)
GPIO.setup(channel, GPIO.IN, pull_up_down=pull_up_down)

self.callback = None

def wait_for_press(self):
"""Waits for the button to be pressed.
This method blocks until the button is pressed.
"""
GPIO.add_event_detect(self.channel, self.polarity)
while True:
if GPIO.event_detected(self.channel) and self._debounce():
GPIO.remove_event_detect(self.channel)
return
else:
time.sleep(0.1)

def on_press(self, callback):
"""Calls the callback whenever the button is pressed.
Args:
callback: a function to call whenever the button is pressed. It should
take a single channel number.
Example:
def MyButtonPressHandler(channel):
print "button pressed: channel = %d" % channel
my_button.on_press(MyButtonPressHandler)
"""
GPIO.remove_event_detect(self.channel)
self.callback = callback
GPIO.add_event_detect(
self.channel, self.polarity, callback=self._debounce_and_callback)

def _debounce_and_callback(self, _):
if self._debounce():
self.callback()

def _debounce(self):
"""Debounces the GPIO signal.
Check that the input holds the expected value for the debounce
period, to avoid false trigger on short pulses.
"""
start = time.time()
while time.time() < start + self.debounce_time:
if GPIO.input(self.channel) != self.expected_value:
return False
time.sleep(0.01)
return True


class LED:
"""Starts a background thread to show patterns with the LED.
Simple usage:
my_led = LED(channel = 25)
my_led.start()
my_led.set_state(LED_BEACON)
my_led.stop()
"""

LED_OFF = 0
LED_ON = 1
LED_BLINK = 2
LED_BLINK_3 = 3
LED_BEACON = 4
LED_BEACON_DARK = 5
LED_DECAY = 6
LED_PULSE_SLOW = 7
LED_PULSE_QUICK = 8

def __init__(self, channel):
self.animator = threading.Thread(target=self._animate)
self.channel = channel
self.iterator = None
self.running = False
self.state = None
self.sleep = 0

GPIO.setmode(GPIO.BCM)
GPIO.setup(channel, GPIO.OUT)
self.pwm = GPIO.PWM(channel, 100)

self.lock = threading.Lock()

def start(self):
"""Starts the LED driver."""
with self.lock:
if not self.running:
self.running = True
self.pwm.start(0) # off by default
self.animator.start()

def stop(self):
"""Stops the LED driver and sets the LED to off."""
with self.lock:
if self.running:
self.running = False
self.animator.join()
self.pwm.stop()

def set_state(self, state):
"""Sets the LED driver's new state.
Note the LED driver must be started for this to have any effect.
"""
with self.lock:
self.state = state

def _animate(self):
while True:
state = None
running = False
with self.lock:
state = self.state
self.state = None
running = self.running
if not running:
return
if state:
if not self._parse_state(state):
raise ValueError('unsupported state: %d' % state)
if self.iterator:
self.pwm.ChangeDutyCycle(next(self.iterator))
time.sleep(self.sleep)
else:
# We can also wait for a state change here with a Condition.
time.sleep(1)

def _parse_state(self, state):
self.iterator = None
self.sleep = 0.0
if state == self.LED_OFF:
self.pwm.ChangeDutyCycle(0)
return True
if state == self.LED_ON:
self.pwm.ChangeDutyCycle(100)
return True
if state == self.LED_BLINK:
self.iterator = itertools.cycle([0, 100])
self.sleep = 0.5
return True
if state == self.LED_BLINK_3:
self.iterator = itertools.cycle([0, 100] * 3 + [0, 0])
self.sleep = 0.25
return True
if state == self.LED_BEACON:
self.iterator = itertools.cycle(
itertools.chain([30] * 100, [100] * 8, range(100, 30, -5)))
self.sleep = 0.05
return True
if state == self.LED_BEACON_DARK:
self.iterator = itertools.cycle(
itertools.chain([0] * 100, range(0, 30, 3), range(30, 0, -3)))
self.sleep = 0.05
return True
if state == self.LED_DECAY:
self.iterator = itertools.cycle(range(100, 0, -2))
self.sleep = 0.05
return True
if state == self.LED_PULSE_SLOW:
self.iterator = itertools.cycle(
itertools.chain(range(0, 100, 2), range(100, 0, -2)))
self.sleep = 0.1
return True
if state == self.LED_PULSE_QUICK:
self.iterator = itertools.cycle(
itertools.chain(range(0, 100, 5), range(100, 0, -5)))
self.sleep = 0.05
return True
return False
103 changes: 13 additions & 90 deletions src/led.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Signal states on a LED"""

'''Signal states on a LED'''

import itertools
import logging
import os
import threading
import time

import aiy.common
import RPi.GPIO as GPIO

logger = logging.getLogger('led')
Expand All @@ -31,83 +29,6 @@
]


class LED:

"""Starts a background thread to show patterns with the LED."""

def __init__(self, channel):
self.animator = threading.Thread(target=self._animate)
self.channel = channel
self.iterator = None
self.running = False
self.state = None
self.sleep = 0

GPIO.setup(channel, GPIO.OUT)
self.pwm = GPIO.PWM(channel, 100)

def start(self):
self.pwm.start(0) # off by default
self.running = True
self.animator.start()

def stop(self):
self.running = False
self.animator.join()
self.pwm.stop()
GPIO.output(self.channel, GPIO.LOW)

def set_state(self, state):
self.state = state

def _animate(self):
# TODO(ensonic): refactor or add justification
# pylint: disable=too-many-branches
while self.running:
if self.state:
if self.state == 'on':
self.iterator = None
self.sleep = 0.0
self.pwm.ChangeDutyCycle(100)
elif self.state == 'off':
self.iterator = None
self.sleep = 0.0
self.pwm.ChangeDutyCycle(0)
elif self.state == 'blink':
self.iterator = itertools.cycle([0, 100])
self.sleep = 0.5
elif self.state == 'blink-3':
self.iterator = itertools.cycle([0, 100] * 3 + [0, 0])
self.sleep = 0.25
elif self.state == 'beacon':
self.iterator = itertools.cycle(
itertools.chain([30] * 100, [100] * 8, range(100, 30, -5)))
self.sleep = 0.05
elif self.state == 'beacon-dark':
self.iterator = itertools.cycle(
itertools.chain([0] * 100, range(0, 30, 3), range(30, 0, -3)))
self.sleep = 0.05
elif self.state == 'decay':
self.iterator = itertools.cycle(range(100, 0, -2))
self.sleep = 0.05
elif self.state == 'pulse-slow':
self.iterator = itertools.cycle(
itertools.chain(range(0, 100, 2), range(100, 0, -2)))
self.sleep = 0.1
elif self.state == 'pulse-quick':
self.iterator = itertools.cycle(
itertools.chain(range(0, 100, 5), range(100, 0, -5)))
self.sleep = 0.05
else:
logger.warning("unsupported state: %s", self.state)
self.state = None
if self.iterator:
self.pwm.ChangeDutyCycle(next(self.iterator))
time.sleep(self.sleep)
else:
time.sleep(1)


def main():
logging.basicConfig(
level=logging.INFO,
Expand All @@ -117,25 +38,26 @@ def main():
import configargparse
parser = configargparse.ArgParser(
default_config_files=CONFIG_FILES,
description="Status LED daemon")
description="Status LED daemon"
)
parser.add_argument('-G', '--gpio-pin', default=25, type=int,
help='GPIO pin for the LED (default: 25)')
args = parser.parse_args()

led = None
state_map = {
"starting": "pulse-quick",
"ready": "beacon-dark",
"listening": "on",
"thinking": "pulse-quick",
"stopping": "pulse-quick",
"power-off": "off",
"error": "blink-3",
"starting": aiy.common.LED.LED_PULSE_QUICK,
"ready": aiy.common.LED.LED_BEACON_DARK,
"listening": aiy.common.LED.LED_ON,
"thinking": aiy.common.LED.LED_PULSE_QUICK,
"stopping": aiy.common.LED.LED_PULSE_QUICK,
"power-off": aiy.common.LED.LED_OFF,
"error": aiy.common.LED.LED_BLINK_3,
}
try:
GPIO.setmode(GPIO.BCM)

led = LED(args.gpio_pin)
led = aiy.common.LED(args.gpio_pin)
led.start()
while True:
try:
Expand All @@ -156,5 +78,6 @@ def main():
led.stop()
GPIO.cleanup()


if __name__ == '__main__':
main()

0 comments on commit 5245e12

Please sign in to comment.