forked from mxcube/HardwareObjects
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMDCameraMockup.py
101 lines (90 loc) · 3.34 KB
/
MDCameraMockup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Class for cameras connected to framegrabbers run by Taco Device Servers
"""
from HardwareRepository import BaseHardwareObjects
import logging
import os, time, datetime
from PIL import Image
import numpy as np
from threading import Event, Thread
import base64
import gevent
MAX_TRIES = 3
SLOW_INTERVAL = 1000
class MDCameraMockup(BaseHardwareObjects.Device):
def __init__(self,name):
BaseHardwareObjects.Device.__init__(self,name)
def _init(self):
self.udiffVER_Ok = False
self.badimg = 0
self.pollInterval = 500
self.connected = False
self.image_name = self.getProperty("image_name")
cwd = os.getcwd()
path = os.path.join(cwd, "./test/HardwareObjectsMockup.xml/")
self.image = os.path.join(path, self.image_name)
self.setIsReady(True)
def init(self):
logging.getLogger("HWR").info( "initializing camera object")
#self.pollingTimer = qt.QTimer()
#self.pollingTimer.connect(self.pollingTimer, qt.SIGNAL("timeout()"), self.poll)
if self.getProperty("interval"):
self.pollInterval = self.getProperty("interval")
self.stopper = False#self.pollingTimer(self.pollInterval, self.poll)
thread = Thread(target=self.poll)
thread.daemon = True
thread.start()
def udiffVersionChanged(self, value):
if value == "MD2_2":
print "start polling MD camera with poll interval=",self.pollInterval
#self.pollingTimer.start(self.pollInterval)
#self.startPolling()
else:
print "stop polling the camera. This microdiff version does not support a camera"
#self.pollingTimer.stop()
self.stopper=True
def connectToDevice(self):
self.connected = True
return self.connected
#@timer.setInterval(self.pollInterval)
def poll(self):
logging.getLogger("HWR").info( "going to poll images")
while not self.stopper:
#time.sleep(float(self.pollInterval)/1000)
time.sleep(1)
#print "polling", datetime.datetime.now().strftime("%H:%M:%S.%f")
try:
img = open( self.image, 'rb').read()
#img = base64.b64encode(img)
self.emit("imageReceived", img, 659, 493)
#logging.getLogger("HWR").info( "polling images")
except KeyboardInterrupt:
self.connected = False
self.stopper = True
logging.getLogger("HWR").info( "poll images stopped")
return
except:
logging.getLogger("HWR").exception("Could not read image")
def imageUpdated(self, value):
print "<HW> got new image"
print value
def gammaExists(self):
return False
def contrastExists(self):
return False
def brightnessExists(self):
return False
def gainExists(self):
return False
def getWidth(self):
#return 768 #JN ,20140807,adapt the MD2 screen to mxCuBE2
return 659
def getHeight(self):
#return 576 # JN ,20140807,adapt the MD2 screen to mxCuBE2
return 493
def setLive(self, state):
self.liveState = state
return True
def imageType(self):
return None
def takeSnapshot(self,snapshot_filename, bw=True):
return True