Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

status command support #32

Merged
merged 6 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions Alpheus.CLI/Runner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ let run (programName:string) (parseResults:ParseResults<AlpheusArgs>) : Result<u
printfn "%s" usage
Ok()
else if parseResults.Contains Init then
result {
result {
let! cwd = (Directory.GetCurrentDirectory(), (fun p -> not (Config.isExperimentDirectory p)), (UserError "The current directory is already an Alpheus experiment directory"))
return API.createExperimentDirectoryAsync cwd |> Async.RunSynchronously |> ignore
}
Expand Down Expand Up @@ -62,16 +62,36 @@ let run (programName:string) (parseResults:ParseResults<AlpheusArgs>) : Result<u
let statusArgs = parseResults.GetResult <@ Status @>
let artefactPath = statusArgs.GetResult <@ StatusArgs.File @>
let! artefact = API.artefactFor artefactPath
return! Error (SystemError "NOT IMPLEMENTED")
//return! API.status artefact
let statusesRes = API.status artefact
match statusesRes with
| Ok(statuses)->
let printStatus artId (status:Angara.Data.MdMap<string,StatusGraph.ArtefactStatus>) =
let statusToString status =
match status with
| StatusGraph.UpToDate location ->
match location with
| DependencyGraph.Local -> "Up to date"
| DependencyGraph.Remote -> "Up to date (in storage)"
| StatusGraph.NeedsRecomputation _ ->
"Need recomputation"
if status.IsScalar then
printfn "%-50s: %s" (artId.ToString()) (status.AsScalar() |> statusToString)
else
printfn "%-50s:" (artId.ToString())
let printItem pair =
let (index:string list),v = pair
printfn "\t%-50s: %s" (String.concat " " index) (statusToString v)
status |> Angara.Data.MdMap.toSeq |> Seq.iter printItem
statuses |> Map.iter printStatus
return ()
| Error e -> return! Error e
}
elif parseResults.Contains Restore then
result {
let restoreArgs = parseResults.GetResult <@ Restore @>
let artefactPath = restoreArgs.GetResult <@ RestoreArgs.Path @>
let! artefact = API.artefactFor artefactPath
return! Error (SystemError "NOT IMPLEMENTED")
//return! API.restoreAsync artefact |> Async.RunSynchronously
return! API.restoreAsync artefact |> Async.RunSynchronously
}
elif parseResults.Contains Save then
result {
Expand Down
43 changes: 22 additions & 21 deletions AlpheusCore/API.fs
Original file line number Diff line number Diff line change
Expand Up @@ -112,24 +112,26 @@ let private restoreSingleItemAsync experimentRoot (path,versionToRestore) =
return! restore path versionToRestore
}

/// Checks whether the specified versions can be extraced from any available storages
let internal checkStoragePresence experimentRoot (versions:HashString seq) : Async<bool array> =
async {
let! config = Config.openExperimentDirectoryAsync experimentRoot
let checker = config.ConfigFile.Storage |> Map.toSeq |> StorageFactory.getPresenseChecker experimentRoot
let nonEmptylist listAsync =
async {
let! l = listAsync
return l |> Array.map (fun x -> not (List.isEmpty x))
}
let res = versions |> Array.ofSeq |> (checker >> nonEmptylist)
return! res
}

/// (Re)Computes the artefact specified
let compute (experimentRoot, artefactId) =
async {
let! g = buildDependencyGraphAsync experimentRoot [artefactId]

let checkStoragePresence (versions:HashString array) : Async<bool array> =
async {
let! config = Config.openExperimentDirectoryAsync experimentRoot
let checker = config.ConfigFile.Storage |> Map.toSeq |> StorageFactory.getPresenseChecker experimentRoot
let nonEmptylist listAsync =
async {
let! l = listAsync
return l |> Array.map (fun x -> not (List.isEmpty x))
}
let res = versions |> (checker >> nonEmptylist)
return! res
}


let restoreFromStorage (pairs:(HashString*string) array) =
async {
let swapedPairs = pairs |> Array.map (fun p -> let x,y = p in y,x )
Expand All @@ -143,20 +145,19 @@ let compute (experimentRoot, artefactId) =
return ()
}

//flow graph to calculate statuses
let flowGraph = ComputationGraph.buildGraph experimentRoot g checkStoragePresence restoreFromStorage
// flow graph to calculate statuses
let flowGraph = ComputationGraph.buildGraph experimentRoot g (checkStoragePresence experimentRoot) restoreFromStorage
logVerbose LogCategory.API "Running computations"
return ComputationGraph.doComputations flowGraph
} |> Async.RunSynchronously

