diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 6840aebb..da0c5a3a 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,3 +1,3 @@ [bumpversion] tag_name = rc/v{new_version} -current_version = 1.2.17 +current_version = 1.2.21 diff --git a/.github/workflows/build-and-publish-PyPI.yml b/.github/workflows/build-and-publish-PyPI.yml index f639fcf3..68a2accf 100644 --- a/.github/workflows/build-and-publish-PyPI.yml +++ b/.github/workflows/build-and-publish-PyPI.yml @@ -27,7 +27,7 @@ jobs: run: | python -m build - name: Publish Package to PyPI - uses: pypa/gh-action-pypi-publish@v1.5.0 + uses: pypa/gh-action-pypi-publish@v1.9.0 with: user: __token__ password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/.github/workflows/build-and-publish-TestPyPI.yml b/.github/workflows/build-and-publish-TestPyPI.yml index f732ea4f..e919244d 100644 --- a/.github/workflows/build-and-publish-TestPyPI.yml +++ b/.github/workflows/build-and-publish-TestPyPI.yml @@ -46,7 +46,7 @@ jobs: run: | python -m build - name: Publish Distribution to TestPyPI - uses: pypa/gh-action-pypi-publish@master + uses: pypa/gh-action-pypi-publish@v1.9.0 with: password: ${{ secrets.TEST_PYPI_API_TOKEN }} repository_url: https://test.pypi.org/legacy/ @@ -69,26 +69,28 @@ jobs: git config user.email ${{ env.commit_email }} git cherry-pick ${{ env.bump_commit }} git push origin dev - Hardware_Test: - needs: publish - name: Hardware Tests - runs-on: [self-hosted, linux, ARM64, hw-test] - strategy: - fail-fast: false - matrix: - python-version: ["3.9"] - steps: - - name: Checkout - uses: actions/checkout@v3 - - name: Install Dependencies - run: | - python -m venv venv_hardware_test - source venv_hardware_test/bin/activate - python -m pip install --upgrade pip - python -m pip install pytest - if [ -f requirements_hw_test.txt ]; then pip install -r requirements_hw_test.txt; fi - - name: Test with pytest - run: | - source venv_hardware_test/bin/activate - python -m pytest ./tests/hardware_tests + + # Hardware_Test: + # needs: publish + # name: Hardware Tests + # runs-on: [self-hosted, linux, ARM64, hw-test] + # strategy: + # fail-fast: false + # matrix: + # python-version: ["3.9"] + + # steps: + # - name: Checkout + # uses: actions/checkout@v3 + # - name: Install Dependencies + # run: | + # python -m venv venv_hardware_test + # source venv_hardware_test/bin/activate + # python -m pip install --upgrade pip + # python -m pip install pytest + # if [ -f requirements_hw_test.txt ]; then pip install -r requirements_hw_test.txt; fi + # - name: Test with pytest + # run: | + # source venv_hardware_test/bin/activate + # python -m pytest ./tests/hardware_tests diff --git a/.pylintrc b/.pylintrc index a0a40c3c..3109e2f5 100644 --- a/.pylintrc +++ b/.pylintrc @@ -5,7 +5,10 @@ disable=logging-fstring-interpolation, too-many-arguments, fixme, too-few-public-methods, - duplicate-code + duplicate-code, + no-else-return, + no-else-raise, + logging-fstring-interpolation [LOGGING] diff --git a/README.md b/README.md index a9d9c40b..cd7cc464 100644 --- a/README.md +++ b/README.md @@ -107,12 +107,27 @@ password: ``` Make sure to include the `pypi-` prefix for your token value. +# Performance + +The following benchmarks were measured on the Rasberry Pi 4, with all edgepi daemons disabled. They're also the slowest average of 3 runs. + +The **Performance** column represents how long it takes to call one function, while the **Max Read Frequency** column represents how many times that function could be called every second. + +| Feature | Performance | Max Read Frequency | Function | Example | Description | +| -- | -- | -- | -- | -- | -- | +| Single DIN | 0.85ms per 8 DIN | 1171 Hz | `digital_input_state(...)` | [examples/single_din.py](https://github.com/EdgePi-Cloud/edgepi-python-sdk/tree/main/examples/single_din.py) | | +| Batched DIN | 0.52ms per 8 DIN | 1936 Hz | `digital_input_state_batch(...)` | [examples/batched_din.py](https://github.com/EdgePi-Cloud/edgepi-python-sdk/tree/main/examples/batched_din.py) | | +| Single ADC | 97.3 ms per 8 ADC | 10.3 Hz | `set_config(...)`

