-
Notifications
You must be signed in to change notification settings - Fork 5
Stagein Stageout Interfaces
Why do we need a stage in and a stage out step? If we were to run the below application package locally using the cwltool command line tool, the input_reference input value would be path pointing to a Directory on the machine and the result of the execution would be stored on the local file system. Wrapping the application workflow with up a stage in and a stage out step to the application workflow would ease the process of providing heavy load of inputs to your application and would allow storing the results directly on an remote storage location.
$graph:
- baseCommand: s-expression
class: CommandLineTool
hints:
DockerRequirement:
dockerPull: eoepca/s-expression:dev0.0.2
id: clt
inputs:
input_reference:
inputBinding:
position: 1
prefix: --input_reference
type: Directory
s_expression:
inputBinding:
position: 2
prefix: --s-expression
type: string
cbn:
inputBinding:
position: 3
prefix: --cbn
type: string
outputs:
results:
outputBinding:
glob: .
type: Directory
requirements:
EnvVarRequirement:
envDef:
PATH: /srv/conda/envs/env_app_snuggs/bin:/srv/conda/envs/env_app_snuggs/bin:/srv/conda/bin:/srv/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
ResourceRequirement: {}
#stderr: std.err
#stdout: std.out
- class: Workflow
doc: Applies s expressions to EO acquisitions
id: s-expression
inputs:
input_reference:
doc: Input product reference
label: Input product reference
type: Directory
s_expression:
doc: s expression
label: s expression
type: string
cbn:
doc: Common band name
label: Common band name
type: string
label: s expressions
outputs:
- id: wf_outputs
outputSource:
- step_1/results
type: Directory
steps:
step_1:
in:
input_reference: input_reference
s_expression: s_expression
cbn: cbn
out:
- results
run: '#clt'
$namespaces:
s: https://schema.org/
cwlVersion: v1.0
s:softwareVersion: 0.0.2
schemas:
- http://schema.org/version/9.0/schemaorg-current-http.rdf
Staging in the data is the first step in the data processing.
Data is pulled from available sources, such as resource catalogues.
Once the data is pulled, it then enters the data preparation stage. Data preparation, often referred to as “pre-processing” is the stage at which raw data is cleaned up and organized for the following stage of data processing. During preparation, raw data is diligently checked for any errors, formatted and standardized in a way to be easily used during the processing phase.
In the Ades, the data stage in and data preparation phase can be setup in a CWL workflow which will be concatenated to the main processing CWL workflow.
The final step of data processing is the stage-out. After all of the data is processed, it is then prepared to be stored.
During preparation, the processed data is diligently checked for any errors, formatted and standardized in a way to be easily used in the future using common specifications, such as STAC. Once the preparation is over, the data can be stored.
Just like for the stage-in phase, the preparation and storage operations can be setup using a CWL workflow which will be concatenated to the main processing CWL workflow.
The Ades provides a default stage-in and stage-out setup in the helm chart values file.
The setup can be customized by modifying the following the following properties:
Parameter | Description |
---|---|
workflowExecutor.main.cwl | Main CWL workflow used by cwl-wrapper
|
workflowExecutor.stagein.cwl | Stage-in CWL workflow |
workflowExecutor.stageout.cwl | Stage-out CWL workflow |
workflowExecutor.rulez.cwl | Data structure for defining the CWL parameter used by cwl-wrapper
|
Following is the Ades default stage-in cwl CommandLineTool. The stage in command is being applied to all the inputs of type Directory or Directory[] and its goal is to download the data and convert it to a Stac item.
cwlVersion: v1.0
baseCommand: Stars
doc: "Run Stars for staging input data"
class: CommandLineTool
hints:
DockerRequirement:
dockerPull: terradue/stars:2.9.2
id: stars
arguments:
- copy
- -v
- --harvest
- -rel
- -r
- '4'
- valueFrom: ${ if (inputs.input.split("#").length == 2)
{ return ["-af", inputs.input.split("#")[1]]; }
else {return '--empty'}
}
- -o
- ./
- valueFrom: ${ return inputs.input.split("#")[0]; }
inputs:
ADES_STAGEIN_AWS_SERVICEURL:
type: string?
ADES_STAGEIN_AWS_REGION:
type: string?
ADES_STAGEIN_AWS_ACCESS_KEY_ID:
type: string?
ADES_STAGEIN_AWS_SECRET_ACCESS_KEY:
type: string?
outputs: {}
requirements:
EnvVarRequirement:
envDef:
PATH: /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
# AWS__Profile: $(inputs.aws_profile)
# AWS__ProfilesLocation: $(inputs.aws_profiles_location.path)
AWS__ServiceURL: $(inputs.ADES_STAGEIN_AWS_SERVICEURL)
AWS__Region: $(inputs.ADES_STAGEIN_AWS_REGION)
AWS__AuthenticationRegion: $(inputs.ADES_STAGEIN_AWS_REGION)
AWS_ACCESS_KEY_ID: $(inputs.ADES_STAGEIN_AWS_ACCESS_KEY_ID)
AWS_SECRET_ACCESS_KEY: $(inputs.ADES_STAGEIN_AWS_SECRET_ACCESS_KEY)
ResourceRequirement: {}
InlineJavascriptRequirement: {}
Following is an example of two inputs which would be staged in:
inputs:
pre_event:
doc: Optical calibrated pre-event acquisition with red, nir and swir22
label: Optical calibrated pre-event acquisition with red, nir and swir22
type: Directory
post_event:
doc: Optical calibrated post-event acquisition with red, nir and swir22
label: Optical calibrated post-event acquisition with red, nir and swir22
type: Directory
Following is the Ades default stage-out cwl CommandLineTool. It is being applied to all the outputs of type Directory or Directory[] and its goal is to convert the results of the processing phase in Stac items and to store them into an object storage repository. The output of the command will be a json string with path to the object storage bucket where the processing result will be saved.
cwlVersion: v1.0
baseCommand: Stars
doc: "Run Stars for staging results"
class: CommandLineTool
hints:
DockerRequirement:
dockerPull: terradue/stars:1.0.0-beta.11
id: stars
arguments:
- copy
- -v
- -r
- '4'
- -o
- $( inputs.ADES_STAGEOUT_OUTPUT + "/" + inputs.process )
- -res
- $( inputs.process + ".res" )
- valueFrom: |
${
if( !Array.isArray(inputs.wf_outputs) )
{
return inputs.wf_outputs.path + "/catalog.json";
}
var args=[];
for (var i = 0; i < inputs.wf_outputs.length; i++)
{
args.push(inputs.wf_outputs[i].path + "/catalog.json");
}
return args;
}
inputs:
ADES_STAGEOUT_AWS_PROFILE:
type: string?
ADES_STAGEOUT_AWS_SERVICEURL:
type: string?
ADES_STAGEOUT_AWS_ACCESS_KEY_ID:
type: string?
ADES_STAGEOUT_AWS_SECRET_ACCESS_KEY:
type: string?
aws_profiles_location:
type: File?
ADES_STAGEOUT_OUTPUT:
type: string?
ADES_STAGEOUT_AWS_REGION:
type: string?
process:
type: string?
outputs:
s3_catalog_output:
outputBinding:
outputEval: ${ return inputs.ADES_STAGEOUT_OUTPUT + "/" + inputs.process + "/catalog.json"; }
type: string
requirements:
InlineJavascriptRequirement: {}
EnvVarRequirement:
envDef:
PATH: /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
AWS__ServiceURL: $(inputs.ADES_STAGEOUT_AWS_SERVICEURL)
AWS__SignatureVersion: "2"
AWS_ACCESS_KEY_ID: $(inputs.ADES_STAGEOUT_AWS_ACCESS_KEY_ID)
AWS_SECRET_ACCESS_KEY: $(inputs.ADES_STAGEOUT_AWS_SECRET_ACCESS_KEY)
ResourceRequirement: {}
Following is an example of an output which would be staged out:
outputs:
- id: wf_outputs
outputSource:
- step_1/results
type: Directory
As shown in the following example, the output definition of the combined workflow contains the original workflow output definition (like the one above) and the output definition of the stageout template named s3_catalog_output.
$graph:
- class: Workflow
doc: Main stage manager
id: main
inputs:
ADES_STAGEIN_AWS_ACCESS_KEY_ID:
type: string?
ADES_STAGEIN_AWS_REGION:
type: string?
ADES_STAGEIN_AWS_SECRET_ACCESS_KEY:
type: string?
ADES_STAGEIN_AWS_SERVICEURL:
type: string?
ADES_STAGEOUT_AWS_ACCESS_KEY_ID:
type: string?
ADES_STAGEOUT_AWS_PROFILE:
type: string?
ADES_STAGEOUT_AWS_REGION:
type: string?
ADES_STAGEOUT_AWS_SECRET_ACCESS_KEY:
type: string?
ADES_STAGEOUT_AWS_SERVICEURL:
type: string?
ADES_STAGEOUT_OUTPUT:
type: string?
aws_profiles_location:
type: File?
cbn:
doc: Common band name
id: cbn
label: Common band name
type: string
input_reference:
doc: Input product reference
id: input_reference
label: Input product reference
type: string
process:
type: string?
s_expression:
doc: s expression
id: s_expression
label: s expression
type: string
label: macro-cwl
outputs:
s3_catalog_output:
id: s3_catalog_output
outputSource:
- node_stage_out/s3_catalog_output
type: string
wf_outputs:
outputSource:
- node_stage_out/wf_outputs_out
type: Directory
requirements:
InlineJavascriptRequirement: {}
ScatterFeatureRequirement: {}
SubworkflowFeatureRequirement: {}
steps:
node_stage_in:
in:
ADES_STAGEIN_AWS_ACCESS_KEY_ID: ADES_STAGEIN_AWS_ACCESS_KEY_ID
ADES_STAGEIN_AWS_REGION: ADES_STAGEIN_AWS_REGION
ADES_STAGEIN_AWS_SECRET_ACCESS_KEY: ADES_STAGEIN_AWS_SECRET_ACCESS_KEY
ADES_STAGEIN_AWS_SERVICEURL: ADES_STAGEIN_AWS_SERVICEURL
input: input_reference
out:
- input_reference_out
run:
arguments:
- copy
- -v
- --harvest
- -rel
- -r
- '4'
- valueFrom: ${ if (inputs.input.split("#").length == 2) { return ["-af",
inputs.input.split("#")[1]]; } else {return '--empty'} }
- -o
- ./
- valueFrom: ${ return inputs.input.split("#")[0]; }
baseCommand: Stars
class: CommandLineTool
cwlVersion: v1.0
doc: Run Stars for staging input data
hints:
DockerRequirement:
dockerPull: terradue/stars:2.9.2
id: stars
inputs:
ADES_STAGEIN_AWS_ACCESS_KEY_ID:
type: string?
ADES_STAGEIN_AWS_REGION:
type: string?
ADES_STAGEIN_AWS_SECRET_ACCESS_KEY:
type: string?
ADES_STAGEIN_AWS_SERVICEURL:
type: string?
input:
type: string?
outputs:
input_reference_out:
outputBinding:
glob: .
type: Directory
requirements:
EnvVarRequirement:
envDef:
AWS_ACCESS_KEY_ID: $(inputs.ADES_STAGEIN_AWS_ACCESS_KEY_ID)
AWS_SECRET_ACCESS_KEY: $(inputs.ADES_STAGEIN_AWS_SECRET_ACCESS_KEY)
AWS__AuthenticationRegion: $(inputs.ADES_STAGEIN_AWS_REGION)
AWS__Region: $(inputs.ADES_STAGEIN_AWS_REGION)
AWS__ServiceURL: $(inputs.ADES_STAGEIN_AWS_SERVICEURL)
PATH: /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
InlineJavascriptRequirement: {}
ResourceRequirement: {}
node_stage_out:
in:
ADES_STAGEOUT_AWS_ACCESS_KEY_ID: ADES_STAGEOUT_AWS_ACCESS_KEY_ID
ADES_STAGEOUT_AWS_PROFILE: ADES_STAGEOUT_AWS_PROFILE
ADES_STAGEOUT_AWS_REGION: ADES_STAGEOUT_AWS_REGION
ADES_STAGEOUT_AWS_SECRET_ACCESS_KEY: ADES_STAGEOUT_AWS_SECRET_ACCESS_KEY
ADES_STAGEOUT_AWS_SERVICEURL: ADES_STAGEOUT_AWS_SERVICEURL
ADES_STAGEOUT_OUTPUT: ADES_STAGEOUT_OUTPUT
aws_profiles_location: aws_profiles_location
process: process
wf_outputs: on_stage/wf_outputs
out:
- s3_catalog_output
- wf_outputs_out
run:
arguments:
- copy
- -v
- -r
- '4'
- -o
- $( inputs.ADES_STAGEOUT_OUTPUT + "/" + inputs.process )
- -res
- $( inputs.process + ".res" )
- valueFrom: "${\n if( !Array.isArray(inputs.wf_outputs) )\n {\n \
\ return inputs.wf_outputs.path + \"/catalog.json\";\n }\n var\
\ args=[];\n for (var i = 0; i < inputs.wf_outputs.length; i++)\n \
\ {\n args.push(inputs.wf_outputs[i].path + \"/catalog.json\"\
);\n }\n return args;\n}\n"
baseCommand: Stars
class: CommandLineTool
cwlVersion: v1.0
doc: Run Stars for staging results
hints:
DockerRequirement:
dockerPull: terradue/stars:1.0.0-beta.11
id: stars
inputs:
ADES_STAGEOUT_AWS_ACCESS_KEY_ID:
type: string?
ADES_STAGEOUT_AWS_PROFILE:
type: string?
ADES_STAGEOUT_AWS_REGION:
type: string?
ADES_STAGEOUT_AWS_SECRET_ACCESS_KEY:
type: string?
ADES_STAGEOUT_AWS_SERVICEURL:
type: string?
ADES_STAGEOUT_OUTPUT:
type: string?
aws_profiles_location:
type: File?
process:
type: string?
wf_outputs:
type: Directory
outputs:
s3_catalog_output:
outputBinding:
outputEval: ${ return inputs.ADES_STAGEOUT_OUTPUT + "/" + inputs.process
+ "/catalog.json"; }
type: string
wf_outputs_out:
outputBinding:
glob: .
type: Directory
requirements:
EnvVarRequirement:
envDef:
AWS_ACCESS_KEY_ID: $(inputs.ADES_STAGEOUT_AWS_ACCESS_KEY_ID)
AWS_SECRET_ACCESS_KEY: $(inputs.ADES_STAGEOUT_AWS_SECRET_ACCESS_KEY)
AWS__ServiceURL: $(inputs.ADES_STAGEOUT_AWS_SERVICEURL)
AWS__SignatureVersion: '2'
PATH: /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
InlineJavascriptRequirement: {}
ResourceRequirement: {}
on_stage:
in:
cbn: cbn
input_reference: node_stage_in/input_reference_out
s_expression: s_expression
out:
- wf_outputs
run: '#s-expression'
- baseCommand: s-expression
class: CommandLineTool
hints:
DockerRequirement:
dockerPull: eoepca/s-expression:dev0.0.2
id: clt
inputs:
cbn:
inputBinding:
position: 3
prefix: --cbn
type: string
input_reference:
inputBinding:
position: 1
prefix: --input_reference
type: Directory
s_expression:
inputBinding:
position: 2
prefix: --s-expression
type: string
outputs:
results:
outputBinding:
glob: .
type: Directory
requirements:
EnvVarRequirement:
envDef:
PATH: /srv/conda/envs/env_app_snuggs/bin:/srv/conda/envs/env_app_snuggs/bin:/srv/conda/bin:/srv/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
ResourceRequirement: {}
- class: Workflow
doc: Applies s expressions to EO acquisitions
id: s-expression
inputs:
cbn:
doc: Common band name
label: Common band name
type: string
input_reference:
doc: Input product reference
label: Input product reference
type: Directory
s_expression:
doc: s expression
label: s expression
type: string
label: s expressions
outputs:
- id: wf_outputs
outputSource:
- step_1/results
type: Directory
steps:
step_1:
in:
cbn: cbn
input_reference: input_reference
s_expression: s_expression
out:
- results
run: '#clt'
$namespaces:
s: https://schema.org/
cwlVersion: v1.0
s:author:
- class: s:Person
s:affiliation: EOEPCA
s:email: [email protected]
s:name: Eoepca Developer
s:citation: https://github.com/EOEPCA/app-s-expression#readme
s:codeRepository: https://github.com/EOEPCA/app-s-expression
s:contributor:
- class: s:Person
s:affiliation: Community
s:email: s:[email protected]
s:name: Community Developer
s:dateCreated: '2022-05-03'
s:keywords: s-expression, processing, EO, bands
s:license: https://raw.githubusercontent.com/EOEPCA/eoepca/develop/LICENSE
s:releaseNotes: https://raw.githubusercontent.com/EOEPCA/app-s-expression/main/README.md
s:softwareVersion: 0.0.2
s:version: 0.0.2
schemas:
- http://schema.org/version/9.0/schemaorg-current-http.rdf
An instance of such an output would look similar to the example below:
{
"s3_catalog_output": "s3://processing-results/wf-75551ad0-6b0e-11ed-8458-62e4e1d63907/catalog.json",
"wf_outputs": {
"location": "file:///workflow/output-data/ha0l463y",
"basename": "ha0l463y",
"class": "Directory",
"listing": [
{
"class": "File",
"location": "file:///workflow/output-data/ha0l463y/wf-75551ad0-6b0e-11ed-8458-62e4e1d63907.res",
"basename": "wf-75551ad0-6b0e-11ed-8458-62e4e1d63907.res",
"checksum": "sha1$796c7cd620b241386724469a7a63e5d4a7d1bbbd",
"size": 112,
"path": "/workflow/output-data/ha0l463y/wf-75551ad0-6b0e-11ed-8458-62e4e1d63907.res"
}
],
"path": "/workflow/utput-data/ha0l463y"
}
}
Note that the location path in wf_outputs cannot be accessed as it represents the folder in the container where the processing job has run and once the job is complete everything gets cleared.
The Stars tool used in the above stage-in and stage-out CWL workflow provides a set of services for common operations with catalogs such as:
- Downloading the data as Stac catalog entry from an OGC Opensearch catalog or from an object storage repository
- Uploading data to an object storage repository as a Stac catalog entry
For further details on STARS, please refer to the github repository of the tool.
Now that we have introduced the concepts of stage in and stage out, you have acknowledge what goes on under the hood of the Ades when you execute application package with specific type of inputs and outputs.
⏭️ If you are happy using the Ades default stage in and stage out setup please proceed to the Get Started section.
⏭️ If you want to implement your own stage in and stage out, please proceed to the Custom Stage in and Stage out section.