diff --git a/avrdude.py b/avrdude.py index 537c76b..681c307 100644 --- a/avrdude.py +++ b/avrdude.py @@ -14,6 +14,8 @@ def __init__(self): self.baudrate = "" self.configFile = "" self.autoEraseFlash = True + self.verify = True + self.verbose = 0 def upload(self, target, timeout = 15): #assemble argument array @@ -26,8 +28,12 @@ def upload(self, target, timeout = 15): cmd.append("-b" + self.baudrate) if self.configFile: cmd.append("-C" + self.configFile) - if self.autoEraseFlash: + if self.autoEraseFlash is False: cmd.append("-D") + if self.verify is False: + cmd.append("-V") + for i in range(self.verbose): + cmd.append("-v") if target.bootloader: cmd.append("-Uflash:w:" + target.bootloader + ":i") if target.extFuse: diff --git a/configuration.py b/configuration.py new file mode 100644 index 0000000..1afff50 --- /dev/null +++ b/configuration.py @@ -0,0 +1,60 @@ +# Configuration file for Rambo-Uploader + +# Path to avrdude +avrdude_path="C:/avrdude-5.11-w32-libusb/avrdude.exe" +# Type of ICSP programmer for Atmega32u2. +m32u2_icsp_programmer="jtag2isp" +# Type of ICSP programmer for Atmega2560 +m2560_icsp_programmer="jtag2isp" +# Type of programming through serial +serial_programmer="wiring" +# Port (USB serial number) of ICSP Programmer connected to Atmega32u2 +m32u2_icsp_port="usb:402671" +# Port (USB serial number) of ICSP Programmer connected to Atmega2560 +m2560_icsp_port="usb:402903" +# Path to Atmega32u2 bootloader +m32u2_bootloader_path="C:/RAMBo/bootloaders/RAMBo-usbserial-DFU-combined-32u2.HEX" +# Path to Atmega2560 bootloader +m2560_bootloader_path="C:/RAMbo/bootloaders/stk500boot_v2_mega2560.hex" +# Serial port for Test Jig Controller. Set to None to use serial number +#controller_port="COM24" +controller_port=None +# Serial number for Test Jig controller +controller_snr="6403635343035130E0E0" +# Serial port for Device Under Test. Set to None to auto-detect +#target_port="COM25" +target_port=None +# List of RAMBo board serial numbers to ignore if auto-detecting target. Useful if you have printers connected to the same PC. +ignore_rambo_snr=("64033353730351A0D1C0", ) +# Program the Atmega32u2 and Atmega2560 through ICSP (required if the fuses are not yet set or bootloader not flashed yet) +icsp_program=True +# Verify flash after ICSP programming +icsp_verify=True +# Delay before testing after we power on the power supply. Some power supply require a bit of time before they provide power. +powering_delay=1 + +# Path to the test firmware +test_firmware_path="C:/RAMBo/bootloaders/test_firmware.hex" +# Path to the final retail firmware +vendor_firmware_path="C:/RAMBo/bootloaders/vendor_firmware.hex" + +# Database settings +database_type="log" + +if database_type == "postgres": + from postgresdb import PostgresDatabase + import os + + # Open our file outside of git repo which has database location, password, etc + dbfile = open( os.path.split(os.path.realpath(__file__))[0]+'/postgres_info.txt', 'r') + postgresInfo = dbfile.read() + dbfile.close() + database = PostgresDatabase(postgresInfo) +elif database_type == "log": + from logdb import LogDatabase + + database = LogDatabase("results.log") +else: + from nodb import NoDatabase + + database = NoDatabase() diff --git a/logdb.py b/logdb.py new file mode 100644 index 0000000..c192fa1 --- /dev/null +++ b/logdb.py @@ -0,0 +1,29 @@ +import datetime + +class LogDatabase(): + def __init__(self, path): + self.path = path + self.fd = None + + def open(self): + if self.fd is None: + self.fd = open(self.path, 'a') + + def isOpen(self): + return self.fd is not None + + def close(self): + if self.fd: + self.fd.close() + self.fd = None + + def post(self, serial, results, version, details): + was_open = self.isOpen() + self.open() + self.fd.write("******************* %s ********************\n" %datetime.datetime.now()) + self.fd.write("Serial number : %s\n" % (serial)) + self.fd.write("Version : %s\n" % (version)) + self.fd.write("%s\n%s\n\n" % (results, details)) + self.fd.flush() + if not was_open: + self.close() diff --git a/nodb.py b/nodb.py new file mode 100644 index 0000000..7fdbe5c --- /dev/null +++ b/nodb.py @@ -0,0 +1,15 @@ +class NoDatabase(): + def __init__(self): + self.opened = False + + def open(self): + self.opened = True + + def isOpen(self): + return self.opened + + def close(self): + self.opened = False + + def post(self, serial, results, version, details): + pass diff --git a/postgresdb.py b/postgresdb.py new file mode 100644 index 0000000..3da9ea8 --- /dev/null +++ b/postgresdb.py @@ -0,0 +1,29 @@ +import psycopg2 + +class PostgresDatabase(): + def __init__(self, info): + self.info = info + self.storage = None + + def open(self): + if not self.isOpen(): + self.storage = psycopg2.connect(Self.info) + if self.storage is None: + raise Exception("Could not open database") + + def isOpen(self): + return self.storage is not None + + def close(self): + if self.storage: + self.storage.close() + self.storage = None + + def post(self, serial, results, version, details): + was_open = self.isOpen() + self.open() + cursor = self.storage.cursor() + cursor.execute("""INSERT INTO testdata(serial, timestamp, testresults, testversion, testdetails) VALUES (%s, %s, %s, %s, %s)""", (serial, 'now', results, version, details)) + self.storage.commit() + if not was_open: + self.close() diff --git a/test_process.py b/test_process.py index 18a19ae..802121e 100755 --- a/test_process.py +++ b/test_process.py @@ -18,22 +18,23 @@ from avrdude import * from atmega import * from testinterface import * -import psycopg2 +import serial.tools.list_ports +import configuration print "RAMBo Test Server" directory = os.path.split(os.path.realpath(__file__))[0] -version = subprocess.check_output(['git', '--git-dir='+directory+'/.git', +try: + version = subprocess.check_output(['git', '--git-dir='+directory+'/.git', 'rev-parse', 'HEAD']) +except: + print "Could not get git version" + version = "unknown" version = version.strip() print "Git version - " + str(version) print "Connecting to database..." -# Open our file outside of git repo which has database location, password, etc -dbfile = open(directory+'/postgres_info.txt', 'r') -postgresInfo = dbfile.read() -dbfile.close() try: - testStorage = psycopg2.connect(postgresInfo) + testStorage = configuration.database.open() except: print "Could not connect!" sys.exit(0) @@ -42,25 +43,72 @@ monitorPin = 44 #PL5 on test controller triggerPin = 3 #bed on target board powerPin = 3 #bed on test controller -homingRate = 5000 -clampingRate = 4000 -clampingLength = 18550 monitorFrequency = 1000 stepperTestRPS = 3 #rotations per second for the stepper test -controllerPort = "/dev/serial/by-id/usb-UltiMachine__ultimachine.com__RAMBo_64033353730351918201-if00" -targetPort = "/dev/ttyACM1" -testFirmwarePath = "/home/ultimachine/workspace/Test_Jig_Firmware/target_test_firmware.hex" -vendorFirmwarePath = "/home/ultimachine/workspace/johnnyr/Marlinth2.hex" testing = True state = "start" serialNumber = "" -vrefPins = [8, 6, 5, 4, 3] #x, y, z, e0, e1 on controller +vrefPins = [8, 6, 5, 4, 3] #x, y, z, e0, e1 on controller [Analog-EXT-8, Analog-EXT-6, Analog-EXT-5, Analog-EXT-4, Analog-EXT-3] supplyPins = [7, 2, 0] #extruder rail, bed rail, 5v rail on controller -mosfetOutPins = [9, 8, 7, 6, 3, 2] #On target -mosfetInPins = [44, 32, 45, 31, 46, 30] #On controller [PL5,PC5,PL4,PC6,PL3,PC7] -endstopOutPins = [83, 82, 81, 80, 79, 78] -endstopInPins = [12, 11, 10, 24, 23, 30] -thermistorPins = [0, 1, 2, 7] +mosfetOutPins = [3, 2, 6, 7, 8, 9] #On target [Bed, Fan2, Fan1, Heat1, Fan0, Heat0] +mosfetInPins = [44, 32, 45, 31, 46, 30] #On controller [MX1-5, MX1-4, MX2-5, MX2-4, MX3-5, MX3-4] +endstopOutPins = [83, 82, 81, 80, 79, 78] # on controller [EXT2-10, EXT2-12, EXT2-14, EXT2-16, EXT2-18, EXT2-20 ] +endstopInPins = [12, 11, 10, 24, 23, 30] # on target [xmin, ymin, zmin, xmax, ymax, zmax] +thermistorPins = [0, 1, 2, 7]; # on target [T0, T1, T2, T3] + +def find_rambo_port(serial_number = None): + ports = list(serial.tools.list_ports.comports()) + for port in ports: + if "RAMBo" in port[1]: + print "Found RAMBo board", port + if serial_number is None: + return port[0] + elif serial_number in port[2]: + print "Found port with correct serial : ", port + return port[0] + else: + print "Ignoring non-RAMBo board", port + +def find_target_port(): + ports = list(serial.tools.list_ports.comports()) + rambos = [] + for port in ports: + if "RAMBo" in port[1]: + ignore = False + if port[0] == configuration.controller_port or port[2].endswith("SNR=%s" % configuration.controller_snr): + ignore = True + for snr in configuration.ignore_rambo_snr: + if port[2].endswith("SNR=%s" % snr): + print "Ignoring this board ", port + ignore = True + + if ignore is False: + rambos.append(port[0]) + + print "Found these boards : ", rambos + if len(rambos) != 1: + return None + return rambos[0] + +def find_serial_number(from_port): + ports = list(serial.tools.list_ports.comports()) + for port in ports: + if port[0] == from_port: + snr = port[2].find("SNR=") + if snr >= 0: + return port[2][snr+4:] + break + return None + +controllerPort = configuration.controller_port +targetPort = configuration.target_port +print list(serial.tools.list_ports.comports()) + +if controllerPort is None: + controllerPort = find_rambo_port(configuration.controller_snr) +if controllerPort is None: + print "Can't find controller board." + sys.exit(0) #Setup test interfaces controller = TestInterface() @@ -72,17 +120,17 @@ #Setup target test firmware object to pass to AVRDUDE. testFirmware = Atmega() testFirmware.name = "atmega2560" -testFirmware.bootloader = testFirmwarePath +testFirmware.bootloader = configuration.test_firmware_path #Setup target vendor firmware object to pass to AVRDUDE. vendorFirmware = Atmega() vendorFirmware.name = "atmega2560" -vendorFirmware.bootloader = vendorFirmwarePath +vendorFirmware.bootloader = configuration.vendor_firmware_path #Setup up avrdude config for upload to an Arduino. avrdude = Avrdude() -avrdude.path = "/usr/bin/avrdude" -avrdude.programmer = "stk500v2" +avrdude.path = configuration.avrdude_path +avrdude.programmer = configuration.serial_programmer avrdude.port = targetPort avrdude.baudrate = "115200" avrdude.autoEraseFlash = True @@ -95,7 +143,6 @@ def signal_handler(signal, frame): print "Shutting down test server..." controller.pinLow(powerPin) - controller.home(homingRate, wait = False) controller.close() target.close() sys.exit(0) @@ -106,53 +153,84 @@ def signal_handler(signal, frame): while(testing): if state == "start": - print "Enter serial number : " - serialNumber = raw_input() - print "Press button to begin test" - controller.waitForStart() #Blocks until button pressed - state = "clamping" + controller.pinLow(powerPin) + print "Press Enter to start test " + raw_input() + if configuration.icsp_program: + state = "uploading" + else: + state = "program for test" print "Test started at " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - elif state == "clamping": - print "Clamping test jig..." - controller.home(rate = homingRate, wait = False) - controller.runSteppers(frequency = clampingRate, steps = clampingLength, - direction = controller.UP, wait = False) - state = "program for test" elif state == "uploading": print "Uploading Bootloader and setting fuses..." - avr32u2 = subprocess.Popen(['/usr/bin/avrdude', '-v', '-v', '-c', u'avrispmkII', '-P', u'usb:0200158420', u'-patmega32u2', u'-Uflash:w:/home/ultimachine/workspace/RAMBo/bootloaders/RAMBo-usbserial-DFU-combined-32u2.HEX:i', u'-Uefuse:w:0xF4:m', u'-Uhfuse:w:0xD9:m', u'-Ulfuse:w:0xEF:m', u'-Ulock:w:0x0F:m']) - avr2560 = subprocess.Popen(['/usr/bin/avrdude', '-v', '-v', '-c', u'avrispmkII', '-P', u'usb:0200158597', u'-pm2560', u'-Uflash:w:/home/ultimachine/workspace/RAMBo/bootloaders/stk500boot_v2_mega2560.hex:i', u'-Uefuse:w:0xFD:m', u'-Uhfuse:w:0xD0:m', u'-Ulfuse:w:0xFF:m', u'-Ulock:w:0x0F:m']) + m32u2_image = Atmega() + m32u2_image.name = "m32u2" + m32u2_image.bootloader = configuration.m32u2_bootloader_path + m32u2_image.lockBits = "0x0F" + m32u2_image.extFuse = "0xF4" + m32u2_image.highFuse = "0xD9" + m32u2_image.lowFuse = "0xEF" - elif state == "program for test": - print "Programming target with test firmware..." - if avrdude.upload(testFirmware, timeout = 10): - state = "connecting target" + m2560_image = Atmega() + m2560_image.name = "m2560" + m2560_image.bootloader = configuration.m2560_bootloader_path + m2560_image.lockBits = "0x0F" + m2560_image.extFuse = "0xFD" + m2560_image.highFuse = "0xD0" + m2560_image.lowFuse = "0xFF" + + icsp = Avrdude() + icsp.path = configuration.avrdude_path + icsp.verbose = 2 + icsp.verify = configuration.icsp_verify + + icsp.programmer = configuration.m32u2_icsp_programmer + icsp.port = configuration.m32u2_icsp_port + if icsp.upload(m32u2_image, timeout=60): + icsp.programmer = configuration.m2560_icsp_programmer + icsp.port = configuration.m2560_icsp_port + if icsp.upload(m2560_image, timeout=120): + state = "program for test" + time.sleep(5) + else: + print "Upload failed." + state = "board fail" else: print "Upload failed." state = "board fail" + elif state == "program for test": + print "Programming target with test firmware..." + state = "connecting target" + targetPort = configuration.target_port + if targetPort is None: + targetPort = find_target_port() + if targetPort is None: + print "Can't find target board." + state = "board fail" + else: + avrdude.port = targetPort + if avrdude.upload(testFirmware, timeout = 10): + state = "connecting target" + else: + print "Upload failed." + state = "board fail" + elif state == "connecting target": print "Attempting connect..." if target.open(port = targetPort): - state = "wait for homing" + state = "powering" else: print "Connect failed." state = "board fail" - elif state == "wait for homing": - print "Waiting for homing to complete..." - if controller.waitForFinish(commands = 2, timeout = 10, clear = True): - state = "powering" - else: - print "Homing failed." - state = "board fail" - elif state == "powering": print "Powering Board..." if controller.pinHigh(powerPin): state = "supply test" + time.sleep(configuration.powering_delay); else: print "Powering failed." state = "board fail" @@ -321,6 +399,7 @@ def signal_handler(signal, frame): print "Programming target with vendor firmware..." if avrdude.upload(vendorFirmware, timeout = 20): state = "processing" + time.sleep(1) else: print "Upload failed!" state = "board fail" @@ -352,13 +431,9 @@ def signal_handler(signal, frame): elif state == "finished": print "Writing results to database..." - testStorage = psycopg2.connect(postgresInfo) - cursor = testStorage.cursor() - cursor.execute("""INSERT INTO testdata(serial, timestamp, testresults, testversion, testdetails) VALUES (%s, %s, %s, %s, %s)""", (serialNumber, 'now', testProcessor.errors, version, str(testProcessor.resultsDictionary()))) - testStorage.commit() + configuration.database.post(find_serial_number(targetPort), testProcessor.errors, version, str(testProcessor.resultsDictionary())) testProcessor.restart() print "Preparing Test Jig for next board..." controller.pinLow(powerPin) - controller.home(homingRate, wait = True) state = "start" diff --git a/testinterface.py b/testinterface.py index d037d81..5c945fc 100644 --- a/testinterface.py +++ b/testinterface.py @@ -17,7 +17,7 @@ def __init__(self): self._groupn = lambda lst, sz: [lst[i:i+sz] for i in range(0, len(lst), sz)] def open(self, port): - if not os.path.exists(port): + if not port.startswith("COM") and not os.path.exists(port): print "Serial port not detected!" return False self.serial.port = port diff --git a/testprocessor.py b/testprocessor.py index 10ceb96..85d0a07 100644 --- a/testprocessor.py +++ b/testprocessor.py @@ -142,7 +142,7 @@ def verifyAllTests(self): if self.supplys: #Just realized this is spelled wrong print "Supply voltage values..." self._analogToVoltage(readings = self.supplys) - print self.supplyVoltages + print str(self.supplys) + " -> " + str(self.supplyVoltages) passed &= self.testSupplys() if self.vrefs: @@ -236,5 +236,7 @@ def _wasTimedOut(self, vals): def _analogToVoltage(self, readings = [], voltage = 5, bits = 10, dividerFactor = 0.091): #divider factor is R2/(R1+R2) + #R1 = 47K Ohm + #R2 = 4700 Ohm for val in readings: self.supplyVoltages += [(val/pow(2, bits))*(voltage/dividerFactor)]