-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrj45db25_test.py
executable file
·117 lines (95 loc) · 3.17 KB
/
rj45db25_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/python3
import time
import serial
import signal
import os
import sys
import subprocess
from termcolor import colored
import psycopg2
import datetime
testjig = "RJ45-DB25"
print("RJ45-DB25 Test Server")
directory = os.path.split(os.path.realpath(__file__))[0]
version = subprocess.check_output(['git', '--git-dir='+directory+'/.git', 'rev-parse', 'HEAD']).strip().decode()
print("Git version - " + str(version))
print("Connecting to database...")
# Open our file outside of git repo which has database location, password, etc
dbfile = open(directory+'/postgres_info.txt', 'r')
postgresInfo = dbfile.read()
dbfile.close()
while True:
try:
testStorage = psycopg2.connect(postgresInfo)
break
except Exception as e:
print(e)
print("Could not connect! Type [Enter] to try again-> ")
input()
continue
#sys.exit(0)
cursor = testStorage.cursor()
def signal_handler(signal, frame):
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
ser = serial.Serial()
ser.port = '/dev/ttyACM0'
ser.baudrate = 115200
ser.timeout = 0
ser.setDTR = False
try: ser.open()
except Exception as e: print(e)
ser.flushOutput()
ser.flushInput()
for count in range(5):
#print colored("time: " + str(count * 0.5),'red')
time.sleep(0.5)
output = ser.read(500).strip().decode('utf-8')
#print colored("output_length: " + str(len(output)),'red')
if len(output):
print(colored(output,'cyan'),end='')
while True:
serialNumber = ""
print("Enter RJ45-DB25 serial number : ")
serialNumber = input()
try:
sNum = int(serialNumber)
lowerLimit = 45000001
upperLimit = 45990000
if( sNum not in range(lowerLimit,upperLimit) ):
print(colored("Invalid Entry. (Use "+str(lowerLimit)+"-"+str(upperLimit)+").",'magenta'))
continue
except:
print("Invalid Entry. (Use "+str(lowerLimit)+"-"+str(upperLimit)+").")
continue
print("Press [Enter] to start test")
input()
fo = open("RJ45_DB25_log.txt", "a")
fo.write(serialNumber + '\n')
print("starting test...")
ser.write(b'F\n')
#time.sleep(0.2)
#print(colored(ser.read(500).decode('utf-8'),'cyan'))
full_output = ""
test_result = "Failed"
# test takes approx. 7 seconds. update output every 0.5 seconds.
for count in range(4*2):
#print colored("time: " + str(count * 0.5),'red')
time.sleep(0.5)
output = ser.read(500).strip().decode('utf-8')
#print colored("output_length: " + str(len(output)),'red')
if len(output):
print(colored(output,'cyan'),end='')
full_output = full_output + output
if("Passed" in full_output):
test_result = "Passed"
ser.flushOutput()
ser.flushInput()
#ser.close()
print("Test Result: " + test_result)
print("Writing results to database...")
testStorage = psycopg2.connect(postgresInfo)
cursor = testStorage.cursor()
cursor.execute("""INSERT INTO testdata(serial, timestamp, testresults, testversion, testjig) VALUES (%s, %s, %s, %s, %s)""", (serialNumber, 'now', test_result, version, testjig, ))
testStorage.commit()
cursor.close()