`single_sample()` | [examples/single_adc.py](https://github.com/EdgePi-Cloud/edgepi-python-sdk/tree/main/examples/single_adc.py) | Reads from ADC1 only | +| Batched ADC | 6.49 ms per 8 ADC | 154 Hz | `read_samples_adc1_batch(...)` | [examples/batched_adc.py](https://github.com/EdgePi-Cloud/edgepi-python-sdk/tree/main/examples/batched_adc.py) | Reads from ADC1 only | +| Batched ADC/Diff | 5.467 ms per 4 ADC, 2 Diff | 183 Hz | `read_samples_adc1_batch(...)` | [examples/batched_adc_diff.py](https://github.com/EdgePi-Cloud/edgepi-python-sdk/tree/main/examples/batched_adc_diff.py) | Differential ADC inputs each use two pins. Reads from ADC1 only | +| Thermocouple (TC) | 100.2ms | 9.98 hz | `read_temperatures()` | [examples/single_tc.py](https://github.com/EdgePi-Cloud/edgepi-python-sdk/tree/main/examples/single_tc.py) | Limited by [hardware](https://www.analog.com/media/en/technical-documentation/data-sheets/MAX31856.pdf) (see conversion mode). 100ms is needed for accurate (19 bit) readings | + # Bug Reports / Feature Requests Use [GitHub Issues Page](https://github.com/EdgePi-Cloud/edgepi-python-sdk/issues) to report any issues or feature requests. # Get involved Follow [@edgepi_cloud on Twitter](https://twitter.com/edgepi_cloud/). -Read and subscribe to the [EdgePi blog](https://www.edgepi.com/blog). +See the [EdgePi wiki](https://wiki.edgepi.com/) for more information on how to get started with your EdgePi. If you have a specific question, please check out our [discussion forums](https://www.edgepi.com/forums). # License diff --git a/examples/batched_adc.py b/examples/batched_adc.py new file mode 100644 index 00000000..1d19055d --- /dev/null +++ b/examples/batched_adc.py @@ -0,0 +1,38 @@ +"""Example reading from ADC using the batched function""" + +import time + +from edgepi.adc.edgepi_adc import EdgePiADC +from edgepi.adc.adc_constants import AnalogIn, ADC1DataRate + +ITER = 50 + +def run_test(): + """ + This test performs 400 Analog input reads, batched 8 reads at a time. + """ + + edgepi_adc = EdgePiADC(enable_cache=False) + + start = time.time() + result_list = [] + adc_choices = [ + AnalogIn.AIN1, AnalogIn.AIN2, AnalogIn.AIN3, AnalogIn.AIN4, + AnalogIn.AIN5, AnalogIn.AIN6, AnalogIn.AIN7, AnalogIn.AIN8, + ] + + for _ in range(ITER): + tmp = edgepi_adc.read_samples_adc1_batch( + data_rate=ADC1DataRate.SPS_38400, + analog_in_list=adc_choices, + ) + result_list += [tmp] + + elapsed = time.time() - start + + print(result_list[24]) + print(f"Time elapsed {elapsed/ITER:.6f} s") + print(f"Frequency {ITER/elapsed:.4f} hz") + +if __name__ == "__main__": + run_test() diff --git a/examples/batched_adc_diff.py b/examples/batched_adc_diff.py new file mode 100644 index 00000000..038f2129 --- /dev/null +++ b/examples/batched_adc_diff.py @@ -0,0 +1,44 @@ +"""Example reading from from individual ADC pins""" + +import time + +from edgepi.adc.edgepi_adc import EdgePiADC +from edgepi.adc.adc_constants import AnalogIn, ADC1DataRate, DiffMode + +ITER = 50 + +def run_test(): + """ + This test performs 300 analog input reads, with 50 for each of 4 analog pins, and + 50 for each of 2 differential analog pairs. + """ + + edgepi_adc = EdgePiADC(enable_cache=False) + + start = time.time() + result_list = [] + adc_choices = [ + AnalogIn.AIN1, AnalogIn.AIN2, + AnalogIn.AIN5, AnalogIn.AIN6, + ] + differential_pairs = [ + DiffMode.DIFF_2, + DiffMode.DIFF_4, + ] + + for _ in range(ITER): + tmp = edgepi_adc.read_samples_adc1_batch( + ADC1DataRate.SPS_38400, + adc_choices, + differential_pairs, + ) + result_list += [tmp] + + elapsed = time.time() - start + + print(result_list[24]) + print(f"Time elapsed {elapsed/ITER:.6f} s") + print(f"Frequency {ITER/elapsed:.4f} hz") + +if __name__ == "__main__": + run_test() diff --git a/examples/batched_din.py b/examples/batched_din.py new file mode 100644 index 00000000..61b55edc --- /dev/null +++ b/examples/batched_din.py @@ -0,0 +1,34 @@ +"""Example reading from DIN using the batched function""" + +import time + +from edgepi.digital_input.digital_input_constants import DinPins +from edgepi.digital_input.edgepi_digital_input import EdgePiDigitalInput + +ITER = 250 + +def run_test(): + """ + This test performs 2000 Digital input reads, batched 8 reads at a time. + """ + digital_input = EdgePiDigitalInput() + + state_list = [] + choices = [ + DinPins.DIN1, DinPins.DIN2, DinPins.DIN3, DinPins.DIN4, + DinPins.DIN5, DinPins.DIN6, DinPins.DIN7, DinPins.DIN8, + ] + + start = time.time() + for _ in range(ITER): + pin_states = digital_input.digital_input_state_batch(choices) + state_list += [pin_states] + + elapsed = time.time() - start + + print(f"DIN Pins: {state_list[217]}") + print(f"Time elapsed {elapsed/ITER:.6f} s") + print(f"Frequency {ITER/elapsed:.4f} hz") + +if __name__ == "__main__": + run_test() diff --git a/examples/single_adc.py b/examples/single_adc.py new file mode 100644 index 00000000..dfc8e1c1 --- /dev/null +++ b/examples/single_adc.py @@ -0,0 +1,44 @@ +"""Example reading from individual ADC pins""" + +import time + +from edgepi.adc.edgepi_adc import EdgePiADC +from edgepi.adc.adc_constants import AnalogIn, ADC1DataRate, ConvMode + +ITER = 50 + +def run_test(): + """ + This test performs 400 analog input reads, with a total of 50 per ADC pin. + """ + edgepi_adc = EdgePiADC(enable_cache=False) + + start = time.time() + result_list = [] + adc_choices = [ + AnalogIn.AIN1, AnalogIn.AIN2, AnalogIn.AIN3, AnalogIn.AIN4, + AnalogIn.AIN5, AnalogIn.AIN6, AnalogIn.AIN7, AnalogIn.AIN8, + ] + + for _ in range(ITER): + tmp_list = [] + for ain in adc_choices: + edgepi_adc.set_config( + adc_1_analog_in=ain, + conversion_mode=ConvMode.PULSE, + adc_1_data_rate=ADC1DataRate.SPS_38400 + ) + + voltage = edgepi_adc.single_sample() + tmp_list += [voltage] + + result_list += [tmp_list] + + elapsed = time.time() - start + + print(result_list[24]) + print(f"Time elapsed {elapsed/ITER:.6f} s") + print(f"Frequency {ITER/elapsed:.4f} hz") + +if __name__ == "__main__": + run_test() diff --git a/examples/single_din.py b/examples/single_din.py new file mode 100644 index 00000000..84b5c549 --- /dev/null +++ b/examples/single_din.py @@ -0,0 +1,35 @@ +"""Example reading from individual DIN pins""" + +import time + +from edgepi.digital_input.digital_input_constants import DinPins +from edgepi.digital_input.edgepi_digital_input import EdgePiDigitalInput + +ITER = 250 + +def run_test(): + """ + This test performs 2000 Digital input reads, with a total of 250 per DIN pin. + """ + digital_input = EdgePiDigitalInput() + + state_list = [] + choices = [ + DinPins.DIN1, DinPins.DIN2, DinPins.DIN3, DinPins.DIN4, + DinPins.DIN5, DinPins.DIN6, DinPins.DIN7, DinPins.DIN8, + ] + + start = time.time() + for _ in range(ITER): + for din in choices: + pin_state = digital_input.digital_input_state(din) + state_list += [pin_state] + + elapsed = time.time() - start + + print(f"DIN Pins: {state_list[-9:-1]}") + print(f"Time elapsed {elapsed/ITER:.6f} s") + print(f"Frequency {ITER/elapsed:.4f} hz") + +if __name__ == "__main__": + run_test() diff --git a/examples/single_tc.py b/examples/single_tc.py new file mode 100644 index 00000000..5f4a9bed --- /dev/null +++ b/examples/single_tc.py @@ -0,0 +1,41 @@ +"""Example reading from the thermocouple (tc)""" + +import time + +from edgepi.tc.edgepi_tc import EdgePiTC +from edgepi.tc.tc_constants import ConvMode + +ITER = 25 + +def run_test(): + """ + This test + """ + edgepi_tc = EdgePiTC() + edgepi_tc.set_config(conversion_mode=ConvMode.AUTO) + + cj_temperatures = [] + lin_temperatures = [] + + start = time.time() + for _ in range(ITER): + iter_start = time.time() + + # make a single temperature measurement + cold_junction, linearized = edgepi_tc.read_temperatures() + cj_temperatures += [cold_junction] + lin_temperatures += [linearized] + + # It doesn't make sense to read thermocoupler values faster than 10hz as they + # won't be updated. You can try it here if you'd like! + sleep_time = 0.1 - (time.time() - iter_start) + time.sleep(0.0 if sleep_time < 0.0 else sleep_time) + + elapsed = time.time() - start + + print(f"TC Reads: {lin_temperatures}") + print(f"Time elapsed {elapsed/ITER:.6f} s") + print(f"Frequency {ITER/elapsed:.4f} hz") + +if __name__ == "__main__": + run_test() diff --git a/requirements_build.txt b/requirements_build.txt index 09b37f03..994e6fde 100644 --- a/requirements_build.txt +++ b/requirements_build.txt @@ -1,27 +1 @@ -bleach==5.0.0 build==0.7.0 -certifi==2021.10.8 -charset-normalizer==2.0.12 -colorama==0.4.4 -commonmark==0.9.1 -docutils==0.18.1 -idna==3.3 -importlib-metadata==4.11.3 -keyring==23.5.0 -packaging==21.3 -pep517==0.12.0 -pkginfo==1.8.2 -Pygments==2.12.0 -pyparsing==3.0.9 -pywin32-ctypes==0.2.0 -readme-renderer==35.0 -requests==2.27.1 -requests-toolbelt==0.9.1 -rfc3986==2.0.0 -rich==12.4.1 -six==1.16.0 -tomli==2.0.1 -twine==4.0.0 -urllib3==1.26.9 -webencodings==0.5.1 -zipp==3.8.0 diff --git a/setup.py b/setup.py index 5378d70f..46719b7e 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setuptools.setup( name="edgepi-python-sdk", - version="1.2.17", + version="1.2.21", author="S.Park", author_email="spark@osensa.com", description="EdgePi Python SDK package", diff --git a/src/edgepi/adc/adc_commands.py b/src/edgepi/adc/adc_commands.py index 44016bc2..8062bae7 100644 --- a/src/edgepi/adc/adc_commands.py +++ b/src/edgepi/adc/adc_commands.py @@ -1,10 +1,18 @@ """ Utility module for ADC commands """ - import logging +from typing import Optional -from edgepi.adc.adc_constants import ADCComs, ADCNum +from edgepi.adc.adc_constants import ( + ADCComs, + ADCChannel as CH, + ADCNum, + ADCReg, + ADCReadInfo, + ADC_VOLTAGE_READ_LEN, +) +from edgepi.adc.adc_multiplexers import generate_mux_opcode _logger = logging.getLogger(__name__) @@ -30,23 +38,79 @@ def write_register_command(self, address, values): command = [ADCComs.COM_WREG.value + address, len(values) - 1] return command + values - def start_adc(self, adc_num: ADCNum): - """Command to start ADC""" + @staticmethod + def unsafe_write_register_command(address: int, values: list[int]): + """ + Trigger ADC register write - unsafe removes all arguments validation. + + Please ensure that values contains only bytes, and that address is a + valid address. + """ + command = [ADCComs.COM_WREG.value + address, len(values) - 1] + return command + values + + @staticmethod + def start_adc_command(adc_num: ADCReadInfo): + """Command to start ADC conversions""" _logger.debug("Command to send is %s", ([adc_num.start_cmd])) return [adc_num.start_cmd] + @staticmethod + def read_adc_command(adc_num: ADCReadInfo, num_bytes: int): + """ + Returns the command to read from the ADC, after waiting the required time for + conversions to take effect + """ + # Since this is full duplex SPI, we have to write something every time we expect + # to read something as well, so we add the 6 0xff bytes so that periphery will read + # the 6 results bytes we care about. + return [adc_num.read_cmd] + [255] * num_bytes - def stop_adc(self, adc_num: ADCNum): + @staticmethod + def stop_adc_command(adc_num: ADCReadInfo): """Command to stop ADC""" _logger.debug("Command to send is %s", ([adc_num.stop_cmd])) return [adc_num.stop_cmd] - - def reset_adc(self): + @staticmethod + def reset_adc_command(): """Command to reset ADC""" _logger.debug("Command to send is %s", ([ADCComs.COM_RESET.value])) return [ADCComs.COM_RESET.value] + @staticmethod + def read_command_tuple( + mode2_register_value: Optional[int], + conversion_delay: float, + mux_p: CH, + mux_n: CH, + ) -> tuple[list, float, list]: + """ + Returns a tuple containing two spi commands, separated by a delay. For use + with spi_apply_adc_commands + """ + inpmux_register_value = generate_mux_opcode(ADCReg.REG_INPMUX, mux_p, mux_n).op_code + if mode2_register_value is not None: + # update only data rate & input multiplexing (luckily they are right beside eachother) + start_addr = ADCReg.REG_MODE2.value + register_list = [mode2_register_value, inpmux_register_value] + else: + # Only write the register value 0x06 (INPMUX), which stores info + # about how the multiplexer should read from the ADC channels (input pins). + start_addr = ADCReg.REG_INPMUX.value + register_list = [inpmux_register_value] + + write_reg_cmd = ADCCommands.unsafe_write_register_command( + start_addr, register_list + ) + + # write config registers, start the adc's conversions, wait, then read registers + return ( + write_reg_cmd + ADCCommands.start_adc_command(ADCNum.ADC_1.value), + conversion_delay, + ADCCommands.read_adc_command(ADCNum.ADC_1.value, ADC_VOLTAGE_READ_LEN), + ) + @staticmethod def check_for_int(target_list): """Checks if a list contains only integer values""" diff --git a/src/edgepi/adc/adc_constants.py b/src/edgepi/adc/adc_constants.py index 3e3cf502..73e8758b 100644 --- a/src/edgepi/adc/adc_constants.py +++ b/src/edgepi/adc/adc_constants.py @@ -8,7 +8,7 @@ ADC_NUM_REGS = 27 # number of ADC1263 registers -ADC_VOLTAGE_READ_LEN = 6 # number of bytes per voltage read +ADC_VOLTAGE_READ_LEN = 6 # number of bytes per voltage read ADC1_NUM_DATA_BYTES = 4 # Number of data bytes for ADC 1 ADC2_NUM_DATA_BYTES = 3 # Number of data bytes for ADC 2 @@ -164,6 +164,8 @@ class ConvMode(Enum): class ADC1DataRate(Enum): """ADS1263 data rates for ADC1""" + # NOTE: the op_code values (0x0, 0x1, ...) refer to the value that's read from + # the ADC after applying the mask. SPS_2P5 = OpCode(0x0, ADCReg.REG_MODE2.value, BitMask.LOW_NIBBLE.value) SPS_5 = OpCode(0x1, ADCReg.REG_MODE2.value, BitMask.LOW_NIBBLE.value) SPS_10 = OpCode(0x2, ADCReg.REG_MODE2.value, BitMask.LOW_NIBBLE.value) diff --git a/src/edgepi/adc/adc_multiplexers.py b/src/edgepi/adc/adc_multiplexers.py index 9f66f2d8..6c5c3925 100644 --- a/src/edgepi/adc/adc_multiplexers.py +++ b/src/edgepi/adc/adc_multiplexers.py @@ -2,15 +2,11 @@ import logging -from bitstring import pack from edgepi.reg_helper.reg_helper import OpCode, BitMask -from edgepi.adc.adc_constants import ADCChannel as CH, AllowedChannels +from edgepi.adc.adc_constants import ADCChannel as CH, AllowedChannels, ADCReg _logger = logging.getLogger(__name__) -MUXS_PER_ADC = 2 -NUM_CHANNELS = 11 - class ChannelNotAvailableError(ValueError): """ @@ -18,57 +14,18 @@ class ChannelNotAvailableError(ValueError): due to RTD_EN status. """ - -def _format_mux_values(mux_p: CH, mux_n: CH): - # all use cases will always update both mux_n and mux_p - mask = BitMask.BYTE - mux_p_val = mux_p.value - mux_n_val = mux_n.value - - return mux_p_val, mux_n_val, mask - - -def generate_mux_opcodes(mux_updates: dict): +def generate_mux_opcode(addx:ADCReg, mux_p:CH, mux_n:CH) -> OpCode: """ - Generates list of OpCodes for updating input multiplexer mapping. - Updates both positive and negative multiplexers. - - Args: - `mux_updates` (dict): values for updating multiplexer mapping. - This should be formatted as {ADCReg: (ADCChannel, ADChannel)}. - - `mux_values` (dict): current multiplexer mapping. - This should be formatted as {ADCReg: (int, int)}. - - Note: both of the above must be dictionaries formatted as: - - mux_reg_addx (ADCReg): (mux_p_val, mux_n_val) + Generates OpCode for updating input multiplexer mapping. - Returns: - `list`: OpCodes for updated multiplexer mapping + We know that mux_p_val can never be larger than 15 (are a byte), + because mux_p and mux_n are ADCChannel. """ - _logger.debug(f"generate_mux_opcodes: mux updates = {mux_updates}") - - mux_opcodes = [] - # generate OpCodes for mux updates - for addx, byte in mux_updates.items(): - mux_p = byte[0] - mux_n = byte[1] - - # not updating mux's for this adc_num (no args passed) - if mux_p is None or mux_n is None: - continue - - mux_p_val, mux_n_val, mask = _format_mux_values(mux_p, mux_n) - - adc_x_ch_bits = pack("uint:4, uint:4", mux_p_val, mux_n_val).uint - - mux_opcodes.append(OpCode(adc_x_ch_bits, addx.value, mask.value)) - - _logger.debug(f"mux opcodes = {mux_opcodes}") - - return mux_opcodes - + return OpCode( + op_code=(mux_p.value << 4) + mux_n.value, + reg_address=addx.value, + op_mask=BitMask.BYTE.value + ) def validate_channels_allowed(channels: list, rtd_enabled: bool): """ @@ -85,8 +42,12 @@ def validate_channels_allowed(channels: list, rtd_enabled: bool): if rtd_enabled else AllowedChannels.RTD_OFF.value ) - for ch in channels: - if ch not in allowed_channels: - raise ChannelNotAvailableError( - f"Channel 'AIN{ch.value}' is currently not available. Disable RTD in order to use." - ) + if any((chan not in allowed_channels) for chan in channels): + unavaliable_channels = ",".join( + f"AIN{chan.value}" for chan in channels + if chan not in allowed_channels + ) + raise ChannelNotAvailableError( + f"The following channels are currently not avaliable: {unavaliable_channels}. " + "Disable RTD in order to use." + ) diff --git a/src/edgepi/adc/adc_state.py b/src/edgepi/adc/adc_state.py index d7453798..f1503b3f 100644 --- a/src/edgepi/adc/adc_state.py +++ b/src/edgepi/adc/adc_state.py @@ -24,7 +24,7 @@ class ADCReadFields: class ADCState: """ADC state intended for reading by users""" -# pylint: disable=too-many-instance-attributes + # pylint: disable=too-many-instance-attributes def __init__(self, reg_map: dict): self.__reg_map = reg_map self.adc_1: ADCReadFields = ADCReadFields( @@ -64,11 +64,9 @@ def __query_state(self, adc_property: ADCProperties) -> PropertyValue: # name of current value of this adc_property adc_property_value = adc_property.value.values[adc_property_bits] _logger.debug( - ( - f"query_state: query_property='{adc_property}'," - " adc_property_bits={hex(adc_property_bits)}," - f" adc_property_value='{adc_property_value}'" - ) + f"query_state: query_property='{adc_property}'," + f" adc_property_bits={hex(adc_property_bits)}," + f" adc_property_value='{adc_property_value}'" ) return adc_property_value diff --git a/src/edgepi/adc/adc_status.py b/src/edgepi/adc/adc_status.py index 392635cd..10d28f69 100644 --- a/src/edgepi/adc/adc_status.py +++ b/src/edgepi/adc/adc_status.py @@ -5,7 +5,6 @@ import bitstring - @unique class ADCStatusBit(Enum): """ @@ -87,7 +86,6 @@ def __repr__(self) -> str: ADCStatusBit.RESET: (ADCStatusMsg.RESET_FALSE, ADCStatusMsg.RESET_TRUE), } - def get_adc_status(status_code: int) -> dict: """Generates a dictionary of ADC Status objects diff --git a/src/edgepi/adc/adc_voltage.py b/src/edgepi/adc/adc_voltage.py index 82225ab3..3f55b999 100644 --- a/src/edgepi/adc/adc_voltage.py +++ b/src/edgepi/adc/adc_voltage.py @@ -3,10 +3,9 @@ import logging -from bitstring import BitArray from edgepi.adc.adc_constants import ADCReadInfo, ADCNum, ADC1_NUM_DATA_BYTES, ADC2_NUM_DATA_BYTES from edgepi.calibration.calibration_constants import CalibParam -from edgepi.utilities.utilities import bitstring_from_list +from edgepi.utilities.utilities import bitstring_from_list, combine_to_uint32 # TODO: retrieve these values from EEPROM once added @@ -21,11 +20,12 @@ _logger = logging.getLogger(__name__) -def _is_negative_voltage(code: BitArray): +def _is_negative_voltage(code: list[int]): """ Determines if voltage code is negative value """ - return code[0] == 1 + # check if first bit of the first integer is a 1 + return (code[0] & 0x80) != 0 def _code_to_input_voltage(code: int, v_ref: float, num_bits: int): @@ -36,13 +36,12 @@ def _code_to_input_voltage(code: int, v_ref: float, num_bits: int): Args: `code` (int): uint value of ADC voltage read bytes - `v_ref` (float): ADC reference voltage in Volts - `num_bits` (int): number of bits in ADC voltage read (24 or 32) """ + voltage_range = v_ref / 2 ** (num_bits - 1) - _logger.debug(f" _code_to_input_voltage: code {code}") + _logger.debug(f"_code_to_input_voltage: code {code}") return float(code) * voltage_range @@ -55,65 +54,55 @@ def _adc_voltage_to_input_voltage(v_in: float, gain: float, offset: float): return v_in * step_up_ratio * gain + offset -def code_to_voltage(code: list[int], adc_info: ADCReadInfo, calibs: CalibParam) -> float: +def code_to_voltage( + code: list[int], + adc_info: ADCReadInfo, + calibs: CalibParam, + single_ended: bool, +) -> float: """ Converts ADC voltage read digital code to output voltage (voltage measured at terminal block) Args: `code` (list[int]): code bytes retrieved from ADC voltage read - `adc_info` (ADCReadInfo): data about this adc's voltage reading configuration - `calibs` (CalibParam): voltage reading gain and offset calibration values + `single_ended` (bool): whether the mode should be single ended or not single ended + (differential) Returns: `float`: voltage value (V) corresponding to `code` """ - code_bits = bitstring_from_list(code[:adc_info.num_data_bytes]) - num_bits = adc_info.num_data_bytes * 8 - code_val = code_bits.uint - - if _is_negative_voltage(code_bits): - code_val = code_val - 2**num_bits - - v_in = _code_to_input_voltage(code_val, REFERENCE_VOLTAGE, num_bits) - - v_out = _adc_voltage_to_input_voltage(v_in, calibs.gain, calibs.offset) - - return v_out - -def code_to_voltage_single_ended(code: list[int], adc_info: ADCReadInfo, calibs: CalibParam): - """ - Converts ADC voltage read digital code to output voltage (voltage measured at terminal block) - - Args: - `code` (list[int]): code bytes retrieved from ADC voltage read - - `adc_info` (ADCReadInfo): data about this adc's voltage reading configuration - - `calibs` (CalibParam): voltage reading gain and offset calibration values - Returns: - `float`: voltage value (V) corresponding to `code` - """ - code_bits = bitstring_from_list(code[:adc_info.num_data_bytes]) num_bits = adc_info.num_data_bytes * 8 - code_val = code_bits.uint - - if _is_negative_voltage(code_bits) and adc_info.num_data_bytes == ADC1_NUM_DATA_BYTES: - code_val = code_val - ADC1_UPPER_LIMIT - elif _is_negative_voltage(code_bits) and adc_info.num_data_bytes == ADC2_NUM_DATA_BYTES: - code_val = code_val - ADC2_UPPER_LIMIT - elif adc_info.num_data_bytes == ADC1_NUM_DATA_BYTES: - code_val = code_val + ADC1_UPPER_LIMIT + if adc_info.num_data_bytes == ADC1_NUM_DATA_BYTES: + code_val = combine_to_uint32(code[0], code[1], code[2], code[3]) elif adc_info.num_data_bytes == ADC2_NUM_DATA_BYTES: - code_val = code_val + ADC2_UPPER_LIMIT + code_val = combine_to_uint32(0, code[0], code[1], code[2]) + else: + raise ValueError( + f"code has unexpected number of bytes {adc_info.num_data_bytes}, " + "expected 4 for ADC1 or 3 for ADC2" + ) + + if single_ended: + if _is_negative_voltage(code) and adc_info.num_data_bytes == ADC1_NUM_DATA_BYTES: + code_val -= ADC1_UPPER_LIMIT + elif _is_negative_voltage(code) and adc_info.num_data_bytes == ADC2_NUM_DATA_BYTES: + code_val -= ADC2_UPPER_LIMIT + elif adc_info.num_data_bytes == ADC1_NUM_DATA_BYTES: + code_val += ADC1_UPPER_LIMIT + elif adc_info.num_data_bytes == ADC2_NUM_DATA_BYTES: + code_val += ADC2_UPPER_LIMIT + else: + if _is_negative_voltage(code): + code_val -= 2**num_bits v_in = _code_to_input_voltage(code_val, REFERENCE_VOLTAGE, num_bits) v_out = _adc_voltage_to_input_voltage(v_in, calibs.gain, calibs.offset) - return v_out + def code_to_temperature( code: list[int], ref_resistance: float, @@ -122,18 +111,16 @@ def code_to_temperature( rtd_calib_gain: float, rtd_calib_offset: float, adc_num: ADCNum - ) -> float: +) -> float: """ Converts ADC voltage read digital code to temperature. Intended for use in RTD sampling. Args: `code` (list[int]): code bytes retrieved from ADC voltage read - - `ref_resistance` (float): EdgePi-specific RTD reference resistance (Ohms) - - `rtd_sensor_resistance` (float): RTD material-dependent resistance value (Ohms) - - `rtd_sensor_resistance_variation` (float): RTD model-dependent resistance variation (Ohms/°C) + `ref_resistance` (float): EdgePi-specific RTD reference resistance (Ohms) + `rtd_sensor_resistance` (float): RTD material-dependent resistance value (Ohms) + `rtd_sensor_resistance_variation` (float): RTD model-dependent resistance variation + (Ohms/°C) Returns: `float`: temperature value (°C) corresponding to `code` diff --git a/src/edgepi/adc/edgepi_adc.py b/src/edgepi/adc/edgepi_adc.py index e0eea0cf..cdd5e537 100644 --- a/src/edgepi/adc/edgepi_adc.py +++ b/src/edgepi/adc/edgepi_adc.py @@ -1,11 +1,13 @@ +# pylint: disable=too-many-lines """ User interface for EdgePi ADC """ from enum import Enum from operator import attrgetter import logging import time +from typing import Optional -from edgepi.adc.adc_query_lang import PropertyValue +from edgepi.adc.adc_query_lang import ADCProperties from edgepi.calibration.calibration_constants import CalibParam from edgepi.peripherals.spi import SpiDevice as SPI from edgepi.adc.adc_commands import ADCCommands @@ -38,7 +40,6 @@ from edgepi.adc.adc_voltage import ( code_to_voltage, code_to_temperature, - code_to_voltage_single_ended ) from edgepi.utilities.crc_8_atm import check_crc from edgepi.gpio.edgepi_gpio import EdgePiGPIO @@ -46,12 +47,13 @@ from edgepi.utilities.utilities import filter_dict, filter_dict_list_key_val from edgepi.reg_helper.reg_helper import OpCode, apply_opcodes from edgepi.adc.adc_multiplexers import ( - generate_mux_opcodes, + generate_mux_opcode, validate_channels_allowed, ) from edgepi.adc.adc_conv_time import expected_initial_time_delay, expected_continuous_time_delay from edgepi.adc.adc_status import get_adc_status from edgepi.eeprom.edgepi_eeprom import EdgePiEEPROM +from edgepi.eeprom.protobuf_assets.eeprom_data_classes.eeprom_adc_module import AdcCalibParamKeys from edgepi.adc.adc_state import ADCState from edgepi.adc.adc_exceptions import ( ADCRegisterUpdateError, @@ -62,6 +64,7 @@ CalibKeyMissingError ) + _logger = logging.getLogger(__name__) @@ -102,7 +105,7 @@ def __init__( enable_cache: bool = False, rtd_sensor_resistance: float = None, rtd_sensor_resistance_variation: float = None - ): + ): """ Args: `enable_cache` (bool): set to True to enable state-caching @@ -120,12 +123,15 @@ def __init__( # Load eeprom data and generate dictionary of calibration dataclass eeprom = EdgePiEEPROM() eeprom_data = eeprom.read_edgepi_data() - self.adc_calib_params = {ADCNum.ADC_1:eeprom_data.adc1_calib_params.extract_ch_dict(), - ADCNum.ADC_2:eeprom_data.adc2_calib_params.extract_ch_dict(),} + self.adc_calib_params = { + ADCNum.ADC_1: eeprom_data.adc1_calib_params.extract_ch_dict(), + ADCNum.ADC_2: eeprom_data.adc2_calib_params.extract_ch_dict(), + } self.rtd_calib = eeprom_data.rtd_calib_params self.adc_ops = ADCCommands() self.gpio = EdgePiGPIO() + # ADC always needs to be in CRC check mode. This also updates the internal __state. # If this call to __config is removed, replace with a call to get_register_map to # initialize __state. @@ -234,8 +240,7 @@ def __write_register(self, start_addx: ADCReg, data: list[int]): code = self.adc_ops.write_register_command(start_addx.value, data) _logger.debug(f"__write_register: sending {code}") with self.spi_open(): - out = self.transfer(code) - return out + return self.transfer(code) def __set_rtd_pin(self, enable: bool = False): """ @@ -249,7 +254,7 @@ def __set_rtd_pin(self, enable: bool = False): self.gpio.set_pin_state(RTDPins.RTD_EN.value) if enable else \ self.gpio.clear_pin_state(RTDPins.RTD_EN.value) -# TODO: To be deleted + # TODO: To be deleted def set_adc_reference(self, reference_config: ADCReferenceSwitching = None): """ Setting ADC referene terminal state. pin 18 and 23 labeled IN GND on the enclosure. It can @@ -275,13 +280,13 @@ def stop_conversions(self, adc_num: ADCNum): """ Halt voltage read conversions when ADC is set to perform continuous conversions """ - stop_cmd = self.adc_ops.stop_adc(adc_num=adc_num.value) + stop_cmd = ADCCommands.stop_adc_command(adc_num=adc_num.value) with self.spi_open(): self.transfer(stop_cmd) def __send_start_command(self, adc_num: ADCNum): """Triggers ADC conversion(s)""" - start_cmd = self.adc_ops.start_adc(adc_num=adc_num.value) + start_cmd = ADCCommands.start_adc_command(adc_num=adc_num.value) with self.spi_open(): self.transfer(start_cmd) @@ -304,12 +309,10 @@ def start_conversions(self, adc_num: ADCNum): adc_num, data_rate.value.op_code, filter_mode.value.op_code ) _logger.debug( - ( - f"\nComputed time delay = {conv_delay} (ms) with the following config opcodes:\n" - f"adc_num={adc_num}, conv_mode={hex(conv_mode.value.op_code)}, " - f"data_rate={hex(data_rate.value.op_code)}, " - f"filter_mode={hex(filter_mode.value.op_code)}\n" - ) + f"\nComputed time delay = {conv_delay} (ms) with the following config opcodes:\n" + f"adc_num={adc_num}, conv_mode={hex(conv_mode.value.op_code)}, " + f"data_rate={hex(data_rate.value.op_code)}, " + f"filter_mode={hex(filter_mode.value.op_code)}\n" ) self.__send_start_command(adc_num) # apply delay for first conversion @@ -324,10 +327,10 @@ def clear_reset_bit(self): """ self.__config(reset_clear=ADCPower.RESET_CLEAR) - def __read_data(self, adc: ADCNum, data_size: int): + def __read_data(self, adc_num: ADCNum, data_size: int): """Sends command to ADC to get new voltage conversion data""" with self.spi_open(): - return self.transfer([adc.value.read_cmd] + [255] * data_size) + return self.transfer(ADCCommands.read_adc_command(adc_num.value, data_size)) def __voltage_read(self, adc_num: ADCNum): """ @@ -350,26 +353,26 @@ def __voltage_read(self, adc_num: ADCNum): return status_code, voltage_code, check_code @staticmethod - def __get_diff_id(mux_p: PropertyValue, mux_n: PropertyValue) -> int: + def __get_diff_id(mux_p: CH, mux_n: CH) -> int: """ Get differential pair id number for retrieving differential pair calibration values """ - # values are the keys from adc_calib_params - diff_ids = { - DiffMode.DIFF_1.value: 8, - DiffMode.DIFF_2.value: 9, - DiffMode.DIFF_3.value: 10, - DiffMode.DIFF_4.value: 11, - } - diff_pair = DifferentialPair(mux_p.code, mux_n.code) - diff_id = diff_ids.get(diff_pair) - if diff_id is None: + # return values are the keys from adc_calib_params + diff_pair = DifferentialPair(mux_p, mux_n) + if diff_pair == DiffMode.DIFF_1.value: + return AdcCalibParamKeys.ADC_DIFF_1 + elif diff_pair == DiffMode.DIFF_2.value: + return AdcCalibParamKeys.ADC_DIFF_2 + elif diff_pair == DiffMode.DIFF_3.value: + return AdcCalibParamKeys.ADC_DIFF_3 + elif diff_pair == DiffMode.DIFF_4.value: + return AdcCalibParamKeys.ADC_DIFF_4 + else: raise InvalidDifferentialPairError( - f"Cannot retrieve calibration values for invalid differential pair {diff_pair}" - ) - return diff_id + f"Cannot retrieve calibration values for invalid differential pair {diff_pair}" + ) - def __get_calibration_values(self, adc_calibs: dict, adc_num: ADCNum) -> CalibParam: + def __get_calibration_params(self, adc_num: ADCNum) -> CalibParam: """ Retrieve voltage reading calibration values based on currently configured input multiplexer channels @@ -385,21 +388,30 @@ def __get_calibration_values(self, adc_calibs: dict, adc_num: ADCNum) -> CalibPa mux_p = attrgetter(f"adc_{adc_num.value.id_num}.mux_p")(state) mux_n = attrgetter(f"adc_{adc_num.value.id_num}.mux_n")(state) + return self.__get_calibration_params_mux(adc_num, mux_p.code, mux_n.code) + + def __get_calibration_params_mux(self, adc_num: ADCNum, mux_p: CH, mux_n: CH) -> CalibParam: + """ + Get calibration parameters using the multiplexing channels and without reading from state + """ # assert neither mux is set to float mode - if CH.FLOAT in (mux_p.code, mux_n.code): + if CH.FLOAT in (mux_p, mux_n): raise ValueError("Cannot retrieve calibration values for channel in float mode") - calib_key = mux_p.value if mux_n.code == CH.AINCOM else self.__get_diff_id(mux_p, mux_n) - calibs = adc_calibs[calib_key] + adc_mux = ADCProperties.ADC1_MUXP if adc_num == ADCNum.ADC_1 else ADCProperties.ADC2_MUXP + calib_key = ( + adc_mux.value.values[mux_p.value << 4].value + if mux_n == CH.AINCOM + else EdgePiADC.__get_diff_id(mux_p, mux_n) + ) + calibs = self.adc_calib_params[adc_num][calib_key] if calibs is None: _logger.error("Failed to find ADC calibration values") raise CalibKeyMissingError( - ( - "Failed to retrieve calibration values from eeprom dictionary: " - f"dict is missing key = {calib_key}" - f"\neeprom_calibs = \n{adc_calibs}" - ) + "Failed to retrieve calibration values from eeprom dictionary: " + f"dict is missing key = {calib_key}" + f"\neeprom_calibs = \n{self.adc_calib_params[adc_num]}" ) return calibs @@ -452,13 +464,10 @@ def read_voltage(self, adc_num: ADCNum): status = get_adc_status(status_code) _logger.debug(f" read_voltage: Logging STATUS byte:\n{status}") - calibs = self.__get_calibration_values(self.adc_calib_params[adc_num], adc_num) + calibs = self.__get_calibration_params(adc_num) _logger.debug(f" read_voltage: gain {calibs.gain}, offset {calibs.offset}") - # convert from code to voltage - - return code_to_voltage_single_ended(voltage_code, adc_num.value, calibs) if single_ended\ - else code_to_voltage(voltage_code, adc_num.value, calibs) + return code_to_voltage(voltage_code, adc_num.value, calibs, single_ended) def read_rtd_temperature(self): """ @@ -503,10 +512,8 @@ def single_sample(self): `float`: input voltage (V) read from ADC1 """ state = self.get_state() - single_ended = False # Check whether the ADC is either in single-ended or differential - if state.adc_1.mux_n.code == CH.AINCOM: - single_ended = True + single_ended = state.adc_1.mux_n.code == CH.AINCOM self.__enforce_pulse_mode(state) @@ -520,13 +527,12 @@ def single_sample(self): status = get_adc_status(status_code) _logger.debug(f"single_sample: Logging STATUS byte:\n{status}") - calibs = self.__get_calibration_values(self.adc_calib_params[ADCNum.ADC_1], ADCNum.ADC_1) + calibs = self.__get_calibration_params(ADCNum.ADC_1) # convert from code to voltage _logger.debug(f" read_voltage: code {voltage_code}") _logger.debug(f" read_voltage: gain {calibs.gain}, offset {calibs.offset}") - return code_to_voltage_single_ended(voltage_code, ADCNum.ADC_1.value, calibs) if \ - single_ended else code_to_voltage(voltage_code, ADCNum.ADC_1.value, calibs) + return code_to_voltage(voltage_code, ADCNum.ADC_1.value, calibs, single_ended) def single_sample_rtd(self): """ @@ -565,7 +571,7 @@ def reset(self): application of custom power-on configurations required by EdgePi. """ with self.spi_open(): - self.transfer(self.adc_ops.reset_adc()) + self.transfer(ADCCommands.reset_adc_command()) self.__reapply_config() def __is_data_ready(self, adc_num: ADCNum): @@ -573,12 +579,15 @@ def __is_data_ready(self, adc_num: ADCNum): # required for integration testing in test_conversion_times.py """Utility for testing conversion times, returns True if ADC indicates new voltage data""" with self.spi_open(): - read_data = self.transfer([adc_num.value.read_cmd] + [255] * 6) + read_data = self.transfer( + ADCCommands.read_adc_command(adc_num.value, ADC_VOLTAGE_READ_LEN) + ) if adc_num is ADCNum.ADC_1: - ready = (read_data[1] & 0b01000000) == 0b01000000 - if adc_num is ADCNum.ADC_2: - ready = (read_data[1] & 0b10000000) == 0b10000000 - return ready + return (read_data[1] & 0b01000000) == 0b01000000 + elif adc_num is ADCNum.ADC_2: + return (read_data[1] & 0b10000000) == 0b10000000 + else: + raise ValueError(f"Unexpected parameter adc_num of {adc_num}") def __read_registers_to_map(self): """ @@ -593,8 +602,6 @@ def __read_registers_to_map(self): # build dict with (register addx : register_value) pairs. reg_dict = {addx + i: reg_values[i] for i in range(ADC_NUM_REGS)} - # _logger.debug(f"__read_registers_to_map: {reg_dict}") - return reg_dict def __is_rtd_on(self): @@ -613,7 +620,7 @@ def __is_rtd_on(self): def __get_rtd_state(self): """ - Get RTD state this includes the corrent mode (on/off) and the adc type being used + Get RTD state this includes the current mode (on/off) and the adc type being used (adc1 or adc2). Returns: rtd_mode (dictionary): RTD mode dictionary {"name of config": op_codes} @@ -628,14 +635,14 @@ def __get_rtd_state(self): # Case 3: user sets any differential mode -> mux_n pass in as arg def __get_channel_assign_opcodes( self, - adc_1_mux_p: CH = None, - adc_2_mux_p: CH = None, - adc_1_mux_n: CH = CH.AINCOM, - adc_2_mux_n: CH = CH.AINCOM, + adc_1_mux_p: Optional[CH] = None, + adc_2_mux_p: Optional[CH] = None, + adc_1_mux_n: Optional[CH] = CH.AINCOM, + adc_2_mux_n: Optional[CH] = CH.AINCOM, override_rtd_validation: bool = False ): """ - Generates OpCodes for assigning positive and negative multiplexers + Generates list of OpCodes for updating positive and negative multiplexer mapping of either ADC1 or ADC2 to an ADC input channel. This is needed to allow users to assign multiplexers to both ADCs using the same input channel number enums. @@ -650,32 +657,31 @@ def __get_channel_assign_opcodes( `list`: if not empty, contains OpCode(s) for updating multiplexer channel assignment for ADC1, ADC2, or both. """ - # only update mux_n if mux_p is updated - if adc_1_mux_p is None: - adc_1_mux_n = None - - if adc_2_mux_p is None: - adc_2_mux_n = None - - # no multiplexer config to update - args = filter_dict_list_key_val(locals(), ["self", "override_rtd_validation"], [None]) - if not args: - return [] - # allowed channels depend on RTD_EN status if not override_rtd_validation: - channels = list(args.values()) + if adc_1_mux_p is None: + adc_1_mux_n = None + if adc_2_mux_p is None: + adc_2_mux_n = None + channels = [adc_1_mux_p, adc_1_mux_n, adc_2_mux_p, adc_2_mux_n] + channels = [ch for ch in channels if ch is not None] + rtd_enabled = self.__is_rtd_on() validate_channels_allowed(channels, rtd_enabled) - adc_mux_updates = { - ADCReg.REG_INPMUX: (adc_1_mux_p, adc_1_mux_n), - ADCReg.REG_ADC2MUX: (adc_2_mux_p, adc_2_mux_n), - } + opcode_list = [] - opcodes = generate_mux_opcodes(adc_mux_updates) + if adc_1_mux_p is not None and adc_1_mux_n is not None: + # ADCReg.REG_INPMUX controls multiplexing for ad1, + opcode = generate_mux_opcode(ADCReg.REG_INPMUX, adc_1_mux_p, adc_1_mux_n) + opcode_list.append(opcode) - return opcodes + if adc_2_mux_p is not None and adc_2_mux_n is not None: + # while ADCReg.REG_ADC2MUX controls multiplexing for adc2 + opcode = generate_mux_opcode(ADCReg.REG_ADC2MUX, adc_2_mux_p, adc_2_mux_n) + opcode_list.append(opcode) + + return opcode_list def select_differential(self, adc: ADCNum, diff_mode: DiffMode): """ @@ -704,7 +710,7 @@ def select_differential(self, adc: ADCNum, diff_mode: DiffMode): diff_update = mux_properties[adc] self.__config(**diff_update) - def __validate_no_rtd_conflict(self, updates: dict): + def __validate_no_rtd_conflict(self, updates: dict): """ Checks no RTD related properties are being updated if RTD mode is enabled @@ -741,7 +747,7 @@ def __check_adc_pins(self): adc_2_muxs = [state.adc_2.mux_n.code.value, state.adc_2.mux_p.code.value] return adc_1_muxs, adc_2_muxs - def __get_rtd_on_update_config(self, muxs: list, adc_num: ADCNum): + def __get_rtd_on_update_config(self, muxs: list, adc_num: ADCNum) -> dict: """ generate a dictionary of config parameters to enable RTD using specified ADC Args: @@ -752,14 +758,16 @@ def __get_rtd_on_update_config(self, muxs: list, adc_num: ADCNum): {name_of_config : op_codes} """ if any(mux not in [x.value for x in AllowedChannels.RTD_ON.value] for mux in muxs): - updates = RTDModes.RTD_ON.value |\ - (ADC2RtdConfig.OFF.value if adc_num == ADCNum.ADC_1 else ADC1RtdConfig.OFF.value) |\ - (ADC1RtdConfig.ON.value if adc_num ==ADCNum.ADC_1 else ADC2RtdConfig.ON.value) + return ( + RTDModes.RTD_ON.value + | (ADC2RtdConfig.OFF.value if adc_num == ADCNum.ADC_1 else ADC1RtdConfig.OFF.value) + | (ADC1RtdConfig.ON.value if adc_num == ADCNum.ADC_1 else ADC2RtdConfig.ON.value) + ) else: - updates = RTDModes.RTD_ON.value |\ - (ADC1RtdConfig.ON.value if adc_num ==ADCNum.ADC_1 else ADC2RtdConfig.ON.value) - return updates - + return ( + RTDModes.RTD_ON.value + | (ADC1RtdConfig.ON.value if adc_num == ADCNum.ADC_1 else ADC2RtdConfig.ON.value) + ) def __get_rtd_off_update_config(self, adc_num: ADCNum): """ @@ -769,8 +777,11 @@ def __get_rtd_off_update_config(self, adc_num: ADCNum): Return: updates (dictionary): configuration dictionary for disabling RTD """ - updates = RTDModes.RTD_OFF.value |\ - (ADC1RtdConfig.OFF.value if adc_num == ADCNum.ADC_1 else ADC2RtdConfig.OFF.value) + updates = RTDModes.RTD_OFF.value | ( + ADC1RtdConfig.OFF.value + if adc_num == ADCNum.ADC_1 + else ADC2RtdConfig.OFF.value + ) return updates def set_rtd(self, set_rtd: bool, adc_num: ADCNum = ADCNum.ADC_2): @@ -789,11 +800,11 @@ def set_rtd(self, set_rtd: bool, adc_num: ADCNum = ADCNum.ADC_2): # default setting. And get RTD_ON configuration values updates = self.__get_rtd_on_update_config( mux_2 if adc_num == ADCNum.ADC_1 else mux_1, adc_num) - # enable RTD pin to re-route internal circuit - self.__set_rtd_pin(set_rtd) else: updates = self.__get_rtd_off_update_config(adc_num) - self.__set_rtd_pin(set_rtd) + + # enable RTD pin to re-route internal circuit + self.__set_rtd_pin(set_rtd) self.__config(**updates, override_rtd_validation=True) @@ -980,6 +991,102 @@ def set_config( [None]) self.__config(**args) + # pylint: disable=too-many-branches + # pylint: disable=too-many-statements + def read_samples_adc1_batch( + self, + data_rate: ADC1DataRate, + analog_in_list: Optional[list[AnalogIn]] = None, + differential_pairs: Optional[list[DiffMode]] = None, + ) -> list: + """ + This function gets a data rate (to set ADC1 to), and a list of analog in and + differential pairs to read from. The differential pairs & analog_in elements may overlap, + but usecases are be uncommon. + + Will reset any prior config made to the ADC. Does not work with RTD mode, and may override + configs if RTD mode is active. + + This function only supports ADC 1, and changes the conversion mode to PULSE automatically. + """ + analog_in_list = [] if analog_in_list is None else analog_in_list + differential_pairs = [] if differential_pairs is None else differential_pairs + + if analog_in_list == [] and differential_pairs == []: + raise ValueError("Both analog_in_list and differential_pairs cannot be empty") + + channel_list = [ + self.__analog_in_to_adc_in_map.get(analog_in) + for analog_in in analog_in_list + ] + if any(ch is None for ch in channel_list): + raise ValueError(f"Invalid analog_in_list={analog_in_list}") + + # get current register values by doing an SPI read + register_values = self.__get_register_map() + if len(register_values.values()) < 1: + raise ValueError("Number of reg_values must be at least 1") + + # determine the conversion delay using the table from the datasheet + # (best case is sleep of 0.207ms) + filter_mode_op_code = ( + (~ADCProperties.FILTER_MODE.value.mask) + & register_values[ADCProperties.FILTER_MODE.value.addx] + ) + conversion_delay = expected_initial_time_delay( + ADCNum.ADC_1, data_rate.value.op_code, filter_mode_op_code + ) / 1000 + + # this is the register value of MODE2 that contains the new data_rate + mode2_register_value = ( + register_values[ADCReg.REG_MODE2.value] + & data_rate.value.op_mask + ) | data_rate.value.op_code + + mux_pairs = ( + [(channel, CH.AINCOM) for channel in channel_list] + + [(diff_mode.value.mux_p, diff_mode.value.mux_n) for diff_mode in differential_pairs] + ) + + data_list = self.spi_apply_adc_commands([ + # get instructions we need to send to perform a read of each pin + ADCCommands.read_command_tuple( + # the first command tuple should write the mode2 register to contain the data rate + mode2_register_value if i == 0 else None, + conversion_delay, + mux_p, mux_n + ) for i, (mux_p, mux_n) in enumerate(mux_pairs) + ]) + + # update with final ADC state we wrote (for state caching) + EdgePiADC.__state[ADCReg.REG_MODE2.value] = mode2_register_value + mux_p, mux_n = mux_pairs[-1] + EdgePiADC.__state[ADCReg.REG_INPMUX.value] = generate_mux_opcode( + ADCReg.REG_INPMUX, mux_p, mux_n + ).op_code + + voltage_list = [] + for i, read_data in enumerate(data_list): + if (len(read_data) - 1) != ADC_VOLTAGE_READ_LEN: + raise VoltageReadError( + f"Voltage read failed: incorrect number of bytes ({len(read_data)}) retrieved" + ) + + voltage_code = read_data[2 : (2 + ADCNum.ADC_1.value.num_data_bytes)] + check_code = read_data[6] + check_crc(voltage_code, check_code) + + mux_p, mux_n = mux_pairs[i] + + # convert from voltage_code to voltage value (float) + calibs = self.__get_calibration_params_mux(ADCNum.ADC_1, mux_p, mux_n) + single_ended = i < len(channel_list) + voltage_list += [code_to_voltage( + voltage_code, ADCNum.ADC_1.value, calibs, single_ended + )] + + return voltage_list + def get_state(self, override_cache: bool = False) -> ADCState: """ Read the current hardware state of configurable ADC properties diff --git a/src/edgepi/digital_input/edgepi_digital_input.py b/src/edgepi/digital_input/edgepi_digital_input.py index 9dd666fc..215f2107 100644 --- a/src/edgepi/digital_input/edgepi_digital_input.py +++ b/src/edgepi/digital_input/edgepi_digital_input.py @@ -1,5 +1,7 @@ """Digital Input Module""" +from typing import Optional + from edgepi.digital_input.digital_input_constants import DinPins from edgepi.gpio.edgepi_gpio import EdgePiGPIO @@ -12,7 +14,7 @@ def __init__(self): # To limit access to input functionality, using composition rather than inheritance self.gpio = EdgePiGPIO() - def digital_input_state(self, pin_name: DinPins = None): + def digital_input_state(self, pin_name: Optional[DinPins] = None): """ Read selected GPIO pin Args: @@ -20,6 +22,22 @@ def digital_input_state(self, pin_name: DinPins = None): Return: state (bool): corresponding pin state """ - if pin_name is None or pin_name.value not in [pins.value for pins in DinPins]: - raise InvalidPinName(f'Invalid pin name passed: {pin_name}') - return self.gpio.read_pin_state(pin_name.value) + if not isinstance(pin_name, DinPins): + raise InvalidPinName(f'Invalid pin={pin_name}') + + return self.gpio.read_din_state(pin_name) + + def digital_input_state_batch(self, pin_list: list[DinPins]) -> list: + """ + Read multiple GPIO pins as digital inputs + """ + if not pin_list: + raise ValueError(f'Unexpected pin_list={pin_list}') + + if any(not isinstance(pin_name, DinPins) for pin_name in pin_list): + raise InvalidPinName( + 'Got invalid pin names ' + f'{pin_name for pin_name in pin_list if not isinstance(pin_name, DinPins)}' + ) + + return self.gpio.batch_read_din_state(pin_list) diff --git a/src/edgepi/eeprom/edgepi_eeprom.py b/src/edgepi/eeprom/edgepi_eeprom.py index 472f6ca9..78291110 100644 --- a/src/edgepi/eeprom/edgepi_eeprom.py +++ b/src/edgepi/eeprom/edgepi_eeprom.py @@ -129,7 +129,6 @@ def __write_edgepi_reserved_memory(self, pb_serial_list: bytes): for page in pages: self.__page_write_register(mem_offset, page) mem_offset = mem_offset+len(page) - time.sleep(PAGE_WRITE_CYCLE_TIME) def __read_edgepi_reserved_memory(self): ''' @@ -159,7 +158,7 @@ def __read_edgepi_reserved_memory(self): time.sleep(0.01) return bytes(buff[2:buff_and_len]) - def read_edgepi_data(self): + def read_edgepi_data(self) -> EepromDataClass: """ Read Edgepi reserved memory space and populate dataclass Args: @@ -207,6 +206,7 @@ def __sequential_read(self, mem_addr: int = None, length: int = None): def __page_write_register(self, mem_addr: int = None, data: list = None): ''' Write operation writes a page of data to the specified address + Args: mem_addr: starting memory address to read from data: data to write to the location @@ -220,6 +220,9 @@ def __page_write_register(self, mem_addr: int = None, data: list = None): f"{len(msg[0].data)}") self.transfer(EEPROMInfo.DEV_ADDR.value, msg) + # must sleep of 10ms (PAGE_WRITE_CYCLE_TIME) after every page write + time.sleep(PAGE_WRITE_CYCLE_TIME) + def __parameter_sanity_check(self, mem_addr: int = None, length: int = None, user_space: bool = True): @@ -322,7 +325,6 @@ def write_user_space(self, data: bytes): for page in pages: self.__page_write_register(mem_offset, page) mem_offset = mem_offset+len(page) - time.sleep(PAGE_WRITE_CYCLE_TIME) # TODO why not separate it into a class def init_memory(self): @@ -390,7 +392,6 @@ def reset_user_space(self): for _ in range(tatal_page): self.__page_write_register(mem_offset, reset_vals) mem_offset = mem_offset+page_size - time.sleep(PAGE_WRITE_CYCLE_TIME) def reset_edgepi_memory(self, bin_hash: str = None, bin_bytes: bytes = None): """ diff --git a/src/edgepi/eeprom/edgepi_eeprom_data.py b/src/edgepi/eeprom/edgepi_eeprom_data.py index 8db844e3..acabba29 100644 --- a/src/edgepi/eeprom/edgepi_eeprom_data.py +++ b/src/edgepi/eeprom/edgepi_eeprom_data.py @@ -17,7 +17,7 @@ @dataclass class EepromDataClass: - """EEPROM Dataclass""" + """Stores data read from the EdgePi's non-volatile EEPROM""" # pylint: disable=too-many-instance-attributes dac_calib_params: DACModule = None adc1_calib_params: ADCModule = None diff --git a/src/edgepi/eeprom/protobuf_assets/eeprom_data_classes/eeprom_adc_module.py b/src/edgepi/eeprom/protobuf_assets/eeprom_data_classes/eeprom_adc_module.py index 1d875ec3..47a79ec0 100644 --- a/src/edgepi/eeprom/protobuf_assets/eeprom_data_classes/eeprom_adc_module.py +++ b/src/edgepi/eeprom/protobuf_assets/eeprom_data_classes/eeprom_adc_module.py @@ -3,6 +3,23 @@ from edgepi.calibration.calibration_constants import CalibParam from edgepi.eeprom.protobuf_assets.generated_pb2 import adc_module_pb2 +class AdcCalibParamKeys: + """Keys for the ADC Callibration Parameters dictionary""" + ADC_CH_1 = 0 + ADC_CH_2 = 1 + ADC_CH_3 = 2 + ADC_CH_4 = 3 + + ADC_CH_5 = 4 + ADC_CH_6 = 5 + ADC_CH_7 = 6 + ADC_CH_8 = 7 + + ADC_DIFF_1 = 8 + ADC_DIFF_2 = 9 + ADC_DIFF_3 = 10 + ADC_DIFF_4 = 11 + @dataclass class ADCModule: """ADC Module Dataclass""" @@ -104,17 +121,19 @@ def extract_adc_calib_params(adc_pb: adc_module_pb2): def extract_ch_dict(self): """create channel to calibration param dictionary""" ch_dict = { - 0:self.adc_ch_1, - 1:self.adc_ch_2, - 2:self.adc_ch_3, - 3:self.adc_ch_4, - 4:self.adc_ch_5, - 5:self.adc_ch_6, - 6:self.adc_ch_7, - 7:self.adc_ch_8, - 8:self.adc_diff_1, - 9:self.adc_diff_2, - 10:self.adc_diff_3, - 11:self.adc_diff_4, + AdcCalibParamKeys.ADC_CH_1: self.adc_ch_1, + AdcCalibParamKeys.ADC_CH_2: self.adc_ch_2, + AdcCalibParamKeys.ADC_CH_3: self.adc_ch_3, + AdcCalibParamKeys.ADC_CH_4: self.adc_ch_4, + + AdcCalibParamKeys.ADC_CH_5: self.adc_ch_5, + AdcCalibParamKeys.ADC_CH_6: self.adc_ch_6, + AdcCalibParamKeys.ADC_CH_7: self.adc_ch_7, + AdcCalibParamKeys.ADC_CH_8: self.adc_ch_8, + + AdcCalibParamKeys.ADC_DIFF_1: self.adc_diff_1, + AdcCalibParamKeys.ADC_DIFF_2: self.adc_diff_2, + AdcCalibParamKeys.ADC_DIFF_3: self.adc_diff_3, + AdcCalibParamKeys.ADC_DIFF_4: self.adc_diff_4, } return ch_dict diff --git a/src/edgepi/gpio/edgepi_gpio_chip.py b/src/edgepi/gpio/edgepi_gpio_chip.py index 03b33f12..1aa27e46 100644 --- a/src/edgepi/gpio/edgepi_gpio_chip.py +++ b/src/edgepi/gpio/edgepi_gpio_chip.py @@ -3,6 +3,8 @@ ''' import logging +from typing import Optional + from edgepi.peripherals.gpio import GpioDevice from edgepi.gpio.gpio_constants import GpioDevPaths from edgepi.gpio.gpio_configs import DINPins, generate_gpiochip_pin_info @@ -15,21 +17,26 @@ class EdgePiGPIOChip(GpioDevice): each module that requires GPIO manipulation. """ # dictionary mapping pin name to CPU gpio pin number - __pin_name_dict = {DINPins.DIN1.value : 26, - DINPins.DIN2.value : 6, - DINPins.DIN3.value : 11, - DINPins.DIN4.value : 9, - DINPins.DIN5.value : 22, - DINPins.DIN6.value : 27, - DINPins.DIN7.value : 10, - DINPins.DIN8.value : 7 - } + __pin_name_dict = { + DINPins.DIN1.value : 26, + DINPins.DIN2.value : 6, + DINPins.DIN3.value : 11, + DINPins.DIN4.value : 9, + DINPins.DIN5.value : 22, + DINPins.DIN6.value : 27, + DINPins.DIN7.value : 10, + DINPins.DIN8.value : 7 + } + __din_pin_dir = "in" + __din_pin_bias = "pull_down" def __init__(self): super().__init__(GpioDevPaths.GPIO_CIHP_DEV_PATH.value) self.gpiochip_pins_dict = generate_gpiochip_pin_info() - def read_gpio_pin_state(self, pin_name: str = None): + # TODO: remove default None & make it non-optional, or handle None case. + # (former perferred) + def read_gpio_pin_state(self, pin_name: Optional[str] = None) -> bool: """ Read current state of GPIO pins. If the GPIO object is instantiated, it will be a unique object until close() method is called. So every time the state is read, it will instantiate @@ -45,6 +52,32 @@ def read_gpio_pin_state(self, pin_name: str = None): state = self.read_state() return state + def read_din_state(self, pin_name: DINPins) -> bool: + """ + A faster alternative to `read_gpio_pin_state` for reading only DIN pins. + + It is slightly faster than `read_gpio_pin_state` as it combines opening & + reading into a single operation. + """ + return self.open_read_state( + pin_num = self.__pin_name_dict[pin_name.value], + pin_dir = self.__din_pin_dir, + pin_bias = self.__din_pin_bias, + ) + + def batch_read_din_state(self, pin_list: list[DINPins]) -> list[bool]: + """ + This function efficiently reads from many DIN pins in a row by only opening & + closing the GPIO port only once. + """ + return self.open_read_state_batch( + pin_num_list = [ + self.__pin_name_dict[pin_name.value] for pin_name in pin_list + ], + pin_dir = self.__din_pin_dir, + pin_bias = self.__din_pin_bias, + ) + def write_gpio_pin_state(self, pin_name: str = None, state: bool = None): """ write pin state diff --git a/src/edgepi/gpio/gpio_configs.py b/src/edgepi/gpio/gpio_configs.py index 9fa24c94..3ced8d91 100644 --- a/src/edgepi/gpio/gpio_configs.py +++ b/src/edgepi/gpio/gpio_configs.py @@ -433,6 +433,9 @@ def generate_pin_info(config: Union[GpioExpanderConfig, GpioChipConfig] = None): pin_dict = _generate_DOUT_expander_pins() elif config.name == GpioConfigs.PWM.value.name: pin_dict = _generate_PWM_expander_pins() + else: + raise ValueError(f"generate_pin_info received unknown config name {config.name}") + return pin_dict def generate_expander_pin_info(): diff --git a/src/edgepi/peripherals/gpio.py b/src/edgepi/peripherals/gpio.py index a3338f9b..7f1826c4 100644 --- a/src/edgepi/peripherals/gpio.py +++ b/src/edgepi/peripherals/gpio.py @@ -6,7 +6,6 @@ from contextlib import contextmanager from periphery import GPIO - class GpioDevice: """Class for representing a GPIO device""" lock_gpio=threading.Lock() @@ -35,7 +34,7 @@ def open_gpio(self, pin_num: int = None, pin_dir: str = None, pin_bias: str = No finally: GpioDevice.lock_gpio.release() - def read_state(self): + def read_state(self) -> bool: """ Read the GPIO pin state Args: @@ -45,6 +44,73 @@ def read_state(self): """ return self.gpio.read() + def open_read_state(self, pin_num:int, pin_dir:str, pin_bias:str) -> bool: + """ + To minimize issues with the lock, we open & read in a single function call + """ + try: + # pylint: disable=consider-using-with + GpioDevice.lock_gpio.acquire() + gpio = GPIO(self.gpio_fd, pin_num, pin_dir, bias=pin_bias) + result = gpio.read() + + finally: + try: + gpio.close() + except Exception as exc: + raise OSError(f"Failed to close {self.gpio_fd}") from exc + finally: + GpioDevice.lock_gpio.release() + + return result + + def open_read_state_batch( + self, + pin_num_list: list[int], + pin_dir: str, + pin_bias: str, + ) -> list[bool]: + """ + Batch several gpio reads into a single lock / unlock. We can also take advantage of + the fact we're accessing the gpio only from different pins, and so we can open & close + just a single time. + """ + results = [] + try: + # pylint: disable=consider-using-with + GpioDevice.lock_gpio.acquire() + gpio = None + try: + # Performance Notes: + + # GPIO(), gpio._reopen(), gpio.read(), and gpio.close() all take roughly the same + # amount of time (read() is the shortest, but luckily required). When we batch the + # reads, the main performance improvement comes from the fact that we don't call + # GPIO() (posix.open()) and gpio.close() 8 times. Instead, they're only called once + + for pin_num in pin_num_list: + if gpio is None: + gpio = GPIO(self.gpio_fd, pin_num, pin_dir, bias=pin_bias) + else: + # NOTE: we'll need to be careful when we update periphery, since we depend + # on private functionality + gpio._line = pin_num # pylint: disable=protected-access + # pylint: disable=protected-access + gpio._reopen( + pin_dir, edge="none", bias=pin_bias, drive="default", inverted=False + ) + results.append(gpio.read()) + finally: + try: + gpio.close() + except Exception as exc: + raise OSError(f"Failed to close {self.gpio_fd}") from exc + + finally: + GpioDevice.lock_gpio.release() + + return results + def write_state(self, state: bool = None): """ Write state to GPIO pin diff --git a/src/edgepi/peripherals/spi.py b/src/edgepi/peripherals/spi.py index 63684cc3..d6b2229a 100644 --- a/src/edgepi/peripherals/spi.py +++ b/src/edgepi/peripherals/spi.py @@ -5,9 +5,11 @@ SpiDevice """ #pylint:disable=too-many-instance-attributes +from contextlib import contextmanager import logging import threading -from contextlib import contextmanager +import time + from periphery import SPI @@ -70,3 +72,40 @@ def transfer(self, data: list) -> list: """Conduct an SPI data transfer""" out = self.spi.transfer(data) return out + + def spi_apply_adc_commands(self, command_tup_list): + """ + This function applies a list of SPI commands for use in the ADC module, + such as sending & reading data. + + Each "command tuple" in the list contains a command, a delay (often required!), + then another command. + + See the `unsafe_write_register_command` for creating commands. + """ + result_list = [] + + try: + SpiDevice.lock_spi[self.dev_id].acquire() + self.spi = SPI( + self.devpath, + self.mode, + self.max_speed, + self.bit_order, + self.bits_per_word, + self.extra_flags, + ) + for data1, delay, data2 in command_tup_list: + self.spi.transfer(data1) + time.sleep(delay) + result_list += [self.spi.transfer(data2)] + + finally: + try: + self.spi.close() + except Exception as exc: + raise OSError(f"Failed to close {self.devpath}") from exc + finally: + SpiDevice.lock_spi[self.dev_id].release() + + return result_list diff --git a/src/edgepi/reg_helper/reg_helper.py b/src/edgepi/reg_helper/reg_helper.py index 5017be27..f0097380 100644 --- a/src/edgepi/reg_helper/reg_helper.py +++ b/src/edgepi/reg_helper/reg_helper.py @@ -13,13 +13,11 @@ apply_opcode(OpCode, int) """ - from copy import deepcopy from dataclasses import dataclass from enum import Enum import logging - _logger = logging.getLogger(__name__) @@ -98,6 +96,7 @@ def apply_opcodes(register_values: dict, opcodes: list): Raises: ValueError: if either register_values or opcodes is empty """ + if len(register_values) < 1 or len(opcodes) < 1: _logger.error( "empty values received for 'register_values' or 'opcodes' args, opcodes not applied" @@ -116,6 +115,9 @@ def apply_opcodes(register_values: dict, opcodes: list): register_entry["value"] = _apply_opcode(register_entry["value"], opcode) register_entry["is_changed"] = True + # There's no reason for us to suspect the other values would change? + # I'm not sure why this was done. The deepcopy which depends on this + # is quite slow (takes 1ms!) __validate_register_updates(original_regs, register_values) return register_values diff --git a/src/edgepi/utilities/utilities.py b/src/edgepi/utilities/utilities.py index 3f1b032f..c6891a46 100644 --- a/src/edgepi/utilities/utilities.py +++ b/src/edgepi/utilities/utilities.py @@ -5,9 +5,7 @@ bitstring_from_list(list) """ - -from bitstring import BitArray, pack - +from bitstring import BitArray def filter_dict(dictionary: dict, entry_key="", entry_val="") -> dict: """use for filtering an entry from a dictionary by key or value @@ -28,6 +26,7 @@ def filter_dict(dictionary: dict, entry_key="", entry_val="") -> dict: } return filtered_args + def filter_dict_list_key_val(dictionary: dict, entry_key: list, entry_val:list) -> dict: """use for filtering an entry from a dictionary by key or value @@ -43,14 +42,13 @@ def filter_dict_list_key_val(dictionary: dict, entry_key: list, entry_val:list) key or value matches either the entry_key or entry_val, respectively. """ filtered_args = { - key: value for (key, value) in dictionary.items() \ + key: value for (key, value) in dictionary.items() if key not in entry_key and value not in entry_val } return filtered_args - -def bitstring_from_list(data: list) -> BitArray: +def bitstring_from_list(data: list[int]) -> BitArray: """ Builds a bitstring from a list of uint byte values @@ -60,8 +58,10 @@ def bitstring_from_list(data: list) -> BitArray: Returns: BitArray: bitstring of bytes ordered from data[0], data[1], ..., data[n-1] """ - code = BitArray() - for value in data: - next_byte = pack("uint:8", value) - code.append(next_byte) - return code + # bytes() will raise a ValueError if any items are not in the range [0, 255] + return BitArray(bytes(data)) + + +def combine_to_uint32(a: int, b: int, c: int, d: int) -> int: + """simply packs 4 bytes into a uint32 (BE)""" + return (a << 24) + (b << 16) + (c << 8) + d diff --git a/src/test_edgepi/integration_tests/conftest.py b/src/test_edgepi/integration_tests/conftest.py new file mode 100644 index 00000000..0254d3a9 --- /dev/null +++ b/src/test_edgepi/integration_tests/conftest.py @@ -0,0 +1,27 @@ +"""Main test config file for integration tests""" + +import base64 +import hashlib +import platform + +import pytest + +from edgepi.eeprom.edgepi_eeprom import EdgePiEEPROM +from edgepi.eeprom.eeprom_constants import DEFAULT_EEPROM_BIN_B64 + +TEST_DEVICE_NAME = "edgepi-intg2" + +@pytest.fixture(scope="session", autouse=True) +def eeprom_reset(): + """Automatically restart the eeprom after each test""" + edgepi_eeprom = EdgePiEEPROM() + + if platform.node() == TEST_DEVICE_NAME: + print('loading default eeprom image ...') + default_bin = base64.b64decode(DEFAULT_EEPROM_BIN_B64) + hash_res = hashlib.md5(default_bin) + print('reseting eeprom ...') + edgepi_eeprom.reset_edgepi_memory(hash_res.hexdigest(), default_bin) + print('done!') + else: + print("dont reset eeprom") diff --git a/src/test_edgepi/integration_tests/test_adc/test_adc.py b/src/test_edgepi/integration_tests/test_adc/test_adc.py index cd60a194..04b174d2 100644 --- a/src/test_edgepi/integration_tests/test_adc/test_adc.py +++ b/src/test_edgepi/integration_tests/test_adc/test_adc.py @@ -1,5 +1,7 @@ """ Integration tests for EdgePi ADC module """ +import time +import statistics import logging import pytest @@ -19,9 +21,13 @@ DiffMode, RTDModes, ADC1PGA, + AnalogIn, ) from edgepi.adc.edgepi_adc import EdgePiADC +from edgepi.dac.edgepi_dac import EdgePiDAC +from edgepi.dac.dac_constants import DACChannel + _logger = logging.getLogger(__name__) # pylint: disable=protected-access @@ -791,7 +797,6 @@ def test_voltage_individual(ch, adc): _logger.info(f"test_voltage_individual: voltage = {out}") assert out != 0 - @pytest.mark.parametrize( "adc_num, ch", [ @@ -834,7 +839,6 @@ def test_voltage_continuous(adc_num, ch, adc): finally: adc.stop_conversions(adc_num) - @pytest.mark.parametrize('adc_num, diff, mux_reg, mux_reg_val', [ (ADCNum.ADC_1, DiffMode.DIFF_1, ADCReg.REG_INPMUX, 0x01), @@ -865,3 +869,69 @@ def test_set_rtd(enable, rtd_mode, adc_num, expected, adc): adc.set_rtd(set_rtd=enable, adc_num=adc_num) assert adc.get_state().rtd_mode == rtd_mode assert adc.get_state().rtd_adc == expected + +@pytest.mark.parametrize( + "out_list, channel_list, voltage", + [ + ([DACChannel.AOUT2], [AnalogIn.AIN2], 3.0), + ([DACChannel.AOUT3], [AnalogIn.AIN3], 2.0), + ([DACChannel.AOUT2, DACChannel.AOUT3], [AnalogIn.AIN2, AnalogIn.AIN3], 1.0), + ], +) +def test_adc_batch_voltage(out_list, channel_list, voltage, adc): + edgepi_dac = EdgePiDAC() + edgepi_dac.set_dac_gain(False) + + # write voltage values + for out_channel in out_list: + edgepi_dac.write_voltage(out_channel, voltage) + + # sample + result_list = [] + for i in range(250): + result_list += adc.read_samples_adc1_batch( + data_rate=ADC1DataRate.SPS_38400, + analog_in_list=channel_list, + ) + + for i, _ in enumerate(channel_list): + avg = statistics.mean(result_list[i::len(channel_list)]) + assert (voltage - 0.15) <= avg <= (voltage + 0.15) + + edgepi_dac.reset() + adc.reset() + +AVG_SPEED_TARGET_MS = 10 +NUM_ITER = 500 + +@pytest.mark.parametrize( + "channel_list", + [ + ([ + AnalogIn.AIN2, AnalogIn.AIN3, + ]), + ([ + AnalogIn.AIN1, AnalogIn.AIN2, AnalogIn.AIN3, AnalogIn.AIN4, + AnalogIn.AIN5, AnalogIn.AIN6, AnalogIn.AIN7, AnalogIn.AIN8, + ]), + ], +) +def test_adc_batch_speed(channel_list, adc): + ''' + This is a speed test, making sure that all ADC inputs can be read at a rate of 100hz (10ms) + ''' + start = time.time() + + # do samples + result_list = [] + for _ in range(NUM_ITER): + result_list += adc.read_samples_adc1_batch( + data_rate=ADC1DataRate.SPS_38400, + analog_in_list=channel_list, + ) + + avg = 1000.0 * ((time.time() - start) / NUM_ITER) + assert avg <= AVG_SPEED_TARGET_MS + + # reset adc registers to pre-test values + adc.reset() diff --git a/src/test_edgepi/integration_tests/test_digital_in/test_digital_in.py b/src/test_edgepi/integration_tests/test_digital_in/test_digital_in.py index 3521bef9..d49aa298 100644 --- a/src/test_edgepi/integration_tests/test_digital_in/test_digital_in.py +++ b/src/test_edgepi/integration_tests/test_digital_in/test_digital_in.py @@ -1,6 +1,8 @@ '''Integration tests for edgepi_digital_input.py module''' +import time import pytest + from edgepi.digital_input.edgepi_digital_input import EdgePiDigitalInput from edgepi.digital_input.digital_input_constants import DinPins @@ -15,6 +17,27 @@ (DinPins.DIN8), ]) def test_input_state(pin_name): - din=EdgePiDigitalInput() + din = EdgePiDigitalInput() pin_state = din.digital_input_state(pin_name) assert pin_state is False + +@pytest.mark.parametrize("pin_names", [ + ([ + DinPins.DIN1, DinPins.DIN2, DinPins.DIN3, DinPins.DIN4, + DinPins.DIN5, DinPins.DIN6, DinPins.DIN7, DinPins.DIN8, + ]), +]) +def test_input_state_speed(pin_names): + + din = EdgePiDigitalInput() + start = time.time() + + pin_states = [] + for _ in range(500): + tmp = din.digital_input_state_batch(pin_names) + pin_states += tmp + + target_ms = 1 + avg = 1000.0 * (time.time() - start) / 500 + #print(avg) + assert avg < target_ms diff --git a/src/test_edgepi/integration_tests/test_eeprom/test_eeprom.py b/src/test_edgepi/integration_tests/test_eeprom/test_eeprom.py index 3cbdef47..87441cf7 100644 --- a/src/test_edgepi/integration_tests/test_eeprom/test_eeprom.py +++ b/src/test_edgepi/integration_tests/test_eeprom/test_eeprom.py @@ -1,4 +1,4 @@ -'''integration test for access eeprom''' +'''integration test for access eeprom. These will only run on the dedicated testing host.''' # pylint: disable=no-name-in-module # pylint: disable=wrong-import-position @@ -10,6 +10,7 @@ import string import random import base64 +import platform import time import logging @@ -21,6 +22,8 @@ from edgepi.eeprom.edgepi_eeprom import EdgePiEEPROM, PermissionDenied from edgepi.eeprom.edgepi_eeprom_data import EepromDataClass +from ..conftest import TEST_DEVICE_NAME + @pytest.fixture(name="eeprom") def fixture_test_eeprom(): eeprom = EdgePiEEPROM() @@ -33,22 +36,31 @@ def fixture_test_eeprom(): ]) # pylint: disable=protected-access def test__page_write_register(data, address, eeprom): - # reset user space to make sure init vals are set to 255 - eeprom.reset_user_space() - addrx = EdgePiMemoryInfo.USER_SPACE_START_BYTE.value + address - with eeprom.i2c_open(): - initial_data = eeprom._EdgePiEEPROM__sequential_read(addrx,len(data)) - eeprom._EdgePiEEPROM__page_write_register(addrx, data) - time.sleep(0.5) - new_data = eeprom._EdgePiEEPROM__sequential_read(addrx,len(data)) - time.sleep(0.5) - eeprom._EdgePiEEPROM__page_write_register(addrx, [255]*len(data)) - _logger.info(f"data to write = {data}") - _logger.info(f"initial data = {initial_data}") - _logger.info(f"new data = {new_data}") - for indx, init_data in enumerate(initial_data): - assert init_data != new_data[indx] - assert new_data[indx] == data[indx] + if platform.node() != TEST_DEVICE_NAME: + pytest.skip("won't run dangerous test on user device") + + original_data = eeprom.read_edgepi_data() + try: + # reset user space to make sure init vals are set to 255 + eeprom.reset_user_space() + addrx = EdgePiMemoryInfo.USER_SPACE_START_BYTE.value + address + with eeprom.i2c_open(): + initial_data = eeprom._EdgePiEEPROM__sequential_read(addrx,len(data)) + eeprom._EdgePiEEPROM__page_write_register(addrx, data) + time.sleep(0.5) + new_data = eeprom._EdgePiEEPROM__sequential_read(addrx,len(data)) + time.sleep(0.5) + eeprom._EdgePiEEPROM__page_write_register(addrx, [255]*len(data)) + _logger.info(f"data to write = {data}") + _logger.info(f"initial data = {initial_data}") + _logger.info(f"new data = {new_data}") + for indx, init_data in enumerate(initial_data): + assert init_data != new_data[indx] + assert new_data[indx] == data[indx] + + finally: + # Write the original data back + eeprom.write_edgepi_data(original_data) DUMMY_KEY = '-----BEGIN RSA PRIVATE KEY-----\nMIIEpQIBAAKCAQEAnwu+S/OI3Hl0BCNQASv0HU5Jc4KUT2X4/tLyk\ Qcd6pE\nv7fji6ZoW/dl8dKwwdi/cfSS/J5Iv+5FwQU4KGNBbhVAnmJeLd+PMUT4bQTf9rVF\nHsDoIPoQLDH7jmBu8ai7jQ0hY\ @@ -70,44 +82,44 @@ def test__page_write_register(data, address, eeprom): TmZ\n-----END RSA PRIVATE KEY-----\n' def test_write_edgepi_data(eeprom): + if platform.node() != TEST_DEVICE_NAME: + pytest.skip("won't run dangerous test on user device") original_data = eeprom.read_edgepi_data() + try: + for _ in range(10): + # generate random strings + str_len = 100 + res = ''.join( + random.choices(string.ascii_uppercase + string.digits, k=str_len) + ) + # Modified data to write to memory + modified_data = eeprom.read_edgepi_data() + modified_data.config_key.certificate = DUMMY_KEY + res + modified_data.config_key.private_key = DUMMY_KEY + res + modified_data.data_key.certificate = DUMMY_KEY + res + modified_data.data_key.private_key = DUMMY_KEY + res + # Write modified data + eeprom.write_edgepi_data(modified_data) + # Read back the changed data + modified_data = eeprom.read_edgepi_data() + assert modified_data.dac_calib_params == original_data.dac_calib_params + assert modified_data.adc1_calib_params == original_data.adc1_calib_params + assert modified_data.adc2_calib_params == original_data.adc2_calib_params + assert modified_data.rtd_calib_params == original_data.rtd_calib_params + assert modified_data.tc_calib_params == original_data.tc_calib_params + assert modified_data.config_key.certificate == DUMMY_KEY + res + assert modified_data.config_key.private_key == DUMMY_KEY + res + assert modified_data.config_key.certificate == DUMMY_KEY + res + assert modified_data.data_key.private_key == DUMMY_KEY + res + assert modified_data.serial == original_data.serial + assert modified_data.model == original_data.model + assert modified_data.cm_part_number == original_data.cm_part_number + assert modified_data.tb_part_number == original_data.tb_part_number + assert modified_data.cm4_part_number == original_data.cm4_part_number - for _ in range(10): - # initializing size of string - str_len = 100 - # using random.choices() - # generating random strings - res = ''.join(random.choices(string.ascii_uppercase + - string.digits, k=str_len)) - - # Modified data to write to memory - modified_data = eeprom.read_edgepi_data() - modified_data.config_key.certificate = DUMMY_KEY + res - modified_data.config_key.private_key = DUMMY_KEY + res - modified_data.data_key.certificate = DUMMY_KEY + res - modified_data.data_key.private_key = DUMMY_KEY + res - # Write modified data - eeprom.write_edgepi_data(modified_data) - # Read back the changed data - modified_data = eeprom.read_edgepi_data() - - assert modified_data.dac_calib_params == original_data.dac_calib_params - assert modified_data.adc1_calib_params == original_data.adc1_calib_params - assert modified_data.adc2_calib_params == original_data.adc2_calib_params - assert modified_data.rtd_calib_params == original_data.rtd_calib_params - assert modified_data.tc_calib_params == original_data.tc_calib_params - assert modified_data.config_key.certificate == DUMMY_KEY + res - assert modified_data.config_key.private_key == DUMMY_KEY + res - assert modified_data.config_key.certificate == DUMMY_KEY + res - assert modified_data.data_key.private_key == DUMMY_KEY + res - assert modified_data.serial == original_data.serial - assert modified_data.model == original_data.model - assert modified_data.cm_part_number == original_data.cm_part_number - assert modified_data.tb_part_number == original_data.tb_part_number - assert modified_data.cm4_part_number == original_data.cm4_part_number - - # Write the original data back - eeprom.write_edgepi_data(original_data) + finally: + # Write the original data back + eeprom.write_edgepi_data(original_data) @pytest.mark.parametrize("bin_hash, error", [ @@ -116,23 +128,30 @@ def test_write_edgepi_data(eeprom): ("6b68b8e2dd2a6bec300ef91572270723", does_not_raise()) ]) def test_reset_edgepi_memory(bin_hash, error, eeprom): + if platform.node() != TEST_DEVICE_NAME: + pytest.skip("won't run dangerous test on user device") + original_data = eeprom.read_edgepi_data() - with error: - eeprom.reset_edgepi_memory(bin_hash, base64.b64decode(DEFAULT_EEPROM_BIN_B64)) - written_data = eeprom.read_edgepi_data() - default_data = eeprom.eeprom_pb.ParseFromString(base64.b64decode(DEFAULT_EEPROM_BIN_B64)) - default_data = EepromDataClass.extract_eeprom_data(eeprom.eeprom_pb) - assert written_data.dac_calib_params == default_data.dac_calib_params - assert written_data.adc1_calib_params == default_data.adc1_calib_params - assert written_data.adc2_calib_params == default_data.adc2_calib_params - assert written_data.rtd_calib_params == default_data.rtd_calib_params - assert written_data.tc_calib_params == default_data.tc_calib_params - assert written_data.config_key == default_data.config_key - assert written_data.data_key == default_data.data_key - assert written_data.serial == default_data.serial - assert written_data.model == default_data.model - assert written_data.cm_part_number == default_data.cm_part_number - assert written_data.tb_part_number == default_data.tb_part_number - assert written_data.cm4_part_number == default_data.cm4_part_number - # Reset to origianl Data + try: + with error: + eeprom.reset_edgepi_memory(bin_hash, base64.b64decode(DEFAULT_EEPROM_BIN_B64)) + written_data = eeprom.read_edgepi_data() + default_data = eeprom.eeprom_pb.ParseFromString( + base64.b64decode(DEFAULT_EEPROM_BIN_B64) + ) + default_data = EepromDataClass.extract_eeprom_data(eeprom.eeprom_pb) + assert written_data.dac_calib_params == default_data.dac_calib_params + assert written_data.adc1_calib_params == default_data.adc1_calib_params + assert written_data.adc2_calib_params == default_data.adc2_calib_params + assert written_data.rtd_calib_params == default_data.rtd_calib_params + assert written_data.tc_calib_params == default_data.tc_calib_params + assert written_data.config_key == default_data.config_key + assert written_data.data_key == default_data.data_key + assert written_data.serial == default_data.serial + assert written_data.model == default_data.model + assert written_data.cm_part_number == default_data.cm_part_number + assert written_data.tb_part_number == default_data.tb_part_number + assert written_data.cm4_part_number == default_data.cm4_part_number + finally: + # Reset to original data eeprom.write_edgepi_data(original_data) diff --git a/src/test_edgepi/unit_tests/test_adc/test_adc_multiplexers.py b/src/test_edgepi/unit_tests/test_adc/test_adc_multiplexers.py index 3bd0901b..aa8e7618 100644 --- a/src/test_edgepi/unit_tests/test_adc/test_adc_multiplexers.py +++ b/src/test_edgepi/unit_tests/test_adc/test_adc_multiplexers.py @@ -7,106 +7,43 @@ from edgepi.adc.adc_constants import ADCChannel as CH, ADCReg from edgepi.reg_helper.reg_helper import BitMask, OpCode from edgepi.adc.adc_multiplexers import ( - generate_mux_opcodes, + generate_mux_opcode, ChannelNotAvailableError, validate_channels_allowed, ) @pytest.mark.parametrize( - "mux_updates, expected", + "addx, adc1_mux, adc2_mux, expected", [ ( - { - ADCReg.REG_INPMUX: (None, None), - ADCReg.REG_ADC2MUX: (None, None), - }, - [], + ADCReg.REG_INPMUX, + CH.AIN1, + CH.AINCOM, + OpCode(0x1A, ADCReg.REG_INPMUX.value, BitMask.BYTE.value), ), ( - { - ADCReg.REG_INPMUX: (None, CH.AIN2), - ADCReg.REG_ADC2MUX: (None, None), - }, - [], + ADCReg.REG_ADC2MUX, + CH.AIN5, + CH.AIN6, + OpCode(0x56, ADCReg.REG_ADC2MUX.value, BitMask.BYTE.value), ), ( - { - ADCReg.REG_INPMUX: (CH.AIN1, CH.AINCOM), - ADCReg.REG_ADC2MUX: (None, None), - }, - [OpCode(0x1A, ADCReg.REG_INPMUX.value, BitMask.BYTE.value)], + ADCReg.REG_INPMUX, + CH.AIN1, + CH.AIN2, + OpCode(0x12, ADCReg.REG_INPMUX.value, BitMask.BYTE.value), ), ( - { - ADCReg.REG_INPMUX: (CH.AIN7, None), - ADCReg.REG_ADC2MUX: (None, None), - }, - [], - ), - ( - { - ADCReg.REG_INPMUX: (None, CH.AIN5), - ADCReg.REG_ADC2MUX: (None, None), - }, - [], - ), - ( - { - ADCReg.REG_INPMUX: (None, None), - ADCReg.REG_ADC2MUX: (CH.AIN5, None), - }, - [], - ), - ( - { - ADCReg.REG_INPMUX: (None, None), - ADCReg.REG_ADC2MUX: (None, CH.AIN6), - }, - [], - ), - ( - { - ADCReg.REG_INPMUX: (None, None), - ADCReg.REG_ADC2MUX: (CH.AIN5, CH.AIN6), - }, - [OpCode(0x56, ADCReg.REG_ADC2MUX.value, BitMask.BYTE.value)], - ), - ( - { - ADCReg.REG_INPMUX: (CH.AIN1, None), - ADCReg.REG_ADC2MUX: (None, None), - }, - [], - ), - ( - { - ADCReg.REG_INPMUX: (CH.AIN5, None), - ADCReg.REG_ADC2MUX: (None, CH.AIN6), - }, - [], - ), - ( - { - ADCReg.REG_INPMUX: (None, CH.AIN5), - ADCReg.REG_ADC2MUX: (CH.AIN6, None), - }, - [], - ), - ( - { - ADCReg.REG_INPMUX: (CH.AIN1, CH.AIN2), - ADCReg.REG_ADC2MUX: (CH.AIN3, CH.AIN4), - }, - [ - OpCode(0x12, ADCReg.REG_INPMUX.value, BitMask.BYTE.value), - OpCode(0x34, ADCReg.REG_ADC2MUX.value, BitMask.BYTE.value), - ], + ADCReg.REG_ADC2MUX, + CH.AIN3, + CH.AIN4, + OpCode(0x34, ADCReg.REG_ADC2MUX.value, BitMask.BYTE.value), ), ], ) -def test_generate_mux_opcodes(mux_updates, expected): - assert generate_mux_opcodes(mux_updates) == expected +def test_generate_mux_opcode(addx, adc1_mux, adc2_mux, expected): + assert generate_mux_opcode(addx, adc1_mux, adc2_mux) == expected @pytest.mark.parametrize( diff --git a/src/test_edgepi/unit_tests/test_adc/test_adc_voltage.py b/src/test_edgepi/unit_tests/test_adc/test_adc_voltage.py index baf3f40a..b3ef8a0e 100644 --- a/src/test_edgepi/unit_tests/test_adc/test_adc_voltage.py +++ b/src/test_edgepi/unit_tests/test_adc/test_adc_voltage.py @@ -3,14 +3,12 @@ import pytest from edgepi.calibration.calibration_constants import CalibParam -from edgepi.utilities.utilities import bitstring_from_list from edgepi.adc.adc_constants import ADCNum from edgepi.adc.adc_voltage import ( _code_to_input_voltage, _is_negative_voltage, _adc_voltage_to_input_voltage, code_to_voltage, - code_to_voltage_single_ended, code_to_temperature, ) @@ -26,8 +24,7 @@ ([0x7F,0xFF,0xFF,0xFF], False), ]) def test_is_negative_voltage(code, result): - code_bits = bitstring_from_list(code) - assert _is_negative_voltage(code_bits) ==result + assert _is_negative_voltage(code) == result @pytest.mark.parametrize( @@ -98,7 +95,10 @@ def test__adc_voltage_to_input_voltage(voltage, gain, offset, result): ], ) def test_code_to_voltage(code, adc_num, calibs, result): - assert pytest.approx(code_to_voltage(code, adc_num, calibs),0.0001) == result + assert pytest.approx( + code_to_voltage(code, adc_num, calibs, single_ended=False), + 0.0001, + ) == result @pytest.mark.parametrize( "code, adc_num, calibs, result", @@ -117,7 +117,7 @@ def test_code_to_voltage(code, adc_num, calibs, result): ], ) def test_code_to_voltage_single_ended(code, adc_num, calibs, result): - assert pytest.approx(code_to_voltage_single_ended(code, adc_num, calibs),0.0001) == result + assert pytest.approx(code_to_voltage(code, adc_num, calibs, single_ended=True),0.0001) == result @pytest.mark.parametrize( "code, ref_resistance, temp_offset, rtd_conv_constant,rtd_gain,rtd_offset,adc_num,expected", diff --git a/src/test_edgepi/unit_tests/test_adc/test_edgepi_adc.py b/src/test_edgepi/unit_tests/test_adc/test_edgepi_adc.py index 7cf2f1be..70d7878b 100644 --- a/src/test_edgepi/unit_tests/test_adc/test_edgepi_adc.py +++ b/src/test_edgepi/unit_tests/test_adc/test_edgepi_adc.py @@ -4,6 +4,7 @@ import sys from copy import deepcopy from unittest import mock +from unittest.mock import ANY from contextlib import nullcontext as does_not_raise sys.modules["periphery"] = mock.MagicMock() @@ -831,7 +832,7 @@ def test__get_rtd_off_update_config(adc_num, result, adc): ([0, 3], ADCNum.ADC_2, ADC2_RTD_ON_2), ]) def test__get_rtd_on_update_config(mux_list, adc_num, result, adc): - update= adc._EdgePiADC__get_rtd_on_update_config(mux_list, adc_num) + update = adc._EdgePiADC__get_rtd_on_update_config(mux_list, adc_num) assert update == result @pytest.mark.parametrize( @@ -1003,7 +1004,7 @@ def test_get_calibration_values(mocker, reg_updates, adc_num, expected, err, adc mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC.get_state", return_value=mock_state) with err: - out = adc._EdgePiADC__get_calibration_values(adc.adc_calib_params[adc_num], adc_num) + out = adc._EdgePiADC__get_calibration_params(adc_num) assert out == expected @@ -1019,7 +1020,7 @@ def test_adc_voltage_read_conv_mode_validation(mocker, adc_to_read, validate, ad ) mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__continuous_time_delay") mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__voltage_read", return_value=[0,0,0]) - mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__get_calibration_values") + mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__get_calibration_params") mocker.patch("edgepi.adc.edgepi_adc.code_to_voltage") mocker.patch("edgepi.adc.edgepi_adc.get_adc_status") adc.read_voltage(adc_to_read) @@ -1052,22 +1053,21 @@ def test_adc_voltage_read_mode(mocker, adc_to_read, ch, adc): mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__check_adc_1_conv_mode") mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__continuous_time_delay") mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__voltage_read", return_value=[0,0,0]) - mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__get_calibration_values") - differential = mocker.patch("edgepi.adc.edgepi_adc.code_to_voltage") - single = mocker.patch("edgepi.adc.edgepi_adc.code_to_voltage_single_ended") + mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__get_calibration_params") + _code_to_voltage = mocker.patch("edgepi.adc.edgepi_adc.code_to_voltage") mocker.patch("edgepi.adc.edgepi_adc.get_adc_status") adc.read_voltage(adc_to_read) if ch == CH.AINCOM: - single.assert_called_once() + _code_to_voltage.assert_called_once_with(ANY, ANY, ANY, True) else: - differential.assert_called_once() + _code_to_voltage.assert_called_once_with(ANY, ANY, ANY, False) @pytest.mark.parametrize("adc_to_read, ch", [ (ADCNum.ADC_1, CH.AINCOM), #single-ended (ADCNum.ADC_1, CH.AIN1), #differential (ADCNum.ADC_1, CH.AIN2), #differential - (ADCNum.ADC_2, CH.AINCOM), #differential + (ADCNum.ADC_2, CH.AINCOM), #single-ended (ADCNum.ADC_2, CH.AIN1),#differential (ADCNum.ADC_2, CH.AIN2),#differential ] @@ -1078,24 +1078,23 @@ def test_adc_single_sample_mode(mocker, adc_to_read, ch, adc): # ADC2 channel shouldn't return differential as well since single sampel only cares about # ADC1 if adc_to_read == ADCNum.ADC_1: - adc_default_vals[6] = 0x10 + ch.value + adc_default_vals[ADCReg.REG_INPMUX.value] = 0x10 + ch.value else: - adc_default_vals[22] = 0x10 + ch.value + adc_default_vals[ADCReg.REG_ADC2MUX.value] = 0x10 + ch.value mocker.patch( "edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__read_register", return_value=deepcopy(adc_default_vals) ) mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC.start_conversions") mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__voltage_read", return_value=[0,0,0]) - mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__get_calibration_values") - differential = mocker.patch("edgepi.adc.edgepi_adc.code_to_voltage") - single = mocker.patch("edgepi.adc.edgepi_adc.code_to_voltage_single_ended") + mocker.patch("edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__get_calibration_params") + _code_to_voltage = mocker.patch("edgepi.adc.edgepi_adc.code_to_voltage") mocker.patch("edgepi.adc.edgepi_adc.get_adc_status") adc.single_sample() if ch == CH.AINCOM and adc_to_read == ADCNum.ADC_1: - single.assert_called_once() + _code_to_voltage.assert_called_once_with(ANY, ANY, ANY, True) else: - differential.assert_called_once() + _code_to_voltage.assert_called_once_with(ANY, ANY, ANY, False) @pytest.mark.parametrize("adc_num, mock_val, expected", [ @@ -1127,6 +1126,91 @@ def test__is_rtd_on(mocker, mock_value, result): mocker.patch("edgepi.adc.edgepi_adc.EdgePiGPIO.get_pin_direction", return_value=mock_value[1]) assert adc._EdgePiADC__is_rtd_on() == result +@pytest.mark.parametrize("ain_list, diff_list, gt_command_tup_list, data_list, gt_result_voltages", + [ + ( + [ + AnalogIn.AIN1, + AnalogIn.AIN2, + AnalogIn.AIN5, + AnalogIn.AIN6, + AnalogIn.AIN7, + AnalogIn.AIN8, + ], + [ + DiffMode.DIFF_2, + ], + [ + ([69, 1, 15, 10, 9], 0.000207, [18, 255, 255, 255, 255, 255, 255]), + ([70, 0, 26, 9], 0.000207, [18, 255, 255, 255, 255, 255, 255]), + ([70, 0, 74, 9], 0.000207, [18, 255, 255, 255, 255, 255, 255]), + ([70, 0, 90, 9], 0.000207, [18, 255, 255, 255, 255, 255, 255]), + ([70, 0, 106, 9], 0.000207, [18, 255, 255, 255, 255, 255, 255]), + ([70, 0, 122, 9], 0.000207, [18, 255, 255, 255, 255, 255, 255]), + ([70, 0, 35, 9], 0.000207, [18, 255, 255, 255, 255, 255, 255]), + ], + [ # list of data returned from the spi port + [232, 233, 129, 25, 121, 83, 30], + [224, 225, 146, 108, 19, 147, 221], + [232, 233, 129, 17, 161, 110, 238], + [232, 233, 129, 21, 238, 222, 196], + [232, 233, 129, 24, 34, 197, 5], + [232, 233, 129, 1, 128, 85, 86], + [232, 233, 21, 46, 25, 48, 60] + ], + [ + 0.1036727201741353, + 1.7370293866017112, + 0.10078385509158633, + 0.10236855203897985, + 0.10317986845300124, + 0.09484310180204501, + 1.9970719971300246, + ] + ), + ] +) +def test_batch_read_samples_adc1( + mocker, + + ain_list: list, + diff_list: list, + gt_command_tup_list: list, + data_list: list[list[int]], + gt_result_voltages: list[int], + + adc, +): + mocker.patch( + # NOTE: the name _EdgePiADC__get_register_map is due to python name mangling, caused + # by the __ before get_register_map + "edgepi.adc.edgepi_adc.EdgePiADC._EdgePiADC__get_register_map", + return_value = { + 0: 35, 1: 17, 2: 6, 3: 64, 4: 128, 5: 15, 6: 35, 7: 0, + 8: 0, 9: 0, 10: 0, 11: 0, 12: 64, 13: 187, 14: 0, 15: 0, + 16: 0, 17: 0, 18: 0, 19: 0, 20: 0, 21: 0, 22: 1, 23: 0, + 24: 0, 25: 0, 26: 64 + } + ) + + spi_apply_commands = mocker.patch( + "edgepi.adc.edgepi_adc.EdgePiADC.spi_apply_adc_commands", + return_value = data_list, + ) + # check value of command_tup_list sent to spi_apply_adc_commands by overriding the mock + def check_commands(command_tup_list): + assert command_tup_list == gt_command_tup_list + return mock.DEFAULT + spi_apply_commands.side_effect = check_commands + + result_voltages = adc.read_samples_adc1_batch( + data_rate=ADC1DataRate.SPS_38400, + analog_in_list=ain_list, + differential_pairs=diff_list, + ) + + assert result_voltages == gt_result_voltages + @pytest.mark.parametrize("param, error", [ ([AnalogIn.AIN1, diff --git a/src/test_edgepi/unit_tests/test_din/test_edgepi_din.py b/src/test_edgepi/unit_tests/test_din/test_edgepi_din.py index 36ff7506..7574c5cb 100644 --- a/src/test_edgepi/unit_tests/test_din/test_edgepi_din.py +++ b/src/test_edgepi/unit_tests/test_din/test_edgepi_din.py @@ -38,9 +38,44 @@ def fixture_test_dac(mocker): (GpioPins.DOUT2, False, False, pytest.raises(InvalidPinName)), (None, False, False, pytest.raises(InvalidPinName))]) def test_edgepi_digital_input_state(mocker, pin_name, mock_value, result, error, din): - mocker.patch("edgepi.gpio.edgepi_gpio.EdgePiGPIOChip.read_gpio_pin_state", + mocker.patch("edgepi.gpio.edgepi_gpio.EdgePiGPIOChip.read_din_state", return_value = mock_value) with error: state = din.digital_input_state(pin_name) assert state == result - \ No newline at end of file + +@pytest.mark.parametrize( + "pin_names, mock_values, error", + [ + ([DinPins.DIN1], [True], does_not_raise()), + ([DinPins.DIN2], [True], does_not_raise()), + ([DinPins.DIN3, DinPins.DIN4, DinPins.DIN5], [True, False, True], does_not_raise()), + ( + [DinPins.DIN1, DinPins.DIN6, DinPins.DIN7, DinPins.DIN8], + [True, True, True, True], + does_not_raise(), + ), + ( + [ + DinPins.DIN1, DinPins.DIN2, DinPins.DIN3, DinPins.DIN4, + DinPins.DIN5, DinPins.DIN6, DinPins.DIN7, DinPins.DIN8, + ], + [True, True, True, True, True, False, True, False], + does_not_raise(), + ), + ([DinPins.DIN2, DinPins.DIN2, DinPins.DIN2], [False, False, False], does_not_raise()), + ([], [], pytest.raises(ValueError)), + (None, [], pytest.raises(ValueError)), + ([GpioPins.DOUT2], [], pytest.raises(InvalidPinName)), + ([DinPins.DIN2, GpioPins.DOUT2], [False], pytest.raises(InvalidPinName)), + ([DinPins.DIN2, None, DinPins.DIN3], [False, False], pytest.raises(InvalidPinName)), + ] +) +def test_edgepi_digital_input_state_batch(mocker, pin_names, mock_values, error, din): + mocker.patch( + "edgepi.gpio.edgepi_gpio.EdgePiGPIOChip.batch_read_din_state", + return_value = mock_values + ) + with error: + state_list = din.digital_input_state_batch(pin_names) + assert state_list == mock_values