Skip to content

Commit

Permalink
preapared for new version
Browse files Browse the repository at this point in the history
  • Loading branch information
JacobTh98 committed Nov 14, 2024
1 parent 82a5305 commit c5f3ab6
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 897 deletions.
49 changes: 0 additions & 49 deletions examples/custom_measurement.py

This file was deleted.

20 changes: 0 additions & 20 deletions examples/prep_npz_data_n_el_16.py

This file was deleted.

121 changes: 81 additions & 40 deletions sciopy/EIT_16_32_64_128.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
print("Could not import module: serial")


from com_util import (
from .com_util import (
clTbt_dp,
clTbt_sp,
del_hex_in_list,
Expand All @@ -28,7 +28,7 @@
"0x92": "Data holdup: Measurement data could not be sent via the master interface",
}

from sciopy_dataclasses import EitMeasurementSetup
from .sciopy_dataclasses import EitMeasurementSetup


class EIT_16_32_64_128:
Expand Down Expand Up @@ -218,73 +218,73 @@ def SoftwareReset(self):

def update_BurstCount(self, burst_count):
self.print_msg = True
self.ssms.burst_count = burst_count
self.setup.burst_count = burst_count
self.write_command_string(
bytearray([0xB0, 0x03, 0x02, 0x00, self.ssms.burst_count, 0xB0])
bytearray([0xB0, 0x03, 0x02, 0x00, self.setup.burst_count, 0xB0])
)
self.print_msg = False

def update_FrameRate(self, framerate):
self.print_msg = True
self.ssms.framerate = framerate
self.setup.framerate = framerate
self.write_command_string(
bytearray(
list(
np.concatenate([[176, 5, 3], clTbt_sp(self.ssms.framerate), [176]])
np.concatenate([[176, 5, 3], clTbt_sp(self.setup.framerate), [176]])
)
)
)
self.print_msg = False

def SetMeasurementSetup(self, ssms: EitMeasurementSetup):
def SetMeasurementSetup(self, setup: EitMeasurementSetup):
"""
set_measurement_config sets the ScioSpec device configuration depending on the EitMeasurementSetup configuration dataclass.
"""

self.ssms = ssms
self.setup = setup
self.print_msg = False
self.ResetMeasurementSetup()

# set burst count | for 3: ["B0 03 02 00 03 B0"]
self.write_command_string(
bytearray([0xB0, 0x03, 0x02, 0x00, ssms.burst_count, 0xB0])
bytearray([0xB0, 0x03, 0x02, 0x00, setup.burst_count, 0xB0])
)

# set excitation alternating current amplitude double precision
# A_min = 100nA
# A_max = 10mA
if ssms.amplitude > 0.01:
if setup.amplitude > 0.01:
print(
f"Amplitude {ssms.amplitude}A is out of available range.\nSet amplitude to 10mA."
f"Amplitude {setup.amplitude}A is out of available range.\nSet amplitude to 10mA."
)
ssms.amplitude = 0.01
setup.amplitude = 0.01
self.write_command_string(
bytearray(
list(np.concatenate([[176, 9, 5], clTbt_dp(ssms.amplitude), [176]]))
list(np.concatenate([[176, 9, 5], clTbt_dp(setup.amplitude), [176]]))
)
)
# ADC range settings: [+/-1, +/-5, +/-10]
# ADC range = +/-1 : B0 02 0D 01 B0
# ADC range = +/-5 : B0 02 0D 02 B0
# ADC range = +/-10 : B0 02 0D 03 B0
if ssms.adc_range == 1:
if setup.adc_range == 1:
self.write_command_string(bytearray([0xB0, 0x02, 0x0D, 0x01, 0xB0]))
elif ssms.adc_range == 5:
elif setup.adc_range == 5:
self.write_command_string(bytearray([0xB0, 0x02, 0x0D, 0x02, 0xB0]))
elif ssms.adc_range == 10:
elif setup.adc_range == 10:
self.write_command_string(bytearray([0xB0, 0x02, 0x0D, 0x03, 0xB0]))
# Gain settings:
# Gain = 1 : B0 03 09 01 00 B0
# Gain = 10 : B0 03 09 01 01 B0
# Gain = 100 : B0 03 09 01 02 B0
# Gain = 1_000 : B0 03 09 01 03 B0
if ssms.gain == 1:
if setup.gain == 1:
self.write_command_string(bytearray([0xB0, 0x03, 0x09, 0x01, 0x00, 0xB0]))
elif ssms.gain == 10:
elif setup.gain == 10:
self.write_command_string(bytearray([0xB0, 0x03, 0x09, 0x01, 0x01, 0xB0]))
elif ssms.gain == 100:
elif setup.gain == 100:
self.write_command_string(bytearray([0xB0, 0x03, 0x09, 0x01, 0x02, 0xB0]))
elif ssms.gain == 1_000:
elif setup.gain == 1_000:
self.write_command_string(bytearray([0xB0, 0x03, 0x09, 0x01, 0x03, 0xB0]))

# Single ended mode:
Expand All @@ -296,13 +296,13 @@ def SetMeasurementSetup(self, ssms: EitMeasurementSetup):
# Set framerate:
self.write_command_string(
bytearray(
list(np.concatenate([[176, 5, 3], clTbt_sp(ssms.framerate), [176]]))
list(np.concatenate([[176, 5, 3], clTbt_sp(setup.framerate), [176]]))
)
)
# Set frequencies:
# [CT] 0C 04 [fmin] [fmax] [fcount] [ftype] [CT]
f_min = clTbt_sp(ssms.exc_freq)
f_max = clTbt_sp(ssms.exc_freq)
f_min = clTbt_sp(setup.exc_freq)
f_max = clTbt_sp(setup.exc_freq)
f_count = [0, 1]
f_type = [0] # linear/log
# bytearray
Expand All @@ -315,8 +315,11 @@ def SetMeasurementSetup(self, ssms: EitMeasurementSetup):
)

