From 60b78d06cf296cc4af6fb3240636128175415bf5 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Wed, 21 Oct 2020 12:49:59 +0200 Subject: [PATCH 01/30] Move slurm job template to separate SLURM directory --- .../common/{slurm.job.template => SLURM/slurm_ESA.job.template} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename toolflow/vivado/common/{slurm.job.template => SLURM/slurm_ESA.job.template} (96%) diff --git a/toolflow/vivado/common/slurm.job.template b/toolflow/vivado/common/SLURM/slurm_ESA.job.template similarity index 96% rename from toolflow/vivado/common/slurm.job.template rename to toolflow/vivado/common/SLURM/slurm_ESA.job.template index 8fd31317..c9b25b97 100644 --- a/toolflow/vivado/common/slurm.job.template +++ b/toolflow/vivado/common/SLURM/slurm_ESA.job.template @@ -26,7 +26,7 @@ #SBATCH -t @@TIMELIMIT@@ #SBATCH --comment="@@COMMENT@@" -source @@TAPASCO_HOME@@/setup.sh +source @@TAPASCO_HOME@@/tapasco-setup.sh # user commands begin here echo "SLURM job #$SLURM_JOB_ID started at $(date)" From 68f49fb18519ea9749ac26b4fba7eddc95bf9b79 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Wed, 21 Oct 2020 13:10:23 +0200 Subject: [PATCH 02/30] Add parser for new JSON file type, that describes a remote SLURM node --- .../tapasco/base/SlurmRemoteConfig.scala | 42 +++++++++++++++++++ .../scala/tapasco/base/json/package.scala | 12 ++++++ toolflow/vivado/common/SLURM/ESA.json | 9 ++++ 3 files changed, 63 insertions(+) create mode 100644 toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala create mode 100644 toolflow/vivado/common/SLURM/ESA.json diff --git a/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala b/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala new file mode 100644 index 00000000..68de728f --- /dev/null +++ b/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2014-2020 Embedded Systems and Applications, TU Darmstadt. + * + * This file is part of TaPaSCo + * (see https://github.com/esa-tu-darmstadt/tapasco). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + * + */ +/** + * @file SlurmRemoteConfig.scala + * @brief Model: TPC remote Slurm Configuration. + * @authors M. Hartmann, TU Darmstadt + **/ + +package tapasco.base + +import java.nio.file.Path +import tapasco.base.builder.Builds + +case class SlurmRemoteConfig( + name: String, + host: String, + workstation: String, + workdir: Path, + installdir: Path, + jobFile: String + ) + +object SlurmRemoteConfig extends Builds[SlurmRemoteConfig] diff --git a/toolflow/scala/src/main/scala/tapasco/base/json/package.scala b/toolflow/scala/src/main/scala/tapasco/base/json/package.scala index 2d5cf570..468553d9 100644 --- a/toolflow/scala/src/main/scala/tapasco/base/json/package.scala +++ b/toolflow/scala/src/main/scala/tapasco/base/json/package.scala @@ -437,6 +437,18 @@ package object json { } /* Configuration @} */ + + /* @{ SlurmRemoteConfig */ + implicit val slurmRemoteConfigReads: Reads[SlurmRemoteConfig] = ( + (JsPath \ "Name").read[String](minimumLength(length = 1)) ~ + (JsPath \ "SlurmHost").read[String](minimumLength(length = 1)) ~ + (JsPath \ "WorkstationHost").read[String](minimumLength(length = 1)) ~ + (JsPath \ "Workdir").read[Path] ~ + (JsPath \ "TapascoInstallDir").read[Path] ~ + (JsPath \ "JobFile").read[String](minimumLength(length = 1)) + ) (SlurmRemoteConfig.apply _) + /* SlurmRemoteConfig @} */ + } // vim: foldmarker=@{,@} foldmethod=marker foldlevel=0 diff --git a/toolflow/vivado/common/SLURM/ESA.json b/toolflow/vivado/common/SLURM/ESA.json new file mode 100644 index 00000000..aac21b38 --- /dev/null +++ b/toolflow/vivado/common/SLURM/ESA.json @@ -0,0 +1,9 @@ +{ + "Name" : "ESA Cluster", + "SlurmHost" : "slurm", + "WorkstationHost" : "balin", + "Workdir" : "/scratch/tapasco_workdir", + "TapascoInstallDir" : "/scratch/tapasco", + "JobFile" : "slurm_ESA.job.template" +} + From f5ea82d9bad9c8998c0545665ab2098a9a4ec0eb Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Wed, 21 Oct 2020 16:09:10 +0200 Subject: [PATCH 03/30] Change CLI such that different SLURM node templates can be selected --- .../main/scala/tapasco/base/Configuration.scala | 4 ++-- .../scala/tapasco/base/ConfigurationImpl.scala | 14 ++++++++------ .../src/main/scala/tapasco/base/json/package.scala | 4 ++-- .../main/scala/tapasco/parser/GlobalOptions.scala | 6 +++--- .../src/main/scala/tapasco/parser/Usage.scala | 3 ++- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/base/Configuration.scala b/toolflow/scala/src/main/scala/tapasco/base/Configuration.scala index e05cb57b..11fe8214 100644 --- a/toolflow/scala/src/main/scala/tapasco/base/Configuration.scala +++ b/toolflow/scala/src/main/scala/tapasco/base/Configuration.scala @@ -65,9 +65,9 @@ trait Configuration { def logFile(p: Option[Path]): Configuration - def slurm: Boolean + def slurm: Option[String] - def slurm(enabled: Boolean): Configuration + def slurm(template: Option[String]): Configuration def parallel: Boolean diff --git a/toolflow/scala/src/main/scala/tapasco/base/ConfigurationImpl.scala b/toolflow/scala/src/main/scala/tapasco/base/ConfigurationImpl.scala index 8d91b2b1..2d1da73b 100644 --- a/toolflow/scala/src/main/scala/tapasco/base/ConfigurationImpl.scala +++ b/toolflow/scala/src/main/scala/tapasco/base/ConfigurationImpl.scala @@ -46,7 +46,7 @@ private case class ConfigurationImpl( private val _coreDir: Path = BasePathManager.DEFAULT_DIR_CORES, private val _compositionDir: Path = BasePathManager.DEFAULT_DIR_COMPOSITIONS, private val _logFile: Option[Path] = None, - slurm: Boolean = false, + slurm: Option[String] = None, parallel: Boolean = false, maxThreads: Option[Int] = None, maxTasks: Option[Int] = None, @@ -81,7 +81,7 @@ private case class ConfigurationImpl( def logFile(op: Option[Path]): Configuration = this.copy(_logFile = op) - def slurm(enabled: Boolean): Configuration = this.copy(slurm = enabled) + def slurm(template: Option[String]): Configuration = this.copy(slurm = template) def parallel(enabled: Boolean): Configuration = this.copy(parallel = enabled) @@ -97,8 +97,10 @@ private case class ConfigurationImpl( def jobs(js: Seq[Job]): Configuration = this.copy(jobs = js) - // these directories must exist - for ((d, n) <- Seq((archDir, "architectures"), - (platformDir, "platforms"))) - require(mustExist(d), "%s directory %s does not exist".format(n, d.toString)) + // these directories must exist, unless we execute on remote SLURM node + if (this.slurm.getOrElse(true).equals("local")) { + for ((d, n) <- Seq((archDir, "architectures"), + (platformDir, "platforms"))) + require(mustExist(d), "%s directory %s does not exist".format(n, d.toString)) + } } diff --git a/toolflow/scala/src/main/scala/tapasco/base/json/package.scala b/toolflow/scala/src/main/scala/tapasco/base/json/package.scala index 468553d9..bb571e81 100644 --- a/toolflow/scala/src/main/scala/tapasco/base/json/package.scala +++ b/toolflow/scala/src/main/scala/tapasco/base/json/package.scala @@ -402,7 +402,7 @@ package object json { (JsPath \ "CoreDir").readNullable[Path].map(_ getOrElse BasePathManager.DEFAULT_DIR_CORES) ~ (JsPath \ "CompositionDir").readNullable[Path].map(_ getOrElse BasePathManager.DEFAULT_DIR_COMPOSITIONS) ~ (JsPath \ "LogFile").readNullable[Path] ~ - (JsPath \ "Slurm").readNullable[Boolean].map(_ getOrElse false) ~ + (JsPath \ "Slurm").readNullable[String] ~ (JsPath \ "Parallel").readNullable[Boolean].map(_ getOrElse false) ~ (JsPath \ "MaxThreads").readNullable[Int] ~ (JsPath \ "MaxTasks").readNullable[Int] ~ @@ -419,7 +419,7 @@ package object json { (JsPath \ "CoreDir").write[Path] ~ (JsPath \ "CompositionDir").write[Path] ~ (JsPath \ "LogFile").writeNullable[Path] ~ - (JsPath \ "Slurm").write[Boolean] ~ + (JsPath \ "Slurm").writeNullable[String] ~ (JsPath \ "Parallel").write[Boolean] ~ (JsPath \ "MaxThreads").writeNullable[Int] ~ (JsPath \ "HlsTimeOut").writeNullable[Int] ~ diff --git a/toolflow/scala/src/main/scala/tapasco/parser/GlobalOptions.scala b/toolflow/scala/src/main/scala/tapasco/parser/GlobalOptions.scala index c31fa1a7..7572d503 100644 --- a/toolflow/scala/src/main/scala/tapasco/parser/GlobalOptions.scala +++ b/toolflow/scala/src/main/scala/tapasco/parser/GlobalOptions.scala @@ -96,8 +96,8 @@ private object GlobalOptions { def inputFiles: Parser[(String, Path)] = jobsFile | configFile | logFile - def slurm: Parser[(String, Boolean)] = - longOption("slurm", "Slurm").map((_, true)) ~ ws + def slurm: Parser[(String, String)] = + longOption("slurm", "Slurm") ~ ws ~/ string.opaque("slurm template name") ~ ws def parallel: Parser[(String, Boolean)] = longOption("parallel", "Parallel").map((_, true)) ~ ws @@ -131,7 +131,7 @@ private object GlobalOptions { case ("Core", p: Path) => mkConfig(as, Some(c getOrElse Configuration() coreDir p)) case ("Kernel", p: Path) => mkConfig(as, Some(c getOrElse Configuration() kernelDir p)) case ("Platform", p: Path) => mkConfig(as, Some(c getOrElse Configuration() platformDir p)) - case ("Slurm", e: Boolean) => mkConfig(as, Some(c getOrElse Configuration() slurm e)) + case ("Slurm", t: String) => mkConfig(as, Some(c getOrElse Configuration() slurm Some(t))) case ("Parallel", e: Boolean) => mkConfig(as, Some(c getOrElse Configuration() parallel e)) case ("JobsFile", p: Path) => mkConfig(as, Some(c getOrElse Configuration() jobs readJobsFile(p))) case ("LogFile", p: Path) => mkConfig(as, Some(c getOrElse Configuration() logFile Some(p))) diff --git a/toolflow/scala/src/main/scala/tapasco/parser/Usage.scala b/toolflow/scala/src/main/scala/tapasco/parser/Usage.scala index 3cd54c2d..390f9624 100644 --- a/toolflow/scala/src/main/scala/tapasco/parser/Usage.scala +++ b/toolflow/scala/src/main/scala/tapasco/parser/Usage.scala @@ -88,7 +88,8 @@ configuration via `tapasco -n config.json`. Arg("--logFile FILE", "Path to output log file") & Arg("--configFile FILE", "Path to Json file with Configuration") & Arg("--jobsFile FILE", "Path to Json file with Jobs array") & - Arg("--slurm", "Activate SLURM cluster execution (requires sbatch)") & + Arg("--slurm TEMPLATE", "Activate SLURM cluster execution." ~ + "TEMPLATE describes a remote SLURM node, use 'local' for local execution (requires sbatch).") & Arg("--parallel", "Execute all jobs in parallel (careful!)") & Arg("--maxThreads NUM", "Limit internal parallelism of tasks (e.g., Vivado)" ~ "to the given number of threads.") & From 0ba49cbd31a64cb0110d7a9d3269a25cf62abb26 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Wed, 28 Oct 2020 13:04:48 +0100 Subject: [PATCH 04/30] Add additional fields to Slurm.Job case class Job scheduling logic shall be moved from {Compose,HLS}-Task into the Slurm object. For this, some additional information is required. --- .../src/main/scala/tapasco/slurm/Slurm.scala | 19 +++++++--- .../main/scala/tapasco/task/ComposeTask.scala | 33 +++++++--------- .../tapasco/task/HighLevelSynthesisTask.scala | 38 ++++++++----------- 3 files changed, 44 insertions(+), 46 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index afd0049d..2eefaf29 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -46,11 +46,14 @@ final object Slurm extends Publisher { /** Name of the job. */ name: String, - /** File name of the stdout logfile. */ - slurmLog: String, + /** File name of the tapasco logfile. */ + log: Path, - /** File name of the stderr logfile. */ - errorLog: String, + /** File name of the stdout slurm logfile. */ + slurmLog: Path, + + /** File name of the stderr slurm logfile. */ + errorLog: Path, /** Consumer to schedule. */ consumer: ResourceConsumer, @@ -62,7 +65,13 @@ final object Slurm extends Publisher { commands: Seq[String], /** Optional comment. */ - comment: Option[String] = None + comment: Option[String] = None, + + /** The job to execute */ + job: tapasco.jobs.Job, + + /** Filename of the tapasco configuration file */ + cfg_file: Path ) /** Exception class for negative SLURM responses. */ diff --git a/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala b/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala index ee5703af..94450972 100644 --- a/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala +++ b/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala @@ -56,7 +56,6 @@ class ComposeTask(composition: Composition, private[this] var _composerResult: Option[Composer.Result] = None private[this] val _outDir = cfg.outputDir(composition, target, designFrequency, features getOrElse Seq()) private[this] val _logFile = logFile getOrElse _outDir.resolve("tapasco.log").toString - private[this] val _errorLogFile = Paths.get(_logFile).resolveSibling("slurm-compose.errors.log") import LogFormatter._ @@ -108,44 +107,40 @@ class ComposeTask(composition: Composition, } private def slurmExecution: Boolean = { + val l = Paths.get(_logFile).toAbsolutePath().normalize() val cfgFile = l.resolveSibling("slurm-compose.cfg") // Configuration Json - val jobFile = l.resolveSibling("slurm-compose.slurm") // SLURM job script val slgFile = l.resolveSibling("slurm-compose.log") // SLURM job stdout log + val errFile = Paths.get(_logFile).resolveSibling("slurm-compose.errors.log") + val cmpsJob = ComposeJob( composition, designFrequency, implementation.toString, Some(Seq(target.ad.name)), Some(Seq(target.pd.name)), features, debugMode ) + // define SLURM job val job = Slurm.Job( name = l.getParent.getParent.getFileName.resolve(l.getParent.getFileName).toString, - slurmLog = slgFile.toString, - errorLog = _errorLogFile.toString, + log = l, + slurmLog = slgFile, + errorLog = errFile, consumer = this, maxHours = ComposeTask.MAX_COMPOSE_HOURS, commands = Seq("tapasco --configFile %s".format(cfgFile.toString)), - comment = Some(_outDir.toString) + comment = Some(_outDir.toString), + job = cmpsJob, + cfg_file = cfgFile ) - // generate non-SLURM config with single job - val newCfg = cfg - .logFile(Some(l)) - .slurm(false) - .jobs(Seq(cmpsJob)) - - _logger.info("launching Compose job on SLURM ({})", cfgFile) - - catchAllDefault(false, "error during SLURM job execution (%s): ".format(jobFile)) { - Files.createDirectories(jobFile.getParent()) // create base directory - Slurm.writeJobScript(job, jobFile) // write job script - Configuration.to(newCfg, cfgFile) // write Configuration to file - Slurm(jobFile) foreach (Slurm.waitFor(_)) // execute and wait + + Slurm(job)(cfg) foreach (Slurm.waitFor(_)) // execute and wait + _composerResult = if (debugMode.isEmpty) { ComposeTask.parseResultInLog(l.toString) } else { ComposeTask.makeDebugResult(debugMode.get) } (_composerResult map (_.result) getOrElse false) == ComposeResult.Success - } + } private def elementdesc = "%s [F=%2.2f]".format(logformat(composition), designFrequency.toDouble) diff --git a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala index 99edeb23..02b54099 100644 --- a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala +++ b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala @@ -40,7 +40,6 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio private[this] val slurm = Slurm.enabled private[this] val r = HighLevelSynthesizer(hls) private[this] val l = r.logFile(k, t)(cfg).resolveSibling("hls.log") - private[this] val e = l.resolveSibling("hls-slurm.errors.log") def synthesizer: HighLevelSynthesizer = r @@ -56,35 +55,30 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio LogFileTracker.stopLogFileAppender(appender) result map (_.toBoolean) getOrElse false } else { - val cfgFile = l.resolveSibling("slurm-hls.cfg") // Configuration Json - val jobFile = l.resolveSibling("hls.slurm") // SLURM job script - val slurmLog = l.resolveSibling("slurm-hls.log") // raw log file (stdout w/colors) + + val cfgFile = l.resolveSibling("slurm-hls.cfg") // Configuration Json + val slurmLog = l.resolveSibling("slurm-hls.log") // raw log file (stdout w/colors) + val e = l.resolveSibling("hls-slurm.errors.log") + val hlsJob = HighLevelSynthesisJob(hls.toString, Some(Seq(t.ad.name)), Some(Seq(t.pd.name)), Some(Seq(k.name))) + // define SLURM job val job = Slurm.Job( name = "hls-%s-%s-%s".format(t.ad.name, t.pd.name, k.name), - slurmLog = slurmLog.toString, - errorLog = e.toString, + log = l, + slurmLog = slurmLog, + errorLog = e, consumer = this, maxHours = HighLevelSynthesisTask.MAX_SYNTH_HOURS, - commands = Seq("tapasco --configFile %s".format(cfgFile.toString, k.name.toString)) + commands = Seq("tapasco --configFile %s".format(cfgFile.toString, k.name.toString)), + job = hlsJob, + cfg_file = cfgFile ) - // generate non-SLURM config with single job - val newCfg = cfg - .logFile(Some(l)) - .jobs(Seq(hlsJob)) - .slurm(false) - - logger.info("starting HLS job on SLURM ({})", cfgFile) - catchAllDefault(false, "error during SLURM job execution (%s): ".format(jobFile)) { - Files.createDirectories(l.getParent()) // create base directory - Slurm.writeJobScript(job, jobFile) // write job script - Configuration.to(newCfg, cfgFile) // write Configuration to file - val r = (Slurm(jobFile) map (Slurm.waitFor(_))).nonEmpty // execute sbatch to enqueue job, then wait for it - FileAssetManager.reset() - r - } + // execute sbatch to enqueue job, then wait for it + val r = (Slurm(job)(cfg) map (Slurm.waitFor(_))).nonEmpty + FileAssetManager.reset() + r } def logFiles: Set[String] = Set(l.toString) From 4b01364e4d83a30674d81f09433cbecf1a2b1ca9 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Wed, 28 Oct 2020 13:17:56 +0100 Subject: [PATCH 05/30] Add option for remote execution to Slurm object --- .../src/main/scala/tapasco/Tapasco.scala | 7 +++- .../src/main/scala/tapasco/slurm/Slurm.scala | 41 +++++++++++++++---- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/Tapasco.scala b/toolflow/scala/src/main/scala/tapasco/Tapasco.scala index 29c57ce5..33c548e7 100644 --- a/toolflow/scala/src/main/scala/tapasco/Tapasco.scala +++ b/toolflow/scala/src/main/scala/tapasco/Tapasco.scala @@ -87,7 +87,12 @@ object Tapasco { logger.trace("configuring FileAssetManager...") FileAssetManager(cfg) logger.trace("SLURM: {}", cfg.slurm) - if (cfg.slurm) Slurm.enabled = cfg.slurm + if (cfg.slurm.isDefined) { + Slurm.set_cfg(cfg.slurm.get match { + case "local" => Slurm.EnabledLocal() + case t => Slurm.EnabledRemote(t) + }) + } FileAssetManager.start() logger.trace("parallel: {}", cfg.parallel) cfg.logFile map { logfile: Path => setupLogFileAppender(logfile.toString) } diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 2eefaf29..561f56cf 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -110,17 +110,32 @@ final object Slurm extends Publisher { } /** Enables or disables SLURM, returns new value for enabled. */ - def enabled_=(en: Boolean): Boolean = if (en && available) { - Slurm.synchronized { - _enabled = en + def set_cfg(cfg: SlurmConfig): Boolean = cfg match { + case Disabled() => false + case EnabledLocal() => if (available) { + Slurm.synchronized { + _enabled = true + } + publish(Events.SlurmModeEnabled(true)) + true + } else { + logger.warn("SLURM local mode was selected, but could be not activated (sbatch not found)") + false } - publish(Events.SlurmModeEnabled(en)) - enabled - } else { - if (en) { - logger.warn("SLURM mode was selected, but could be not activated (sbatch not found)") + case EnabledRemote(template_name) => { + val template_path = SLURM_TEMPLATE_DIR.resolve(template_name + ".json") + if (template_path.toFile.exists()) { + Slurm.synchronized { + _enabled = true + slurm_remote_cfg = SlurmRemoteConfig.from(template_path).toOption + } + publish(Events.SlurmModeEnabled(true)) + true + } else { + logger.warn("SLURM mode was selected, but the specified template was not found") + false + } } - false } /** Helper function: Sets correct file permissions on job scripts. */ @@ -230,4 +245,12 @@ final object Slurm extends Publisher { /** Use SLURM? */ private var _enabled = false + + /** Host for executing SLURM */ + private var slurm_remote_cfg: Option[SlurmRemoteConfig] = None + + sealed trait SlurmConfig + final case class EnabledLocal() extends SlurmConfig + final case class EnabledRemote(template_name: String) extends SlurmConfig + final case class Disabled() extends SlurmConfig } From b35dc8186880aa0409e0d0e1efba1308c36da174 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Wed, 28 Oct 2020 14:07:50 +0100 Subject: [PATCH 06/30] Add a wrapper that handles local/remote execution of shell commands --- .../src/main/scala/tapasco/slurm/Slurm.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 561f56cf..0a82dbac 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -183,7 +183,7 @@ final object Slurm extends Publisher { catchAllDefault[Option[Int]](None, "Slurm scheduling failed: ") { val cmd = "sbatch %s".format(script.toAbsolutePath().normalize().toString) logger.debug("running slurm batch job: '%s'".format(cmd)) - val res = cmd.!! + val res = exec_cmd(cmd) val id = slurmSubmissionAck.findFirstMatchIn(res) map (_ group (1) toInt) if (id.isEmpty) { if (retries > 0) { @@ -201,7 +201,7 @@ final object Slurm extends Publisher { /** Check via `squeue` if the SLURM job is still running. */ def isRunning(id: Int): Boolean = catchAllDefault[Boolean](true, "Slurm `squeue` failed: ") { - val squeue = "squeue -h".!! + val squeue = exec_cmd("squeue -h") logger.trace("squeue output: {}", squeue) !"%d".format(id).r.findFirstIn(squeue).isEmpty } @@ -220,7 +220,7 @@ final object Slurm extends Publisher { Seq() } else { catchAllDefault(Seq[Int](), "could not get squeue output: ") { - val lines = "squeue -u %s".format(sys.env("USER")).!! + val lines = exec_cmd("squeue -u %s".format(sys.env("USER"))) val ids = ("""\n\s*(\d+)""".r.unanchored.findAllMatchIn(lines) map (m => m.group(1).toInt)).toSeq logger.debug("running SLURM jobs: {}", ids mkString " ") ids @@ -229,7 +229,7 @@ final object Slurm extends Publisher { /** Cancels the SLURM job with the given ID. */ def cancel(id: Int): Unit = catchAllDefault((), "canceling SLURM job %d failed: ".format(id)) { - "scancel %d".format(id).!! + exec_cmd("scancel %d".format(id)) } /** Cancels all currently running SLURM jobs. */ @@ -239,10 +239,21 @@ final object Slurm extends Publisher { val cmd = "scancel %s" format (ids mkString " ") logger.info("canceling SLURM jobs: {}", ids mkString ", ") logger.debug("command: '{}'", cmd) - cmd.! + exec_cmd(cmd, get_ret_code = true).toInt } } + /** Execute a SLURM command, either locally or on a remote host */ + def exec_cmd(c: String, get_ret_code: Boolean = false, hostname: Option[String] = None): String = { + val cmd = if (slurm_remote_cfg.isEmpty) c else { + val host = hostname.getOrElse(slurm_remote_cfg.get.host) + "ssh %s %s".format(host, c) + } + + logger.info("Executing command: %s".format(cmd)) + if (get_ret_code) cmd.!.toString else cmd.!! + } + /** Use SLURM? */ private var _enabled = false From 96839255e27fc8a3c9145b19dabc1d126d83843b Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Wed, 28 Oct 2020 14:54:42 +0100 Subject: [PATCH 07/30] Add Pre/Postamble that runs before/after SLURM Job Preamble copies all files that are required for the current job to the SLURM node, postamble copies all generated artefacts back from node. --- .../src/main/scala/tapasco/slurm/Slurm.scala | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 0a82dbac..3b53f19f 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -173,6 +173,85 @@ final object Slurm extends Publisher { true } + /** + * Preamble is run before the SLURM job is started. + * Copy required files from host to SLURM workstation. + * @param slurm_job Job to execute. + * @param files List of files that need to be copied to SLURM node + * @param update_paths Function that converts local workdir file paths to valid paths on a remote SLURM node. + **/ + def slurm_preamble(slurm_job: Job, files: Seq[Path], update_paths: Path => Path)(implicit cfg: Configuration): Unit = { + val local_files: Seq[Path] = slurm_job.job match { + case ComposeJob(c, _, _, a, p, _, _, _, _, _) => { + val tgt = Target.fromString(a.get.head, p.get.head).get + val cores = c.composition.map(ce => FileAssetManager.entities.core(ce.kernel, tgt)) + + // TODO: In case there are no local ipcores, they are synth'ed prior to compose job, This is done LOCALLY + files ++ cores.map(_.get.zipPath) ++ cores.map(_.get.descPath) + } + case HighLevelSynthesisJob(_, _, _, k, _) => { + val kernels = FileAssetManager.entities.kernels.filter( kernel => k.get.contains(kernel.name) ).toSeq + files ++ kernels.map(_.descPath.getParent) + } + case _ => files + } + + val remote_files = local_files map update_paths + file_transfer(local_files.zip(remote_files).toMap, tx = true) + } + + /** + * Postamble is run after the SLURM job is finished. + * Copy generated artefacts back from the SLURM node. + * @param slurm_job Job to execute. + * @param files List of (local) filenames that need to be copied from SLURM node to local machine + * @param update_paths Function that converts local workdir file paths to valid paths on a remote SLURM node. + **/ + def slurm_postamble(slurm_job: Job, files: Seq[Path], update_paths: Path => Path): Unit = { + val loc_files = slurm_job.job match { + case ComposeJob(c, f, _, a, p, _, _, _, _, _) => { + val tgt = Target.fromString(a.get.head, p.get.head).get + val bit = slurm_job.log.resolveSibling( Composer.mkProjectName(c, tgt, f) + ".bit" ) + val bin = slurm_job.log.resolveSibling( Composer.mkProjectName(c, tgt, f) + ".bit.bin" ) + + // TODO: Is this sufficient, or do we also need the timing/utilization report (or simply pull whole composition folder) ? + files ++ Seq(bit, bin) + } + case HighLevelSynthesisJob(_, a,p, kernels, _) => { + val tgt = Target.fromString(a.get.head, p.get.head).get + val cores = kernels.get.map(k => FileAssetManager.entities.core(k, tgt)) + files ++ cores.map(_.get.zipPath) ++ cores.map(_.get.descPath) + } + case _ => files + } + + val remote_files = loc_files map update_paths + file_transfer(remote_files.zip(loc_files).toMap, tx=false) + } + + /** + * Copy a set of files either from a host to a remote SLURM node or vice versa, depending on the @param tx + * @param tfer A map from SRC to DST file paths + * @param tx indicates the direction of transfer. If value is true (false), the direction is push (pull). + **/ + def file_transfer(tfer: Map[Path, Path], tx: Boolean): Boolean = { + for ((from, to) <- tfer) { + val target_host = slurm_remote_cfg.get.workstation; + logger.info("Copying %s to %s on %s".format(from, to, target_host)) + + // parent directory may not exist + exec_cmd("mkdir -p %s".format(to.getParent), hostname = Some(target_host)) + + val cpy_cmd = if (tx) + "scp -r %s %s:%s".format(from, target_host, to) + else + "scp -r %s:%s %s".format(target_host, from, to) + logger.info("Copy Command: " + cpy_cmd) + if (cpy_cmd.! != 0) throw new Exception("Could not copy file %s to %s!".format(from, to)) + } + true + } + /** * Schedules a job on SLURM. * From 3b390c808250768f99431c5019570dd0abe654a5 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Wed, 28 Oct 2020 15:46:22 +0100 Subject: [PATCH 08/30] Make writeJobScript() work with remote paths --- .../src/main/scala/tapasco/slurm/Slurm.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 3b53f19f..efe3a6f8 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -146,26 +146,27 @@ final object Slurm extends Publisher { * * @param job Job to execute. * @param file File to write script to. + * @param upd_wd Function that converts local workdir file paths to valid paths on a remote SLURM node. * @return True, iff successful. **/ - def writeJobScript(job: Job, file: Path): Boolean = (catchDefault[Boolean](false, Seq(classOf[java.io.IOException]), - prefix = "could not write %s: ".format(file.toString)) _) { + def writeJobScript(job: Job, file: Path, upd_wd: Path => Path): Boolean = + (catchDefault[Boolean](false, Seq(classOf[java.io.IOException]), prefix = "could not write %s: ".format(file.toString)) _) { // fill in template needles val jobScript = new Template jobScript("JOB_NAME") = job.name - jobScript("SLURM_LOG") = job.slurmLog - jobScript("ERROR_LOG") = job.errorLog + jobScript("SLURM_LOG") = upd_wd(job.slurmLog).toString + jobScript("ERROR_LOG") = upd_wd(job.errorLog).toString jobScript("MEM_PER_CPU") = (job.consumer.memory / 1024).toString jobScript("CPUS") = (job.consumer.cpus).toString jobScript("TIMELIMIT") = "%02d:00:00".format(job.maxHours) - jobScript("TAPASCO_HOME") = FileAssetManager.TAPASCO_HOME.toString - jobScript("COMMANDS") = job.commands mkString "\n" + jobScript("TAPASCO_HOME") = upd_wd(FileAssetManager.TAPASCO_WORK_DIR).toString + jobScript("COMMANDS") = "tapasco --configFile %s".format(upd_wd(job.cfg_file).toString) jobScript("COMMENT") = job.comment getOrElse "" // create parent directory Files.createDirectories(file.getParent()) // write file val fw = new java.io.FileWriter(file.toString) - fw.append(jobScript.interpolateFile(Slurm.slurmTemplate.toString)) + fw.append(jobScript.interpolateFile(SLURM_TEMPLATE_DIR.resolve(slurm_remote_cfg.get.jobFile).toString)) fw.flush() fw.close() // set executable permissions From b029b7715a9ea42afde241dcb0e9c342e1dc2da9 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Wed, 28 Oct 2020 16:00:14 +0100 Subject: [PATCH 09/30] Adapt Slurm.apply() to remote execution --- .../src/main/scala/tapasco/slurm/Slurm.scala | 83 +++++++++++++++---- 1 file changed, 65 insertions(+), 18 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index efe3a6f8..e74a252f 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -24,13 +24,18 @@ package tapasco.slurm import java.nio.file._ import java.nio.file.attribute.PosixFilePermission._ +import tapasco.Common import tapasco.Logging._ +import tapasco.activity.composers.Composer +import tapasco.base.{Configuration, SlurmRemoteConfig, Target} import tapasco.filemgmt._ import tapasco.task.ResourceConsumer import tapasco.util.{Publisher, Template} import scala.collection.JavaConverters._ import scala.sys.process._ +import tapasco.base.json._ +import tapasco.jobs.{ComposeJob, HighLevelSynthesisJob} /** * Primitive interface to SLURM scheduler: @@ -88,7 +93,7 @@ final object Slurm extends Publisher { } /** Template file for job script. */ - final val slurmTemplate = FileAssetManager.TAPASCO_HOME.resolve("common").resolve("slurm.job.template") + final val SLURM_TEMPLATE_DIR = Common.commonDir.resolve("SLURM"); /** Default output directory for SLURM-related outputs. */ final val slurmOutput = FileAssetManager.TAPASCO_HOME.resolve("slurm") /** Regular expression: Positive ACK from `sbatch`. */ @@ -256,32 +261,74 @@ final object Slurm extends Publisher { /** * Schedules a job on SLURM. * - * @param script Job script file to schedule via `sbatch`. + * @param slurm_job Job script to schedule via `sbatch`. * @return Either a positive integer (SLURM id), or an Exception. **/ - def apply(script: Path, retries: Int = SLURM_RETRIES): Option[Int] = - catchAllDefault[Option[Int]](None, "Slurm scheduling failed: ") { - val cmd = "sbatch %s".format(script.toAbsolutePath().normalize().toString) + def apply(slurm_job: Job)(implicit cfg: Configuration): Option[Int] = { + val local_base = slurm_job.cfg_file.getParent + val jobFile = local_base.resolveSibling("slurm-job.slurm") // SLURM job script + + /** replace a prefix of a Path by a different prefix. Used to convert local file paths to paths that are valid on SLURM node */ + def prefix_subst(old_pre: Path, new_pre: Path): (Path => Path) = { + f => { + val postfix = f.toString.stripPrefix(old_pre.toString).stripPrefix("/") + new_pre.resolve(postfix) + } + } + val wd_to_rmt = if (slurm_remote_cfg.isDefined) prefix_subst(cfg.kernelDir.getParent, slurm_remote_cfg.get.workdir) else (x: Path) => x + val tpsc_to_rmt = if (slurm_remote_cfg.isDefined) prefix_subst(cfg.platformDir.getParent.getParent.getParent, slurm_remote_cfg.get.installdir) else (x: Path) => x + + /** Create non-slurm cfg, with updated paths such that they match the folder structure on SLURM node */ + val newCfg = cfg + .descPath(wd_to_rmt(cfg.descPath)) + .compositionDir(wd_to_rmt(cfg.compositionDir)) + .coreDir(wd_to_rmt(cfg.coreDir)) + .kernelDir(wd_to_rmt(cfg.kernelDir)) + .platformDir(tpsc_to_rmt(cfg.platformDir)) + .archDir(tpsc_to_rmt(cfg.archDir)) + .jobs(Seq(slurm_job.job)) + .slurm(None) + + logger.info("starting " + slurm_job.name + " job on SLURM ({})", slurm_job.cfg_file) + catchAllDefault[Option[Int]](None, "error during SLURM job execution (%s): ".format(jobFile)) { + Files.createDirectories(local_base) // create base directory + + Slurm.writeJobScript(slurm_job, jobFile, wd_to_rmt) // write job script + Configuration.to(newCfg, slurm_job.cfg_file) // write Configuration to file + + /** preamble: copy required files to SLURM node */ + if (slurm_remote_cfg.isDefined) { + val files_to_copy = Seq(jobFile, slurm_job.cfg_file) + slurm_preamble(slurm_job, files_to_copy, wd_to_rmt) + } + + val cmd = "sbatch %s".format(wd_to_rmt(jobFile.toAbsolutePath()).normalize().toString) logger.debug("running slurm batch job: '%s'".format(cmd)) - val res = exec_cmd(cmd) - val id = slurmSubmissionAck.findFirstMatchIn(res) map (_ group (1) toInt) - if (id.isEmpty) { - if (retries > 0) { - // wait for 10 secs + random up to 5 secs to avoid congestion - Thread.sleep(slurmRetryDelay + scala.util.Random.nextInt() % (slurmRetryDelay / 2)) - apply(script, retries - 1) - } else { - throw new SlurmException(script.toString, res) + + var id: Option[Int] = None + var retries = SLURM_RETRIES + while (id.isEmpty) { + val res = exec_cmd(cmd) + id = slurmSubmissionAck.findFirstMatchIn(res) map (_ group (1) toInt) + if (id.isEmpty) { + if (retries > 0) { + // wait for 10 secs + random up to 5 secs to avoid congestion + Thread.sleep(slurmRetryDelay + scala.util.Random.nextInt() % (slurmRetryDelay / 2)) + retries -= 1 + } else { + throw new SlurmException(jobFile.toString, res) + } } - } else { - logger.debug("received SLURM id: {}", id) - id } + logger.debug("received SLURM id: {}", id) + + id } + } /** Check via `squeue` if the SLURM job is still running. */ def isRunning(id: Int): Boolean = catchAllDefault[Boolean](true, "Slurm `squeue` failed: ") { - val squeue = exec_cmd("squeue -h") + val squeue = exec_cmd("squeue -h") logger.trace("squeue output: {}", squeue) !"%d".format(id).r.findFirstIn(squeue).isEmpty } From 5e43cfa02bcf73e17ac57dd3f5ad5bd1fc54a245 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Wed, 28 Oct 2020 17:50:21 +0100 Subject: [PATCH 10/30] Add postamble that pulls generated files from node --- .../scala/src/main/scala/tapasco/slurm/Slurm.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index e74a252f..4499925d 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -105,6 +105,8 @@ final object Slurm extends Publisher { final val slurmScriptPermissions = Set(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE, GROUP_READ, OTHERS_READ).asJava /** Wait interval between retries. */ final val slurmRetryDelay = 10000 // 10 secs + /** Stores a closure for every slurm job id, which is called once that job finishes. */ + var postambles: Map[Int, Int => Unit] = Map() /** Returns true if SLURM is available on host running iTPC. */ lazy val available: Boolean = "which sbatch".! == 0 @@ -322,6 +324,13 @@ final object Slurm extends Publisher { } logger.debug("received SLURM id: {}", id) + /** define postamble that shall be run once job is finished */ + if (slurm_remote_cfg.isDefined) { + postambles += (id.get -> {slurm_id => + logger.info("Running postamble for SLURM id: {}", slurm_id) + slurm_postamble(slurm_job, Seq(slurm_job.log, slurm_job.slurmLog, slurm_job.errorLog), wd_to_rmt) + }) + } id } } @@ -339,6 +348,9 @@ final object Slurm extends Publisher { logger.trace("SLURM job #%d is still running, sleeping for %d secs ...".format(id, slurmDelay / 1000)) Thread.sleep(slurmDelay) } + + // callback that pulls generated files from remote node + postambles(id)(id) } /** Returns a list of all SLURM job ids which are registered under the From c54e9881ea9109e9d8fae82e40f4ab1d8380d518 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Thu, 5 Nov 2020 13:58:29 +0100 Subject: [PATCH 11/30] Use explicit identity func instead of custom lambda --- toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 4499925d..bc20c93e 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -277,8 +277,12 @@ final object Slurm extends Publisher { new_pre.resolve(postfix) } } - val wd_to_rmt = if (slurm_remote_cfg.isDefined) prefix_subst(cfg.kernelDir.getParent, slurm_remote_cfg.get.workdir) else (x: Path) => x - val tpsc_to_rmt = if (slurm_remote_cfg.isDefined) prefix_subst(cfg.platformDir.getParent.getParent.getParent, slurm_remote_cfg.get.installdir) else (x: Path) => x + val wd_to_rmt = if (slurm_remote_cfg.isDefined) + prefix_subst(cfg.kernelDir.getParent, slurm_remote_cfg.get.workdir) + else identity[Path] _ + val tpsc_to_rmt = if (slurm_remote_cfg.isDefined) + prefix_subst(cfg.platformDir.getParent.getParent.getParent, slurm_remote_cfg.get.installdir) + else identity[Path] _ /** Create non-slurm cfg, with updated paths such that they match the folder structure on SLURM node */ val newCfg = cfg From 92415bac269ea5a18be28b6ea9cee28e441fcef2 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Thu, 5 Nov 2020 13:59:14 +0100 Subject: [PATCH 12/30] Postamble: add timing and utilization report --- toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index bc20c93e..80aac358 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -218,12 +218,10 @@ final object Slurm extends Publisher { def slurm_postamble(slurm_job: Job, files: Seq[Path], update_paths: Path => Path): Unit = { val loc_files = slurm_job.job match { case ComposeJob(c, f, _, a, p, _, _, _, _, _) => { - val tgt = Target.fromString(a.get.head, p.get.head).get - val bit = slurm_job.log.resolveSibling( Composer.mkProjectName(c, tgt, f) + ".bit" ) - val bin = slurm_job.log.resolveSibling( Composer.mkProjectName(c, tgt, f) + ".bit.bin" ) + val bit_name = Composer.mkProjectName(c, Target.fromString(a.get.head, p.get.head).get, f) + val fnames = Seq(bit_name + ".bit", bit_name + ".bit.bin", "timing.txt", "utilization.txt") - // TODO: Is this sufficient, or do we also need the timing/utilization report (or simply pull whole composition folder) ? - files ++ Seq(bit, bin) + files ++ fnames.map(f => slurm_job.log.resolveSibling(f)) } case HighLevelSynthesisJob(_, a,p, kernels, _) => { val tgt = Target.fromString(a.get.head, p.get.head).get From 3e2183ffe105160e796b6f186c607d14565a0058 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Thu, 5 Nov 2020 14:22:12 +0100 Subject: [PATCH 13/30] Fix local SLURM execution --- .../src/main/scala/tapasco/slurm/Slurm.scala | 9 +++-- .../vivado/common/SLURM/default.job.template | 34 +++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 toolflow/vivado/common/SLURM/default.job.template diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 80aac358..75a86ed0 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -173,7 +173,11 @@ final object Slurm extends Publisher { Files.createDirectories(file.getParent()) // write file val fw = new java.io.FileWriter(file.toString) - fw.append(jobScript.interpolateFile(SLURM_TEMPLATE_DIR.resolve(slurm_remote_cfg.get.jobFile).toString)) + val template_name = slurm_remote_cfg match { + case Some(c) => c.jobFile + case None => "default.job.template" + } + fw.append(jobScript.interpolateFile(SLURM_TEMPLATE_DIR.resolve(template_name).toString)) fw.flush() fw.close() // set executable permissions @@ -352,7 +356,8 @@ final object Slurm extends Publisher { } // callback that pulls generated files from remote node - postambles(id)(id) + if (slurm_remote_cfg.isDefined) + postambles(id)(id) } /** Returns a list of all SLURM job ids which are registered under the diff --git a/toolflow/vivado/common/SLURM/default.job.template b/toolflow/vivado/common/SLURM/default.job.template new file mode 100644 index 00000000..c9b25b97 --- /dev/null +++ b/toolflow/vivado/common/SLURM/default.job.template @@ -0,0 +1,34 @@ +#!/bin/bash +# Copyright (c) 2014-2020 Embedded Systems and Applications, TU Darmstadt. +# +# This file is part of TaPaSCo +# (see https://github.com/esa-tu-darmstadt/tapasco). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . +# + +#SBATCH -J "@@JOB_NAME@@" +#SBATCH -o "@@SLURM_LOG@@" +#SBATCH -e "@@ERROR_LOG@@" +#SBATCH --mem-per-cpu=@@MEM_PER_CPU@@ +#SBATCH -n @@CPUS@@ +#SBATCH -t @@TIMELIMIT@@ +#SBATCH --comment="@@COMMENT@@" + +source @@TAPASCO_HOME@@/tapasco-setup.sh + +# user commands begin here +echo "SLURM job #$SLURM_JOB_ID started at $(date)" +@@COMMANDS@@ +echo "SLURM job #$SLURM_JOB_ID finished at $(date)" From 619893e181f20bffdd8780a7f6c875ea49997c73 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Thu, 5 Nov 2020 14:41:59 +0100 Subject: [PATCH 14/30] Add optional sbatch CLI options to slurm json --- .../src/main/scala/tapasco/base/SlurmRemoteConfig.scala | 3 ++- .../scala/src/main/scala/tapasco/base/json/package.scala | 3 ++- toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala | 8 +++++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala b/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala index 68de728f..9a7bbaf4 100644 --- a/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala +++ b/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala @@ -36,7 +36,8 @@ case class SlurmRemoteConfig( workstation: String, workdir: Path, installdir: Path, - jobFile: String + jobFile: String, + SbatchOptions: String ) object SlurmRemoteConfig extends Builds[SlurmRemoteConfig] diff --git a/toolflow/scala/src/main/scala/tapasco/base/json/package.scala b/toolflow/scala/src/main/scala/tapasco/base/json/package.scala index bb571e81..2d7b1f07 100644 --- a/toolflow/scala/src/main/scala/tapasco/base/json/package.scala +++ b/toolflow/scala/src/main/scala/tapasco/base/json/package.scala @@ -445,7 +445,8 @@ package object json { (JsPath \ "WorkstationHost").read[String](minimumLength(length = 1)) ~ (JsPath \ "Workdir").read[Path] ~ (JsPath \ "TapascoInstallDir").read[Path] ~ - (JsPath \ "JobFile").read[String](minimumLength(length = 1)) + (JsPath \ "JobFile").read[String](minimumLength(length = 1)) ~ + (JsPath \ "SbatchOptions").readNullable[String].map(_.getOrElse("")) ) (SlurmRemoteConfig.apply _) /* SlurmRemoteConfig @} */ diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 75a86ed0..8888f6fb 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -310,7 +310,13 @@ final object Slurm extends Publisher { slurm_preamble(slurm_job, files_to_copy, wd_to_rmt) } - val cmd = "sbatch %s".format(wd_to_rmt(jobFile.toAbsolutePath()).normalize().toString) + val cmd = "sbatch %s %s".format( + slurm_remote_cfg match { + case Some(c) => c.SbatchOptions + case None => "" + }, + wd_to_rmt(jobFile.toAbsolutePath()).normalize().toString + ) logger.debug("running slurm batch job: '%s'".format(cmd)) var id: Option[Int] = None From 16b4464a9ce9f6d3fcc481d35a3a5ef1befb0b5c Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Thu, 5 Nov 2020 15:26:04 +0100 Subject: [PATCH 15/30] Add support for user-defined pre/postamble The absolute path to both scripts may be supplied via the key "PreambleScript" resp. "PostambleScript" in the SLURM JSON cfg file. --- .../main/scala/tapasco/base/SlurmRemoteConfig.scala | 4 +++- .../src/main/scala/tapasco/base/json/package.scala | 4 +++- .../scala/src/main/scala/tapasco/slurm/Slurm.scala | 10 ++++++++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala b/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala index 9a7bbaf4..05d5400c 100644 --- a/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala +++ b/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala @@ -37,7 +37,9 @@ case class SlurmRemoteConfig( workdir: Path, installdir: Path, jobFile: String, - SbatchOptions: String + SbatchOptions: String, + PreambleScript: Option[String], + PostambleScript: Option[String] ) object SlurmRemoteConfig extends Builds[SlurmRemoteConfig] diff --git a/toolflow/scala/src/main/scala/tapasco/base/json/package.scala b/toolflow/scala/src/main/scala/tapasco/base/json/package.scala index 2d7b1f07..3dac4fc4 100644 --- a/toolflow/scala/src/main/scala/tapasco/base/json/package.scala +++ b/toolflow/scala/src/main/scala/tapasco/base/json/package.scala @@ -446,7 +446,9 @@ package object json { (JsPath \ "Workdir").read[Path] ~ (JsPath \ "TapascoInstallDir").read[Path] ~ (JsPath \ "JobFile").read[String](minimumLength(length = 1)) ~ - (JsPath \ "SbatchOptions").readNullable[String].map(_.getOrElse("")) + (JsPath \ "SbatchOptions").readNullable[String].map(_.getOrElse("")) ~ + (JsPath \ "PreambleScript").readNullable[String] ~ + (JsPath \ "PostambleScript").readNullable[String] ) (SlurmRemoteConfig.apply _) /* SlurmRemoteConfig @} */ diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 8888f6fb..8081e8a2 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -207,9 +207,12 @@ final object Slurm extends Publisher { } case _ => files } - val remote_files = local_files map update_paths file_transfer(local_files.zip(remote_files).toMap, tx = true) + + // run preamble script, if specified + if (slurm_remote_cfg.get.PreambleScript.isDefined) + "sh %s".format(slurm_remote_cfg.get.PreambleScript.get).! } /** @@ -234,9 +237,12 @@ final object Slurm extends Publisher { } case _ => files } - val remote_files = loc_files map update_paths file_transfer(remote_files.zip(loc_files).toMap, tx=false) + + // run postamble script, if specified + if (slurm_remote_cfg.get.PostambleScript.isDefined) + "sh %s".format(slurm_remote_cfg.get.PostambleScript.get).! } /** From 773c47b5eda83a68ffe4e33b71d224f7086eba2a Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Thu, 5 Nov 2020 18:01:35 +0100 Subject: [PATCH 16/30] Remove unnecessary func parameter --- toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 8081e8a2..abc4221d 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -397,19 +397,19 @@ final object Slurm extends Publisher { val cmd = "scancel %s" format (ids mkString " ") logger.info("canceling SLURM jobs: {}", ids mkString ", ") logger.debug("command: '{}'", cmd) - exec_cmd(cmd, get_ret_code = true).toInt + exec_cmd(cmd) } } /** Execute a SLURM command, either locally or on a remote host */ - def exec_cmd(c: String, get_ret_code: Boolean = false, hostname: Option[String] = None): String = { + def exec_cmd(c: String, hostname: Option[String] = None): String = { val cmd = if (slurm_remote_cfg.isEmpty) c else { val host = hostname.getOrElse(slurm_remote_cfg.get.host) "ssh %s %s".format(host, c) } logger.info("Executing command: %s".format(cmd)) - if (get_ret_code) cmd.!.toString else cmd.!! + cmd.!! } /** Use SLURM? */ From 0d4c832e5b7c97960fd6bb13bb381630c087b5c9 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Thu, 5 Nov 2020 21:36:40 +0100 Subject: [PATCH 17/30] Fix: Copy SLURM job script to correct host --- .../src/main/scala/tapasco/slurm/Slurm.scala | 55 +++++++++---------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index abc4221d..14ca4bc7 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -189,24 +189,21 @@ final object Slurm extends Publisher { * Preamble is run before the SLURM job is started. * Copy required files from host to SLURM workstation. * @param slurm_job Job to execute. - * @param files List of files that need to be copied to SLURM node * @param update_paths Function that converts local workdir file paths to valid paths on a remote SLURM node. **/ - def slurm_preamble(slurm_job: Job, files: Seq[Path], update_paths: Path => Path)(implicit cfg: Configuration): Unit = { - val local_files: Seq[Path] = slurm_job.job match { + def slurm_preamble(slurm_job: Job, update_paths: Path => Path)(implicit cfg: Configuration): Unit = { + val local_files = Seq(slurm_job.cfg_file) ++ (slurm_job.job match { case ComposeJob(c, _, _, a, p, _, _, _, _, _) => { val tgt = Target.fromString(a.get.head, p.get.head).get val cores = c.composition.map(ce => FileAssetManager.entities.core(ce.kernel, tgt)) - - // TODO: In case there are no local ipcores, they are synth'ed prior to compose job, This is done LOCALLY - files ++ cores.map(_.get.zipPath) ++ cores.map(_.get.descPath) + cores.map(_.get.zipPath) ++ cores.map(_.get.descPath) } case HighLevelSynthesisJob(_, _, _, k, _) => { val kernels = FileAssetManager.entities.kernels.filter( kernel => k.get.contains(kernel.name) ).toSeq - files ++ kernels.map(_.descPath.getParent) + kernels.map(_.descPath.getParent) } - case _ => files - } + case _ => Seq() + }) val remote_files = local_files map update_paths file_transfer(local_files.zip(remote_files).toMap, tx = true) @@ -219,24 +216,22 @@ final object Slurm extends Publisher { * Postamble is run after the SLURM job is finished. * Copy generated artefacts back from the SLURM node. * @param slurm_job Job to execute. - * @param files List of (local) filenames that need to be copied from SLURM node to local machine * @param update_paths Function that converts local workdir file paths to valid paths on a remote SLURM node. **/ - def slurm_postamble(slurm_job: Job, files: Seq[Path], update_paths: Path => Path): Unit = { - val loc_files = slurm_job.job match { + def slurm_postamble(slurm_job: Job, update_paths: Path => Path): Unit = { + val loc_files = Seq(slurm_job.log, slurm_job.slurmLog, slurm_job.errorLog) ++ (slurm_job.job match { case ComposeJob(c, f, _, a, p, _, _, _, _, _) => { val bit_name = Composer.mkProjectName(c, Target.fromString(a.get.head, p.get.head).get, f) val fnames = Seq(bit_name + ".bit", bit_name + ".bit.bin", "timing.txt", "utilization.txt") - - files ++ fnames.map(f => slurm_job.log.resolveSibling(f)) + fnames.map(f => slurm_job.log.resolveSibling(f)) } case HighLevelSynthesisJob(_, a,p, kernels, _) => { val tgt = Target.fromString(a.get.head, p.get.head).get val cores = kernels.get.map(k => FileAssetManager.entities.core(k, tgt)) - files ++ cores.map(_.get.zipPath) ++ cores.map(_.get.descPath) + cores.map(_.get.zipPath) ++ cores.map(_.get.descPath) } - case _ => files - } + case _ => Seq() + }) val remote_files = loc_files map update_paths file_transfer(remote_files.zip(loc_files).toMap, tx=false) @@ -250,13 +245,13 @@ final object Slurm extends Publisher { * @param tfer A map from SRC to DST file paths * @param tx indicates the direction of transfer. If value is true (false), the direction is push (pull). **/ - def file_transfer(tfer: Map[Path, Path], tx: Boolean): Boolean = { + def file_transfer(tfer: Map[Path, Path], tx: Boolean, host: Option[String] = None): Boolean = { for ((from, to) <- tfer) { - val target_host = slurm_remote_cfg.get.workstation; + val target_host = host.getOrElse(slurm_remote_cfg.get.workstation) logger.info("Copying %s to %s on %s".format(from, to, target_host)) // parent directory may not exist - exec_cmd("mkdir -p %s".format(to.getParent), hostname = Some(target_host)) + if (tx) exec_cmd("mkdir -p %s".format(to.getParent), hostname = Some(target_host)) val cpy_cmd = if (tx) "scp -r %s %s:%s".format(from, target_host, to) @@ -312,17 +307,17 @@ final object Slurm extends Publisher { /** preamble: copy required files to SLURM node */ if (slurm_remote_cfg.isDefined) { - val files_to_copy = Seq(jobFile, slurm_job.cfg_file) - slurm_preamble(slurm_job, files_to_copy, wd_to_rmt) + // copy all required files to workstation + slurm_preamble(slurm_job, wd_to_rmt) + + // copy slurm job file to slurm login node + file_transfer(Map(jobFile -> Path.of("~/slurm-job.slurm")), tx = true, host=Some(slurm_remote_cfg.get.host)) } - val cmd = "sbatch %s %s".format( - slurm_remote_cfg match { - case Some(c) => c.SbatchOptions - case None => "" - }, - wd_to_rmt(jobFile.toAbsolutePath()).normalize().toString - ) + val cmd = "sbatch " ++ (slurm_remote_cfg match { + case Some(c) => "%s %s".format(c.SbatchOptions, "~/slurm-job.slurm") + case None => jobFile.toAbsolutePath().normalize().toString + }) logger.debug("running slurm batch job: '%s'".format(cmd)) var id: Option[Int] = None @@ -346,7 +341,7 @@ final object Slurm extends Publisher { if (slurm_remote_cfg.isDefined) { postambles += (id.get -> {slurm_id => logger.info("Running postamble for SLURM id: {}", slurm_id) - slurm_postamble(slurm_job, Seq(slurm_job.log, slurm_job.slurmLog, slurm_job.errorLog), wd_to_rmt) + slurm_postamble(slurm_job, wd_to_rmt) }) } id From c5911efb38f3a42f8449cc83f2230efa1c3e45fe Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Thu, 5 Nov 2020 21:46:27 +0100 Subject: [PATCH 18/30] Finalize SLURM job template for ESA cluster --- toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala | 2 ++ toolflow/vivado/common/SLURM/ESA.json | 4 ++-- toolflow/vivado/common/SLURM/slurm_ESA.job.template | 4 ++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 14ca4bc7..4c37f1b9 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -169,6 +169,8 @@ final object Slurm extends Publisher { jobScript("TAPASCO_HOME") = upd_wd(FileAssetManager.TAPASCO_WORK_DIR).toString jobScript("COMMANDS") = "tapasco --configFile %s".format(upd_wd(job.cfg_file).toString) jobScript("COMMENT") = job.comment getOrElse "" + if (slurm_remote_cfg.isDefined) + jobScript("WORKSTATION") = slurm_remote_cfg.get.workstation // create parent directory Files.createDirectories(file.getParent()) // write file diff --git a/toolflow/vivado/common/SLURM/ESA.json b/toolflow/vivado/common/SLURM/ESA.json index aac21b38..1245260b 100644 --- a/toolflow/vivado/common/SLURM/ESA.json +++ b/toolflow/vivado/common/SLURM/ESA.json @@ -2,8 +2,8 @@ "Name" : "ESA Cluster", "SlurmHost" : "slurm", "WorkstationHost" : "balin", - "Workdir" : "/scratch/tapasco_workdir", - "TapascoInstallDir" : "/scratch/tapasco", + "Workdir" : "/scratch/SLURM/tapasco_workdir", + "TapascoInstallDir" : "/scratch/SLURM/tapasco", "JobFile" : "slurm_ESA.job.template" } diff --git a/toolflow/vivado/common/SLURM/slurm_ESA.job.template b/toolflow/vivado/common/SLURM/slurm_ESA.job.template index c9b25b97..b25df658 100644 --- a/toolflow/vivado/common/SLURM/slurm_ESA.job.template +++ b/toolflow/vivado/common/SLURM/slurm_ESA.job.template @@ -26,9 +26,13 @@ #SBATCH -t @@TIMELIMIT@@ #SBATCH --comment="@@COMMENT@@" +# Setup env source @@TAPASCO_HOME@@/tapasco-setup.sh +export PATH="/opt/cad/xilinx/vivado/Vivado/2019.2/bin/:$PATH" # user commands begin here echo "SLURM job #$SLURM_JOB_ID started at $(date)" +rsync -a /net/@@WORKSTATION@@/SLURM/tapasco_workdir/ @@TAPASCO_HOME@@ @@COMMANDS@@ +rsync -a @@TAPASCO_HOME@@/ /net/@@WORKSTATION@@/SLURM/tapasco_workdir echo "SLURM job #$SLURM_JOB_ID finished at $(date)" From 6b01e254e4cd525fe8ac419399111afef3ed4524 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Mon, 16 Nov 2020 12:58:36 +0100 Subject: [PATCH 19/30] Use Paths.get() instead of Path.of() --- toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 4c37f1b9..6bccde88 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -313,7 +313,7 @@ final object Slurm extends Publisher { slurm_preamble(slurm_job, wd_to_rmt) // copy slurm job file to slurm login node - file_transfer(Map(jobFile -> Path.of("~/slurm-job.slurm")), tx = true, host=Some(slurm_remote_cfg.get.host)) + file_transfer(Map(jobFile -> Paths.get("~/slurm-job.slurm")), tx = true, host=Some(slurm_remote_cfg.get.host)) } val cmd = "sbatch " ++ (slurm_remote_cfg match { From 74436f468fe25339e8e6e6baf643101177e7f6b6 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Mon, 16 Nov 2020 21:28:35 +0100 Subject: [PATCH 20/30] Use unique slurm script names --- toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 6bccde88..d9fb1da9 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -273,7 +273,7 @@ final object Slurm extends Publisher { **/ def apply(slurm_job: Job)(implicit cfg: Configuration): Option[Int] = { val local_base = slurm_job.cfg_file.getParent - val jobFile = local_base.resolveSibling("slurm-job.slurm") // SLURM job script + val jobFile = local_base.resolveSibling("%s.slurm".format(slurm_job.name)) // SLURM job script /** replace a prefix of a Path by a different prefix. Used to convert local file paths to paths that are valid on SLURM node */ def prefix_subst(old_pre: Path, new_pre: Path): (Path => Path) = { @@ -313,11 +313,12 @@ final object Slurm extends Publisher { slurm_preamble(slurm_job, wd_to_rmt) // copy slurm job file to slurm login node - file_transfer(Map(jobFile -> Paths.get("~/slurm-job.slurm")), tx = true, host=Some(slurm_remote_cfg.get.host)) + file_transfer(Map(jobFile -> Paths.get("~/%s.slurm".format(slurm_job.name))), + tx = true, host=Some(slurm_remote_cfg.get.host)) } val cmd = "sbatch " ++ (slurm_remote_cfg match { - case Some(c) => "%s %s".format(c.SbatchOptions, "~/slurm-job.slurm") + case Some(c) => "%s ~/%s.slurm".format(c.SbatchOptions, slurm_job.name) case None => jobFile.toAbsolutePath().normalize().toString }) logger.debug("running slurm batch job: '%s'".format(cmd)) From 48c5fdebe7ee37036c7e35899e13a39ab1581c84 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Mon, 16 Nov 2020 22:13:03 +0100 Subject: [PATCH 21/30] Fix hls postamble --- toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index d9fb1da9..f8f9b6bf 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -229,8 +229,9 @@ final object Slurm extends Publisher { } case HighLevelSynthesisJob(_, a,p, kernels, _) => { val tgt = Target.fromString(a.get.head, p.get.head).get - val cores = kernels.get.map(k => FileAssetManager.entities.core(k, tgt)) - cores.map(_.get.zipPath) ++ cores.map(_.get.descPath) + val core_dir = slurm_job.log.getParent.resolveSibling("ipcore") + val core_zip = kernels.get.map(k => core_dir.resolve("%s_%s.zip".format(k, tgt.ad.name))) + core_zip ++ core_zip.map(z => z.resolveSibling("core.json")) } case _ => Seq() }) @@ -253,7 +254,8 @@ final object Slurm extends Publisher { logger.info("Copying %s to %s on %s".format(from, to, target_host)) // parent directory may not exist - if (tx) exec_cmd("mkdir -p %s".format(to.getParent), hostname = Some(target_host)) + val mkdir = "mkdir -p %s".format(to.getParent) + if (tx) exec_cmd(mkdir, hostname = Some(target_host)) else mkdir.! val cpy_cmd = if (tx) "scp -r %s %s:%s".format(from, target_host, to) From 6c92d0c0651925a53f0b22b1b9d1a463739afd56 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Tue, 17 Nov 2020 12:47:47 +0100 Subject: [PATCH 22/30] Only pull SLURM artefacts if job was successful --- .../src/main/scala/tapasco/slurm/Slurm.scala | 53 +++++++++++++------ 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index f8f9b6bf..2bdfb779 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -99,14 +99,14 @@ final object Slurm extends Publisher { /** Regular expression: Positive ACK from `sbatch`. */ final val slurmSubmissionAck = """[Ss]ubmitted batch job (\d+)""".r - /** Polling interval for `squeue`. */ + /** Polling interval for `sacct`. */ final val slurmDelay = 15000 // 15 secs /** Set of POSIX permissions for SLURM job scripts. */ final val slurmScriptPermissions = Set(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE, GROUP_READ, OTHERS_READ).asJava /** Wait interval between retries. */ final val slurmRetryDelay = 10000 // 10 secs /** Stores a closure for every slurm job id, which is called once that job finishes. */ - var postambles: Map[Int, Int => Unit] = Map() + var postambles: Map[Int, Int => Boolean => Unit] = Map() /** Returns true if SLURM is available on host running iTPC. */ lazy val available: Boolean = "which sbatch".! == 0 @@ -218,16 +218,17 @@ final object Slurm extends Publisher { * Postamble is run after the SLURM job is finished. * Copy generated artefacts back from the SLURM node. * @param slurm_job Job to execute. + * @param slurm_success Indicates if the SLURM job finished successfully. * @param update_paths Function that converts local workdir file paths to valid paths on a remote SLURM node. **/ - def slurm_postamble(slurm_job: Job, update_paths: Path => Path): Unit = { + def slurm_postamble(slurm_job: Job, slurm_success: Boolean, update_paths: Path => Path): Unit = { val loc_files = Seq(slurm_job.log, slurm_job.slurmLog, slurm_job.errorLog) ++ (slurm_job.job match { - case ComposeJob(c, f, _, a, p, _, _, _, _, _) => { + case ComposeJob(c, f, _, a, p, _, _, _, _, _) if slurm_success => { val bit_name = Composer.mkProjectName(c, Target.fromString(a.get.head, p.get.head).get, f) val fnames = Seq(bit_name + ".bit", bit_name + ".bit.bin", "timing.txt", "utilization.txt") fnames.map(f => slurm_job.log.resolveSibling(f)) } - case HighLevelSynthesisJob(_, a,p, kernels, _) => { + case HighLevelSynthesisJob(_, a,p, kernels, _) if slurm_success => { val tgt = Target.fromString(a.get.head, p.get.head).get val core_dir = slurm_job.log.getParent.resolveSibling("ipcore") val core_zip = kernels.get.map(k => core_dir.resolve("%s_%s.zip".format(k, tgt.ad.name))) @@ -344,32 +345,48 @@ final object Slurm extends Publisher { /** define postamble that shall be run once job is finished */ if (slurm_remote_cfg.isDefined) { - postambles += (id.get -> {slurm_id => + postambles += (id.get -> {slurm_id:Int => slurm_success:Boolean => logger.info("Running postamble for SLURM id: {}", slurm_id) - slurm_postamble(slurm_job, wd_to_rmt) + slurm_postamble(slurm_job, slurm_success, wd_to_rmt) }) } id } } - /** Check via `squeue` if the SLURM job is still running. */ - def isRunning(id: Int): Boolean = catchAllDefault[Boolean](true, "Slurm `squeue` failed: ") { - val squeue = exec_cmd("squeue -h") - logger.trace("squeue output: {}", squeue) - !"%d".format(id).r.findFirstIn(squeue).isEmpty + /** Check via `sacct` if the SLURM job is still running. */ + def getSlurmStatus(id: Int): SlurmStatus = catchAllDefault[SlurmStatus](Unknown(), "Slurm `sacct` failed: ") { + val sacct = exec_cmd("sacct -pn") + val pattern = """%d\|([^|]*)\|[^|]*\|[^|]*\|[^|]*\|([A-Z]*)( [^|]*)?\|[^|]*\|""".format(id).r + pattern.findFirstIn(sacct) match { + case None => + logger.warn("Job ID %d not listed in sacct".format(id)) + Slurm.Unknown() + case Some(m) => m match { + case pattern(name, status, cancelledBy) => status match { + case "RUNNING" => Slurm.Running () + case "COMPLETED" => Slurm.Completed () + case "CANCELLED" => Slurm.Cancelled (cancelledBy) + case _ => + logger.warn ("Job %s (ID=%d) has status %s".format (name, id, status) ) + Slurm.Unknown () + } + } + } } - /** Wait until the given SLURM job disappears from `squeue` output. */ + /** Wait until the given SLURM job is not listed as RUNNING anymore in `sacct` output. */ def waitFor(id: Int): Unit = { - while (isRunning(id)) { + var status: SlurmStatus = Slurm.Running() + while (status == Running()) { logger.trace("SLURM job #%d is still running, sleeping for %d secs ...".format(id, slurmDelay / 1000)) Thread.sleep(slurmDelay) + status = getSlurmStatus(id) } // callback that pulls generated files from remote node if (slurm_remote_cfg.isDefined) - postambles(id)(id) + postambles(id)(id)(status == Slurm.Completed()) } /** Returns a list of all SLURM job ids which are registered under the @@ -418,6 +435,12 @@ final object Slurm extends Publisher { /** Host for executing SLURM */ private var slurm_remote_cfg: Option[SlurmRemoteConfig] = None + sealed trait SlurmStatus + final case class Completed() extends SlurmStatus + final case class Cancelled(by: String) extends SlurmStatus + final case class Running() extends SlurmStatus + final case class Unknown() extends SlurmStatus + sealed trait SlurmConfig final case class EnabledLocal() extends SlurmConfig final case class EnabledRemote(template_name: String) extends SlurmConfig From 27d8af34424a304dc02eb75449487f9a7a0eeb12 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Tue, 17 Nov 2020 13:17:18 +0100 Subject: [PATCH 23/30] HLS task: check slurm return code --- toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala | 3 ++- .../src/main/scala/tapasco/task/HighLevelSynthesisTask.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 2bdfb779..ab9336a8 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -376,7 +376,7 @@ final object Slurm extends Publisher { } /** Wait until the given SLURM job is not listed as RUNNING anymore in `sacct` output. */ - def waitFor(id: Int): Unit = { + def waitFor(id: Int): SlurmStatus = { var status: SlurmStatus = Slurm.Running() while (status == Running()) { logger.trace("SLURM job #%d is still running, sleeping for %d secs ...".format(id, slurmDelay / 1000)) @@ -387,6 +387,7 @@ final object Slurm extends Publisher { // callback that pulls generated files from remote node if (slurm_remote_cfg.isDefined) postambles(id)(id)(status == Slurm.Completed()) + status } /** Returns a list of all SLURM job ids which are registered under the diff --git a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala index 02b54099..2322717d 100644 --- a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala +++ b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala @@ -76,7 +76,7 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio ) // execute sbatch to enqueue job, then wait for it - val r = (Slurm(job)(cfg) map (Slurm.waitFor(_))).nonEmpty + val r = (Slurm(job)(cfg) map Slurm.waitFor).getOrElse(false) == Slurm.Completed() FileAssetManager.reset() r } From 9d496b33843209f3475e7ff753ed7b51f8e64ac4 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Tue, 17 Nov 2020 13:18:24 +0100 Subject: [PATCH 24/30] Handle return value correctly in slurm job --- toolflow/vivado/common/SLURM/slurm_ESA.job.template | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/toolflow/vivado/common/SLURM/slurm_ESA.job.template b/toolflow/vivado/common/SLURM/slurm_ESA.job.template index b25df658..33f88221 100644 --- a/toolflow/vivado/common/SLURM/slurm_ESA.job.template +++ b/toolflow/vivado/common/SLURM/slurm_ESA.job.template @@ -34,5 +34,13 @@ export PATH="/opt/cad/xilinx/vivado/Vivado/2019.2/bin/:$PATH" echo "SLURM job #$SLURM_JOB_ID started at $(date)" rsync -a /net/@@WORKSTATION@@/SLURM/tapasco_workdir/ @@TAPASCO_HOME@@ @@COMMANDS@@ + +retVal=$? +if [ $retVal -ne 0 ]; then + echo "SLURM job #$SLURM_JOB_ID failed at $(date)" +else + echo "SLURM job #$SLURM_JOB_ID finished at $(date)" +fi + rsync -a @@TAPASCO_HOME@@/ /net/@@WORKSTATION@@/SLURM/tapasco_workdir -echo "SLURM job #$SLURM_JOB_ID finished at $(date)" +exit $retVal From 3207acd108c4674b96ddd1911f06c3397e23a09f Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Tue, 17 Nov 2020 15:14:44 +0100 Subject: [PATCH 25/30] Fix vivado hang --- toolflow/vivado/common/SLURM/slurm_ESA.job.template | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/toolflow/vivado/common/SLURM/slurm_ESA.job.template b/toolflow/vivado/common/SLURM/slurm_ESA.job.template index 33f88221..d51ad687 100644 --- a/toolflow/vivado/common/SLURM/slurm_ESA.job.template +++ b/toolflow/vivado/common/SLURM/slurm_ESA.job.template @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/bash -x # Copyright (c) 2014-2020 Embedded Systems and Applications, TU Darmstadt. # # This file is part of TaPaSCo @@ -29,6 +29,8 @@ # Setup env source @@TAPASCO_HOME@@/tapasco-setup.sh export PATH="/opt/cad/xilinx/vivado/Vivado/2019.2/bin/:$PATH" +# Vivado will hang otherwise, since it has no write permissions to real $HOME +export HOME=@@TAPASCO_HOME@@/../ # user commands begin here echo "SLURM job #$SLURM_JOB_ID started at $(date)" From 1d81cd62a5a8f0ed6f9915137594f8d3b7e194b9 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Tue, 17 Nov 2020 15:24:42 +0100 Subject: [PATCH 26/30] Reduce verbosity, cleanup --- .../src/main/scala/tapasco/slurm/Slurm.scala | 29 +++++++++---------- .../tapasco/task/HighLevelSynthesisTask.scala | 10 +++++-- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index ab9336a8..5529ee25 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -210,8 +210,7 @@ final object Slurm extends Publisher { file_transfer(local_files.zip(remote_files).toMap, tx = true) // run preamble script, if specified - if (slurm_remote_cfg.get.PreambleScript.isDefined) - "sh %s".format(slurm_remote_cfg.get.PreambleScript.get).! + slurm_remote_cfg.get.PreambleScript map ("sh %s".format(_).!) } /** @@ -231,7 +230,7 @@ final object Slurm extends Publisher { case HighLevelSynthesisJob(_, a,p, kernels, _) if slurm_success => { val tgt = Target.fromString(a.get.head, p.get.head).get val core_dir = slurm_job.log.getParent.resolveSibling("ipcore") - val core_zip = kernels.get.map(k => core_dir.resolve("%s_%s.zip".format(k, tgt.ad.name))) + val core_zip = kernels.get.map(k => core_dir.resolve("%s.zip".format(k))) core_zip ++ core_zip.map(z => z.resolveSibling("core.json")) } case _ => Seq() @@ -240,8 +239,7 @@ final object Slurm extends Publisher { file_transfer(remote_files.zip(loc_files).toMap, tx=false) // run postamble script, if specified - if (slurm_remote_cfg.get.PostambleScript.isDefined) - "sh %s".format(slurm_remote_cfg.get.PostambleScript.get).! + slurm_remote_cfg.get.PostambleScript map ("sh %s".format(_).!) } /** @@ -258,11 +256,12 @@ final object Slurm extends Publisher { val mkdir = "mkdir -p %s".format(to.getParent) if (tx) exec_cmd(mkdir, hostname = Some(target_host)) else mkdir.! - val cpy_cmd = if (tx) + val cpy_cmd = if (tx) { "scp -r %s %s:%s".format(from, target_host, to) - else + } else { "scp -r %s:%s %s".format(target_host, from, to) - logger.info("Copy Command: " + cpy_cmd) + } + logger.debug("Copy Command: " + cpy_cmd) if (cpy_cmd.! != 0) throw new Exception("Could not copy file %s to %s!".format(from, to)) } true @@ -285,12 +284,10 @@ final object Slurm extends Publisher { new_pre.resolve(postfix) } } - val wd_to_rmt = if (slurm_remote_cfg.isDefined) - prefix_subst(cfg.kernelDir.getParent, slurm_remote_cfg.get.workdir) - else identity[Path] _ - val tpsc_to_rmt = if (slurm_remote_cfg.isDefined) - prefix_subst(cfg.platformDir.getParent.getParent.getParent, slurm_remote_cfg.get.installdir) - else identity[Path] _ + val (wd_to_rmt, tpsc_to_rmt) = if (slurm_remote_cfg.isDefined) + (prefix_subst(cfg.kernelDir.getParent, slurm_remote_cfg.get.workdir), + prefix_subst(cfg.platformDir.getParent.getParent.getParent, slurm_remote_cfg.get.installdir)) + else (identity[Path] _, identity[Path] _) /** Create non-slurm cfg, with updated paths such that they match the folder structure on SLURM node */ val newCfg = cfg @@ -379,7 +376,7 @@ final object Slurm extends Publisher { def waitFor(id: Int): SlurmStatus = { var status: SlurmStatus = Slurm.Running() while (status == Running()) { - logger.trace("SLURM job #%d is still running, sleeping for %d secs ...".format(id, slurmDelay / 1000)) + logger.info("SLURM job #%d is still running, sleeping for %d secs ...".format(id, slurmDelay / 1000)) Thread.sleep(slurmDelay) status = getSlurmStatus(id) } @@ -426,7 +423,7 @@ final object Slurm extends Publisher { "ssh %s %s".format(host, c) } - logger.info("Executing command: %s".format(cmd)) + logger.debug("Executing command: %s".format(cmd)) cmd.!! } diff --git a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala index 2322717d..90d6e5d9 100644 --- a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala +++ b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala @@ -60,7 +60,13 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio val slurmLog = l.resolveSibling("slurm-hls.log") // raw log file (stdout w/colors) val e = l.resolveSibling("hls-slurm.errors.log") - val hlsJob = HighLevelSynthesisJob(hls.toString, Some(Seq(t.ad.name)), Some(Seq(t.pd.name)), Some(Seq(k.name))) + val hlsJob = HighLevelSynthesisJob( + hls.toString, + Some(Seq(t.ad.name)), + Some(Seq(t.pd.name)), + Some(Seq(k.name)), + Some(true) // skip Evaluation on cluster + ) // define SLURM job val job = Slurm.Job( @@ -70,7 +76,7 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio errorLog = e, consumer = this, maxHours = HighLevelSynthesisTask.MAX_SYNTH_HOURS, - commands = Seq("tapasco --configFile %s".format(cfgFile.toString, k.name.toString)), + commands = Seq("tapasco --configFile %s".format(cfgFile.toString)), job = hlsJob, cfg_file = cfgFile ) From ff4a8e9ee25460155b95aba4925ba7be1f80a4ad Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Sat, 28 Nov 2020 18:59:45 +0100 Subject: [PATCH 27/30] Cancel SLURM job if tapasco is terminated --- toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 5529ee25..81d56c06 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -33,6 +33,7 @@ import tapasco.task.ResourceConsumer import tapasco.util.{Publisher, Template} import scala.collection.JavaConverters._ +import scala.sys.ShutdownHookThread import scala.sys.process._ import tapasco.base.json._ import tapasco.jobs.{ComposeJob, HighLevelSynthesisJob} @@ -228,7 +229,6 @@ final object Slurm extends Publisher { fnames.map(f => slurm_job.log.resolveSibling(f)) } case HighLevelSynthesisJob(_, a,p, kernels, _) if slurm_success => { - val tgt = Target.fromString(a.get.head, p.get.head).get val core_dir = slurm_job.log.getParent.resolveSibling("ipcore") val core_zip = kernels.get.map(k => core_dir.resolve("%s.zip".format(k))) core_zip ++ core_zip.map(z => z.resolveSibling("core.json")) @@ -374,12 +374,14 @@ final object Slurm extends Publisher { /** Wait until the given SLURM job is not listed as RUNNING anymore in `sacct` output. */ def waitFor(id: Int): SlurmStatus = { + val hook = ShutdownHookThread(Slurm.cancel(id)) var status: SlurmStatus = Slurm.Running() - while (status == Running()) { + while (status == Running()) { // can be cancelled by SIGINT logger.info("SLURM job #%d is still running, sleeping for %d secs ...".format(id, slurmDelay / 1000)) Thread.sleep(slurmDelay) status = getSlurmStatus(id) } + hook.remove() // callback that pulls generated files from remote node if (slurm_remote_cfg.isDefined) From b7c0a206863b8d030b2dcd1ac525ebe5bea4b7c6 Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Sat, 28 Nov 2020 19:12:19 +0100 Subject: [PATCH 28/30] Show duration for which SLURM job has been running --- toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 81d56c06..3edb794a 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -32,6 +32,7 @@ import tapasco.filemgmt._ import tapasco.task.ResourceConsumer import tapasco.util.{Publisher, Template} +import scala.concurrent.duration.Duration import scala.collection.JavaConverters._ import scala.sys.ShutdownHookThread import scala.sys.process._ @@ -375,9 +376,12 @@ final object Slurm extends Publisher { /** Wait until the given SLURM job is not listed as RUNNING anymore in `sacct` output. */ def waitFor(id: Int): SlurmStatus = { val hook = ShutdownHookThread(Slurm.cancel(id)) + val start = System.currentTimeMillis() var status: SlurmStatus = Slurm.Running() while (status == Running()) { // can be cancelled by SIGINT - logger.info("SLURM job #%d is still running, sleeping for %d secs ...".format(id, slurmDelay / 1000)) + val dur = Duration(System.currentTimeMillis() - start, "millis") + logger.info("SLURM job #%d is running since %dh %02dm %02ds" + .format(id, dur.toHours, dur.toMinutes % 60, dur.toSeconds % 60)) Thread.sleep(slurmDelay) status = getSlurmStatus(id) } From 259e6ee1e7e32fb4bc76774a6a70c174e84e799b Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Tue, 29 Dec 2020 16:05:28 +0100 Subject: [PATCH 29/30] Refactor Slurm tasks into slurm jobs Previously, a job would be broken into its tasks, and a new tapasco job would be created for each task. These jobs were then executed on the SLURM cluster. Refactor this, such that the original job is executed on the SLURM cluster as-is, which simplifies the SLURM logic. --- .../tapasco/jobs/executors/Compose.scala | 57 ++++++++++++++++--- .../jobs/executors/HighLevelSynthesis.scala | 44 +++++++++++++- .../main/scala/tapasco/task/ComposeTask.scala | 47 +-------------- .../tapasco/task/HighLevelSynthesisTask.scala | 36 +----------- 4 files changed, 96 insertions(+), 88 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala b/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala index 50ff9005..db42774d 100644 --- a/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala +++ b/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala @@ -28,18 +28,20 @@ package tapasco.jobs.executors import java.util.concurrent.Semaphore +import tapasco.activity.composers.Composer import tapasco.base._ import tapasco.filemgmt._ import tapasco.jobs.{ComposeJob, HighLevelSynthesisJob} +import tapasco.slurm.Slurm.Completed import tapasco.task._ +import tapasco.slurm._ private object Compose extends Executor[ComposeJob] { private implicit val logger = tapasco.Logging.logger(getClass) + private[this] val _slurm = Slurm.enabled def execute(job: ComposeJob) (implicit cfg: Configuration, tsk: Tasks): Boolean = { - val signal = new Semaphore(0) - logger.trace("composition: {}", job.composition) // first, collect all kernels and trigger HLS if not built yet @@ -74,7 +76,18 @@ private object Compose extends Executor[ComposeJob] { logger.info("all HLS tasks finished successfully, beginning compose run...") logger.debug("job: {}", job) - val composeTasks = for { + if (!_slurm) nodeExecution(job) else slurmExecution(job) + } else { + logger.error("HLS tasks failed, aborting composition") + false + } + } + + private def nodeExecution(job: ComposeJob) + (implicit cfg: Configuration, tsk: Tasks): Boolean = { + val signal = new Semaphore(0) + + val composeTasks = for { p <- job.platforms a <- job.architectures t = Target(a, p) @@ -104,10 +117,40 @@ private object Compose extends Executor[ComposeJob] { // successful, if all successful (composeTasks map (_.result) fold true) (_ && _) - } else { - logger.error("HLS tasks failed, aborting composition") - false + } + + private def slurmExecution(job: ComposeJob) + (implicit cfg: Configuration, tsk: Tasks): Boolean = { + + val ComposeJob(c, f, i, _, _, _, _, _, _, _) = job + val name = c.composition.map(_.kernel).fold("compose")(_ ++ "-" ++ _) + val outDir = FileAssetManager.TAPASCO_WORK_DIR.resolve("Slurm").resolve("Compose").resolve(name) + // needed for resource-based scheduling + val consumer = new ComposeTask( + composition = c, + designFrequency = f, + implementation = Composer.Implementation(i), + target = Target(job.architectures.head, job.platforms.head), + onComplete = _ => () + ) + + // define SLURM job + val sjob = Slurm.Job( + name = name, + log = outDir.resolve("tapasco.log"), + slurmLog = outDir.resolve("slurm-compose.log"), + errorLog = outDir.resolve("slurm-compose.errors.log"), + consumer = consumer, + maxHours = ComposeTask.MAX_COMPOSE_HOURS, + comment = Some(outDir.toString), + job = job, + cfg_file = outDir.resolve("slurm-compose.cfg") + ) + + // start slurm job and wait for finish + Slurm(sjob)(cfg) match { + case Some(id) => Slurm.waitFor(id) == Completed() + case None => false } } } - diff --git a/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala b/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala index 80fbe765..7f2d04f4 100644 --- a/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala +++ b/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala @@ -22,7 +22,6 @@ package tapasco.jobs.executors import java.util.concurrent.Semaphore - import tapasco.Logging import tapasco.activity.hls.HighLevelSynthesizer import tapasco.activity.hls.HighLevelSynthesizer.Implementation._ @@ -30,12 +29,18 @@ import tapasco.activity.hls.HighLevelSynthesizer._ import tapasco.base._ import tapasco.filemgmt.FileAssetManager import tapasco.jobs._ +import tapasco.slurm.Slurm +import tapasco.slurm.Slurm.Completed import tapasco.task._ protected object HighLevelSynthesis extends Executor[HighLevelSynthesisJob] { private implicit final val logger = Logging.logger(getClass) + private[this] val _slurm = Slurm.enabled + + def execute(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = + if (!_slurm) nodeExecution(job) else slurmExecution(job) - def execute(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = { + def nodeExecution(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = { val signal = new Semaphore(0) val runs: Seq[(Kernel, Target)] = for { a <- job.architectures.toSeq.sortBy(_.name) @@ -94,4 +99,39 @@ protected object HighLevelSynthesis extends Executor[HighLevelSynthesisJob] { // success, if all tasks were successful ((tasks ++ importTasks) map (_.result) fold true) (_ && _) } + + def slurmExecution(job: HighLevelSynthesisJob) + (implicit cfg: Configuration, tsk: Tasks): Boolean = { + + val name = job.kernels.map(_.name).fold("hls")(_++"-"++_) + val outDir = FileAssetManager.TAPASCO_WORK_DIR.resolve("Slurm").resolve("HLS").resolve(name) + // needed for resource-based scheduling + val consumer = new HighLevelSynthesisTask( + job.kernels.head, + Target(job.architectures.head, job.platforms.head), + cfg, + VivadoHLS, + _ => () + ) + + // define SLURM job + val sjob = Slurm.Job( + name = name, + log = outDir.resolve("tapasco.log"), + slurmLog = outDir.resolve("slurm-hls.log"), + errorLog = outDir.resolve("hls-slurm.errors.log"), + consumer = consumer, + maxHours = HighLevelSynthesisTask.MAX_SYNTH_HOURS, + job = job, + cfg_file = outDir.resolve("slurm-hls.cfg") + ) + + // execute sbatch to enqueue job, then wait for it + val r = Slurm(sjob)(cfg) match { + case Some(id) => Slurm.waitFor(id) == Completed() + case None => false + } + FileAssetManager.reset() + r + } } diff --git a/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala b/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala index 94450972..d07fab1f 100644 --- a/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala +++ b/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala @@ -26,10 +26,7 @@ import java.nio.file._ import tapasco.Logging._ import tapasco.activity.composers._ import tapasco.base._ -import tapasco.base.json._ import tapasco.dse.Heuristics -import tapasco.jobs._ -import tapasco.slurm._ import tapasco.util._ import scala.util.Properties.{lineSeparator => NL} @@ -52,7 +49,6 @@ class ComposeTask(composition: Composition, val onComplete: Boolean => Unit) (implicit cfg: Configuration) extends Task with LogTracking { private[this] implicit val _logger = tapasco.Logging.logger(getClass) - private[this] val _slurm = Slurm.enabled private[this] var _composerResult: Option[Composer.Result] = None private[this] val _outDir = cfg.outputDir(composition, target, designFrequency, features getOrElse Seq()) private[this] val _logFile = logFile getOrElse _outDir.resolve("tapasco.log").toString @@ -62,9 +58,7 @@ class ComposeTask(composition: Composition, def composerResult: Option[Composer.Result] = _composerResult /** @inheritdoc**/ - def job: Boolean = if (!_slurm) nodeExecution else slurmExecution - - private def nodeExecution: Boolean = { + def job: Boolean = { val appender = LogFileTracker.setupLogFileAppender(_logFile.toString) val composer = Composer(implementation)(cfg) _logger.debug("launching compose run for {}@{} [current thread: {}], logfile {}", @@ -106,43 +100,6 @@ class ComposeTask(composition: Composition, result } - private def slurmExecution: Boolean = { - - val l = Paths.get(_logFile).toAbsolutePath().normalize() - val cfgFile = l.resolveSibling("slurm-compose.cfg") // Configuration Json - val slgFile = l.resolveSibling("slurm-compose.log") // SLURM job stdout log - val errFile = Paths.get(_logFile).resolveSibling("slurm-compose.errors.log") - - val cmpsJob = ComposeJob( - composition, designFrequency, implementation.toString, Some(Seq(target.ad.name)), Some(Seq(target.pd.name)), - features, debugMode - ) - - // define SLURM job - val job = Slurm.Job( - name = l.getParent.getParent.getFileName.resolve(l.getParent.getFileName).toString, - log = l, - slurmLog = slgFile, - errorLog = errFile, - consumer = this, - maxHours = ComposeTask.MAX_COMPOSE_HOURS, - commands = Seq("tapasco --configFile %s".format(cfgFile.toString)), - comment = Some(_outDir.toString), - job = cmpsJob, - cfg_file = cfgFile - ) - - Slurm(job)(cfg) foreach (Slurm.waitFor(_)) // execute and wait - - _composerResult = if (debugMode.isEmpty) { - ComposeTask.parseResultInLog(l.toString) - } else { - ComposeTask.makeDebugResult(debugMode.get) - } - (_composerResult map (_.result) getOrElse false) == ComposeResult.Success - - } - private def elementdesc = "%s [F=%2.2f]".format(logformat(composition), designFrequency.toDouble) /** @inheritdoc*/ @@ -175,7 +132,7 @@ object ComposeTask { import scala.io._ - private final val MAX_COMPOSE_HOURS = 23 + final val MAX_COMPOSE_HOURS = 23 private final val RE_RESULT = """compose run .*result: ([^,]+)""".r.unanchored private final val RE_LOG = """compose run .*result: \S+.*logfile: '([^']+)'""".r.unanchored private final val RE_TIMING = """compose run .*result: \S+.*timing report: '([^']+)'""".r.unanchored diff --git a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala index 90d6e5d9..3d89ae0c 100644 --- a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala +++ b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala @@ -37,7 +37,6 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio val onComplete: Boolean => Unit) extends Task with LogTracking { private[this] implicit val logger = tapasco.Logging.logger(getClass) private[this] var result: Option[HighLevelSynthesizer.Result] = None - private[this] val slurm = Slurm.enabled private[this] val r = HighLevelSynthesizer(hls) private[this] val l = r.logFile(k, t)(cfg).resolveSibling("hls.log") @@ -48,43 +47,12 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio def description: String = "High-Level-Synthesis for '%s' with target %s @ %s".format(k.name, t.pd.name, t.ad.name) - def job: Boolean = if (!slurm) { + def job: Boolean = { val appender = LogFileTracker.setupLogFileAppender(l.toString) logger.trace("current thread name: {}", Thread.currentThread.getName()) result = Some(r.synthesize(k, t)(cfg)) LogFileTracker.stopLogFileAppender(appender) result map (_.toBoolean) getOrElse false - } else { - - val cfgFile = l.resolveSibling("slurm-hls.cfg") // Configuration Json - val slurmLog = l.resolveSibling("slurm-hls.log") // raw log file (stdout w/colors) - val e = l.resolveSibling("hls-slurm.errors.log") - - val hlsJob = HighLevelSynthesisJob( - hls.toString, - Some(Seq(t.ad.name)), - Some(Seq(t.pd.name)), - Some(Seq(k.name)), - Some(true) // skip Evaluation on cluster - ) - - // define SLURM job - val job = Slurm.Job( - name = "hls-%s-%s-%s".format(t.ad.name, t.pd.name, k.name), - log = l, - slurmLog = slurmLog, - errorLog = e, - consumer = this, - maxHours = HighLevelSynthesisTask.MAX_SYNTH_HOURS, - commands = Seq("tapasco --configFile %s".format(cfgFile.toString)), - job = hlsJob, - cfg_file = cfgFile - ) - - // execute sbatch to enqueue job, then wait for it - val r = (Slurm(job)(cfg) map Slurm.waitFor).getOrElse(false) == Slurm.Completed() - FileAssetManager.reset() - r } def logFiles: Set[String] = Set(l.toString) @@ -100,6 +68,6 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio ) } -private object HighLevelSynthesisTask { +object HighLevelSynthesisTask { final val MAX_SYNTH_HOURS = 8 } From f84e5cc5e4657c4bdc9dfb449daab59be340ac2d Mon Sep 17 00:00:00 2001 From: Marco Hartmann Date: Tue, 29 Dec 2020 16:42:41 +0100 Subject: [PATCH 30/30] Adapt Slurm pre/post-amble to changes in 259e6ee1e Since SLURM cluster now processes whole jobs (instead of single tasks), dependencies (preamble) and produced artefacts (postamble) of multiple platform/architecture pairs may need to be transferred. --- .../src/main/scala/tapasco/slurm/Slurm.scala | 55 +++++++++++++------ 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index 3edb794a..6958fc24 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -68,9 +68,6 @@ final object Slurm extends Publisher { /** Time limit (in hours). */ maxHours: Int, - /** Sequence of commands to execute (bash). */ - commands: Seq[String], - /** Optional comment. */ comment: Option[String] = None, @@ -197,10 +194,16 @@ final object Slurm extends Publisher { **/ def slurm_preamble(slurm_job: Job, update_paths: Path => Path)(implicit cfg: Configuration): Unit = { val local_files = Seq(slurm_job.cfg_file) ++ (slurm_job.job match { - case ComposeJob(c, _, _, a, p, _, _, _, _, _) => { - val tgt = Target.fromString(a.get.head, p.get.head).get - val cores = c.composition.map(ce => FileAssetManager.entities.core(ce.kernel, tgt)) - cores.map(_.get.zipPath) ++ cores.map(_.get.descPath) + case j@ComposeJob(c, _, _, _, _, _, _, _, _, _) => { + val cores = for { + p <- j.platforms + a <- j.architectures + tgt = Target(a, p) + } yield { + val tgt_cores = c.composition.map(ce => FileAssetManager.entities.core(ce.kernel, tgt)) + tgt_cores.map(_.get.zipPath) ++ tgt_cores.map(_.get.descPath) + } + cores flatten } case HighLevelSynthesisJob(_, _, _, k, _) => { val kernels = FileAssetManager.entities.kernels.filter( kernel => k.get.contains(kernel.name) ).toSeq @@ -222,17 +225,32 @@ final object Slurm extends Publisher { * @param slurm_success Indicates if the SLURM job finished successfully. * @param update_paths Function that converts local workdir file paths to valid paths on a remote SLURM node. **/ - def slurm_postamble(slurm_job: Job, slurm_success: Boolean, update_paths: Path => Path): Unit = { + def slurm_postamble(slurm_job: Job, cfg: Configuration, slurm_success: Boolean, update_paths: Path => Path): Unit = { val loc_files = Seq(slurm_job.log, slurm_job.slurmLog, slurm_job.errorLog) ++ (slurm_job.job match { - case ComposeJob(c, f, _, a, p, _, _, _, _, _) if slurm_success => { - val bit_name = Composer.mkProjectName(c, Target.fromString(a.get.head, p.get.head).get, f) - val fnames = Seq(bit_name + ".bit", bit_name + ".bit.bin", "timing.txt", "utilization.txt") - fnames.map(f => slurm_job.log.resolveSibling(f)) + case j@ComposeJob(c, f, _, _, _, feat, _, _, _, _) if slurm_success => { + val compose_out = for { + p <- j.platforms + a <- j.architectures + tgt = Target(a, p) + } yield { + val out_dir = cfg.outputDir(c,tgt,f,feat.getOrElse(Seq())) + val bit_name = Composer.mkProjectName(c, tgt, f) + val fnames = Seq(bit_name + ".bit", "timing.txt", "utilization.txt") + fnames.map(out_dir.resolve) + } + compose_out flatten } - case HighLevelSynthesisJob(_, a,p, kernels, _) if slurm_success => { - val core_dir = slurm_job.log.getParent.resolveSibling("ipcore") - val core_zip = kernels.get.map(k => core_dir.resolve("%s.zip".format(k))) - core_zip ++ core_zip.map(z => z.resolveSibling("core.json")) + case j@HighLevelSynthesisJob(_, _,_, _, _) if slurm_success => { + val cores = for { + p <- j.platforms + a <- j.architectures + k <- j.kernels + tgt = Target(a, p) + } yield { + val core_dir = cfg.outputDir(k, tgt).resolve("ipcore") + Seq(core_dir.resolve("%s.zip".format(k.name)), core_dir.resolve("core.json")) + } + cores flatten } case _ => Seq() }) @@ -276,7 +294,7 @@ final object Slurm extends Publisher { **/ def apply(slurm_job: Job)(implicit cfg: Configuration): Option[Int] = { val local_base = slurm_job.cfg_file.getParent - val jobFile = local_base.resolveSibling("%s.slurm".format(slurm_job.name)) // SLURM job script + val jobFile = local_base.resolve("%s.slurm".format(slurm_job.name)) // SLURM job script /** replace a prefix of a Path by a different prefix. Used to convert local file paths to paths that are valid on SLURM node */ def prefix_subst(old_pre: Path, new_pre: Path): (Path => Path) = { @@ -300,6 +318,7 @@ final object Slurm extends Publisher { .archDir(tpsc_to_rmt(cfg.archDir)) .jobs(Seq(slurm_job.job)) .slurm(None) + .logFile(Some(wd_to_rmt(slurm_job.log))) logger.info("starting " + slurm_job.name + " job on SLURM ({})", slurm_job.cfg_file) catchAllDefault[Option[Int]](None, "error during SLURM job execution (%s): ".format(jobFile)) { @@ -345,7 +364,7 @@ final object Slurm extends Publisher { if (slurm_remote_cfg.isDefined) { postambles += (id.get -> {slurm_id:Int => slurm_success:Boolean => logger.info("Running postamble for SLURM id: {}", slurm_id) - slurm_postamble(slurm_job, slurm_success, wd_to_rmt) + slurm_postamble(slurm_job, cfg, slurm_success, wd_to_rmt) }) } id