Skip to content

Commit

Permalink
Generalize saving of logs, describe
Browse files Browse the repository at this point in the history
This should completely mitigate #100 and #112, we display the archived
output only if it is different, cf. `plugin.ml`:

    if Some (out ^ err) = s.archived
    then should_display_archived := false

It also makes the saving work for the AWS and Local-docker backends.
  • Loading branch information
smondet committed May 23, 2017
1 parent a9b5a27 commit 83cbb5d
Show file tree
Hide file tree
Showing 15 changed files with 271 additions and 199 deletions.
67 changes: 50 additions & 17 deletions src/ketrew_backend/plugin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -344,21 +344,58 @@ module Long_running_implementation : Ketrew.Long_running.LONG_RUNNING = struct
| `Error (`Client ce) ->
fail (Ketrew_pure.Internal_pervasives.Log.verbatim (Client.Error.to_string ce))

let freshness_list_to_markup l ~how =
let job_query_result_to_markup l =
let open Ketrew_pure.Internal_pervasives.Display_markup in
List.map l
~f:(fun (`Id id, `Describe_output o, `Freshness frns) ->
description_list [
"Job-id", command id;
"Freshness", text frns;
begin match how with
| `Block -> "Output", code_block o
| `Link -> "Link", uri o
end;
])
List.map l ~f:(fun (id, qr) ->
description_list
(("Job-id", command id)
:: List.map qr ~f:(fun (section, result) ->
section,
begin match result with
| `Url u -> uri u
| `Saved_command s ->
let open Hyper_shell.Saved_command in
let code_block_or_empty =
function
| s when String.strip s = "" -> text "Empty"
| other -> code_block other in
let should_display_archived = ref true in
let command_and_result = [
"Command", command s.command;
begin match s.outcome with
| `Error e ->
"Error",
let open Hyper_shell.Error in
description_list [
"STDOUT", option ~f:code_block_or_empty e.stdout;
"STDERR", option ~f:code_block_or_empty e.stderr;
"Status",
option e.status
~f:(text_of_stringable Pvem_lwt_unix.System.Shell.status_to_string);
"Exception", option e.exn ~f:command;
]
| `Ok (out, err) ->
begin
if Some (out ^ err) = s.archived
then should_display_archived := false
end;
"Success", description_list [
"STDOUT", code_block_or_empty out;
"STDERR", code_block_or_empty err;
]
end;
] in
let archived =
if !should_display_archived then [
"Archived content", option ~f:code_block_or_empty s.archived;
] else [] in
command_and_result @ archived |> description_list
end)))
|> concat
|> serialize



module Markup = Ketrew_pure.Internal_pervasives.Display_markup

let markup_job_state : Coclobas.Job.t -> Markup.t = fun job ->
Expand Down Expand Up @@ -450,16 +487,12 @@ module Long_running_implementation : Ketrew.Long_running.LONG_RUNNING = struct
| ds, `Running {job_id; _} when ds = Query_names.describe ->
client_query begin
Client.get_job_descriptions created.client [job_id]
>>| freshness_list_to_markup ~how:`Block
>>| job_query_result_to_markup
end
| ds, `Running {job_id; _} when ds = Query_names.logs ->
let how =
match Job.Specification.kind created.specification with
| `Aws_batch -> `Link
| `Kube | `Local_docker -> `Block in
client_query begin
Client.get_job_logs created.client [job_id]
>>| freshness_list_to_markup ~how
>>| job_query_result_to_markup
end
| other, _ -> fail (s "Unknown query: " % s other)

Expand Down
19 changes: 12 additions & 7 deletions src/lib/aws_batch_job.ml
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,18 @@ let get_update ~log ~id ~state =
fail (Error.status ~id ~aws_id:state.State.aws_id (`Parsing_status status))
end

let describe ~log ~id ~state =
ksprintf
(command_must_succeed_with_output ~log ~id)
let describe ~storage ~log ~id ~state =
let cmd =
sprintf
"aws batch describe-jobs --jobs %s"
state.State.aws_id
>>= fun (out, err) ->
return (`Fresh, out)
state.State.aws_id in
let save_path = Job_common.save_path id `Describe_output in
Hyper_shell.Saved_command.run
~storage ~log ~cmd ~path:save_path
~section:(Job_common.job_section id)
~keep_the:`Latest
>>= fun logres ->
return (Job_common.Query_result.one_saved "Description" logres)

let get_logs ~log ~id ~state =
let cloudwatch_url =
Expand All @@ -215,7 +220,7 @@ let get_logs ~log ~id ~state =
streamFilter=typeLogStreamPrefix"
(job_name id) state.State.aws_id
in
return (`Fresh, cloudwatch_url)
return (Job_common.Query_result.one_url "Logs-link" cloudwatch_url)

