package bistro

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file repo.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
open Lwt.Infix
open Core
open Bistro
open Bistro_engine

module W = Bistro_internals.Workflow

type item =
  | Item  : string list * _ path workflow -> item
  | Precious_item : _ path workflow -> item

type t = item list

type normalized_repo_item = {
  repo_path  : string ;
  file_path  : string ;
  cache_path : string ;
}

let normalized_repo_item ~repo_path ~id ~cache_path = {
    repo_path = Path.to_string repo_path ;
    file_path = Filename.concat "_files" id ;
    cache_path ;
  }

let item path w = Item (path, w)

let precious_item w = Precious_item w

let ( %> ) path w = item path w

let is_strict_prefix ~prefix u =
  String.length prefix < String.length u
  && String.is_prefix ~prefix u

let find_bottom items item =
  let f min_item candidate =
    if is_strict_prefix ~prefix:candidate.cache_path min_item.cache_path
    then candidate
    else min_item
  in
  List.fold items ~init:item ~f

(* FIXME: quadratic complexity *)
let remove_redundancies repo =
  List.map repo ~f:(fun item ->
      let bottom = find_bottom repo item in
      if Poly.(bottom = item) then item
      else
        let cache_path =
          Filename.concat
            bottom.file_path
            (String.chop_prefix_exn ~prefix:bottom.cache_path item.cache_path)
        in
        { item with cache_path }
    )

let make_absolute p =
  if Filename.is_absolute p then p
  else Filename.concat (Sys.getcwd ()) p

let make_relative ~from p =
  let open Path in
  make_relative ~from p
  |> to_string

let link dst p_u =
  let target = make_absolute p_u in
  let dst_dir = Filename.dirname dst in
  let target_from_dst_dir =
    make_relative ~from:(make_absolute dst_dir) (make_absolute target)
  in
  Unix.mkdir_p dst_dir ;
  let cmd = sprintf "rm -rf '%s' && ln -s '%s' '%s'" dst target_from_dst_dir dst in
  ignore (Sys.command cmd)

let generate outdir items =
  let items = remove_redundancies items in
  List.iter items ~f:(fun item ->
      let repo_path = Filename.concat outdir item.repo_path in
      let file_path = Filename.concat outdir item.file_path in
      let cache_path =
        if Filename.is_absolute item.cache_path then item.cache_path
        else Filename.concat outdir item.cache_path in
      link repo_path file_path ;
      link file_path cache_path
    )

let item_to_workflow = function
  | Item (repo_path, w) ->
    let%workflow id = W.id (Private.reveal w) in
    [normalized_repo_item ~repo_path ~id ~cache_path:[%path w]]
  | Precious_item _ -> Workflow.data []

let to_workflow ~outdir items =
  let normalized_items =
    List.map items ~f:item_to_workflow
    |> Workflow.list
  in
  [%workflow
    [%eval normalized_items]
    |> List.concat
    |> remove_redundancies
    |> generate outdir]

let partition_results xs =
  let rec inner ok err = function
    | [] -> ok, err
    | Ok x :: t -> inner (x :: ok) err t
    | Error e :: t -> inner ok (e :: err) t
  in
  inner [] [] xs

let protect sched items =
  List.iter items ~f:(function
      | Precious_item w -> Scheduler.protect sched w
      | Item _ -> ()
    )

let build ?np ?mem ?loggers ?allowed_containers ?(bistro_dir = "_bistro") ?collect ~outdir repo =
  let db = Db.init_exn bistro_dir in
  let expressions = List.map repo ~f:(item_to_workflow) in
  let sched = Scheduler.create ?np ?mem ?loggers ?allowed_containers ?collect db in
  protect sched repo ;
  let results = Lwt_list.map_p (Scheduler.eval sched) expressions in
  Scheduler.start sched ;
  Lwt.map partition_results results >>= fun (res, errors) ->
  Scheduler.stop sched >|= fun () ->
  generate outdir (List.concat res) ;
  match errors with
  | [] -> ()
  | _ :: _ -> (
    let errors =
      List.concat errors
      |> Execution_trace.gather_failures
    in
    prerr_endline (Scheduler.error_report sched errors) ;
    failwith "Some workflow failed!"
  )

let build_main ?np ?mem ?loggers ?allowed_containers ?bistro_dir ?collect ~outdir repo =
  build ?np ?mem ?loggers ?allowed_containers ?bistro_dir ?collect ~outdir repo
  |> Lwt_main.run

let add_prefix prefix items =
  List.map items ~f:(function
      | Item  (p, w) -> Item  (prefix @ p, w)
      | Precious_item _ as i -> i
    )

let shift dir items = add_prefix [ dir ] items

let singleton dir w = [ [ dir ] %> w ]

let protected_set repo =
  let rec fold_path_workflow acc (W.Any w) =
    match w with
    | Select s -> fold_path_workflow acc (W.Any s.dir)
    | Input _ -> acc
    | Shell _
    | Plugin _ -> String.Set.add acc (W.id w)
    | Trywith tw ->
      fold_path_workflow (fold_path_workflow acc (W.Any tw.w)) (W.Any tw.failsafe)
    | Ifelse ie ->
      let acc = fold_path_workflow acc (W.Any ie.cond) in
      let acc = fold_path_workflow acc (W.Any ie._then_) in
      fold_path_workflow acc (W.Any ie._else_)
    | App _
    | Both _
    | Eval_path _
    | Glob _
    | List _
    | List_nth _
    | Pure _
    | Spawn _ -> assert false
  in
  let k acc w = fold_path_workflow acc (W.Any (Bistro.Private.reveal w)) in
  List.fold repo ~init:String.Set.empty ~f:(fun acc it ->
      match it with
      | Item (_, w) -> k acc w
      | Precious_item w -> k acc w
    )

let cache_clip_fold ~bistro_dir repo ~f ~init =
  let protected = protected_set repo in
  let db = Db.init_exn bistro_dir in
  Db.fold_cache db ~init ~f:(fun acc id ->
      f db acc (if String.Set.mem protected id then `Protected id else `Unprotected id)
    )

let cache_clip_dry_run ~bistro_dir repo =
  cache_clip_fold ~bistro_dir repo ~init:(0,0,0,0) ~f:(fun db (total_files, total_size, deleted_files, deleted_size) item ->
      let id, protected = match item with
        | `Protected id -> id, true
        | `Unprotected id -> id, false
      in
      match Misc.du (Db.cache db id) with
      | Ok size ->
        let total_files = total_files + 1 in
        let total_size = total_size + size in
        if protected then (total_files, total_size, deleted_files, deleted_size)
        else (total_files, total_size, deleted_files + 1, deleted_size + size)
      | Error (`Msg msg) ->
        failwithf "du: %s" msg ()
    )

let cache_clip ~bistro_dir repo =
  cache_clip_fold ~bistro_dir repo ~init:() ~f:(fun db () file ->
      match file with
      | `Protected _ -> ()
      | `Unprotected id ->
        match Db.remove db id with
        | Ok () -> ()
        | Error (`Msg msg) -> failwithf "cache_clip: %s" msg ()
    )
OCaml

Innovation. Community. Security.