-
Notifications
You must be signed in to change notification settings - Fork 3
/
tool.py
132 lines (108 loc) · 4.66 KB
/
tool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from . import Metadata
from pycompss.api.task import task
from pycompss.api.constraint import constraint
from pycompss.api.parameter import FILE_IN, FILE_OUT
# ------------------------------------------------------------------------------
# Main Tool interface
# ------------------------------------------------------------------------------
class Tool(object):
"""
Abstract class describing a specific operation on a precise input data type
to produce a precise output data type.
The tool is executed by calling its "run()" method, which should support
multiple inputs and outputs. Inputs and outputs are valid file names
locally accessible to the Tool.
The "run()" method also receives an instance of Metadata for each of the
input data elements. It is the Tool's responsibility to generate the
metadata for each of the output data elements, which are returned in a
tuple (see code below).
The "run()" method calls the relevant methods that perform the operations
require to implement the Tool's functionality. Each of these methods should
be decorated using the "@task" decorator. Further, the execution
environment in which each operation is run can be configured by decorating
the appropriate method(s) using the "@constraint" decorator.
See also Workflow.
"""
input_data_type = None
output_data_type = None
configuration = {}
def __init__(self, configuration={}):
"""
Initialise the tool with its configuration.
Parameters
----------
configuration : dict
a dictionary containing parameters that define how the operation
should be carried out, which are specific to each Tool.
"""
self.configuration.update(configuration)
# @constraint()
@task(input_file=FILE_IN, output_file=FILE_OUT, isModifier=False)
def _taskMethod(self, input_file, output_file):
"""
This method performs the actions required to achieve the Tool's
functionality. Note the use of the "@task" and "@constraint"
decorators.
Parameters
----------
input_file : str
valid file name locally accessible to the Tool.
output_file : str
valid file name locally accessible to the Tool.
Returns
-------
The returned value(s) is specific to the implementation. Note that
Exceptions cannot be risen from taskMethods, and therefore will need to
be returned to "run" in order to be handled.
"""
pass
def run(self, input_files, metadata=None):
"""
Perform the required operations to achieve the functionality of the
Tool. This usually involves:
0. Import tool-specific libraries
1. Perform relevant checks on input data
2. Optionally convert input data to internal formats
3. Perform tool-specific operations
4. Optionally convert output data to the output format
5. Write metadata for the output data
6. Handle failure in any of the above
In case of failure, the Tool should return None instead of the output
file name, AND attach an Exception instance to the output metadata (see
Metadata.set_exception), to allow the wrapping App to report the
error (see App).
Note that this method calls the actual task(s). Ideally, each task
should have a unique name that identifies the operation: these will be
used by the COMPSs runtime to build a graph and trace.
Parameters
----------
input_file : list
List of valid file names (str) locally accessible to the Tool.
metadata : list
List of Metadata instances, one for each of the input_files.
Returns
-------
list, list
1. a list of output files (str), each a valid file name locally
accessible to the Tool
2. a list of Metadata instances, one for each of the
output_files
Example
-------
>>> import Tool
>>> tool = Tool(configuration = {})
>>> tool.run([<input_1>, <input_2>], [<in_data_1>, <in_data_2>])
([<output_1>, <output_2>], [<out_data_1>, <out_data_2>])
"""
# 0: not required
# 1:
assert len(input_files) == 1
# 2: not required
# 3:
output_file = "/path/to/output_file"
output_format = "OUTPUT_FILE_FORMAT"
self._taskMethod(input_files[0]. output_file)
# 4: not required
# 5:
output_metadata = Metadata(self.output_data_type, output_format)
return ([output_file], [output_metadata])