-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpaper.tex
711 lines (550 loc) · 54.2 KB
/
paper.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
%% This is an article based on superfri.cls class file of
%% ``Supercomputing Frontiers and Innovations. An International Journal''
%% http://superfri.org/.
\documentclass{superfri}
\usepackage{graphicx}
\usepackage[hidelinks]{hyperref}
\usepackage{array}
\usepackage{todonotes}
\usepackage[capitalize]{cleveref}
\Crefname{lstlisting}{Listing}{Listings}
\usepackage{subcaption}
\usepackage{comment}
\usepackage{float}
\usepackage[utf8]{inputenc}
\usepackage{listings}
\usepackage{textcomp} %for upquote=true (straight quotes)
\definecolor{pblue}{rgb}{0.13,0.13,1}
\definecolor{pgreen}{rgb}{0,0.5,0}
\definecolor{pred}{rgb}{0.9,0,0}
\definecolor{pgrey}{rgb}{0.46,0.45,0.48}
\lstset{
basicstyle=\ttfamily\scriptsize,
numberstyle=\ttfamily\tiny,
frame=single,
numbers=left,
numbersep=1em,
xleftmargin=2em,
language=XML, % to avoid type being blue
breaklines=true,
breakatwhitespace=true,
postbreak=\hbox{$\hookrightarrow$ },
showstringspaces=false,
tabsize=2,
commentstyle=\color{pgreen},
keywordstyle=\color{pblue},
stringstyle=\bfseries,
moredelim=[il][\textcolor{pgrey}]{\[\]},
moredelim=[is][\textcolor{pgrey}]{\%\%}{\%\%},
columns=fullflexible,
upquote=true
}
\pagestyle{plain}
% ------------
\bibliographystyle{superfri}
\begin{document}
\author{Julian M. Kunkel\footnote{\label{uread}University of Reading, Reading, United Kingdom}\orcidID{0000-0002-6915-1179} \and Luciana R. Pedro\footnoteref{uread}\orcidID{0000-0001-8365-6264}
}
\title{Potential of I/O Aware Workflows in Climate and Weather}
\maketitle{}
\begin{abstract}
The efficient, convenient, and robust execution of data-driven workflows and enhanced data management are essential for productivity in scientific computing.
In HPC, the concerns of storage and computing are traditionally separated and optimised independently from each other and the needs of the end-to-end user. However, in complex workflows, this is becoming problematic. These problems are particularly acute in climate and weather workflows, which as well as becoming increasingly complex and exploiting deep storage hierarchies, can involve multiple data centres.
The key contributions of this paper are:
1) A sketch of a vision for an integrated data-driven approach, with a discussion of the associated challenges and implications, and 2) An architecture and roadmap consistent with this vision that would allow a seamless integration into current climate and weather workflows as it utilises versions of existing tools (ESDM, Cylc, XIOS, and DDN's IME).
The vision proposed here is built on the belief that workflows composed of data, computing, and communication-intensive tasks should drive interfaces and hardware configurations to better support the programming models.
When delivered, this work will increase the opportunity for smarter scheduling of computing by considering storage in heterogeneous storage systems.
We illustrate the performance-impact on an example workload using a model built on measured performance data using ESDM at DKRZ.
\keywords{workflow, heterogeneous storage, data-driven, climate/weather}
\end{abstract}
% -----------------------------------------------------------------------
\section*{Introduction}
\label{sec:intro}
High-Performance Computing (HPC) harnesses the fastest available hardware components to enable the execution of tightly coupled applications from science and industry.
Typical use-cases include numerical simulation of physical systems and analysis of large-scale observational data.
In the domain of climate and weather, there is a considerable demand for the orchestration of ensembles of simulation models and the generation of data products.
A service such as the operational weather forecast workflow in Met Office writes around 200\,TB and reads around 600\,TB every day.
In total, at the Met Office, on average 1.5\,PB and 14\,PB are written and read per day, respectively, for all climate and weather forecasts across all HPC clusters.
Based on the needs of climate and weather researchers, the HPC community has developed a software ecosystem that supports scientists to execute their large-scale workflows.
While the current advances correspond to a big leap forward, many processes still require experts.
For example, porting a workflow from one system to another requires adjusting runtime parameters of applications and deciding on how data is managed.
Since performance is of crucial importance to large-scale workflows, careful attention must be paid to exploit the system characteristics of the target computing centre.
For instance, a data-driven workflow may benefit from the explicit and simultaneous use of a locally heterogeneous set of computing and storage technologies.
This aspect means that substantial changes may be required to a workflow to tailor it to a particular supercomputer environment in order to obtain the best performance.
Knowing the capabilities, interfaces, and performance characteristics of individual components are mandatory to make the best use of them.
As the complexity of systems expands and alternative storage and computing technologies provide unique characteristics, it becomes increasingly difficult, even for experts, to manually optimise the usage of resources in workflows.
In many cases, modifications are not performed because: 1) They are labour intense: any change to the workflow requires careful validation which may not pay off for small scale runs; 2) It is a one-time explorative workflow and; 3) Users are not aware of the potential of the complex system.
In this paper, we illustrate how knowing the Input/Output (I/O) characteristics of workflow tasks and overall experimental design helps to optimise the execution of climate and weather workflows.
Exploiting this information automatically may increase the performance, throughput and cost-efficiency of the systems, providing an incentive to users and data-centres that cannot be neglected any longer.
Our approach intends to reduce the burden on researchers and, at the same time, optimise the decisions about jobs running on HPC systems.
This paper is structured as follows.
First, we describe the software stack involved in executing workflows in climate and weather in \Cref{sec:workflows}.
Related work in heterogeneous storage environments and solutions for workflow processing is presented in \Cref{sec:art}.
Next, the vision for including knowledge about data requirements and characteristics is sketched in \Cref{sec:vision} outlining the potential benefit the automatic exploitation might bring.
Our design, based on existing components in climate and weather, is described in \Cref{sec:design}.
An example use case demonstrates the impact on running a workload at the Mistral supercomputer in \Cref{sec:evaluation}.
The paper is concluded in \Cref{sec:conclusions}.
\section{Workflows in Climate/Weather}
\label{sec:workflows}
In this section, we describe how workflows are executed in a conventional software stack and the typical hardware and software environment involved in running a climate and weather application.
\subsection{Cylc}
Cylc~\cite{8675433} is a general-purpose workflow engine in charge of executing and monitoring cyclic workflows in which each step is submitted to the batch scheduler of a data centre.
With Cylc, tasks from multiple cycles may be able to run concurrently without violating dependencies and preventing the issue of delays that cause one cycle to run into another.
Cylc was written in Python and built around a new scheduling algorithm that can manage infinite workflows of cycling tasks without a sequential cycle loop.
At any point during workflow execution, only the dependence between the individual tasks matters, regardless of their particular cycle points.
The information Cylc uses to control a given workflow is the task dependency.
In a script file, the developers define, for each task, the parallelism settings and where data is to be stored.
\fig{width=0.9\columnwidth}{pic/cylc1}{Example of a Cylc workflow with its configuration file \cite{8675433}}
Consider the Cylc workflow for a toy monthly cycling workflow in \cref{pic/cylc1}.
In this workflow, an atmospheric model (labelled as \texttt{model} in the figure) simulates the physics from a current state to predict the future, for example, a month later.
In climate research, this process is repeated in the model to simulate years into the future.
Once the simulation of any month is computed, data for this month becomes available and can now be analysed.
In this workflow, the task \texttt{model} is followed by tasks postprocessing (\texttt{post}), forecast verification (\texttt{ver}), and product generation (\texttt{prod}), all specified as a workflow in a Cylc configuration file (\texttt{flow.cylc}).
\subsection{Workflow Execution}
While Cylc is directing the execution of workflows, several components are presented in the implementation.
The software stack involved in a general workflow is depicted in \cref{pic/stages}.
Next, each stage of the execution is further described.
\fig{width=0.7\textwidth}{pic/stages}{Software stack and stages of execution}
\begin{enumerate}
\item \textbf{Scientist} specifies the workflow and provides a command or a script for each task.
As part of the Cylc configuration, the command(s) to be run, any environment variables used by these application(s), and any workload manager directives.
After that, the user enacts Cylc to start the workflow.
\item \textbf{Cylc} parses the workflow configuration file (\texttt{flow.cylc}), generates tasks dependencies, defines a schedule for the execution, and monitors the progress of the workflow.
Once a task can be executed (dependencies are fulfilled), the workflow engine submits a \textit{job script} for the workload manager with the required metadata that will run the Cylc task script.
\item \textbf{Workload Manager} such as Slurm~\cite{Jette02slurm:simple} is responsible for allocating compute resources to a batch job and performing the job scheduling.
The selected tool queues the job that represents the Cylc task and plans its execution, considering the scheduling policy of the data centre.
Once the job is scheduled to be dispatched, i.e., resources are available, and the job priority is the highest, it is started on the supercomputer.
\item \textbf{Job} provides the environment with the resources and runs the user-provided program or script on one of the nodes allocated for it.
Local variables containing information about the environment of the batch job, e.g., the compute nodes allocated, enact the Cylc provided script on the node.
\item \textbf{Script} starts the commands sequentially (a command can be a parallel application).
During the creation of the script, Cylc has included variables that describe the task in the workflow.
The information is typically fed into the application(s) representing the task and defining the storage location.
The script uses commands to generate filenames considering the cycle and may store data in a workflow-specific shared directory.
Either these commands are set in the Cylc workflow and then injected as environment variables or directly utilised as a part of the user-provided script.
\item \textbf{Application} is executed taking the filenames set by the script.
\end{enumerate}
\subsection{I/O Stack of a Parallel Application}
Climate applications may have complex I/O stacks, as can be seen in \cref{pic/layers-xios}.
In this case, we assume the application uses XIOS~\cite{xios}, which is providing domain-specific semantics to climate and weather.
It may gather data from individual fields distributed across the machine (exploiting MPI for parallelism) and then uses NetCDF~\cite{netcdf} to store data as a file.
Under the hood, NetCDF uses the HDF5 API with its file format.
Internally, HDF5 uses MPI and its data types to specify the nature of data stored.
Finally, data is stored on a parallel file system like Lustre which, on the server-side, stores data in a local file system on block devices such as SSDs and HDDs.
\begin{figure}[b]
\begin{minipage}{.42\linewidth}
\centering
\includegraphics[width=0.42\textwidth]{pic/layers-xios}
\subcaption{I/O path for an MPI application}\label{pic/layers-xios}
\end{minipage}
\begin{minipage}{.58\linewidth}
\centering
\includegraphics[width=1\textwidth]{pic/system}
\subcaption{Example of an heterogeneous HPC landscape}\label{pic/system}
\end{minipage}
\vspace{5pt}
\caption{Typical hardware and software environment for applications}
\label{fig:34}
\end{figure}
Different applications involved in a workflow may use different I/O stacks to store their outputs.
Naturally, the application which uses previously generated data as its inputs must use a compatible API to read the specific data format.
In \cref{pic/layers-xios}, for example, XIOS may perform parallel I/O via the NetCDF API, allowing subsequent processes to read data directly using NetCDF.
Within the ESiWACE project\footnote{\url{https://www.esiwace.eu/}}, we are developing the Earth System Data Middleware (ESDM)\cite{esdm} to allow applications with this kind of software stack to exploit heterogeneous storage resources in data centres.
The goal of ESDM is to provide parallel I/O for parallel applications with advanced features to optimise subsequent read accesses.
Implemented as a standalone API, it also provides NetCDF integration allowing its usage in existing applications.
Hence, in \cref{pic/layers-xios}, the HDF5 layer can be replaced with ESDM.
\subsection{Data Centre Infrastructure}
At present-day, data centres provide an infrastructure consisting of computing and storage devices with different characteristics, making them more efficient for specific tasks and satisfying the needs of different workflows.
Take, for example, the supercomputer Mistral at DKRZ, that consists of 3,321 nodes\footnote{\url{https://www.dkrz.de/up/systems/mistral}} and offers two types of compute nodes equipped with different CPUs and GPU nodes.
Each node has an SSD for local storage, and DKRZ has additionally two shared Lustre file systems with different performance characteristics.
Individual users and projects are mapped to one file system explicitly, and users can access it with \texttt{work} or \texttt{scratch} semantics.
While data is kept on the \texttt{work} file system indefinitely, available space is limited by a quota.
The \texttt{scratch} file system allows storing additional data, but data is automatically purged after some time.
Future centres are expected to have even more heterogeneity. A variety of accelerators (GPU, TPU, FPGAs), active storage, in-memory, and in-network computing technologies will provide further storage and processing capabilities.
\cref{pic/system} shows such a system with a focus on computation and storage.
Some of these technologies might be locally (specific compute nodes) or globally available.
Depending on the need, the storage characteristics range from predictable low-latency (in-memory storage, NVMe) to online storage (SSD, HDD), and also cheap storage for long-term archival (tape).
The tasks within any given workflow could benefit from utilising different combinations of storage and computing infrastructure.
\subsection{Data Management}
\label{sec:datamanagement}
Usually, the scripts representing tasks define how data is placed on the available storage system.
What happens in many current workflows is that they ignore the benefits of using multiple file systems concurrently and data locality between tasks to colocating them.
On top of that, in the current state-of-the-art scientists optimise the available storage resources intuitively and compile the information about this decision-making process manually.
If a user knows the workflow and the system characteristics, data placement decisions can be optimised.
Consider, for instance, the situation where each computing node has access to three file systems: a fast \texttt{scratch} file system on which data may reside only for a week, a slower \texttt{work} file system, and a local file system.
Most current workflows utilise \texttt{work} and \texttt{scratch} systems.
When a task is set to run, the corresponding dataset would be moved from \texttt{work} to \texttt{scratch}, processed, and the resulting dataset would be transferred back to \texttt{work}.
If the \texttt{scratch} file system reaches its capacity, the dataset would be moved back to \texttt{work}, and the task would continue running until it is finished, which might be inefficient.
In this situation, there are many straightforward opportunities to utilise data migration to optimise performance, and also other criteria (e.g., costs).
However, with a multitude of file systems that differ at each data centre, such optimisations would be difficult to achieve manually by users.
Policy-driven systems and burst buffers perform such optimisations automatically to some extent. However, as they lack information about the workflow, they cannot optimise workflows altogether.
\section{State-of-the-Art}
\label{sec:art}
Related work to the proposed approach can be categorised into:
1) Technology that exploits heterogeneous storage environments and supports user-directed policies and
2) Solutions for workflow processing.\\
\textbf{Technology.}
Manual tiering requires the user or application to control data placement, i.e., storing data typically in the form of files on a particular storage system and, usually, moving data between storage by scripts.
One limitation of such an approach is that decisions about how data are mapped and packaged into files are made by the producing application, and cannot be changed without manual intervention by a downstream application.
Burst buffer solutions provide a tiered storage system that aims to exploit a storage hierarchy.
They can be integrated into hardware capabilities such as DDN's Infinite Memory Engine (IME)~\cite{BODIAIFSFI19} or simple software solutions.
A policy system, e.g., deployed on a burst buffer~\cite{RomanusRP15}, aims to simplify data movement for the user, but typically migrates objects in the coarse granularity of files.
File systems and data management software such as IBM Spectrum Scale~\cite{SchmuckH02}, HPSS~\cite{528214}, BeeGFS~\cite{beegfs}, and Lustre~\cite{abs-1903-01955} (e.g., using the progressive file layouts feature) provide hierarchical storage management allowing to store data on different storage technology according to administrator-provided policies.
However, the semantic information that can be used by this type of system to make decisions is limited, e.g., data location, file extension, file age, etc.
The storage community had also adjusted various higher-level software to support storage tiering on top of several storage systems. For instance, ADIOS provides in-memory staging that had been exploited by a variety of applications~\cite{slawinska2013maya}.
Hermes~\cite{kougkas2020acceleration} introduces a multi-tiered I/O buffering system with pre-fetcher that provides several data placement policies.
iRODS~\cite{rajasekar2010irods} is a rule-oriented data system that allows scientists to organise data into shareable collections and provides several patterns for workflows considering data locality and data migration/replication.
Finally, there have also been extensions to batch schedulers to perform data staging for utilising node-local storage, for example, NORNS~\cite{miranda2019norns} as an extension to Slurm.\\
\textbf{Workflows.}
A good overview of the flavours of Scientific Workflow Management Systems (SWfMS) and their application to data-intensive workflows is given in \cite{liu2015survey}.
The article states that SWfMS should enable the parallel execution of data-intensive scientific workflows and exploit vast amounts of distributed resources.
Existing solutions recognise challenges in data variety (formats of the input data), opportunities to optimise the schedule by moving code to data, specification of the data dependencies for tasks, and they even may consider the capacity of the available data storage.
The execution engine Dryad~\cite{isard2007dryad}, for example, allows transferring data between tasks via files or directly using TCP connections and attempts to schedule tasks on the same nodes or racks.
%Swift/T is a scripting language for describing dataflow processing enabling the execution of ensembles of applications~\cite{ozik2016desktop}.
%Recent improvements aim to migrate data to a local cache allowing to exploit locality.
%For instance, in \cite{dai2018cross}, information about locality is proposed to be stored in extended attributes.
%In \cite{TUIBIHWLSC19}, an approach was presented to monitor and analyse I/O behaviour of HPC workflows.
In \cite{TUIBIHWLSC19}, an approach was presented to monitor and analyse I/O behaviour of HPC workflows.
Swift/T~\cite{6546066}, a scripting language for describing dataflow processing enabling the execution of ensembles of applications, is now openly used as a prototype platform~\cite{ozik2016desktop}.
Recent improvements aim to migrate data to a local cache allowing to exploit locality.
For instance, in \cite{dai2018cross}, information about locality is proposed to be stored in extended attributes.
Several early research in grid workflows and, lately, cloud workflows, use cases of interest to maximise data locality.
Economic factors (including storage costs) for workflow execution are discussed in \cite{alkhanak2016cost}.
In \cite{deelman2019role}, the authors discuss the role of Machine Learning (ML) for workflow execution and elaborate a general potential for resource provisionings such as optimisation of runtime parameters, data movements, and hierarchical storage.
In \cite{subedi2019leveraging}, an ML model that stages data for in-situ analysis by exploiting the access patterns is introduced.
Workflow systems can also be specifically utilised to reproduce scientific results, i.e., recompute the results.
Those scalable workflow solutions typically utilise a container solution to allow execution in an arbitrary software environment.
Popper~\cite{jimenez2017popper}, Snakemake~\cite{bts480}, and Nextflow~\cite{Nextflow} provide a language to specify workflows and to execute them.
Snakemake is interesting as it supports definition and inference of input and output filenames.
While various aspects of our vision have been addressed individually by related work for different domains, the high level of abstraction that we aim for and the potential it unleashes goes beyond the capabilities of existing approaches.
\section{Vision for I/O-Aware Workflows}
\label{sec:vision}
Nowadays, in order to run a job in an HPC environment efficiently, researchers have to develop profound knowledge, not only about their workflow, which is expected, but also about decisions regarding storage, communication, computing, and considerations regarding cost-efficiency of those operations.
However, applied scientists should not spend much time understanding hardware characteristics and operational knowledge of running a data centre, but using their expertise to develop their work and just collect and analyse the results of their experiments.
We aim for achieving an automatic and dynamic mapping of I/O resources to workflows.
Once we have an automated decision about where the job will run and how the storage will be managed, scientists can then reuse their workflow specification on any system without further modification and even without previous knowledge about the system architecture.
There are several approaches to implement the technology for the vision proposed in this work, and changes are needed in the software components to realise it.
In \Cref{sec:design}, we will discuss a specific design for our transitional roadmap considering climate and weather workflows and tools scientists from this field already use in their routine research.
Our vision for I/O-aware workflows requires two additional pieces of information.
Firstly, the user must augment the workflow description with information about I/O specifications and explicitly annotate dependencies to datasets.
Secondly, details about the storage architecture must be available.
\subsection{System Information}
While many optimisations are possible once an abstraction is in place, the improvements we discuss here are related to the life cycle and placement of datasets into specific storage according to system performance characteristics and workflow specification.
To achieve that, the system information shall comprise of all available storage systems, the system topology, and details of each available component.
Simplified and complex models of the components can be included to approximate expected performance for specific I/O patterns.
It is expected that the data centre (or expert user) can create such a configuration file, e.g., by using vendor-provided information or by executing benchmarks.
With this information, a scheduler can make the initial data placement, transformation, and migration decisions for individual datasets during their life cycle.
This separation of concerns allows us to abstract from the workflow what is essential and what a system should optimise to ensure smart usage of the available resources.
\subsection{Extended Workflow Description}
In general, climate and weather workflows allow specifying tasks and dependencies among them.
We aim to enhance the current information with characteristics for input and output, i.e., the datasets.
An example workflow with \texttt{N} cycles containing input datasets and (intermediate) products is illustrated in \cref{pic/workflow-s3-n}.
Round nodes represent tasks, squared nodes represent data, and arrows indicate dependencies.
In the example, Task\,1 needs two datasets to perform its work, it produces Product\,1, and directly communicates with Task\,2.
For each new cycle, the \texttt{checkpoint} from the previous cycle (Product\,1) is used as input to starting the next cycle.
Most of the workflow can run automatically, except for the manual quality control of the products and the final data usage of Product\,3.
This last step represents the typical uncertainty of data reuse, i.e., it is unclear how Product\,3 will be used further.
In the approach proposed in this work, each task is annotated with the required input datasets and the generated products must include metadata such as data life cycle, the value of data, and how long it should be kept.
The idea here is to embrace the concept that tasks dependencies are really imposed by datasets dependencies.
\fig{width=0.8\columnwidth}{pic/workflow-s3-n}{Example of a high-level workflow with tasks and data dependencies}
\subsection{Smarter I/O Scheduling}
The abstraction and automation of the I/O inside a workflow allow a runtime system to improve data placement and apply data reduction on heterogeneous storage systems.
Taking into consideration the architecture and workflow information, a smarter schedule can now be realised by exploiting the additional information.
Value and priority can influence fault-tolerance strategies and imply the quality of service for performance and availability.
Aspects like data reproducibility (can it be recomputed easily), type of the experiment (test, production), and runtime constraints for the overall and potential workflow could allow reducing costs and, hence, increase scientific output.
Next, we outline two core strategies and the potential the proposed vision can bring to the improvement of current workflows.
In the design proposed in this work (\Cref{sec:design}), we will focus on the data placement strategy.
\paragraph{Strategy: Data Placement}
Data placement encompasses all data movement-related activities such as transfer, staging, replication, space allocation and de-allocation, registering and unregistering metadata, locating and retrieving data\footnote{https://www.igi-global.com/dictionary/data-aware-distributed-batch-scheduling/6782}.
The general idea is to host a dataset on the storage system that is most favourable in terms of performance, cost-effectiveness, and availability for the access pattern observed in the workflow.
Here we are considering the optimisation of data locality, where locality is twofold, spatial and temporal, on a variety of characteristics.
For optimising data placement, we introduce four approaches: data allocation, data migration, data replication and direct-coupling.
\begin{description}
\item[Data Allocation] is the assignment of a specific area of an available storage system to particular data.
In current workflows, the user usually has a script for each task defining the filenames with a prefix that places datasets generated by the same task into a specific storage\footnote{Complicated scripts would have allowed changing the storage type depending on the cycle. Still, it is a significant burden to the user.}.
Because there is one script responsible for generating the configuration, the decision in which directory the dataset will be stored is somewhat fixed.
Such configuration is done manually and with restricted information about the system architecture.
It would be interesting to explore storage options for the datasets and, e.g., to have datasets from different cycles placed at different storage systems.
For instance, in \cref{pic/workflow-s3-n}, alternating the storage location for Product\,2 into two \texttt{scratch} file systems is something that would be a simple job for an I/O-aware scheduler.
However, currently, it implies providing scripts for that task and all tasks depending on it with information about the different storage placement.
\item[Data Migration] is the process of transferring data from one storage system to another. Typically, it involves to delete data, but this decision can be delayed to provide read access to multiple storage systems.
Data movement involves a significant overhead, both in terms of latency and energy-efficient computing, as data must be read on one storage and written to another.
Hence, it needs to be considered carefully.
\cref{fig:lifecycle} introduces three possible life cycles for a specific dataset and explains how migrations can be done to improve datasets accessibility.
In \cref{fig:lifecycle1}, the dataset could be first stored on the local storage to avoid congestion on the \texttt{work} file system, then it is migrated to \texttt{work} file system where subsequent tasks of the workflow may read it multiple times.
In the end, this dataset may be an intermediate product that can then be deleted.
In \cref{fig:lifecycle2}, the dataset is stored on the \texttt{scratch} file system immediately and accessed there.
However, the last read access must happen before files on \texttt{scratch} are automatically removed.
Alternatively, \cref{fig:lifecycle3} presents the case where the dataset is created on \texttt{work} and it is copied to a local node.
This local node allows reading accesses of subsequent tasks which might be beneficial for small random accesses.
For the last two scenarios, subsequent tasks would have to be placed on the same node where previous data was stored.
\begin{figure}[b]
\begin{minipage}{.33\linewidth}
\centering
\includegraphics[width=0.9\columnwidth]{pic/lifecycle-1}
\subcaption{Local and \texttt{work} file systems}\label{fig:lifecycle1}
\end{minipage}
\begin{minipage}{.33\linewidth}
\centering
\includegraphics[width=0.9\columnwidth]{pic/lifecycle-2}
\subcaption{\texttt{Scratch} file system only}\label{fig:lifecycle2}
\end{minipage}
\begin{minipage}{.33\linewidth}
\centering
\includegraphics[width=0.9\columnwidth]{pic/lifecycle-3}
\subcaption{Local and \texttt{work} file systems}\label{fig:lifecycle3}
\end{minipage}
\vspace{5pt}
\caption{Alternative life cycles for mapping a dataset to storage and the operations: \textbf{A}llocation, \textbf{M}igration, \textbf{R}eading, and \textbf{D}eleting}
\label{fig:lifecycle}
\end{figure}
\item[Data Replication] in computing involves sharing information to ensure consistency between redundant resources, such as software or hardware components, to improve reliability, fault-tolerance, or accessibility.
Data might be replicated by enabling the system to rerun parts of the workflow in case of a data loss.
In addition, the system may combine the replication of data by \textbf{transforming} data into a different representation allowing to achieve better performance considering a variety of access patterns.
\item[Direct-Coupling] replaces I/O by communicating data between subsequent steps of a workflow directly without storing intermediate data products on persistent storage.
As an example, in \cref{pic/workflow-s3-n}, the outcome of Task\,1 may be used directly by Task\,2.
Data may also be kept in memory and cached to achieve a certain level of independence between producer and consumer.
\end{description}
\paragraph{Strategy: Data Reduction}
Data reduction decreases the amount of data stored.
We discuss here two potential optimisations: data compression and data recomputation.
\begin{description}
\item[Data Compression] is the process of encoding information using fewer bits than the original representation.
Knowing the characteristics of data production and usage allows scientists to annotate the required precision of data in those workflows.
The storage system can exploit such information by reducing the precision of data and automatically picking an appropriate compression algorithm.
\item[Data Recomputation] Climate/weather scientists are trading recomputation with space usage manually.
By knowing how to rerun the workflow behind the data creation, a smarter storage system can automatically trade data availability for potential recomputation opportunities to optimise the cost-efficiency of the system.
Intermediate states could be rerun by utilising virtualisation and container technologies.
Consider \cref{pic/workflow-s3-n} again and that, at every \texttt{K} cycles of the workflow, the generated Product\,3 (from Cycle 1 to Cycle \texttt{K}) are used in a validation task, called here \texttt{check}.
From the workflow, we know that $P_3 C_1$\footnote{The $P_i C_j$ notation represents the Product\,$i$ generated in the Cycle $j$.} will be used to construct $P_3 C_2$ and then \texttt{check}.
This dataset would probably be stored somewhere, and it will not be used until the workflow reaches the \texttt{K}-th cycle.
One alternative is to delete it after it was first used and then recompute it when time is right.
The cost of doing that is storing \texttt{checkpoint} and then use it to reconstruct $P_3 C_1$.
If, for instance, $P_3 C_1$ is a large dataset, \texttt{checkpoint} is small, and computing time is short, it is easy to see that deleting and recomputing it may improve the costs of running the workflow.
That is just an example, and, currently, scientists perform those optimisations manually.
\end{description}
\subsection{Benefit}
The benefits of the proposed vision are:
\begin{description}
\item[Abstraction] Providing the abstraction that enables a separation of concerns.
Once the I/O characteristics of a workflow are defined, the user does not have to know the architecture of the target system on which the workflow will run.
Thus, this level of abstraction can remove the specialist from the decision-making process of individual workflows.
\item[Optimisation] The workflow will be optimised specifically for the available system infrastructure and information about data.
In particular, by exposing the heterogeneous architecture, potential runtime characteristics can be considered.
By using information about the value of data, policies for data management (storage resilience, recomputation, replication, etc.) can be decided.
\item[Performance-portability] With both abstraction and optimisation, the user can specify the I/O requirements only once for the tasks of a specific workflow, and the I/O-aware workflow can now run with optimised data storage on any system without user intervention.
Even more, if the system characteristics change, e.g., it gets upgraded, an additional storage tier becomes available, or if storage degrades, the I/O-aware workflow could automatically adapt and make use of this new environment.
\end{description}
\section{Design}
\label{sec:design}
This section describes our first approach to incrementally extend workflows for climate and weather that realises parts of our vision.
While individual components such as ESDM and Cylc exist, we have not implemented the described scheduler, yet.
To automatically make scheduling decisions, the software stack needs to: \vspace{0.3cm}
\begin{enumerate}
\item Deliver information about dataset life cycle together with the workflow, and
\item Adapt the resulting workflow, individual scripts, and application executions to consider the potential for data placement strategies.
\end{enumerate}
\subsection{System Information}
The system information of the design is realized using already available capabilities in the ESDM middleware.
We assume ESDM is used as the I/O middleware in the parallel application (with NetCDF or directly) and orchestrates the I/O according to a simplified ESDM configuration file (\texttt{esdm.conf}).
This file contains information about the available technology in the data centre, its I/O characteristics, and will be used to make decisions about how to prioritise I/O targets.
In the example presented in \cref{lst:esdm.conf}, we have three storage targets: two global accessible file systems (\texttt{lustre01} and \texttt{lustre02}), and one local file system in \texttt{/tmp} that can be accessed via the POSIX backend.
Each of them comes with a lightweight performance model and the maximum size of data fragments.
The metadata section (Line 24) utilises here a POSIX interface to store the information about ESDM objects. Internally, ESDM creates so-called containers and dataset objects to manage data fragments.
\begin{figure}[!ht]
\begin{lstlisting}
"backends": [
{"type": "POSIX", "id": "work1", "target": "/work/lustre01/projectX/",
"performance-model" : {"latency" : 0.00001, "throughput" : 500000.0},
"max-threads-per-node" : 8,
"max-fragment-size" : 104857600,
"max-global-threads" : 200,
"accessibility" : "global"
},
{"type": "POSIX", "id": "work2", "target": "/work/lustre02/projectX/",
"performance-model" : {"latency" : 0.00001, "throughput" : 200000.0},
"max-threads-per-node" : 8,
"max-fragment-size" : 104857600,
"max-global-threads" : 200,
"accessibility" : "global"
},
{"type": "POSIX", "id": "tmp", "target": "/tmp/esdm/",
"performance-model" : {"latency" : 0.00001, "throughput" : 200.0},
"max-threads-per-node" : 0,
"max-fragment-size" : 10485760,
"max-global-threads" : 0,
"accessibility" : "local"
}
],
"metadata": {"type": "POSIX",
"id" : "md",
"target" : "./metadata",
"accessibility" : "global"
}
\end{lstlisting}
\caption{Example of an ESDM configuration file (\texttt{esdm.conf})}
\label{lst:esdm.conf}
\end{figure}
ESDM manages a pool of threads that should be created per compute node to achieve better performance and delegates the assignment of optimal block sizes to the storage backend.
Since ESDM supports several (non-POSIX) storage backends, an application can utilise all available storage systems without any modifications to the code.
%The maximum number of threads is defined in the configuration file.
The configuration file is inquired by the application utilising ESDM and steers the distribution of data during I/O.
To elucidate the system's behaviour, ESDM distributes a single dataset across multiple storage devices depending on their characteristics.
While the current system information and performance model are based on latency and throughput only, ESDM shows that automatic decision making can be made on behalf of the user.
%As an example, based on the number of Object Storage Targets (OSTs) provided by both Lustre systems at DKRZ, performance tests already developed using ESDM~\cite{2019_3361225} shows that no more than 200 threads in total should be used to perform I/O to extract the best performance.
\subsection{Extended Workflow Description}
The user now has to provide information about the datasets required as input and the generated output for each workflow task in a file called I/O-workflow configuration file (\texttt{io.cylc}).
An example of an \texttt{io.cylc} file is shown in \cref{lst:cylc}. In this file, information about Task\,1 is given as an example, and we expect the extra information about all tasks in the same file.
This file could define a cycle flexibly to be a month or a year according to the file \texttt{flow.cylc}.
The notation is similar to the specification of Cylc workflows using a nested INI format and, ultimately, files \texttt{io.cylc} and \texttt{flow.cylc} can be merged.
For each task, inputs and outputs are defined.
In the \textbf{input} section, each entry specifies the virtual name that is used by ESDM as a filename inside NetCDF.
Line~5, for example, it defines that the filename \texttt{topography} is mapped to a specific input file and that this dataset does not depend on any previous step of the workflow.
The next line specifies that the input filename \texttt{checkpoint} should be mapped to the output of Task~1 \texttt{checkpoint} dataset from the previous cycle (e.g., the checkpoint generated after completing the last year's production).
For the initial cycle, the checkpoint file will be empty, and the application will load \texttt{init} data.
In the \textbf{output} section, the datasets are annotated with their characteristics more precisely.
For each variable, a pattern defining how frequently data is output according to the workflow must be provided.
Most data is input and output in the periodicity of the cycle. Still, we can have variables with different patterns, such as \texttt{varA}, which is output per day regardless of the cycle.
\begin{figure}[!ht]
\begin{lstlisting}
[Task 1]
[[inputs]]
topography = "/pool/input/app/config/topography.dat"
checkpoint = "[Task 1].checkpoint$(CYCLE - 1)"
init = "/pool/input/app/config/init.dat"
[[outputs]]
[[[varA]]] # This is the name of the variable
pattern = 1 day
lifetime = 5 years
type = product
datatype = float
size = 100 GB
precision.absolute_tolerance = 0.1
[[[checkpoint]]]
pattern = $(CYCLE)
lifetime = 7 days
type = checkpoint
datatype = float
dimension = (100,100,100,50)
[[[log]]]
type = logfile
datatype = text
size = small
\end{lstlisting}
\caption{External Cylc I/O-workflow configuration file (\texttt{io.cylc})}
\label{lst:cylc}
\end{figure}
Next, we formally define the expected annotations in all the fields envisioned in the I/O-workflow configuration file:
\begin{description}[itemsep=0pt]
\item[Name] A primary name for the field/data generated. It is extended by a pattern defined in a variable (Lines: 11, 19, 26).
\item[Pattern] The frequency of data output (Lines: 12, 20).
\item[Lifetime] How long data must be retained on storage (if at all) (Lines: 13, 21).
\item[Type] The class type of data, i.e., checkpoint, diagnostics, temporary (Lines: 14, 22, 27).
\item[Datatype] The data type of data (Lines: 15, 23, 28).
\item[Size] An estimate of data size\footnote{This field can be inferred if dimension and data type are provided.} (Lines: 16, 29).
\item[Dimension] The data dimension (Line: 24).
\item[Accuracy] Characteristics quantifying the required level of data precision (Line: 17).
\end{description}
Note that the user may not be able to provide all required information which can be handled by assuming a default safe behaviour.
For instance, in the case of missing data precision, data should be retained in the original form.
Knowing the dimension or size a priori might be difficult for scientists, e.g., the log file size is unclear.
In this case, the user may insert relevant information like small or big, indicating that any information is better than no information at all.
In future, we will explore ways to infer the output volume from the input automatically.
For instance, by running the workflow without I/O specification and monitoring I/O accesses for one cycle, we can propose an I/O description to the user to simplify the specification and generate an experimental I/O configuration file.
\subsection{Smarter I/O Scheduling}
From the list of opportunities, we realise data placement and migration in a heterogeneous (multi-storage) environment.
These goals will be achieved via the proposed I/O-aware scheduler, called here EIOS (ESDM I/O Scheduler).
EIOS will make the schedule considering Cylc workflow and ESDM provided system characteristics.
We are working together with Cylc Team in developing how EIOS interfaces with Cylc.
While Cylc schedules the workflow, EIOS can provide hints about colocating tasks which generate the opportunity for keeping data in local storage.
Our design imposes only minor changes to Cylc as normal functionalities cover the core requirements:
\begin{description}
\item[The ability to dynamically set the job (Slurm) directives for a task]
\ \\ \vspace{-0.6cm}
This will be achieved by calling an external command (run on the Cylc scheduler host) which adds additional directives to be used by the job.
This command, provided by EIOS, will determine attributes of previous tasks through simple SQL queries to the Cylc database.
We plan on using the Cylc broadcast functionality to change the instructions used by a task by running an external program before any task where we may want to alter the directives.
\item[The ability to dynamically set storage locations]
\ \\ \vspace{-0.6cm}
This will be achieved by defining environment variables in the job script which are set to the output of another external command (run on the job host).
This command, also provided by EIOS, will have access to all the standard Cylc environment variables with details about the current task. \vspace{-0.1cm}
\end{description}
We plan on utilising DDN's IME API to pin data in IME and trigger migrations between IME and a storage backend explicitly.
Decisions about data locality will not be made for a whole (and potentially big) workflow.
Instead, the system will make decisions by looking ahead to several steps of the workflow, allowing reacting to the observed dynamics of the execution.
Ultimately, when a user-script runs, the information about the intended I/O schedule is communicated from EIOS through a modified filename, which is then used by the ESDM-aware application to determine the data placement.
\subsection{Modified Workflow Execution}
The steps to execute a workflow enriched with I/O information and perform smarter scheduling are depicted in \cref{pic/stages-io}.
Components of EIOS are involved in different steps of the workflow and the I/O path.
The suggested alterations can be seen in boxes pointed by red arrows, and the remaining components are the current state-of-the-art for workflows in climate and weather from \cref{pic/stages}.
In the following, we describe the modifications we propose in this vision paper for each component involved in the software stack.
\fig{width=0.7\columnwidth}{pic/stages-io}{Software stack and stages of execution with the I/O-aware scheduler (EIOS). The red arrows and boxes indicate additions to the workflow compared to \cref{pic/stages}.}
\begin{enumerate}
\item \textbf{Scientist} The user now has to provide an additional file that covers the I/O information for each task and slight changes have to be made to the current scripts.
\item \textbf{Cylc} EIOS is invoked by Cylc to identify potential optimisations in the schedule before generating the Slurm script.
\item \textbf{EIOS} The ESDM I/O Scheduler reads the information about the workflow (\texttt{flow.cylc} and \texttt{io.cylc} configuration files) and acts depending on the stage of the execution.
EIOS consists of several subcomponents:
\begin{itemize}
\item The high-level scheduler that interfaces with Cylc.
\item A tool to generate pseudo filenames used by the ESDM-aware applications.
\item A data management service (not shown in the figure) that migrate and purge data at the end of the life cycle.
\end{itemize}
EIOS components use knowledge about the system by parsing the \texttt{esdm.conf} file.
EIOS may decide that subsequent jobs shall be placed on the same node, reorder the execution of some jobs, and provide information for conducting data migration.
\item \textbf{Slurm}
Cylc may now have added an optimisation identified by EIOS which is promptly handled by a modified Slurm.
Also, if migrations have to be performed, Slurm will administer them according to the specification in the job script.
\item \textbf{Job}
A job might run on the same node as a previous job to utilise local storage.
\item \textbf{Script}
Filenames are now generated by a replacement command that calls EIOS to create a pseudo filename.
This filename will encode additional information for ESDM about how to prioritise data placement according to data access.
\item \textbf{Application}
The application may either use XIOS, NetCDF with ESDM support or ESDM directly to access datasets.
% Hence, in \cref{pic/layers-xios}, the HDF5 layer is replaced with ESDM.
ESDM loads the file \texttt{esdm.conf} that contains the information about the available storage backends and their characteristics.
ESDM extracts the long-term schedule information from the generated pseudo filenames and employs them during the I/O scheduling to optimise the storage considering data locality among tasks.
Basically, ESDM can now change the priorities for data placement in the different storage locations that would typically be encoded in Cylc's configuration file.
\end{enumerate}
\section{Potential Benefit}
\label{sec:evaluation}
In this section, we discuss the potential performance benefit that our vision for I/O-aware workflows may have considering DKRZ Mistral supercomputer, ESDM current version and a hypothetical workload related to the workflow in \cref{pic/cylc1}.
In our scenario, we compare the usage of the node-local file system\footnote{%
We assume the availability and fault-tolerance of the nodes is non-uncritical for the particular workload -- typically nodes can be repaired and returned to the pool within days.} with a globally shared Lustre file system to store intermediate data.
We focus on the model execution and subsequent verification and postprocessing steps.
Firstly, checkpoints of a long model execution chain could be stored locally and restart from there.
When the subsequent jobs require the whole data to generate a product, they can be run on the same nodes.
\fig{width=0.6\columnwidth}{pic/data}{Lustre performance for 100, 200, and 500 nodes}
\cref{pic/data} shows the read/write performance when using ESDM to store a time series of 10 steps of a variable with 200k $\times$ 200k dimension (about 1\,km global resolution of the model, equivalent to 3\,TB of data in total).
Note that, while we only consider the volume data for one variable with one level and ten timesteps, this value could be multiplied by a sensible number of levels and timesteps of the model.
Data is stored on either Lustre02 or both Lustre file systems -- ESDM splits data of a single variable internally and distributes them across the file systems.
While the mapping is not yet optimal, the figure shows that the write performance benefits from this approach.
In our observations, performance will not improve beyond 500 nodes, which might be due to the fact that each Lustre file system has 128 Object Storage Targets (256 in total).
Another improvement can be achieved by using local storage.
Each local SSD of Mistral is a Micron M600 MTFD\footnote{https://www.anandtech.com/show/8528/micron-m600-128gb-256gb-1tb-ssd-review-nda-placeholder} (256\,GB), which has a nominal sequential read/write performance slightly above 500\,MB/s.
Hence, with 500 nodes, we could achieve 250\,GB/s, which surpasses the Lustre performance observed in \cref{pic/data}.
Even more, our 3\,TB of data would be about 6\,GB per node, which could be cached in memory and overlap with the computation phases.
An additional benefit of using local storage is that the interference with I/O activities of other jobs would be minimised.
Actually, for all reasonable sizes of the experimental data with 500+ nodes, the observed performance of node-local storage would be higher, and thus, improving the workflow execution time.
Since DKRZ has more than 3,000 nodes, using the local SSD would sum up to 1.5\,TB/s speeding up the IO phase by 7x.
For model runs with 1\,km resolution, such configurations would be reasonable.
It might also be suitable to couple the model with a parallel analysis process directly using an in-memory file system such as \texttt{tmpfs}.
In this case, the performance per node can be assumed to be 3\,GiB/s, making it a viable option for smaller runs.
Users can always use the node-local storage manually and create specific run-scripts to reproduce similar behaviours utilising the local storage.
However, this would be tedious and error-prone.
The purpose of our proposal is to establish an abstraction layer to allow for semi-automatic decision making and reduce, or even remove manual intervention.
\section*{Conclusions}
\label{sec:conclusions}
In the domain of climate and weather, organising data placement on storage tiers is performed by the users or via policies, often leading to suboptimal decisions.
Additionally, manual optimisation and hard-coding of storage locations are non-portable and an error-prone task.
We believe users must be able to express their workflows abstractly.
By increasing the abstraction level for scientists, not only tedious manual optimisation could be automatised, but also strategies for data placement and data reduction can be harnessed.
With knowledge about the data pattern, the runtime system could generate optimised execution plans and monitor their execution.
In this work, we describe the overall vision and a specific design for the software stack in the domain of climate and weather that we work on in the ESiWACE project.
The proposed changes increase the opportunity for smarter scheduling of storage in heterogeneous storage environments by considering the characteristics of data and system architecture in the workflow.
\section*{Acknowledgements}
\small
This project is funded by the European Union's Horizon 2020 research and innovation programme under grant agreement No. 823988.
We thank our collaborators Bryan Lawrence, Glenn Greed, David Matthews, and Hua Huang for their input to this paper, and the NGI initiative for contributions to the vision.
\openaccess
\bibliography{paper}
\end{document}