/// Prints to the stdout the textural statuses of the artefact and its provenance
let status (experimentRoot, artefactId) =
failwith "Not implemented" |> ignore
//async {
// let! g = buildDependencyGraphAsync experimentRoot [artefactId]
// let flowGraph = StatusGraph.buildStatusGraph g
// return StatusGraph.printStatuses flowGraph
//} |> Async.RunSynchronously
async {
let! g = buildDependencyGraphAsync experimentRoot [artefactId]
let flowGraph = StatusGraph.buildStatusGraph g experimentRoot (checkStoragePresence experimentRoot)
return StatusGraph.getStatuses flowGraph
} |> Async.RunSynchronously

let private artefactVersionToPaths fullArtPath (artefactVersion:ArtefactVersion) =
if artefactVersion.IsScalar then
Expand Down
3 changes: 2 additions & 1 deletion AlpheusCore/AlpheusCore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
<Compile Include="DependencyGraph.fs" />
<Compile Include="DependencyGraphUtils.fs" />
<Compile Include="DependencyGraphToAngaraWrapper.fs" />
<None Include="StatusGraph.fs" />
<Compile Include="AngaraGraphCommon.fs" />
<Compile Include="StatusGraph.fs" />
<Compile Include="ComputationContext.fs" />
<Compile Include="ExecuteCommand.fs" />
<Compile Include="ComputationGraph.fs" />
Expand Down
153 changes: 153 additions & 0 deletions AlpheusCore/AngaraGraphCommon.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
module ItisLab.Alpheus.AngaraGraphCommon

open System
open System.IO
open AlphFiles
open Angara.Data
open Angara.Graph
open Angara.Execution
open Angara.States
open ItisLab.Alpheus.DependencyGraph
open ItisLab.Alpheus.PathUtils

let internal arrayType<'a> rank : Type =
if rank < 0 then invalidArg "rank" "Rank is negative"
else if rank = 0 then typeof<ArtefactId>
else typeof<ArtefactId>.MakeArrayType(rank)

let internal inputRank (v:MethodVertex) =
match v with
| Source src -> 0
| Command cmd -> cmd.Inputs |> Seq.map(fun a -> a.Artefact.Rank) |> Seq.max

let internal outputRank (v:MethodVertex) =
match v with
| Source src -> src.Output.Artefact.Rank
| Command cmd -> cmd.Outputs |> Seq.map(fun a -> a.Artefact.Rank) |> Seq.max

let internal methodRank (v:MethodVertex) = min (outputRank v) (inputRank v)

let getOutputTypes (v:MethodVertex) =
let rank = methodRank v
match v with
| Source src -> [max 0 (src.Output.Artefact.Rank - rank) |> arrayType]
| Command cmd -> cmd.Outputs |> Seq.map(fun a -> max 0 (a.Artefact.Rank - rank) |> arrayType) |> List.ofSeq

let getInputTypes (v:MethodVertex) =
let rank = methodRank v
match v with
| Source src -> List.empty
| Command cmd -> cmd.Inputs |> Seq.map(fun a -> max 0 (a.Artefact.Rank - rank) |> arrayType) |> List.ofSeq