let kill ~log ~id ~state =
ksprintf
Expand Down
72 changes: 24 additions & 48 deletions src/lib/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -101,59 +101,35 @@ let get_json_keys ~uri ~parsers json =
| other -> fail (`Client (`Json_parsing (uri, "Not a List", other)))
end

let get_job_descriptions t ids =
let path = "job/describe" in
(* For describe or logs *)
let get_job_query_result ~path t ids =
get_job_jsons t ~path ~ids
>>= fun json ->
let uri = uri_of_ids t.base_url path ids in (* Only for error values: *)
let get_string name =
function
| `String i -> `Ok i
| other -> `Error (sprintf "%s not a string" name)
in
get_json_keys ~uri json ~parsers:[
"id", get_string "id";
"description", get_string "description";
"freshness", get_string "freshness";
]
>>= fun (res : string list list) ->
Deferred_list.while_sequential res ~f:(
function
| [id; descr; freshness] ->
return (`Id id, `Describe_output descr, `Freshness freshness)
| other ->
ksprintf failwith
"This should never happen: 3 parsers Vs %d results: [%s]"
(List.length other)
(String.concat ~sep:", " other)
)
begin match json with
| `List l ->
Deferred_list.while_sequential l ~f:(function
| `Assoc ["id", `String id;
"output", yoj] ->
begin match (Job_common.Query_result.of_yojson yoj) with
| Ok output ->
return (id, output)
| Error e ->
fail (`Client (`Json_parsing
(uri, "Not an query-result", yoj)))
end
| other ->
fail (`Client (`Json_parsing
(uri, "Not an {id: ... output: ...}", other)))
)
| other -> fail (`Client (`Json_parsing (uri, "Not a List", other)))
end

let get_job_descriptions t ids =
get_job_query_result ~path:"job/describe" t ids

let get_job_logs t ids =
let path = "job/logs" in
get_job_jsons t ~path ~ids
>>= fun json ->
let uri = uri_of_ids t.base_url path ids in (* Only for error values: *)
let get_string name =
function
| `String i -> `Ok i
| other -> `Error (sprintf "%s not a string" name)
in
get_json_keys ~uri json ~parsers:[
"id", get_string "id";
"output", get_string "output";
"freshness", get_string "freshness";
]
>>= fun (res : string list list) ->
Deferred_list.while_sequential res ~f:(
function
| [id; output; freshness] ->
return (`Id id, `Describe_output output, `Freshness freshness)
| other ->
ksprintf failwith
"This should never happen: 3 parsers Vs %d results: [%s]"
(List.length other)
(String.concat ~sep:", " other)
)
get_job_query_result ~path:"job/logs" t ids

let kill_jobs {base_url} ids =
let uri = uri_of_ids base_url "job/kill" ids in
Expand Down
5 changes: 2 additions & 3 deletions src/lib/command_line.ml
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ let client ~base_url action ids =
Client.get_job_descriptions client ids
>>= fun descs ->
List.iter descs
~f:(fun (`Id id, `Describe_output d, `Freshness f) ->
~f:(fun (id, d) ->
printf "ID: %s\n\
Freshness: %s\n\
%s\n\n" id f d)
%s\n\n" id (Job_common.Query_result.show d))
|> return
| `Status ->
Client.get_job_states client ids
Expand Down
47 changes: 46 additions & 1 deletion src/lib/hyper_shell.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Error = struct
stderr: string option;
status: [`Exited of int | `Signaled of int | `Stopped of int] option;
exn: string option;
} [@@deriving yojson,make]
} [@@deriving yojson,make,show]

let of_result cmd =
function
Expand Down Expand Up @@ -74,3 +74,48 @@ let command_must_succeed
~log ?section ?additional_json cmd
>>= fun (_, _) ->
return ()


module Saved_command = struct
type t = {
command: string;
outcome: [
| `Ok of string * string
| `Error of Error.t
];
archived: string option;
} [@@deriving yojson,show,make]


let run ~storage ~log ~section ~cmd ~path ~keep_the =
begin
command_must_succeed_with_output ~log ~section cmd
>>< function
| `Ok (out, err) ->
let new_output = out ^ err in
begin match keep_the with
| `Latest ->
Storage.update storage path new_output
| `Largest ->
Storage.read storage path
>>= begin function
| Some old when String.length old > String.length new_output ->
return ()
| Some _ (* smaller *) | None ->
Storage.update storage path new_output
end
end
>>= fun () ->
Storage.read storage path
>>= fun archived ->
return (make
~command:cmd ~outcome:(`Ok (out, err))
?archived ())
| `Error (`Shell_command e) ->
Storage.read storage path
>>= fun archived ->
return (make
~command:cmd ~outcome:(`Error e) ?archived ())
| `Error (`Log _ as e) -> fail e
end
end
22 changes: 21 additions & 1 deletion src/lib/hyper_shell.mli
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module Error: sig
stderr: string option;
status: [`Exited of int | `Signaled of int | `Stopped of int] option;
exn: string option;
} [@@deriving yojson,make]
} [@@deriving yojson,make,show]
val to_display_string: t -> string
end