# Set injection config
el_inj = np.arange(1, ssms.n_el + 1)
el_gnd = np.roll(el_inj, -(ssms.inj_skip + 1))
assert setup.n_el == self.n_el, print(
"Number of electrodes in setup configuration must match Eit_16_32_64_128() initialization."
)
el_inj = np.arange(1, setup.n_el + 1)
el_gnd = np.roll(el_inj, -(setup.inj_skip + 1))
for v_el, g_el in zip(el_inj, el_gnd):
self.write_command_string(bytearray([0xB0, 0x03, 0x06, v_el, g_el, 0xB0]))

Expand Down Expand Up @@ -357,26 +360,64 @@ def GetMeasurementSetup(self, setup_of: str):

print("TBD (to be checked)")
self.print_msg = True
self.write_command_string(bytearray([0xB1, 0x01, hex(setup_of), 0xB1]))

self.write_command_string(bytearray([0xB1, 0x01, setup_of, 0xB1]))
print("TBD: Translation")
self.print_msg = False

def StartStopMeasurement(self):
print("Start measurement.")
self.write_command_string(bytearray([0xB4, 0x01, 0x01, 0xB4]))
self.ret_hex_int = "hex"
data = self.SystemMessageCallback()
self.ret_hex_int = None
print("Stop measurement.")
self.write_command_string(bytearray([0xB4, 0x01, 0x00, 0xB4]))
def StartStopMeasurement(self, return_as="pot_mat"):
if self.serial_protocol == "HS":
self.device.write_data(bytearray([0xB4, 0x01, 0x01, 0xB4]))
self.ret_hex_int = "hex"
self.print_msg = False

# data truncation and processing
data = del_hex_in_list(data)
data = reshape_full_message_in_bursts(data, self.ssms)
data = split_bursts_in_frames(data, self.ssms)
data = self.SystemMessageCallback_usb_hs()

self.device.write_data(bytearray([0xB4, 0x01, 0x00, 0xB4]))
self.ret_hex_int = None
self.SystemMessageCallback()

elif self.serial_protocol == "FS":
self.device.write(bytearray([0xB4, 0x01, 0x01, 0xB4]))
self.ret_hex_int = "hex"
self.print_msg = False

data = self.SystemMessageCallback_usb_fs()

self.device.write(bytearray([0xB4, 0x01, 0x00, 0xB4]))
self.ret_hex_int = None
self.SystemMessageCallback()

data = del_hex_in_list(data)
data = reshape_full_message_in_bursts(data, self.setup)
data = split_bursts_in_frames(data, self.setup.burst_count, self.channel_group)
self.data = data
return data

if return_as == "hex":
return data
elif return_as == "pot_mat":
return self.get_data_as_matrix()

def get_data_as_matrix(self):
pot_matrix = np.empty(
(self.setup.burst_count, self.n_el, self.n_el), dtype=complex
)

for b_c, burst in enumerate(self.data):
row = -1
for frame in burst:
curr_grp = frame.channel_group
if curr_grp == 1:
row += 1
el_signs = list()
for ch in range(16):
el_signs.append(frame.__dict__[f"ch_{ch+1}"])

el_signs = np.array(el_signs)
start_idx = (curr_grp - 1) * 16
stop_idx = curr_grp * 16
pot_matrix[b_c, row, start_idx:stop_idx] = el_signs
self.data = pot_matrix
return pot_matrix

def SetOutputConfiguration(self):
print("TBD")
Expand Down
17 changes: 5 additions & 12 deletions sciopy/com_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import serial
except ImportError:
print("Could not import module: serial")
from sciopy_dataclasses import EitMeasurementSetup, SingleFrame

from .sciopy_dataclasses import EitMeasurementSetup, SingleFrame

import numpy as np
import struct
Expand Down Expand Up @@ -242,19 +243,11 @@ def parse_single_frame(lst_ele: np.ndarray) -> SingleFrame:


def split_bursts_in_frames(
split_list: np.ndarray, ssms: EitMeasurementSetup
split_list: np.ndarray, burst_count: int, channel_group: list
) -> np.ndarray:
"""
Takes the splitted list from `reshape_full_message_in_bursts()` and parses the single frames.
Parameters
----------
split_list : np.ndarray
splitted input list
ssms : EitMeasurementSetup
dataclass object configuration file
Returns
-------
np.ndarray
Expand All @@ -264,12 +257,12 @@ def split_bursts_in_frames(
frame = [] # Channel group depending frame
burst_frame = [] # single burst count frame with channel depending frame
subframe_length = split_list.shape[1] // msg_len
for bursts in range(ssms.burst_count): # Iterate over bursts
for bursts in range(burst_count): # Iterate over bursts
tmp_split_list = np.reshape(split_list[bursts], (subframe_length, msg_len))
for subframe in range(subframe_length):
parsed_sgl_frame = parse_single_frame(tmp_split_list[subframe])
# Select the right channel group data
if parsed_sgl_frame.channel_group in ssms.channel_group:
if parsed_sgl_frame.channel_group in channel_group:
frame.append(parsed_sgl_frame)
burst_frame.append(frame)
frame = [] # Reset channel depending single burst frame
Expand Down
Loading

0 comments on commit c5f3ab6

Please sign in to comment.