package current_web

  1. Overview
  2. Docs

Source file job.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
open Tyxml.Html
open Astring
open Lwt.Infix

let sep = "@@LOG@@"

let max_log_chunk_size = 102400L  (* 100K at a time *)

let read ~start path =
  let ch = open_in_bin (Fpath.to_string path) in
  Fun.protect ~finally:(fun () -> close_in ch) @@ fun () ->
  let len = LargeFile.in_channel_length ch in
  let (+) = Int64.add in
  let (-) = Int64.sub in
  let start = if start < 0L then len + start else start in
  let start = if start < 0L then 0L else if start > len then len else start in
  LargeFile.seek_in ch start;
  let len = min max_log_chunk_size (len - start) in
  really_input_string ch (Int64.to_int len), start + len

let render ctx ~actions ~job_id ~log:path =
  let ansi = Ansi.create () in
  let action op = a_action (Fmt.str "/job/%s/%s" job_id op) in
  let csrf = Context.csrf ctx in
  let rebuild_button =
    if actions#rebuild = None then []
    else
      [form ~a:[action "rebuild"; a_method `Post]
         [ input ~a:[a_input_type `Submit; a_value "Rebuild"] ();
           input ~a:[a_name "csrf"; a_input_type `Hidden; a_value csrf] () ]
      ]
  in
  let cancel_button =
    match Current.Job.lookup_running job_id with
    | Some job when Current.Job.cancelled_state job = Ok () ->
      [form ~a:[action "cancel"; a_method `Post]
         [ input ~a:[a_input_type `Submit; a_value "Cancel"] ();
           input ~a:[a_name "csrf"; a_input_type `Hidden; a_value csrf] () ]
      ]
    | _ -> []
  in
  let start_button =
    match Current.Job.lookup_running job_id with
    | Some job when Current.Job.is_waiting_for_confirmation job ->
      [form ~a:[action "start"; a_method `Post]
         [ input ~a:[a_input_type `Submit; a_value "Start now"] ();
           input ~a:[a_name "csrf"; a_input_type `Hidden; a_value csrf] () ]
      ]
    | _ -> []
  in
  let job_item ~label id =
    let label = txt label in
    if id = job_id then b [label]
    else a ~a:[a_href (Fmt.str "/job/%s" id)] [label]
  in
  let history =
    match Current_cache.Db.history ~limit:10 ~job_id with
    | None, [] -> []
    | current, past ->
      let items = past |> List.map (fun entry ->
          let label = Int64.to_string entry.Current_cache.Db.build in
          let item = job_item ~label entry.job_id in
          li [item]
        ) in
      let items =
        match current with
        | None -> items
        | Some id -> li [job_item id ~label:"(building)"] :: items
      in
      [div ~a:[a_class ["build-history"]]
         [txt "Build: ";
          ol items]
      ]
  in
  let line_numbers_js = [script ~a:[a_src (Xml.uri_of_string "/js/line-numbers.js")] (txt "");]
  in
  let tmpl =
    Context.template ctx (
      line_numbers_js @
      history @
      rebuild_button @
      cancel_button @
      start_button @
      [pre [txt sep]]
    )
  in
  match String.cut ~sep tmpl with
  | None -> assert false
  | Some (pre, post) ->
    let i = ref `Pre in
    let stream =
      Lwt_stream.from (fun () ->
          match !i with
          | `Pre -> i := `Log 0L; Lwt.return_some pre
          | `Log start ->
            let rec aux () =
              begin match read ~start path with
                | "", _ ->
                  begin match Current.Job.lookup_running job_id with
                    | None -> i := `Done; Lwt.return_some post
                    | Some job -> Current.Job.wait_for_log_data job >>= aux
                  end
                | (data, next) ->
                  i := `Log next;
                  Lwt.return_some (Ansi.process ansi data)
              end
            in aux ()
          | `Done -> Lwt.return_none
        )
    in
    Cohttp_lwt.Body.of_stream stream

type actions = <
  rebuild : (unit -> string) option;
>

let lookup_actions ~engine job_id =
  let state = Current.Engine.state engine in
  let jobs = state.Current.Engine.jobs in
  match Current.Job.Map.find_opt job_id jobs with
  | Some a -> (a :> actions)
  | None ->
    object
      method rebuild = None
    end

let job ~engine ~job_id = object
  inherit Resource.t

  val! can_get = `Viewer

  method! private get ctx =
    let actions = lookup_actions ~engine job_id in
    match Current.Job.log_path job_id with
    | Error (`Msg msg) -> Context.respond_error ctx `Bad_request msg
    | Ok path ->
      let body = render ctx ~actions ~job_id ~log:path in
      let headers =
        (* Otherwise, an nginx reverse proxy will wait for the whole log before sending anything. *)
        Cohttp.Header.init_with "X-Accel-Buffering" "no"
      in
      Utils.Server.respond ~status:`OK ~headers ~body ()
end

let rebuild ~engine ~job_id = object
  inherit Resource.t

  val! can_post = `Builder

  method! private post ctx  _body =
    let actions = lookup_actions ~engine job_id in
    match actions#rebuild with
    | None -> Context.respond_error ctx `Bad_request "Job does not support rebuild"
    | Some rebuild ->
      let new_id = rebuild () in
      Utils.Server.respond_redirect ~uri:(Uri.of_string ("/job/" ^ new_id)) ()
end

let cancel ~job_id = object
  inherit Resource.t

  val! can_post = `Builder

  method! private post ctx _body =
    match Current.Job.lookup_running job_id with
    | None -> Context.respond_error ctx `Bad_request "Job does not support cancel (already finished?)"
    | Some job ->
      Current.Job.cancel job "Cancelled by user";
      Context.respond_redirect ctx (Uri.of_string "/")
end

let start ~job_id = object
  inherit Resource.t

  val! can_post = `Admin

  method! private post ctx _body =
    match Current.Job.lookup_running job_id with
    | None -> Context.respond_error ctx `Bad_request "Job is not awaiting confirmation"
    | Some j ->
      Current.Job.approve_early_start j;
      let id = Current.Job.id j in
      Context.respond_redirect ctx (Uri.of_string ("/job/" ^ id))
end

let id ~date ~log = Fmt.str "%s/%s" date log

let routes ~engine = Routes.[
    s "job" / str / str /? nil @--> (fun date log -> job ~engine ~job_id:(id ~date ~log));
    s "job" / str / str / s "rebuild" /? nil @--> (fun date log -> rebuild ~engine ~job_id:(id ~date ~log));
    s "job" / str / str / s "cancel" /? nil @--> (fun date log -> cancel ~job_id:(id ~date ~log));
    s "job" / str / str / s "start" /? nil @--> (fun date log -> start ~job_id:(id ~date ~log));
  ]
OCaml

Innovation. Community. Security.