-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathEdnaWorkflow.py
236 lines (197 loc) · 8.12 KB
/
EdnaWorkflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
from HardwareRepository.BaseHardwareObjects import HardwareObject
import os
import time
import gevent
import pprint
import httplib
import logging
#import threading
from lxml import etree
import types
class State(object):
"""
Class for mimic the PyTango state object
"""
def __init__(self, parent):
self._value = "ON"
self._parent = parent
def getValue(self):
return self._value
def setValue(self, newValue):
self._value = newValue
self._parent.state_changed(newValue)
def delValue(self):
pass
value = property(getValue, setValue, delValue, "Property for value")
class EdnaWorkflow(HardwareObject):
"""
This HO acts as a interface to the Passerelle EDM workflow engine.
The previous version of this HO was a Tango client. In order to avoid
too many changes this version of the HO is a drop-in replacement of the
previous version, hence the "State" object which mimics the PyTango state.
Example of a corresponding XML file (currently called "ednaparams.xml"):
<object class = "EdnaWorkflow" role = "workflow">
<bes_host>mxhpc2-1705</bes_host>
<bes_port>8090</bes_port>
<object href="/session" role="session"/>
<workflow>
<title>MXPressA</title>
<path>MXPressA</path>
</workflow>
<workflow>
<title>...</title>
<path>...</path>
</workflow>
"""
def __init__(self, name):
HardwareObject.__init__(self, name)
self._state = State(self)
self._command_failed = False
self._besWorkflowId = None
self._gevent_event = None
self._bes_host = None
self._bes_port = None
def _init(self):
pass
def init(self):
self._session_object = self.getObjectByRole("session")
self._gevent_event = gevent.event.Event()
self._bes_host = self.getProperty("bes_host")
self._bes_port = int(self.getProperty("bes_port"))
self.state.value = "ON"
def getState(self):
return self._state
def setState(self, newState):
self._state = newState
def delState(self):
pass
state = property(getState, setState, delState, "Property for state")
def command_failure(self):
return self._command_failed
def set_command_failed(self, *args):
logging.getLogger("HWR").error("Workflow '%s' Tango command failed!" % args[1])
self._command_failed = True
def state_changed(self, new_value):
new_value = str(new_value)
logging.getLogger("HWR").debug('%s: state changed to %r', str(self.name()), new_value)
self.emit('stateChanged', (new_value, ))
def workflow_end(self):
"""
The workflow has finished, sets the state to 'ON'
"""
# If necessary unblock dialog
if not self._gevent_event.is_set():
self._gevent_event.set()
self.state.value = "ON"
def open_dialog(self, dict_dialog):
# If necessary unblock dialog
if not self._gevent_event.is_set():
self._gevent_event.set()
self.params_dict = dict()
if "reviewData" in dict_dialog and "inputMap" in dict_dialog:
review_data = dict_dialog["reviewData"]
for dictEntry in dict_dialog["inputMap"]:
if "value" in dictEntry:
value = dictEntry["value"]
else:
value = dictEntry["defaultValue"]
self.params_dict[dictEntry["variableName"]] = str(value)
self.emit('parametersNeeded', (review_data, ))
self.state.value = "OPEN"
self._gevent_event.clear()
while not self._gevent_event.is_set():
self._gevent_event.wait()
time.sleep(0.1)
return self.params_dict
def get_values_map(self):
return self.params_dict
def set_values_map(self, params):
self.params_dict = params
self._gevent_event.set()
def get_available_workflows(self):
workflow_list = list()
no_wf = len( self['workflow'] )
for wf_i in range( no_wf ):
wf = self['workflow'][wf_i]
dict_workflow = dict()
dict_workflow["name"] = str(wf.title)
dict_workflow["path"] = str(wf.path)
dict_workflow["requires"] = wf.getProperty('requires')
dict_workflow["doc"] = ""
workflow_list.append(dict_workflow)
return workflow_list
def abort(self):
logging.getLogger("HWR").info('Aborting current workflow')
# If necessary unblock dialog
if not self._gevent_event.is_set():
self._gevent_event.set()
self._command_failed = False
if self._besWorkflowId is not None:
abortWorkflowURL = os.path.join("/BES", "bridge", "rest", "processes", self._besWorkflowId, "STOP?timeOut=0")
logging.info("BES web service URL: %r" % abortWorkflowURL)
conn = httplib.HTTPConnection(self._bes_host, self._bes_port)
conn.request("POST", abortWorkflowURL)
response = conn.getresponse()
if response.status == 200:
workflowStatus=response.read()
logging.info("BES {0}: {1}".format(self._besWorkflowId, workflowStatus))
self.state.value = "ON"
def start(self, listArguments):
# If necessary unblock dialog
if not self._gevent_event.is_set():
self._gevent_event.set()
self.state.value = "RUNNING"
self.dictParameters = {}
iIndex = 0
if (len(listArguments) == 0):
self.error_stream("ERROR! No input arguments!")
return
elif (len(listArguments) % 2 != 0):
self.error_stream("ERROR! Odd number of input arguments!")
return
while iIndex < len(listArguments):
self.dictParameters[listArguments[iIndex]] = listArguments[iIndex+1]
iIndex += 2
logging.info("Input arguments:")
logging.info(pprint.pformat(self.dictParameters))
if "modelpath" in self.dictParameters:
modelPath = self.dictParameters["modelpath"]
if "." in modelPath:
modelPath = modelPath.split(".")[0]
self.workflowName = os.path.basename(modelPath)
else:
self.error_stream("ERROR! No modelpath in input arguments!")
return
time0 = time.time()
self.startBESWorkflow()
time1 = time.time()
logging.info("Time to start workflow: {0}".format(time1-time0))
def startBESWorkflow(self):
logging.info("Starting workflow {0}".format(self.workflowName))
logging.info("Starting a workflow on http://%s:%d/BES" % (self._bes_host, self._bes_port))
startWorkflowURL = os.path.join("/BES", "bridge", "rest", "processes", self.workflowName, "RUN")
isFirstParameter = True
self.dictParameters["initiator"] = self._session_object.endstation_name
self.dictParameters["externalRef"] = self._session_object.get_proposal()
# Build the URL
for key in self.dictParameters:
urlParameter = "%s=%s" % (key, self.dictParameters[key].replace(" ", "_"))
if isFirstParameter:
startWorkflowURL += "?%s" % urlParameter
else:
startWorkflowURL += "&%s" % urlParameter
isFirstParameter = False
logging.info("BES web service URL: %r" % startWorkflowURL)
conn = httplib.HTTPConnection(self._bes_host, self._bes_port)
headers = {"Accept": "text/plain"}
conn.request("POST", startWorkflowURL, headers=headers)
response = conn.getresponse()
if response.status == 200:
self.state.value = "RUNNING"
requestId=response.read()
logging.info("Workflow started, request id: %r" % requestId)
self._besWorkflowId = requestId
else:
logging.error("Workflow didn't start!")
requestId = None
self.state.value = "ON"