-
Notifications
You must be signed in to change notification settings - Fork 0
/
data.py
129 lines (97 loc) · 4.16 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# -*- coding: utf-8 -*-
"""
Created on Thu Sep 30 11:17:06 2021
@author: chris
"""
# from pyspectrumdaq.m2i4931 import Card
import numpy as np
from PyQt5 import QtCore
import time
from scipy import signal
from pyspectrumdaq import Card
class DataCollector(QtCore.QRunnable):
def __init__(self):
super(DataCollector, self).__init__()
def run(self):
Data.acquire()
class Data(object):
time_trace=np.array([])
avg_time_trace=[]
rFFT = []
iFFT = []
channel = 1
time = 50*1e-3
averages = 1
samplerate = 30*1e6
adc=[]
error=0
freq = []
ns = int(time*samplerate)
# It is highly recommended to use context management ("with" statement) to make sure
# proper closing of the card
# a now contains the data as a float64 NumPy array, shaped as [n_samples, n_channels]
def start_acquire():
try:
Data.adc = Card()
Data.ns = int(Data.time*Data.samplerate)
Data.adc.set_acquisition(channels=[Data.channel],
terminations=["50"],
fullranges=[2],
pretrig_ratio=0,
nsamples=Data.ns,
samplerate=Data.samplerate)
Data.adc.set_trigger(mode="ext")
# print("2")
time_trace = Data.adc.acquire()
Data.ns = int(time_trace.shape[0])
Data.freq = np.fft.fftfreq(Data.ns, d=1/Data.samplerate)[:int(Data.ns/2)]
# print(2)
except:
if Data.error==0:
print("There is no connection to the Daq card")
Data.error = 1
def acquire():
try:
data = Data.adc.acquire()
if Data.time_trace.shape[0]==0:
Data.time_trace = np.zeros([Data.averages+1,data.shape[0]])
# print("2")
Data.time_trace[0,:] = data[:,0]
Data.counter = 1
else:
if data.shape[0] == Data.time_trace.shape[1]:
Data.time_trace[Data.counter,:] = data[:,0]
# print("test1")
if Data.counter < Data.averages and Data.averages+1 == Data.time_trace.shape[0]:
Data.counter += 1
elif Data.averages+1 != Data.time_trace.shape[0]:
if Data.averages+1 > Data.time_trace.shape[0]:
empty = np.zeros([Data.averages+1,data.shape[0]])
empty[:Data.time_trace.shape[0],:]=Data.time_trace
Data.time_trace = empty
else:
empty = np.zeros([Data.averages+1,data.shape[0]])
empty[:,:]=Data.time_trace[:empty.shape[0],:]
Data.time_trace = empty
if Data.counter > Data.averages:
Data.counter = Data.averages
else:
Data.time_trace[:-1,:] = Data.time_trace[1:,:]
Data.counter = Data.averages
else:
Data.time_trace=np.array([])
Data.total_averages = Data.counter
Data.avg_time_trace = np.average(Data.time_trace[:Data.counter,:],axis=0)
# print(Data.avg_time_trace)
Data.FFT()
except Exception as e:
if Data.error < 2:
print("Daq card not being loaded")
Data.time_trace=np.array([])
QtCore.QThread.sleep(1)
def FFT():
FFT = np.fft.fft(Data.avg_time_trace)/Data.ns
Data.PSD = np.abs(FFT[:int(Data.ns/2)])**2
Data.abs = np.abs(FFT[:int(Data.ns/2)])
Data.angle = np.angle(FFT[:int(Data.ns/2)])
Data.imag = np.abs(np.real(FFT[:int(Data.ns/2)]))