Expand All @@ -28,3 +28,23 @@ val command_must_succeed :
[> `Shell_command of Error.t
| `Log of Log.Error.t ])
Internal_pervasives.Deferred_result.t

module Saved_command : sig
type t = {
command: string;
outcome: [
| `Ok of string * string
| `Error of Error.t
];
archived: string option;
} [@@deriving yojson,show,make]
val run :
storage:Storage.t ->
log:Log.t ->
section:string list ->
cmd:string ->
path:Storage.key ->
keep_the:[ `Largest | `Latest ] ->
(t, [> `Log of Log.Error.t | `Storage of [> Storage.Error.common ] ])
Internal_pervasives.Deferred_result.t
end
5 changes: 5 additions & 0 deletions src/lib/internal_pervasives.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ let return = Deferred_result.return
let fail = Deferred_result.fail


let of_ocaml_result =
function
| Ok o -> return o
| Error s -> fail s

let dbg fmt =
ksprintf (fun s -> printf "Cocldebug>> %s\n%!" s) fmt

Expand Down
23 changes: 7 additions & 16 deletions src/lib/job.ml
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,14 @@ let fresh spec =
make ~id spec


let make_path id =
(* TODO: use this in Kube_jobs uses of `~section` *)
function
| `Saved_state -> ["job"; id; "saved_state.json"]
| `Describe_output -> ["job"; id; "describe.out"]
| `Logs_output -> ["job"; id; "logs.out"]

let save st job =
Storage.Json.save_jsonable st
~path:(make_path (id job) `Saved_state)
~path:(Job_common.save_path (id job) `Saved_state)
(to_yojson job)

let get st job_id =
Storage.Json.get_json st
~path:(make_path job_id `Saved_state)
~path:(Job_common.save_path job_id `Saved_state)
~parse:of_yojson

let kind t = Specification.kind t.specification
Expand All @@ -96,10 +89,9 @@ let aws_state t =
let get_logs ~storage ~log t =
match kind t with
| `Kube ->
let save_path = make_path t.id `Logs_output in
Kube_job.get_logs ~storage ~log ~id:t.id ~save_path
Kube_job.get_logs ~storage ~log ~id:t.id
| `Local_docker ->
Local_docker_job.get_logs ~log ~id:t.id
Local_docker_job.get_logs ~storage ~log ~id:t.id
| `Aws_batch ->
aws_state t
>>= fun state ->
Expand All @@ -108,14 +100,13 @@ let get_logs ~storage ~log t =
let describe ~storage ~log t =
match kind t with
| `Kube ->
let save_path = make_path t.id `Describe_output in
Kube_job.describe ~storage ~log ~id:t.id ~save_path
Kube_job.describe ~storage ~log ~id:t.id
| `Local_docker ->
Local_docker_job.describe ~log ~id:t.id
Local_docker_job.describe ~log ~id:t.id ~storage
| `Aws_batch ->
aws_state t
>>= fun state ->
Aws_batch_job.describe ~log ~id:t.id ~state
Aws_batch_job.describe ~storage ~log ~id:t.id ~state

let kill ~log t =
match kind t with
Expand Down
6 changes: 2 additions & 4 deletions src/lib/job.mli
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ val get_logs :
storage:Storage.t ->
log:Log.t ->
t ->
([ `Archived of [ `Shell_command of Hyper_shell.Error.t ] | `Fresh ] *
string,
(Job_common.Query_result.t,
[> `Log of Log.Error.t
| `Shell_command of Hyper_shell.Error.t
| `Job of [> `Missing_aws_state of string ]
Expand All @@ -66,8 +65,7 @@ val describe :
storage:Storage.t ->
log:Log.t ->
t ->
([ `Archived of [ `Shell_command of Hyper_shell.Error.t ] | `Fresh ] *
string,
(Job_common.Query_result.t,
[> `Log of Log.Error.t
| `Shell_command of Hyper_shell.Error.t
| `Job of [> `Missing_aws_state of string ]
Expand Down
Loading

0 comments on commit 83cbb5d

Please sign in to comment.