-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathopenBCIStream.py
67 lines (54 loc) · 2.01 KB
/
openBCIStream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# Based on:
# https://github.com/pareeknikhil/biofeedback/blob/master/OpenBCI Device Data Stream/openBCIStream.py
import time
import brainflow
import numpy as np
import pandas as pd
from brainflow.board_shim import BoardIds, BoardShim, BrainFlowInputParams
from brainflow.data_filter import AggOperations, DataFilter, FilterTypes
def scale_magnitude_array(x, ymin, ymax):
x = np.array(x)
return (x - ymin) / (ymax - ymin)
def board_2_df(data):
# data = scale_magnitude_array(data, -396, 433)
# Could implement tkinder slideshow integration here. May
df = pd.DataFrame(data[:, [1, 2, 3, 4, 5, 6, 7, 8, 22]], columns=[
"EEG ch0",
"EEG ch1",
"EEG ch2",
"EEG ch3",
"EEG ch4",
"EEG ch5",
"EEG ch6",
"EEG ch7",
"TIME"])
print(df)
return df
class CytonBoard(object):
def __init__(self, serial_port):
self.params = BrainFlowInputParams()
self.params.serial_port = serial_port
self.board = BoardShim(BoardIds.CYTON_BOARD.value, self.params)
def start_stream(self):
self.board.prepare_session()
self.board.start_stream()
def stop_stream(self):
self.board.stop_stream()
self.board.release_session()
def poll(self, sample_num):
try:
while self.board.get_board_data_count() < sample_num:
time.sleep(0.02)
except Exception as e:
raise (e)
board_data = self.board.get_board_data()
DataFilter.write_file (board_data, '.\Data\cyton_data_new.txt', 'a') # 'a' appends; 'w' overwrites
# Could add check to see if file already exists, adding a 1, 2, etc. on the end to avoid conflict
# Could use date function for generating names based on date-time.
df = board_2_df(np.transpose(board_data))
#print('/n')
#print(df)
return df
def sampling_frequency(self):
sampling_freq = self.board.get_sampling_rate(BoardIds.CYTON_BOARD.value)
return sampling_freq