let rec internal toJaggedArrayOrValue (mapValue: (string list * 'a) -> 'c) (index: string list) (map: MdMapTree<string,'a>) : obj =
let isValue = function
| MdMapTree.Value _ -> true
| MdMapTree.Map _ -> false

let mapToArray (getElement: (string * MdMapTree<string,'a>) -> 'b) (map: Map<string,MdMapTree<string,'a>>) : 'b[] =
map |> Map.toSeq |> Seq.sortBy fst |> Seq.map getElement |> Seq.toArray

let append v list = list |> List.append [v]

match map with
| MdMapTree.Value v -> upcast(mapValue (index, v))
| MdMapTree.Map subMap ->
match subMap |> Map.forall(fun _ -> isValue) with
| true -> // final level
upcast(subMap |> mapToArray (fun (k,t) ->
let newIndex = index |> append k
match t with
| MdMapTree.Value v -> mapValue (newIndex, v)
| MdMapTree.Map _ -> failwith "Unreachable case"))
| false ->
upcast(subMap |> mapToArray (fun (k,t) ->
let newIndex = index |> append k
match t with
| MdMapTree.Map _ -> toJaggedArrayOrValue mapValue newIndex t
| MdMapTree.Value _ -> failwith "Data is incomplete and has missing elements"))


let resolveIndex (index:string list) (map: MdMap<string, 'a option>) =
let rec resolveInTree (index:string list) (map: MdMapTree<string, 'a option>) =
match map, index with
| _,[] -> Some map
| MdMapTree.Value value,_ -> Some map // index length > rank of the map
| MdMapTree.Map values, k :: tail ->
match values |> Map.tryFind k with
| Some value -> resolveInTree tail value
| None -> None
match resolveInTree index (map |> MdMap.toTree) with
| Some(MdMapTree.Value v) -> v
| Some(MdMapTree.Map map) when map.IsEmpty -> None
| Some(MdMapTree.Map _) -> invalidOp "Only one-to-one vectors are supported at the moment"
| None -> None

let extractActualVersionsFromLinks index links =
async {
let! actualVersion =
links
|> Seq.map (fun (a:LinkToArtefact) -> a.Artefact.ActualVersionAsync)
|> Async.Parallel
return actualVersion |> Array.map (fun v -> resolveIndex index v)
}

let extractExpectedVersionsFromLinks index links =
links |> Seq.map (fun (a:LinkToArtefact) -> resolveIndex index a.ExpectedVersion)

/// whether the command method needs actual CLI tool execution
/// common code that is used both during the artefact status calculation and artefact production
let getCommandVertexStatus checkStoragePresence (command:CommandLineVertex) index =
let methodItemId = command.MethodId |> applyIndex index
let logVerbose str = Logger.logVerbose Logger.Execution (sprintf "%s: %s" methodItemId str)
async {
let expectedInputItemVersions = extractExpectedVersionsFromLinks index command.Inputs |> Array.ofSeq
let! actualInputItemVersions = extractActualVersionsFromLinks index command.Inputs

let! inputsStatus = findExpectedArtefacts checkStoragePresence expectedInputItemVersions actualInputItemVersions

match inputsStatus with
// we can avoid checking outputs to speed up the work
| SomeAreNotFound ->
logVerbose "Needs recomputation as some of the inputs not found"
return Outdated InputsOutdated
| SomeAreLocalUnexpected ->
logVerbose "Needs recomputation as disk version of the input does not match expected version"
return Outdated InputsOutdated
| AllExist _ ->
// checking outputs
let expectedOutputItemVersions = extractExpectedVersionsFromLinks index command.Outputs |> Array.ofSeq
let! actualOutputItemVersions = extractActualVersionsFromLinks index command.Outputs
let! outputsStatus = findExpectedArtefacts checkStoragePresence expectedOutputItemVersions actualOutputItemVersions
match outputsStatus with
| SomeAreNotFound ->
logVerbose "Needs recomputation as some of the outputs not found"
return Outdated OutputsOutdated
| SomeAreLocalUnexpected ->
logVerbose "Needs recomputation as disk version of the output does not match expected version"
return Outdated OutputsOutdated
| AllExist outputs ->
return UpToDate outputs
}


/// This type represents an Angara Flow method.
/// Note that execution modifies the given vertex and it is Angara Flow execution runtime who controls
/// the concurrency.
[<AbstractClass>]
type AngaraGraphNode(producerVertex:MethodVertex) =
inherit ExecutableMethod(
System.Guid.NewGuid(),
getInputTypes producerVertex,
getOutputTypes producerVertex)

member s.VertexID =
match producerVertex with
| Source(s) -> s.MethodId
| Command(comp) -> comp.MethodId

override s.ToString() =
match producerVertex with
| Source src -> sprintf "Source %A" src.Output.Artefact.Id
| Command cmd -> sprintf "Command %s" cmd.Command

Loading