diff --git a/Alpheus.CLI/Runner.fs b/Alpheus.CLI/Runner.fs index 195fc06..243e698 100644 --- a/Alpheus.CLI/Runner.fs +++ b/Alpheus.CLI/Runner.fs @@ -11,7 +11,7 @@ let run (programName:string) (parseResults:ParseResults) : Result not (Config.isExperimentDirectory p)), (UserError "The current directory is already an Alpheus experiment directory")) return API.createExperimentDirectoryAsync cwd |> Async.RunSynchronously |> ignore } @@ -62,16 +62,36 @@ let run (programName:string) (parseResults:ParseResults) : Result let artefactPath = statusArgs.GetResult <@ StatusArgs.File @> let! artefact = API.artefactFor artefactPath - return! Error (SystemError "NOT IMPLEMENTED") - //return! API.status artefact + let statusesRes = API.status artefact + match statusesRes with + | Ok(statuses)-> + let printStatus artId (status:Angara.Data.MdMap) = + let statusToString status = + match status with + | StatusGraph.UpToDate location -> + match location with + | DependencyGraph.Local -> "Up to date" + | DependencyGraph.Remote -> "Up to date (in storage)" + | StatusGraph.NeedsRecomputation _ -> + "Need recomputation" + if status.IsScalar then + printfn "%-50s: %s" (artId.ToString()) (status.AsScalar() |> statusToString) + else + printfn "%-50s:" (artId.ToString()) + let printItem pair = + let (index:string list),v = pair + printfn "\t%-50s: %s" (String.concat " " index) (statusToString v) + status |> Angara.Data.MdMap.toSeq |> Seq.iter printItem + statuses |> Map.iter printStatus + return () + | Error e -> return! Error e } elif parseResults.Contains Restore then result { let restoreArgs = parseResults.GetResult <@ Restore @> let artefactPath = restoreArgs.GetResult <@ RestoreArgs.Path @> let! artefact = API.artefactFor artefactPath - return! Error (SystemError "NOT IMPLEMENTED") - //return! API.restoreAsync artefact |> Async.RunSynchronously + return! API.restoreAsync artefact |> Async.RunSynchronously } elif parseResults.Contains Save then result { diff --git a/AlpheusCore/API.fs b/AlpheusCore/API.fs index 87de7c1..d727563 100644 --- a/AlpheusCore/API.fs +++ b/AlpheusCore/API.fs @@ -112,24 +112,26 @@ let private restoreSingleItemAsync experimentRoot (path,versionToRestore) = return! restore path versionToRestore } +/// Checks whether the specified versions can be extraced from any available storages +let internal checkStoragePresence experimentRoot (versions:HashString seq) : Async = + async { + let! config = Config.openExperimentDirectoryAsync experimentRoot + let checker = config.ConfigFile.Storage |> Map.toSeq |> StorageFactory.getPresenseChecker experimentRoot + let nonEmptylist listAsync = + async { + let! l = listAsync + return l |> Array.map (fun x -> not (List.isEmpty x)) + } + let res = versions |> Array.ofSeq |> (checker >> nonEmptylist) + return! res + } + /// (Re)Computes the artefact specified let compute (experimentRoot, artefactId) = async { let! g = buildDependencyGraphAsync experimentRoot [artefactId] - let checkStoragePresence (versions:HashString array) : Async = - async { - let! config = Config.openExperimentDirectoryAsync experimentRoot - let checker = config.ConfigFile.Storage |> Map.toSeq |> StorageFactory.getPresenseChecker experimentRoot - let nonEmptylist listAsync = - async { - let! l = listAsync - return l |> Array.map (fun x -> not (List.isEmpty x)) - } - let res = versions |> (checker >> nonEmptylist) - return! res - } - + let restoreFromStorage (pairs:(HashString*string) array) = async { let swapedPairs = pairs |> Array.map (fun p -> let x,y = p in y,x ) @@ -143,20 +145,19 @@ let compute (experimentRoot, artefactId) = return () } - //flow graph to calculate statuses - let flowGraph = ComputationGraph.buildGraph experimentRoot g checkStoragePresence restoreFromStorage + // flow graph to calculate statuses + let flowGraph = ComputationGraph.buildGraph experimentRoot g (checkStoragePresence experimentRoot) restoreFromStorage logVerbose LogCategory.API "Running computations" return ComputationGraph.doComputations flowGraph } |> Async.RunSynchronously /// Prints to the stdout the textural statuses of the artefact and its provenance let status (experimentRoot, artefactId) = - failwith "Not implemented" |> ignore - //async { - // let! g = buildDependencyGraphAsync experimentRoot [artefactId] - // let flowGraph = StatusGraph.buildStatusGraph g - // return StatusGraph.printStatuses flowGraph - //} |> Async.RunSynchronously + async { + let! g = buildDependencyGraphAsync experimentRoot [artefactId] + let flowGraph = StatusGraph.buildStatusGraph g experimentRoot (checkStoragePresence experimentRoot) + return StatusGraph.getStatuses flowGraph + } |> Async.RunSynchronously let private artefactVersionToPaths fullArtPath (artefactVersion:ArtefactVersion) = if artefactVersion.IsScalar then diff --git a/AlpheusCore/AlpheusCore.fsproj b/AlpheusCore/AlpheusCore.fsproj index d363761..492ed8f 100644 --- a/AlpheusCore/AlpheusCore.fsproj +++ b/AlpheusCore/AlpheusCore.fsproj @@ -38,7 +38,8 @@ - + + diff --git a/AlpheusCore/AngaraGraphCommon.fs b/AlpheusCore/AngaraGraphCommon.fs new file mode 100644 index 0000000..414a0f0 --- /dev/null +++ b/AlpheusCore/AngaraGraphCommon.fs @@ -0,0 +1,153 @@ +module ItisLab.Alpheus.AngaraGraphCommon + +open System +open System.IO +open AlphFiles +open Angara.Data +open Angara.Graph +open Angara.Execution +open Angara.States +open ItisLab.Alpheus.DependencyGraph +open ItisLab.Alpheus.PathUtils + +let internal arrayType<'a> rank : Type = + if rank < 0 then invalidArg "rank" "Rank is negative" + else if rank = 0 then typeof + else typeof.MakeArrayType(rank) + +let internal inputRank (v:MethodVertex) = + match v with + | Source src -> 0 + | Command cmd -> cmd.Inputs |> Seq.map(fun a -> a.Artefact.Rank) |> Seq.max + +let internal outputRank (v:MethodVertex) = + match v with + | Source src -> src.Output.Artefact.Rank + | Command cmd -> cmd.Outputs |> Seq.map(fun a -> a.Artefact.Rank) |> Seq.max + +let internal methodRank (v:MethodVertex) = min (outputRank v) (inputRank v) + +let getOutputTypes (v:MethodVertex) = + let rank = methodRank v + match v with + | Source src -> [max 0 (src.Output.Artefact.Rank - rank) |> arrayType] + | Command cmd -> cmd.Outputs |> Seq.map(fun a -> max 0 (a.Artefact.Rank - rank) |> arrayType) |> List.ofSeq + +let getInputTypes (v:MethodVertex) = + let rank = methodRank v + match v with + | Source src -> List.empty + | Command cmd -> cmd.Inputs |> Seq.map(fun a -> max 0 (a.Artefact.Rank - rank) |> arrayType) |> List.ofSeq + + +let rec internal toJaggedArrayOrValue (mapValue: (string list * 'a) -> 'c) (index: string list) (map: MdMapTree) : obj = + let isValue = function + | MdMapTree.Value _ -> true + | MdMapTree.Map _ -> false + + let mapToArray (getElement: (string * MdMapTree) -> 'b) (map: Map>) : 'b[] = + map |> Map.toSeq |> Seq.sortBy fst |> Seq.map getElement |> Seq.toArray + + let append v list = list |> List.append [v] + + match map with + | MdMapTree.Value v -> upcast(mapValue (index, v)) + | MdMapTree.Map subMap -> + match subMap |> Map.forall(fun _ -> isValue) with + | true -> // final level + upcast(subMap |> mapToArray (fun (k,t) -> + let newIndex = index |> append k + match t with + | MdMapTree.Value v -> mapValue (newIndex, v) + | MdMapTree.Map _ -> failwith "Unreachable case")) + | false -> + upcast(subMap |> mapToArray (fun (k,t) -> + let newIndex = index |> append k + match t with + | MdMapTree.Map _ -> toJaggedArrayOrValue mapValue newIndex t + | MdMapTree.Value _ -> failwith "Data is incomplete and has missing elements")) + + +let resolveIndex (index:string list) (map: MdMap) = + let rec resolveInTree (index:string list) (map: MdMapTree) = + match map, index with + | _,[] -> Some map + | MdMapTree.Value value,_ -> Some map // index length > rank of the map + | MdMapTree.Map values, k :: tail -> + match values |> Map.tryFind k with + | Some value -> resolveInTree tail value + | None -> None + match resolveInTree index (map |> MdMap.toTree) with + | Some(MdMapTree.Value v) -> v + | Some(MdMapTree.Map map) when map.IsEmpty -> None + | Some(MdMapTree.Map _) -> invalidOp "Only one-to-one vectors are supported at the moment" + | None -> None + +let extractActualVersionsFromLinks index links = + async { + let! actualVersion = + links + |> Seq.map (fun (a:LinkToArtefact) -> a.Artefact.ActualVersionAsync) + |> Async.Parallel + return actualVersion |> Array.map (fun v -> resolveIndex index v) + } + +let extractExpectedVersionsFromLinks index links = + links |> Seq.map (fun (a:LinkToArtefact) -> resolveIndex index a.ExpectedVersion) + +/// whether the command method needs actual CLI tool execution +/// common code that is used both during the artefact status calculation and artefact production +let getCommandVertexStatus checkStoragePresence (command:CommandLineVertex) index = + let methodItemId = command.MethodId |> applyIndex index + let logVerbose str = Logger.logVerbose Logger.Execution (sprintf "%s: %s" methodItemId str) + async { + let expectedInputItemVersions = extractExpectedVersionsFromLinks index command.Inputs |> Array.ofSeq + let! actualInputItemVersions = extractActualVersionsFromLinks index command.Inputs + + let! inputsStatus = findExpectedArtefacts checkStoragePresence expectedInputItemVersions actualInputItemVersions + + match inputsStatus with + // we can avoid checking outputs to speed up the work + | SomeAreNotFound -> + logVerbose "Needs recomputation as some of the inputs not found" + return Outdated InputsOutdated + | SomeAreLocalUnexpected -> + logVerbose "Needs recomputation as disk version of the input does not match expected version" + return Outdated InputsOutdated + | AllExist _ -> + // checking outputs + let expectedOutputItemVersions = extractExpectedVersionsFromLinks index command.Outputs |> Array.ofSeq + let! actualOutputItemVersions = extractActualVersionsFromLinks index command.Outputs + let! outputsStatus = findExpectedArtefacts checkStoragePresence expectedOutputItemVersions actualOutputItemVersions + match outputsStatus with + | SomeAreNotFound -> + logVerbose "Needs recomputation as some of the outputs not found" + return Outdated OutputsOutdated + | SomeAreLocalUnexpected -> + logVerbose "Needs recomputation as disk version of the output does not match expected version" + return Outdated OutputsOutdated + | AllExist outputs -> + return UpToDate outputs + } + + +/// This type represents an Angara Flow method. +/// Note that execution modifies the given vertex and it is Angara Flow execution runtime who controls +/// the concurrency. +[] +type AngaraGraphNode(producerVertex:MethodVertex) = + inherit ExecutableMethod( + System.Guid.NewGuid(), + getInputTypes producerVertex, + getOutputTypes producerVertex) + + member s.VertexID = + match producerVertex with + | Source(s) -> s.MethodId + | Command(comp) -> comp.MethodId + + override s.ToString() = + match producerVertex with + | Source src -> sprintf "Source %A" src.Output.Artefact.Id + | Command cmd -> sprintf "Command %s" cmd.Command + diff --git a/AlpheusCore/ComputationGraph.fs b/AlpheusCore/ComputationGraph.fs index cbdc23a..2832aad 100644 --- a/AlpheusCore/ComputationGraph.fs +++ b/AlpheusCore/ComputationGraph.fs @@ -9,327 +9,177 @@ open Angara.Execution open Angara.States open ItisLab.Alpheus.DependencyGraph open ItisLab.Alpheus.PathUtils +open ItisLab.Alpheus.AngaraGraphCommon -let private arrayType<'a> rank : Type = - if rank < 0 then invalidArg "rank" "Rank is negative" - else if rank = 0 then typeof - else typeof.MakeArrayType(rank) - -let private inputRank (v:MethodVertex) = - match v with - | Source src -> 0 - | Command cmd -> cmd.Inputs |> Seq.map(fun a -> a.Artefact.Rank) |> Seq.max - -let private outputRank (v:MethodVertex) = - match v with - | Source src -> src.Output.Artefact.Rank - | Command cmd -> cmd.Outputs |> Seq.map(fun a -> a.Artefact.Rank) |> Seq.max - -let private methodRank (v:MethodVertex) = min (outputRank v) (inputRank v) - -let getOutputTypes (v:MethodVertex) = - let rank = methodRank v - match v with - | Source src -> [max 0 (src.Output.Artefact.Rank - rank) |> arrayType] - | Command cmd -> cmd.Outputs |> Seq.map(fun a -> max 0 (a.Artefact.Rank - rank) |> arrayType) |> List.ofSeq - -let getInputTypes (v:MethodVertex) = - let rank = methodRank v - match v with - | Source src -> List.empty - | Command cmd -> cmd.Inputs |> Seq.map(fun a -> max 0 (a.Artefact.Rank - rank) |> arrayType) |> List.ofSeq - type ArtefactItem = { FullPath: string - Index: string list } - -let rec private toJaggedArrayOrValue (mapValue: (string list * 'a) -> 'c) (index: string list) (map: MdMapTree) : obj = - let isValue = function - | MdMapTree.Value _ -> true - | MdMapTree.Map _ -> false - - let mapToArray (getElement: (string * MdMapTree) -> 'b) (map: Map>) : 'b[] = - map |> Map.toSeq |> Seq.sortBy fst |> Seq.map getElement |> Seq.toArray - - let append v list = list |> List.append [v] - - match map with - | MdMapTree.Value v -> upcast(mapValue (index, v)) - | MdMapTree.Map subMap -> - match subMap |> Map.forall(fun _ -> isValue) with - | true -> // final level - upcast(subMap |> mapToArray (fun (k,t) -> - let newIndex = index |> append k - match t with - | MdMapTree.Value v -> mapValue (newIndex, v) - | MdMapTree.Map _ -> failwith "Unreachable case")) - | false -> - upcast(subMap |> mapToArray (fun (k,t) -> - let newIndex = index |> append k - match t with - | MdMapTree.Map _ -> toJaggedArrayOrValue mapValue newIndex t - | MdMapTree.Value _ -> failwith "Data is incomplete and has missing elements")) - + Index: string list + } -/// This type represents an Angara Flow method. -/// Note that execution modifies the given vertex and it is Angara Flow execution runtime who controls -/// the concurrency. -[] -type ComputationGraphNode( - producerVertex:MethodVertex, - experimentRoot:string, - checkStoragePresence:HashString array -> Async, - restoreFromStorage: (HashString*string) array -> Async) = // version*filename - inherit ExecutableMethod( - System.Guid.NewGuid(), - getInputTypes producerVertex, - getOutputTypes producerVertex) - - member s.VertexID = - match producerVertex with - | Source(s) -> s.Output.Artefact.Id - | Command(comp) -> (Seq.head comp.Outputs).Artefact.Id // first output is used as vertex ID - - override s.ToString() = - match producerVertex with - | Source src -> sprintf "Source %A" src.Output.Artefact.Id - | Command cmd -> sprintf "Command %s" cmd.Command - - -type SourceMethod(source: SourceVertex, experimentRoot, checkStoragePresence, restoreFromStorage) = - inherit ComputationGraphNode(DependencyGraph.Source source, experimentRoot, checkStoragePresence, restoreFromStorage) +type SourceMethod(source: SourceVertex, experimentRoot, + checkStoragePresence : HashString seq -> Async) = + inherit AngaraGraphNode(DependencyGraph.Source source) override s.Execute(_, _) = // ignoring checkpoints - let expectedArtefact = source.Output - let artefact = expectedArtefact.Artefact - - // Output of the method is an scalar or a vector of full paths to the data of the artefact. - let outputArtefact : Artefact = - artefact.Id - |> PathUtils.enumerateItems experimentRoot - |> MdMap.toTree - |> toJaggedArrayOrValue (fun (index, fullPath) -> { FullPath = fullPath; Index = index }) [] - + async { + let expectedArtefact = source.Output + let artefact = expectedArtefact.Artefact - let ensureScalar (artefactVersion:ArtefactVersion) = - if not artefactVersion.IsScalar then - failwithf "Source artefacts can't be vectored: %A" artefact.Id - else - artefactVersion.AsScalar() + let ensureScalar (artefactVersion:ArtefactVersion) = + if not artefactVersion.IsScalar then + failwithf "Source artefacts can't be vectored: %A" artefact.Id + else + artefactVersion.AsScalar() - let expectedVersionOpt = expectedArtefact.ExpectedVersion |> ensureScalar + let expectedVersionOpt = expectedArtefact.ExpectedVersion |> ensureScalar - let expectedV = - match expectedVersionOpt with - | None -> - // this could be file/dir without alph file. - (artefact.ActualVersionAsync |> Async.RunSynchronously |> ensureScalar).Value - | Some(v) -> v - - let actualVersionRes = artefact.ActualVersionAsync |> Async.RunSynchronously - let actualVersionOpt = ensureScalar actualVersionRes - - match actualVersionOpt with - | None -> // The artefact does not exist on disk - // This may be OK in case the specified version is available in storages - // todo: note that in case of vectore, only some of the items can exists/restore/etc. - if (checkStoragePresence [|expectedV|] |> Async.RunSynchronously).[0] then - // If some methods needs this artefact as input, the artefact will be restored during the method execution - () - else - failwithf "Source artefact %A is not on disk and can't be found in any of the storages. Consider adding additional storages to look in. Can't proceed with computation." artefact.Id - | Some(_) -> - // we now consider actual version as expected - source.Output.ExpectActualVersionAsync() |> Async.RunSynchronously - // if alph file exists on disk (e.g. isTracked), we need to re-save it to update the expected version - if artefact.IsTracked then - artefact.SaveAlphFile() - - Seq.singleton ([outputArtefact], null) - -type CommandMethod(command: CommandLineVertex, experimentRoot, checkStoragePresence, restoreFromStorage) = - inherit ComputationGraphNode(DependencyGraph.Command command, experimentRoot, checkStoragePresence, restoreFromStorage) - - let resolveIndex (index:string list) (map: MdMap) = - let rec resolveInTree (index:string list) (map: MdMapTree) = - match map, index with - | _,[] -> Some map - | MdMapTree.Value value,_ -> Some map // index length > rank of the map - | MdMapTree.Map values, k :: tail -> - match values |> Map.tryFind k with - | Some value -> resolveInTree tail value - | None -> None - match resolveInTree index (map |> MdMap.toTree) with - | Some(MdMapTree.Value v) -> v - | Some(MdMapTree.Map map) when map.IsEmpty -> None - | Some(MdMapTree.Map _) -> invalidOp "Only one-to-one vectors are supported at the moment" - | None -> None - - /// valid are items that either have actual disk data version match expected version, or actual disk data is missing and expected version is restorable from storage - let areValidItemsVersions expectedVersionHashes actualVersionsHashes = - if Array.exists Option.isNone expectedVersionHashes then - // some of the artefact element was not ever produced, this is invalid - false - else - /// Chooses the pairs that are not valid on disk (filtering out versions match) - let invalidOnDiskChooser hash1 hash2 = - match hash1,hash2 with - | Some(h1),Some(h2) -> if h1 = h2 then None else Some(hash1,hash2) - | _ -> Some(hash1,hash2) - let localInvalid = Seq.map2 invalidOnDiskChooser expectedVersionHashes actualVersionsHashes |> Seq.choose id |> Array.ofSeq - if Array.length localInvalid = 0 then - // valid as actual disk version match expected version. No need to check the storage - true - else - // check if the locally "invalid" are "remote valid" (restorable from storage in case of disk data absence) - let eligibleForRemoteCheckChooser pair = - let expected,actual = pair - match expected,actual with - | Some(v),None -> Some(v) - | _,_ -> None - - let eligibleForRemoteCheck = Array.choose eligibleForRemoteCheckChooser localInvalid - if Array.length eligibleForRemoteCheck = Array.length localInvalid then - // we proceed with the remote checks only if all of the locally invalid items are eligible for remote check - let remotePresence = checkStoragePresence eligibleForRemoteCheck |> Async.RunSynchronously - Array.forall id remotePresence + let! expectedV = + async { + match expectedVersionOpt with + | None -> + // this could be file/dir without alph file. + let! v = artefact.ActualVersionAsync + return (v |> ensureScalar).Value + | Some(v) -> return v + } + + let! actualVersionRes = artefact.ActualVersionAsync + let actualVersionOpt = ensureScalar actualVersionRes + + match actualVersionOpt with + | None -> // The artefact does not exist on disk + // This may be OK in case the specified version is available in storages + // todo: note that in case of vectore, only some of the items can exists/restore/etc. + let! presenceCheck = checkStoragePresence [|expectedV|] + if presenceCheck.[0] then + // If some methods needs this artefact as input, the artefact will be restored during the method execution + () else - // otherwise at least one unrestorable item exists. Thus invalid - false + failwithf "Source artefact %A is not on disk and can't be found in any of the storages. Consider adding additional storages to look in. Can't proceed with computation." artefact.Id + | Some(_) -> + // we now consider actual version as expected + source.Output.ExpectActualVersionAsync() |> Async.RunSynchronously + // if alph file exists on disk (e.g. isTracked), we need to re-save it to update the expected version + if artefact.IsTracked then + artefact.SaveAlphFile() + + // Output of the method is an scalar or a vector of full paths to the data of the artefact. + let outputArtefact : Artefact = + artefact.Id + |> PathUtils.enumerateItems experimentRoot + |> MdMap.toTree + |> toJaggedArrayOrValue (fun (index, fullPath) -> { FullPath = fullPath; Index = index }) [] + - let isValid (actualVersion:ArtefactVersion) (expectedVersion:ArtefactVersion) = - let actVersions = MdMap.toSeq actualVersion |> Array.ofSeq - let expVersions = MdMap.toSeq expectedVersion |> Array.ofSeq - let vectorKeysMatch = (actVersions |> Seq.map fst |> Set.ofSeq) = (expVersions |> Seq.map fst |> Set.ofSeq) - if not vectorKeysMatch then - // vector element keys are different. Thus we cant compare versions and the artefact is invalid - false - else - let actVersionsHashes = actVersions |> Seq.map snd |> Array.ofSeq - let expVersionHashes = expVersions |> Seq.map snd |> Array.ofSeq - areValidItemsVersions expVersionHashes actVersionsHashes + return Seq.singleton ([outputArtefact], null) + } |> Async.RunSynchronously +type CommandMethod(command: CommandLineVertex, + experimentRoot, + checkStoragePresence: HashString seq -> Async, + restoreFromStorage: (HashString*string) array -> Async) = // version*filename + inherit AngaraGraphNode(DependencyGraph.Command command) + override s.Execute(inputs, _) = // ignoring checkpoints - // Rules of execution - // The artefact is valid either if actual disk version matches expected version or if the disk version is absend and expected version is restorable from storage - // We can bypass the computation entirely if inputs and outputs are valid + async{ + // Rules of execution + // The artefact is valid either if actual disk version matches expected version or if the disk version is absent and expected version is restorable from storage + // We can bypass the computation entirely if inputs and outputs are valid - // If any input, output is not valid we need to - // 1) restore inputs if they are absent on disk - // 2) execute the command + // If any input, output is not valid we need to + // 1) restore inputs if they are absent on disk + // 2) execute the command - let inputItems = inputs |> List.map (fun inp -> inp :?> ArtefactItem) + let inputItems = inputs |> List.map (fun inp -> inp :?> ArtefactItem) - let index = - inputItems - |> Seq.map(fun item -> item.Index) - |> Seq.fold(fun (max: string list) index -> if index.Length > max.Length then index else max) [] - let methodItemId = command.MethodId |> applyIndex index - let logVerbose str = Logger.logVerbose Logger.Execution (sprintf "%s: %s" methodItemId str) - // logVerbose "Started." - - // Build the output paths by applying the index of this method. - let outputPaths = - command.Outputs // the order is important here - |> List.map(fun out -> out.Artefact.Id |> PathUtils.idToFullPath experimentRoot |> applyIndex index) - - let extractExpectedVersionsFromLinks links = - links |> Seq.map (fun (a:LinkToArtefact) -> MdMap.find index a.ExpectedVersion) - - let extractActualVersionsFromLinks links = - links - |> Seq.map (fun (a:LinkToArtefact) -> a.Artefact.ActualVersionAsync) - |> Async.Parallel |> Async.RunSynchronously - |> Array.map (fun v -> MdMap.find index v) - - let expectedInputItemVersions = extractExpectedVersionsFromLinks command.Inputs |> Array.ofSeq - let actualInputItemVersions = extractActualVersionsFromLinks command.Inputs - - let areInputsValid = areValidItemsVersions expectedInputItemVersions actualInputItemVersions - - let doComputations = - if not areInputsValid then - // we can avoid checking outputs to speed up the work - // is the inputs are invalid - logVerbose "Needs recomputation due to the outdated inputs" - true - else - // checking outputs - let expectedOutputItemVersions = extractExpectedVersionsFromLinks command.Outputs |> Array.ofSeq - let actualOutputItemVersions = extractActualVersionsFromLinks command.Outputs - let areOutputsValid = areValidItemsVersions expectedOutputItemVersions actualOutputItemVersions - if not areOutputsValid then - logVerbose "Needs recomputation due to the outdated outputs" - not areOutputsValid - + let index = + inputItems + |> Seq.map(fun item -> item.Index) + |> Seq.fold(fun (max: string list) index -> if index.Length > max.Length then index else max) [] + let methodItemId = command.MethodId |> applyIndex index + let logVerbose str = Logger.logVerbose Logger.Execution (sprintf "%s: %s" methodItemId str) + // logVerbose "Started" + + // Build the output paths by applying the index of this method. + let outputPaths = + command.Outputs // the order is important here + |> List.map(fun out -> out.Artefact.Id |> PathUtils.idToFullPath experimentRoot |> applyIndex index) + + let! currentVertexStatus = getCommandVertexStatus checkStoragePresence command index - if doComputations then - // We need to do computation - // 1) deleting outputs if they exist - // 2) restoring inputs from storage if it is needed - // 3) execute external command - // 4) upon 0 exit code hash the outputs - // 5) fill in corresponding method vertex (to fix proper versions) - // 6) write alph files for outputs - - // 1) Deleting outputs - if not command.DoNotCleanOutputs then - outputPaths |> List.iter deletePath - - // 2) restoring inputs from storage if it is needed - let inputChooser (input:LinkToArtefact) = - let actualHashOpt = (extractActualVersionsFromLinks [input]).[0] - if Option.isSome actualHashOpt then - None + match currentVertexStatus with + | Outdated _ -> + // We need to do computation + // 1) deleting outputs if they exist + // 2) restoring inputs from storage if it is needed + // 3) execute external command + // 4) upon 0 exit code hash the outputs + // 5) fill in corresponding method vertex (to fix proper versions) + // 6) write alph files for outputs + + // 1) Deleting outputs + if not command.DoNotCleanOutputs then + outputPaths |> List.iter deletePath + + // 2) restoring inputs from storage if it is needed + let inputChooser (input:LinkToArtefact) = + async { + let! actualHashOpt = extractActualVersionsFromLinks index [input] + if Option.isSome actualHashOpt.[0] then + return None + else + return Some(input) + } + let! toRestoreOts = Seq.map inputChooser command.Inputs |> Array.ofSeq |> Async.Parallel + let toRestore = Array.choose id toRestoreOts + let hashesToRestore = toRestore |> extractExpectedVersionsFromLinks index |> Seq.map (fun x -> x.Value) + let pathsToRestore = toRestore |> Seq.map (fun x -> idToFullPath experimentRoot x.Artefact.Id |> applyIndex index ) + let zipped = Seq.zip hashesToRestore pathsToRestore |> Array.ofSeq + if Array.length zipped > 0 then + logVerbose (sprintf "Restoring missing inputs from storage...") + do! restoreFromStorage zipped + logVerbose (sprintf "Inputs are restored") + + + // 3) executing a command + let print (s:string) = Console.WriteLine s + let input idx = command.Inputs.[idx-1].Artefact.Id |> idToFullPath experimentRoot |> applyIndex index + let output idx = command.Outputs.[idx-1].Artefact.Id |> idToFullPath experimentRoot |> applyIndex index + let context : ComputationContext = { ExperimentRoot = experimentRoot; Print = print } + let exitCode = command |> ExecuteCommand.runCommandLineMethodAndWait context (input, output) + + // 4) upon 0 exit code hash the outputs + if exitCode <> 0 then + raise(InvalidOperationException(sprintf "Process exited with exit code %d" exitCode)) else - Some(input) - let toRestore = Seq.choose inputChooser command.Inputs |> Array.ofSeq - let hashesToRestore = toRestore |> extractExpectedVersionsFromLinks |> Seq.map (fun x -> x.Value) - let pathsToRestore = toRestore |> Seq.map (fun x -> idToFullPath experimentRoot x.Artefact.Id |> applyIndex index ) - let zipped = Seq.zip hashesToRestore pathsToRestore |> Array.ofSeq - if Array.length zipped > 0 then - logVerbose (sprintf "Restoring missing inputs from storage...") - restoreFromStorage zipped |> Async.RunSynchronously - logVerbose (sprintf "Inputs are restored") - - - // 3) executing a command - let print (s:string) = Console.WriteLine s - let input idx = command.Inputs.[idx-1].Artefact.Id |> idToFullPath experimentRoot |> applyIndex index - let output idx = command.Outputs.[idx-1].Artefact.Id |> idToFullPath experimentRoot |> applyIndex index - let context : ComputationContext = { ExperimentRoot = experimentRoot; Print = print } - let exitCode = command |> ExecuteCommand.runCommandLineMethodAndWait context (input, output) - - // 4) upon 0 exit code hash the outputs - if exitCode <> 0 then - raise(InvalidOperationException(sprintf "Process exited with exit code %d" exitCode)) - else - logVerbose (sprintf "Program succeeded. Calculating hashes of the outputs...") - async { + logVerbose (sprintf "Program succeeded. Calculating hashes of the outputs...") // 5a) hashing outputs disk content // 5b) updating dependency versions in dependency graph // 6) dumping updated alph files to disk do! command.OnSucceeded(index) - } |> Async.RunSynchronously - logVerbose "alph file saved" - else - logVerbose "skipping as up to date" - //comp.Outputs |> Seq.map (fun (output:DependencyGraph.VersionedArtefact) -> output.ExpectedVersion) |> List.ofSeq + logVerbose "alph file saved" + | UpToDate _ -> + logVerbose "skipping as up to date" + //comp.Outputs |> Seq.map (fun (output:DependencyGraph.VersionedArtefact) -> output.ExpectedVersion) |> List.ofSeq - Seq.singleton(outputPaths |> List.map(fun outputPath -> upcast { FullPath = outputPath; Index = index }), null) + let outPathToArtefactItem outputPath : Artefact = + upcast { FullPath = outputPath; Index = index } + let result = Seq.singleton(outputPaths |> List.map outPathToArtefactItem, null) + return result + } |> Async.RunSynchronously let buildGraph experimentRoot (g:DependencyGraph.Graph) checkStoragePresence restoreFromStorage = - let factory method : ComputationGraphNode = - match method with - | DependencyGraph.Source src -> upcast SourceMethod(src, experimentRoot, checkStoragePresence, restoreFromStorage) + let factory method : AngaraGraphNode = + match method with + | DependencyGraph.Source src -> upcast SourceMethod(src, experimentRoot, checkStoragePresence) | DependencyGraph.Command cmd -> upcast CommandMethod(cmd, experimentRoot, checkStoragePresence, restoreFromStorage) g |> DependencyGraphToAngaraWrapper |> AngaraTranslator.translate factory -let doComputations (g:FlowGraph) = +let doComputations (g:FlowGraph) = let state = { TimeIndex = 0UL @@ -337,8 +187,9 @@ let doComputations (g:FlowGraph) = Vertices = Map.empty } try - use engine = new Engine(state,Scheduler.ThreadPool()) + use engine = new Engine(state,Scheduler.ThreadPool()) engine.Start() + // engine.Changes.Subscribe(fun x -> x.State.Vertices) let final = Control.pickFinal engine.Changes let finalState = final.GetResult() diff --git a/AlpheusCore/DependencyGraph.fs b/AlpheusCore/DependencyGraph.fs index a3dee42..430f973 100644 --- a/AlpheusCore/DependencyGraph.fs +++ b/AlpheusCore/DependencyGraph.fs @@ -10,13 +10,88 @@ open ItisLab.Alpheus.PathUtils open Angara.Data open FSharp.Control -let ts = TraceSource("Dependency Graph") - // Disable warning on requiring to override GetHashCode in case of Equals overriding // As I want vertices to be alphnumerically sorted, but compared by reference #nowarn "0346" +type ArtefactLocation = + /// Artefact can be restored from storage + | Remote + /// Artefact is currently on disk + | Local + +type OutdatedReason = + | InputsOutdated + | OutputsOutdated + +/// computation status of method instance (e.g. scalar or vector element) +type MethodInstanceStatus = + /// Expected outputs version exist + | UpToDate of outputs: ArtefactLocation list + | Outdated of OutdatedReason + +type LinkToArtefactStatus = + /// there is no artefact locally and no expected version on remotes + | NotFound + /// there is an expected version of artefact on the local disk, and no information about remotes + | Local + /// there is a version of artefact on the local disk, but it has unexpected version, and no information about remotes + | LocalUnexpected + /// there is no any version of artefact on the local disk, but there is a remote artefact of the expected version + | Remote + +type ExpectedArtefactSearchResult = + | SomeAreLocalUnexpected + | SomeAreNotFound + /// All group has expected versions + | AllExist of location:ArtefactLocation list + +/// Checks location of the expected version of the given collection of artefacts and either returns locations of all given artefacts, if they are found; or, otherwise, returns issues. +let findExpectedArtefacts checkStoragePresence expectedVersionHashes actualVersionsHashes = + async { + let expectedVersionHashesArr = Array.ofSeq expectedVersionHashes + let N = Array.length expectedVersionHashesArr + if Seq.exists Option.isNone expectedVersionHashesArr then + // some of the artefact element was not ever produced, this is invalid + return SomeAreLocalUnexpected + else + /// Chooses the pairs that are not valid on disk (filtering out versions match) + let invalidOnDiskChooser hash1 hash2 = + match hash1,hash2 with + | Some(h1),Some(h2) -> if h1 = h2 then None else Some(hash1,hash2) + | _ -> Some(hash1,hash2) + let localInvalid = Seq.map2 invalidOnDiskChooser expectedVersionHashesArr actualVersionsHashes |> Seq.choose id |> Array.ofSeq + if Array.length localInvalid = 0 then + // valid as actual disk version match expected version. No need to check the storage + return AllExist (List.init N (fun _ -> ArtefactLocation.Local)) + else + // check if the locally "invalid" are "remote valid" (restorable from storage in case of disk data absence) + let eligibleForRemoteCheckChooser idx pair = + let expected,actual = pair + match expected,actual with + | Some(v),None -> Some(idx,v) + | _,_ -> None + + let eligibleForRemoteCheck = Seq.mapi eligibleForRemoteCheckChooser localInvalid |> Seq.choose id |> Array.ofSeq + if Array.length eligibleForRemoteCheck = Array.length localInvalid then + // we proceed with the remote checks only if all of the locally invalid items are eligible for remote check + let eligibleForRemoteCheckVersions = Array.map snd eligibleForRemoteCheck + let! remotePresence = checkStoragePresence (Seq.ofArray eligibleForRemoteCheckVersions) + if Array.forall id remotePresence then + let resultArray = Array.create N ArtefactLocation.Local + // as all of the remote check eligible elements are present remotely + eligibleForRemoteCheck + |> Seq.map fst + |> Seq.iter (fun idx -> (resultArray.[idx] <- ArtefactLocation.Remote)) + return AllExist (List.ofArray resultArray) + else + return SomeAreNotFound + else + // otherwise at least one unrestorable item exists on disk. Thus invalid + return SomeAreLocalUnexpected + } + type ArtefactVertex(id:ArtefactId, experimentRoot:string) = // expereiment root is needed to calc actual data versions (via path to the actual data) let mutable producer : MethodVertex option = None @@ -142,17 +217,19 @@ type ArtefactVertex(id:ArtefactId, experimentRoot:string) = override s.ToString() = let version = - let hash = s.ActualVersionAsync |> Async.RunSynchronously - sprintf "%A" (hash |> MdMap.map(Option.map(fun s -> s.Substring(0,6)))) + match actualVersion with + | Some (v) -> + sprintf "%A" (v |> MdMap.map(Option.map(fun s -> s.Substring(0,6)))) + | None -> "disk version not checked" sprintf "Artefact(%s|%s)" (s.Id.ToString()) version /// Represents a link to a specific version of an artefact. and LinkToArtefact(artefact: ArtefactVertex, expectedVersion: ArtefactVersion) = let mutable expected = expectedVersion let lockObj = obj() - + /// Creates a link to the artefact which expects the given actual version. - new(artefact) = LinkToArtefact(artefact, artefact.ActualVersionAsync |> Async.RunSynchronously) + new(artefact) = LinkToArtefact(artefact, MdMap.scalar None) member s.Artefact : ArtefactVertex = artefact @@ -160,13 +237,27 @@ and LinkToArtefact(artefact: ArtefactVertex, expectedVersion: ArtefactVersion) = /// If the actual version differs from the expected, it should be handled specifically. member s.ExpectedVersion : ArtefactVersion = expected + member s.AnalyzeStatus checkStoragePresence index = + async { + let expectedVersion = MdMap.find index expected + let! actualVersion = artefact.ActualVersionAsync + let artefactItemActualVersion = MdMap.find index actualVersion + let! status = findExpectedArtefacts checkStoragePresence (Seq.singleton expectedVersion) [|artefactItemActualVersion|] + match status with + | SomeAreLocalUnexpected -> return LocalUnexpected + | SomeAreNotFound -> return NotFound + | AllExist items -> + match List.head items with + | ArtefactLocation.Local -> return LinkToArtefactStatus.Local + | ArtefactLocation.Remote -> return LinkToArtefactStatus.Remote + } + /// Makes the link to expect the actual version. /// The actual version MUST be available. member s.ExpectActualVersionAsync() = async { let! actual = artefact.ActualVersionAsync - lock lockObj (fun() -> - expected <- actual) + lock lockObj (fun() -> expected <- actual) } /// Makes the link to expect the actual version. @@ -223,9 +314,11 @@ and SourceVertex(methodId: MethodId, output: LinkToArtefact, experimentRoot: str /// Represents a method defined as a command line. and CommandLineVertex(methodId : MethodId, experimentRoot: string, inputs: LinkToArtefact list, outputs: LinkToArtefact list, command: string) = + let exitCodeLockObj = obj() + let outputStatusesLockObj = obj() let mutable workingDirectory: ExperimentRelativePath = String.Empty let mutable doNotClean = false - let mutable commandExitCode: int option = None + let mutable commandExitCode: MdMap = MdMap.empty member s.MethodId = methodId member s.ExperimentRoot = experimentRoot @@ -242,9 +335,12 @@ and CommandLineVertex(methodId : MethodId, experimentRoot: string, inputs: LinkT member s.Command = command /// None if the command execution was not launched or not finished yet, other wise holds the execution exit code - member s.ExitCode - with get() = commandExitCode - and set v = commandExitCode <- v + member s.GetExitCode index = + MdMap.find index commandExitCode + + member s.SetExitCode index exitCode= + lock exitCodeLockObj (fun () -> + commandExitCode <- MdMap.add index (Some exitCode) commandExitCode) /// From where the Command must be executed. /// Experiment root related @@ -281,7 +377,7 @@ and CommandLineVertex(methodId : MethodId, experimentRoot: string, inputs: LinkT else do! input.ExpectActualVersionAsync index input.Artefact.SaveAlphFile() - }) + }) Seq.append outLinksUpdates inputLinksUpdates |> Async.Parallel |> Async.Ignore @@ -410,6 +506,13 @@ and Graph (experimentRoot:string) = | None -> Error (sprintf "ArtefactID %A is not found in the dependency graph" artefactId |> SystemError) + member s.GetMethod methodId = + match methodVertices |> Map.tryFind methodId with + | Some(vertex) -> + Ok vertex + | None -> + Error (sprintf "MethodID %A is not found in the dependency graph" methodId |> SystemError) + /// Adds a method vertex for the given artefact. member private s.AddOrGetSource (output: LinkToArtefact) : SourceVertex = let methodId = getMethodId (Seq.singleton output.Artefact.Id) diff --git a/AlpheusCore/StatusGraph.fs b/AlpheusCore/StatusGraph.fs index b7769f6..f85a917 100644 --- a/AlpheusCore/StatusGraph.fs +++ b/AlpheusCore/StatusGraph.fs @@ -7,104 +7,117 @@ open Angara.States open AlphFiles open System open ItisLab.Alpheus +open System.IO open ItisLab.Alpheus.PathUtils +open ItisLab.Alpheus.AngaraGraphCommon +open DependencyGraph +open Angara.Data + +type ArtefactItem = + { ArtefactId: ArtefactId + Index: string list + Status: MethodInstanceStatus + } + + +type SourceMethod(source: SourceVertex, experimentRoot, checkStoragePresense) = + inherit AngaraGraphNode(DependencyGraph.Source source) + + override s.Execute(_, _) = // ignoring checkpoints + async{ + let expectedArtefact = source.Output + let artefact = expectedArtefact.Artefact + + let getItemStatus (link:LinkToArtefact) index = + link.AnalyzeStatus checkStoragePresense index + + // Output of the method is an scalar or a vector of full paths to the data of the artefact. + let indices = + artefact.Id + |> PathUtils.enumerateItems experimentRoot + |> MdMap.toSeq |> Seq.map fst |> Array.ofSeq + + let! itemStatuses = + indices |> Array.map (getItemStatus expectedArtefact) |> Async.Parallel + + let linkStatusToCommandVertextStatus status = + match status with + | LocalUnexpected -> + // Source methods always updates their output. + // Always valid if there is any disk version of the artefact + UpToDate [ArtefactLocation.Local] + | NotFound -> + // that's bad! as there is not local nor remote version + // we can't get the artefact data by any means + // this is exception + failwith "The artefact data is not found neither on disk nor in any of the available storages" + | Local -> UpToDate [ArtefactLocation.Local] + | Remote -> UpToDate [ArtefactLocation.Remote] + let result = + Array.map2 (fun index status -> {ArtefactId = artefact.Id; Index = index; Status = linkStatusToCommandVertextStatus status}) indices itemStatuses + |> Seq.map (fun x -> x :> Artefact) + |> List.ofSeq + + return seq{ yield (result, null) } + } |> Async.RunSynchronously + -type ArtefactStatus = { - Id: ArtefactId - IsOnDisk: bool - IsUpToDate: bool - IsTracked: bool - ProducedVersionStorages: string list - CurrentDiskVersionStorages: string list -} - -[] -type StatusGraphNode(depCount,outCount) = - inherit ExecutableMethod(System.Guid.NewGuid(), [ for i in 0..(depCount-1) -> typeof] , [for i in 0..(outCount-1) -> typeof]) - - member s.OutputCount = - outCount - -type SourceGraphNode(orphanArtefact:DependencyGraph.LinkToArtefact, experimentRoot: string) = - inherit StatusGraphNode(0,1) - - override s.Execute(_, _) = //ignoring inputs and checkpoint. - // Just utilizing parallel method computation feature of AngaraFlow to check the statuses of all method vertex - - let isOnDisk = orphanArtefact.Artefact.ActualVersion.IsSome - - // source method is always up to date, thus succeeds - let result = { - Id = orphanArtefact.Artefact.Id; - IsUpToDate = (not isOnDisk) || (orphanArtefact.Artefact.ActualHash.Value = orphanArtefact.ExpectedVersion.Value); - IsOnDisk = isOnDisk; - IsTracked = orphanArtefact.Artefact.IsTracked - ProducedVersionStorages = orphanArtefact.StoragesContainingVersion - CurrentDiskVersionStorages = orphanArtefact.Artefact.StoragesContainingActualHash - } - - seq{ yield [result :> Artefact], null } - -type NotSourceGraphNode(methodVertex:DependencyGraph.CommandLineVertex) = - inherit StatusGraphNode(methodVertex.Inputs.Length, methodVertex.Outputs.Length) - - member s.FirstOutputID = - methodVertex.MethodId +type CommandMethod(command: CommandLineVertex, + experimentRoot, + checkStoragePresence: HashString seq -> Async) = + inherit AngaraGraphNode(DependencyGraph.Command command) override s.Execute(inputs, _) = //ignoring checkpoint. - let inputs = inputs |> List.map (fun x -> x:?> ArtefactStatus) - // Just utilizing parallel method computation feature of AngaraFlow to check the statuses of all method vertex - - // There can be 3 reasons why the node can be outdated (execution halts) - // a) inputs are outdated - // b) one of the inputs hash does not match - // c) one of the output hashes does nor match - - let outputs = methodVertex.Outputs - let outputToStatus idx isUpToDate = - let output = outputs.[idx] - { - Id = output.Artefact.Id - IsUpToDate = isUpToDate - IsOnDisk = - match output.Artefact.ActualHash with - | None -> false - | Some(_) -> true - IsTracked = output.Artefact.IsTracked - ProducedVersionStorages = output.StoragesContainingVersion - CurrentDiskVersionStorages = output.Artefact.StoragesContainingActualHash - } - - let outdatedResult = seq { yield (List.init methodVertex.Outputs.Count (fun i -> outputToStatus i false:> Artefact) , null) } - - let isVersionMismatch expected actual = - match expected,actual with - | _,None -> false // if actual file is missing. That's OK. There is no version mismatch - | Some(expected),Some(actual) -> expected <> actual - | _ -> raise(NotImplementedException("need to define behavior")) - - // checking a) - if List.exists (fun i -> not i.IsUpToDate) inputs then - outdatedResult - else - if - // Checking b) - (Seq.exists (fun (input:DependencyGraph.VersionedArtefact) -> isVersionMismatch input.ExpectedVersion input.Artefact.ActualHash) methodVertex.Inputs) || - // Checking c) - (Seq.exists (fun (output:DependencyGraph.VersionedArtefact) -> isVersionMismatch output.ExpectedVersion output.Artefact.ActualHash) methodVertex.Outputs) then - outdatedResult - else - let results = List.init methodVertex.Outputs.Count (fun i -> outputToStatus i true:> Artefact) - seq { yield (results, null) } - -let buildStatusGraph (g:DependencyGraph.Graph) = - let factory method : StatusGraphNode = + async{ + // Rules of execution + // The artefact is valid either if actual disk version matches expected version or if the disk version is absend and expected version is restorable from storage + // We can bypass the computation entirely if inputs and outputs are valid + + let inputItems = inputs |> List.map (fun inp -> inp :?> ArtefactItem) + let index = + inputItems + |> Seq.map(fun item -> item.Index) + |> Seq.fold(fun (max: string list) index -> if index.Length > max.Length then index else max) [] + + let isUpToDate status = + match status with + | UpToDate _ -> true + | Outdated _ -> false + + let areInputsValid = inputItems |> Seq.map (fun x -> x.Status) |> Seq.forall isUpToDate + + let! currentVertexStatus = + async { + if not areInputsValid then + // shortcut: just propagating outdated status down the graph + return Outdated InputsOutdated + else + // actually checking current vertex + return! getCommandVertexStatus checkStoragePresence command index + } + + let outputIds = + command.Outputs // the order is important here + |> List.map(fun out -> out.Artefact.Id) + + let prepareAngaraArtefact artId : Artefact = + upcast {Index = index; Status = currentVertexStatus; ArtefactId= artId } + let result = Seq.singleton(List.map prepareAngaraArtefact outputIds, null) + return result + } |> Async.RunSynchronously + +let buildStatusGraph (g:DependencyGraph.Graph) experimetRoot checkStoragePresence = + let factory method : AngaraGraphNode = match method with - | DependencyGraph.Source(source) -> upcast SourceGraphNode(source.Output) - | DependencyGraph.Command(computed) -> upcast NotSourceGraphNode(computed) + | DependencyGraph.Source(source) -> upcast SourceMethod(source, experimetRoot, checkStoragePresence) + | DependencyGraph.Command(computed) -> upcast CommandMethod(computed, experimetRoot, checkStoragePresence) g |> DependencyGraphToAngaraWrapper |> AngaraTranslator.translate factory + +type ArtefactStatus = +| UpToDate of ArtefactLocation +| NeedsRecomputation of OutdatedReason -let printStatuses (g:FlowGraph) = +let getStatuses (g:FlowGraph) = let state = { TimeIndex = 0UL @@ -112,47 +125,45 @@ let printStatuses (g:FlowGraph) = Vertices = Map.empty } try - use engine = new Engine(state,Scheduler.ThreadPool()) + use engine = new Engine(state,Scheduler.ThreadPool()) engine.Start() // engine.Changes.Subscribe(fun x -> x.State.Vertices) let final = Control.pickFinal engine.Changes let finalState = final.GetResult() + + let vertices = finalState.Vertices + + let vertexStateToStatus state = + let toArtefactItemStatus (x:MethodOutput) = + let outputsCount = (x :> IVertexData).Shape.Length + let methodInstanceStatusToOutputStatus (status:MethodInstanceStatus) outputIdx = + match status with + | MethodInstanceStatus.UpToDate outputs -> + ArtefactStatus.UpToDate(List.item outputIdx outputs) + | Outdated reason -> + NeedsRecomputation reason + let outputNumToRes idx : (ArtefactId * string list* ArtefactStatus) = + let artItem: ArtefactItem = downcast x.TryGet(idx).Value + artItem.ArtefactId, artItem.Index, (methodInstanceStatusToOutputStatus artItem.Status idx) + Seq.init outputsCount outputNumToRes + let itemsStatus = state |> MdMap.toSeq |> Seq.collect (fun x -> let _,v = x in v.Data.Value |> toArtefactItemStatus) + itemsStatus + + let verticesPairs = vertices |> Map.toSeq |> Seq.collect (fun x -> let _,output = x in (vertexStateToStatus output)) + + // assembling MdMap back + let folder (state:Map>) (elem: ArtefactId * string list* ArtefactStatus) = + let artId, index, status = elem + let artMap = + match Map.tryFind artId state with + | Some m -> m + | None -> MdMap.empty + let updatedArtMap = MdMap.add index status artMap + Map.add artId updatedArtMap state + let result = Seq.fold folder Map.empty verticesPairs - let getVertexOutdatedOutputs (vertex:StatusGraphNode) = - let N = vertex.OutputCount - Seq.init N (fun idx -> Control.outputScalar(vertex,idx) finalState ) |> Seq.choose (fun (s:ArtefactStatus) -> if s.IsUpToDate then None else Some(s.Id)) - - let getVertexOutputStatusStrings (vertex:StatusGraphNode) = - let N = vertex.OutputCount - let statusToStr s = - let diskStatus = if s.IsOnDisk then "on disk" else "absent" - let uptToDateStatus = if s.IsUpToDate then "up to date" else "needs (re)computation" - let expectedVerStorages = - if List.length s.ProducedVersionStorages > 0 then - sprintf "restorable from %s" (String.Join(",",s.ProducedVersionStorages)) - else - String.Empty - let actualVerStorages = - if List.length s.CurrentDiskVersionStorages > 0 then - sprintf "Saved in %s" (String.Join(",",s.ProducedVersionStorages)) - else - "Unsaved" - let storagesStatus = - if s.IsTracked then - if s.IsOnDisk then - actualVerStorages - else - expectedVerStorages - else String.Empty - s.Id, (sprintf "%s\t%s\t%s" diskStatus uptToDateStatus storagesStatus) - Seq.init N (fun idx -> Control.outputScalar(vertex,idx) finalState ) |> Seq.map statusToStr - - let outdatedArtefacts = finalState.Graph.Structure.Vertices |> Set.toSeq |> Seq.collect getVertexOutdatedOutputs - let allArtefactStatuses = finalState.Graph.Structure.Vertices |> Set.toSeq |> Seq.collect getVertexOutputStatusStrings |> Seq.sortBy fst |> Seq.map (fun x -> let id,status = x in sprintf "%10A:\t%s" id status) - let statuses = String.Join("\n\t", allArtefactStatuses) - printfn "Statuses:\n\t%s" statuses - Ok() + Ok(result) with | :? Control.FlowFailedException as flowExc -> let failed = String.Join("\n\t", flowExc.InnerExceptions |> Seq.map(fun e -> e.ToString())) - Error(sprintf "Failed to compute the artefacts: \n\t%s" failed) \ No newline at end of file + Error(SystemError(sprintf "Failed to compute the artefacts: \n\t%s" failed)) \ No newline at end of file diff --git a/AlpheusUnitTests/ApiTests.fs b/AlpheusUnitTests/ApiTests.fs index e4b9b8c..3e965c7 100644 --- a/AlpheusUnitTests/ApiTests.fs +++ b/AlpheusUnitTests/ApiTests.fs @@ -11,6 +11,7 @@ open System.IO open ItisLab.Alpheus.DependencyGraph open System open ItisLab.Alpheus +open Angara.Data /// can be used as calssData for XUnit theory. returns all of the artefactIDs for the sample experiment type ArtefactIdSource() = @@ -239,6 +240,11 @@ type DepGraphSaveRestore(output) = } |> toAsyncFact +let equalStatuses expected actual = + let s1 = Map.toSeq expected + let s2 = Map.toSeq actual + Seq.forall2 (fun x y -> let idx1,v1 = x in let idx2,v2 = y in (idx1=idx2) && MdMap.equal (fun _ elem1 elem2 -> elem1=elem2) v1 v2) s1 s2 + type ScalarScenarios(output) = inherit SingleUseOneTimeDirectory(output) @@ -297,6 +303,101 @@ type ScalarScenarios(output) = Environment.CurrentDirectory <- savedWD } |> toAsyncFact + [] + member s.``Status: Uncomputed chain``() = + async { + let savedWD = Environment.CurrentDirectory + try + let path = Path.GetFullPath s.Path + Environment.CurrentDirectory <- s.Path + do! buildExperiment(path) + + let res = API.status(path, ArtefactId.Path "1_2_3.txt") + match res with + | Ok r -> + let expectedStatuses:Map> = + [ + ArtefactId.Path "1.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.NeedsRecomputation OutdatedReason.InputsOutdated); + ArtefactId.Path "1_2_3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.NeedsRecomputation OutdatedReason.InputsOutdated); + ] |> Map.ofList + Assert.True(equalStatuses expectedStatuses r) + | Error e-> + Assert.True(false, sprintf "Error: %A" e) + + finally + Environment.CurrentDirectory <- savedWD + } + + [] + member s.``Status: Computed chain``() = + async { + let savedWD = Environment.CurrentDirectory + try + let path = Path.GetFullPath s.Path + Environment.CurrentDirectory <- s.Path + do! buildExperiment(path) + output.WriteLine("TEST: experiment graph is constructed") + + assertResultOk <| API.compute(path, ArtefactId.Path "1_2_3.txt") + + let res = API.status(path, ArtefactId.Path "1_2_3.txt") + match res with + | Ok r -> + let expectedStatuses:Map> = + [ + ArtefactId.Path "1.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2_3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ] |> Map.ofList + Assert.True(equalStatuses expectedStatuses r) + | Error e-> + Assert.True(false, sprintf "Error: %A" e) + + finally + Environment.CurrentDirectory <- savedWD + } + + [] + member s.``Status: changed input content of computed graph``() = + async { + let savedWD = Environment.CurrentDirectory + try + let path = Path.GetFullPath s.Path + Environment.CurrentDirectory <- s.Path + + do! buildExperiment(path) + output.WriteLine("TEST: experiment graph is constructed") + + assertResultOk <| API.compute(path, ArtefactId.Path "1_2_3.txt") + + output.WriteLine("TEST: Artefacts are computed") + + // now changing 3.txt + do! File.WriteAllTextAsync(Path.Combine(path,"3.txt"),"File 3 changed\\r\\n") |> Async.AwaitTask + + let res = API.status(path, ArtefactId.Path "1_2_3.txt") + match res with + | Ok r -> + let expectedStatuses:Map> = + [ + ArtefactId.Path "1.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); // anyway up to date + ArtefactId.Path "1_2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2_3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.NeedsRecomputation OutdatedReason.InputsOutdated); // but this is invalid now + ] |> Map.ofList + Assert.True(equalStatuses expectedStatuses r) + | Error e-> + Assert.True(false, sprintf "Error: %A" e) + finally + Environment.CurrentDirectory <- savedWD + } |> toAsyncFact + [] member s.``Changed input content invalidates vertex``() = async { @@ -361,7 +462,43 @@ type ScalarScenarios(output) = Assert.Equal(content1,content2) finally Environment.CurrentDirectory <- savedWD - } |> toAsyncFact + } + + [] + member s.``Status: Changed output content invalidates vertex``() = + async { + let savedWD = Environment.CurrentDirectory + try + let path = Path.GetFullPath s.Path + Environment.CurrentDirectory <- s.Path + + do! buildExperiment(path) + output.WriteLine("TEST: experiment graph is constructed") + + assertResultOk <| API.compute(path, ArtefactId.Path "1_2_3.txt") + + output.WriteLine("TEST: first time computed 1_2_3.txt") + + // now changing 1_2_3.txt + do! File.WriteAllTextAsync(Path.Combine(path,"1_2_3.txt"),"manually changed\\r\\n") |> Async.AwaitTask + + let res = API.status(path, ArtefactId.Path "1_2_3.txt") + match res with + | Ok r -> + let expectedStatuses:Map> = + [ + ArtefactId.Path "1.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2_3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.NeedsRecomputation OutdatedReason.OutputsOutdated); // but this is invalid now + ] |> Map.ofList + Assert.True(equalStatuses expectedStatuses r) + | Error e-> + Assert.True(false, sprintf "Error: %A" e) + finally + Environment.CurrentDirectory <- savedWD + } [] @@ -398,6 +535,44 @@ type ScalarScenarios(output) = Environment.CurrentDirectory <- savedWD } |> toAsyncFact + [] + member s.``Status: changed command invalidates vertex``() = + async { + let savedWD = Environment.CurrentDirectory + try + let path = Path.GetFullPath s.Path + Environment.CurrentDirectory <- s.Path + + do! buildExperiment(path) + output.WriteLine("TEST: experiment graph is constructed") + + assertResultOk <| API.compute(path, ArtefactId.Path "1_2_3.txt") + + output.WriteLine("TEST: first time computed 1_2_3.txt") + + + // now changing 1_2_3.txt producing command + let! res2 = API.buildAsync path ["1_2.txt";"3.txt"] ["1_2_3.txt"] concatCommand2 false // double + assertResultOk res2 + + let res = API.status(path, ArtefactId.Path "1_2_3.txt") + match res with + | Ok r -> + let expectedStatuses:Map> = + [ + ArtefactId.Path "1.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2_3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.NeedsRecomputation OutdatedReason.InputsOutdated); // Inputs, not outputs, because rebuilding command resets expected input versions + ] |> Map.ofList + Assert.True(equalStatuses expectedStatuses r) + | Error e-> + Assert.True(false, sprintf "Error: %A" e) + finally + Environment.CurrentDirectory <- savedWD + } + [] member s.``Changed inputs order invalidates vertex``() = async { @@ -430,7 +605,48 @@ type ScalarScenarios(output) = Assert.NotEqual(content1,content2) finally Environment.CurrentDirectory <- savedWD - } |> toAsyncFact + } + + [] + member s.``Status: changed inputs order invalidates vertex``() = + async { + let savedWD = Environment.CurrentDirectory + try + let path = Path.GetFullPath s.Path + Environment.CurrentDirectory <- s.Path + + do! buildExperiment(path) + output.WriteLine("TEST: experiment graph is constructed") + + assertResultOk <| API.compute(path, ArtefactId.Path "1_2_3.txt") + + output.WriteLine("TEST: first time computed 1_2_3.txt") + + do! assertNonEmptyFile(Path.Combine(path,"1_2.txt")) + let! content1 = File.ReadAllTextAsync(Path.Combine(path,"1_2_3.txt")) |> Async.AwaitTask + + // now changing inputs order + let! res2 = API.buildAsync path ["3.txt";"1_2.txt"] ["1_2_3.txt"] concatCommand false // double + assertResultOk res2 + + let res = API.status(path, ArtefactId.Path "1_2_3.txt") + match res with + | Ok r -> + let expectedStatuses:Map> = + [ + ArtefactId.Path "1.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2_3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.NeedsRecomputation OutdatedReason.InputsOutdated); // Inputs, not outputs, because rebuilding command resets expected input versions + ] |> Map.ofList + Assert.True(equalStatuses expectedStatuses r) + | Error e-> + Assert.True(false, sprintf "Error: %A" e) + + finally + Environment.CurrentDirectory <- savedWD + } [] member s.``recompute missing intermediate while computing final``() = @@ -453,7 +669,40 @@ type ScalarScenarios(output) = finally Environment.CurrentDirectory <- savedWD - } |> toAsyncFact + } + + [] + member s.``Status: missing intermediate``() = + async { + let savedWD = Environment.CurrentDirectory + try + let path = Path.GetFullPath s.Path + Environment.CurrentDirectory <- s.Path + do! buildExperiment(path) + + let res = API.compute(path, ArtefactId.Path "1_2_3.txt") // first compute all + assertResultOk res + + File.Delete(Path.Combine(path,"1_2.txt")) // then delete intermediate + + let res = API.status(path, ArtefactId.Path "1_2_3.txt") + match res with + | Ok r -> + let expectedStatuses:Map> = + [ + ArtefactId.Path "1.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.NeedsRecomputation OutdatedReason.OutputsOutdated); + ArtefactId.Path "1_2_3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.NeedsRecomputation OutdatedReason.InputsOutdated); + ] |> Map.ofList + Assert.True(equalStatuses expectedStatuses r) + | Error e-> + Assert.True(false, sprintf "Error: %A" e) + + finally + Environment.CurrentDirectory <- savedWD + } [] member s.``skip missing (storage present) intermediate while computing final``() = @@ -480,7 +729,43 @@ type ScalarScenarios(output) = finally Environment.CurrentDirectory <- savedWD - } |> toAsyncFact + } + + [] + member s.``Status: intermediate is remotely available``() = + async { + let savedWD = Environment.CurrentDirectory + try + let path = Path.GetFullPath s.Path + Environment.CurrentDirectory <- s.Path + do! buildExperiment(path) + + assertResultOk <| API.compute(path, ArtefactId.Path "1_2_3.txt") // first compute all + + let! res = API.saveAsync(path, ArtefactId.Path "1_2.txt") "local" false // saving intermediate + assertResultOk res + + File.Delete(Path.Combine(path,"1_2.txt")) // then delete intermediate + + let res = API.status(path, ArtefactId.Path "1_2_3.txt") + match res with + | Ok r -> + let expectedStatuses:Map> = + [ + ArtefactId.Path "1.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate ArtefactLocation.Remote); + ArtefactId.Path "1_2_3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ] |> Map.ofList + Assert.True(equalStatuses expectedStatuses r) + | Error e-> + Assert.True(false, sprintf "Error: %A" e) + + finally + Environment.CurrentDirectory <- savedWD + } + [] member s.``restore only intermediate while computing final``() = @@ -500,6 +785,8 @@ type ScalarScenarios(output) = assertResultOk res let! res = API.saveAsync(path, ArtefactId.Path "2.txt") "local" false // saving initial assertResultOk res + let! res = API.saveAsync(path, ArtefactId.Path "3.txt") "local" false // saving initial + assertResultOk res //Deleting all @@ -508,15 +795,114 @@ type ScalarScenarios(output) = Path.Combine(path,"1_2.txt"); Path.Combine(path,"1.txt"); Path.Combine(path,"2.txt"); + Path.Combine(path,"3.txt"); ] |> List.iter File.Delete assertResultOk <| API.compute(path, ArtefactId.Path "1_2_3.txt") // computing all do! assertNonEmptyFile(Path.Combine(path,"1_2_3.txt")) // final is recomputed do! assertNonEmptyFile(Path.Combine(path,"1_2.txt")) // intermediate is restored + do! assertNonEmptyFile(Path.Combine(path,"3.txt")) // intermediate is restored Assert.False(File.Exists(Path.Combine(path,"1.txt"))) // but initials are not restored, a intermediate is sufficient Assert.False(File.Exists(Path.Combine(path,"2.txt"))) // but initials are not restored, a intermediate is sufficient finally Environment.CurrentDirectory <- savedWD - } |> toAsyncFact + } + + [] + member s.``Status: all but final exist remotely``() = + async { + let savedWD = Environment.CurrentDirectory + try + let path = Path.GetFullPath s.Path + Environment.CurrentDirectory <- s.Path + do! buildExperiment(path) + + assertResultOk <| API.compute(path, ArtefactId.Path "1_2_3.txt") // computing all + + // saving all except final + let! res = API.saveAsync(path, ArtefactId.Path "1_2.txt") "local" false // saving intermediate + assertResultOk res + let! res = API.saveAsync(path, ArtefactId.Path "1.txt") "local" false // saving initial + assertResultOk res + let! res = API.saveAsync(path, ArtefactId.Path "2.txt") "local" false // saving initial + assertResultOk res + + + //Deleting all + [ + Path.Combine(path,"1_2_3.txt"); + Path.Combine(path,"1_2.txt"); + Path.Combine(path,"1.txt"); + Path.Combine(path,"2.txt"); + ] |> List.iter File.Delete + + let res = API.status(path, ArtefactId.Path "1_2_3.txt") + match res with + | Ok r -> + let expectedStatuses:Map> = + [ + ArtefactId.Path "1.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate ArtefactLocation.Remote); + ArtefactId.Path "2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate ArtefactLocation.Remote); + ArtefactId.Path "3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local); + ArtefactId.Path "1_2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate ArtefactLocation.Remote); + ArtefactId.Path "1_2_3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.NeedsRecomputation OutdatedReason.OutputsOutdated); + ] |> Map.ofList + Assert.True(equalStatuses expectedStatuses r) + | Error e-> + Assert.True(false, sprintf "Error: %A" e) + finally + Environment.CurrentDirectory <- savedWD + } + + [] + member s.``Status: all exist remotely``() = + async { + let savedWD = Environment.CurrentDirectory + try + let path = Path.GetFullPath s.Path + Environment.CurrentDirectory <- s.Path + do! buildExperiment(path) + + assertResultOk <| API.compute(path, ArtefactId.Path "1_2_3.txt") // computing all + + // saving all except final + let! res = API.saveAsync(path, ArtefactId.Path "1_2.txt") "local" false // saving intermediate + assertResultOk res + let! res = API.saveAsync(path, ArtefactId.Path "1.txt") "local" false // saving initial + assertResultOk res + let! res = API.saveAsync(path, ArtefactId.Path "2.txt") "local" false // saving initial + assertResultOk res + let! res = API.saveAsync(path, ArtefactId.Path "3.txt") "local" false // saving initial + assertResultOk res + let! res = API.saveAsync(path, ArtefactId.Path "1_2_3.txt") "local" false + assertResultOk res + + + //Deleting all + [ + Path.Combine(path,"1_2_3.txt"); + Path.Combine(path,"1_2.txt"); + Path.Combine(path,"1.txt"); + Path.Combine(path,"2.txt"); + Path.Combine(path,"3.txt"); + ] |> List.iter File.Delete + + let res = API.status(path, ArtefactId.Path "1_2_3.txt") + match res with + | Ok r -> + let expectedStatuses:Map> = + [ + ArtefactId.Path "1.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate ArtefactLocation.Remote); + ArtefactId.Path "2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate ArtefactLocation.Remote); + ArtefactId.Path "3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate ArtefactLocation.Remote); + ArtefactId.Path "1_2.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate ArtefactLocation.Remote); + ArtefactId.Path "1_2_3.txt",MdMap.scalar (StatusGraph.ArtefactStatus.UpToDate ArtefactLocation.Remote); + ] |> Map.ofList + Assert.True(equalStatuses expectedStatuses r) + | Error e-> + Assert.True(false, sprintf "Error: %A" e) + finally + Environment.CurrentDirectory <- savedWD + } \ No newline at end of file