Source file workflow.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
type path =
| FS_path of string
| Cache_id of string
| Cd of path * string list
let cd dir sel = match dir with
| Cd (indir, insel) -> Cd (indir, insel @ sel)
| FS_path _ | Cache_id _ -> Cd (dir, sel)
module Docker_image = struct
type t = {
account : string ;
name : string ;
tag : string option ;
registry : string option ;
}
end
module Singularity_image = struct
type t = {
account : string ;
name : string ;
tag : string option ;
registry : string option ;
}
end
type container_image =
| Docker_image of Docker_image.t
| Singularity_image of Singularity_image.t
let docker_image ?tag ?registry ~account ~name () =
Docker_image {
account = account ;
name = name ;
tag = tag ;
registry = registry ;
}
type _ t =
| Pure : { id : string ; value : 'a } -> 'a t
| App : {
id : string ;
f : ('a -> 'b) t ;
x : 'a t ;
} -> 'b t
| Both : {
id : string ;
fst : 'a t ;
snd : 'b t ;
} -> ('a *'b) t
| List : {
id : string ;
elts : 'a t list ;
} -> 'a list t
| Eval_path : { id : string ; workflow : path t } -> string t
| Spawn : {
id : string ;
elts : 'a list t ;
f : 'a t -> 'b t ;
deps : any list ;
} -> 'b list t
| List_nth : {
id : string ;
elts : 'a list t ;
index : int ;
} -> 'a t
| Input : { id : string ; path : string ; version : int option } -> path t
| Select : {
id : string ;
dir : path t ;
sel : string list ;
} -> path t
| Plugin : ('a plugin, any) step -> 'a t
| Shell : (shell_command, any) step -> path t
| Glob : {
id : string ;
pattern : string option ;
type_selection : [`File | `Directory] option ;
dir : path t ;
} -> path list t
| Trywith : {
id : string ;
w : 'a t ;
failsafe : 'a t ;
} -> 'a t
| Ifelse : {
id : string ;
cond : bool t ;
_then_ : 'a t ;
_else_ : 'a t ;
} -> 'a t
and ('a, 'b) step = {
id : string ;
descr : string ;
task : 'a ;
np : int ; (** Required number of processors *)
mem : int t option ; (** Required memory in MB *)
version : int option ; (** Version number of the wrapper *)
deps : 'b list ;
}
and 'a plugin =
| Value_plugin : (unit -> 'a) t -> 'a plugin
| Path_plugin : (string -> unit) t -> path plugin
and shell_command = {
cmd : token Command.t ;
images : container_image list ;
}
and token =
| Path_token of path t
| Path_list_token of {
elts : path list t ;
sep : string ;
quote : char option ;
}
| String_token of string t
and any = Any : _ t -> any
let digest x =
Digest.to_hex (Digest.string (Marshal.(to_string x [No_sharing])))
let id : type s. s t -> string = function
| Input { id ; _ } -> id
| Select { id ; _ } -> id
| Plugin { id ; _ } -> id
| Pure { id ; _ } -> id
| App { id ; _ } -> id
| Spawn { id ; _ } -> id
| Both { id ; _ } -> id
| Eval_path { id ; _ } -> id
| Shell { id ; _ } -> id
| List { id ; _ } -> id
| List_nth { id ; _ } -> id
| Glob { id ; _ } -> id
| Trywith { id ; _ } -> id
| Ifelse { id ; _ } -> id
let any x = Any x
let compare_token x y =
match x, y with
| Path_token wx, Path_token wy ->
String.compare (id wx) (id wy)
| Path_token _, _ -> -1
| Path_list_token x, Path_list_token y -> (
match String.compare (id x.elts) (id y.elts) with
| 0 -> compare (x.sep, x.quote) (y.sep, y.quote)
| i -> i
)
| Path_list_token _, _ -> -1
| String_token wx, String_token wy ->
String.compare (id wx) (id wy)
| String_token _, _ -> 1
module Any = struct
module T = struct
type t = any
let id (Any w) = id w
let compare x y =
String.compare (id x) (id y)
let equal x y =
String.equal (id x) (id y)
let hash x = Hashtbl.hash (id x)
end
module Set = Set.Make(T)
module Table = Hashtbl.Make(T)
module Map = Map.Make(T)
include T
let deps (Any w) = match w with
| Pure _ -> []
| App app -> [ Any app.f ; Any app.x ]
| Both p -> [ Any p.fst ; Any p.snd ]
| List l -> List.map any l.elts
| Eval_path { workflow ; _ } -> [ Any workflow ]
| Spawn s -> s.deps
| List_nth l -> [ Any l.elts ]
| Input _ -> []
| Select sel -> [ any sel.dir ]
| Plugin v -> v.deps
| Shell s -> s.deps
| Glob g -> [ Any g.dir ]
| Trywith tw -> [ Any tw.w ; Any tw.failsafe ]
| Ifelse ie -> [ Any ie.cond ; Any ie._then_ ; Any ie._else_ ]
let descr (Any w) = match w with
| Shell s -> Some s.descr
| Plugin s -> Some s.descr
| Input i -> Some i.path
| Select s -> Some (List.fold_left Filename.concat "" s.sel)
| _ -> None
let rec fold_aux w ~seen ~init ~f =
if Set.mem w seen then init, seen
else
let acc, seen =
List.fold_left
(fun (acc, seen) w -> fold_aux w ~seen ~init:acc ~f)
(init, seen)
(deps w)
in
f acc w,
Set.add w seen
let fold w ~init ~f =
fold_aux w ~seen:Set.empty ~init ~f
|> fst
end
let input ?version path =
let id = digest (`Input, path, version) in
Input { id ; path ; version }
let select dir sel =
let dir, sel =
match dir with
| Select { dir ; sel = root ; _ } -> dir, root @ sel
| Input _ | Plugin _ | Shell _ -> dir, sel
| _ -> assert false
in
let id = digest ("select", id dir, sel) in
Select { id ; dir ; sel }
let pure ~id value = Pure { id ; value }
let data value = pure ~id:(digest value) value
let int = data
let string = data
let app f x =
let id = digest (`App, id f, id x) in
App { id ; f ; x }
let ( $ ) = app
let both fst snd =
let id = digest (`Both, id fst, id snd) in
Both { id ; fst ; snd }
let add_mem_dep mem deps = match mem with
| None -> deps
| Some mem -> any mem :: deps
let plugin ?(descr = "") ?(np = 1) ?mem ?version workflow =
let id = digest (`Value, id workflow, version) in
Plugin { id ; descr ; np ; mem ; version ;
task = Value_plugin workflow ;
deps = add_mem_dep mem [ any workflow ] }
let path_plugin ?(descr = "") ?(np = 1) ?mem ?version workflow =
let id = digest (`Value, id workflow, version) in
Plugin { id ; descr ; np ; mem ; version ;
task = Path_plugin workflow ;
deps = add_mem_dep mem [ any workflow ] }
let path w = Eval_path { id = digest (`Eval_path, id w) ; workflow = w }
let digestible_cmd = Command.map ~f:(function
| Path_token w -> id w
| Path_list_token { elts ; sep ; quote } -> digest (id elts, sep, quote)
| String_token w -> id w
)
let shell
?(descr = "")
?mem
?(np = 1)
?version
?(img = [])
cmds =
let cmd = Command.And_list cmds in
let shell_cmd = {
cmd ;
images = img ;
}
in
let id = digest ("shell", version, digestible_cmd cmd) in
let deps = add_mem_dep mem (
Command.deps cmd ~compare:compare_token
|> List.map (function
| Path_token w -> any w
| Path_list_token { elts ; _ } -> any elts
| String_token s -> any s
)
)
in
Shell { descr ; task = shell_cmd ; np ; mem ; version ; id ; deps }
let list elts =
let id = digest ("list", List.map id elts) in
List { id ; elts }
let rec independent_workflows_aux cache w ~from:u =
if Any.equal w u then Any.Map.add w (true, Any.Set.empty) cache
else if Any.Map.mem w cache then cache
else (
let deps = Any.deps w in
let f acc w = independent_workflows_aux acc w ~from:u in
let cache = List.fold_left f cache deps in
let children = List.map (fun k -> Any.Map.find k cache) deps in
if List.exists fst children
then
let union =
List.fold_left
(fun acc (_, s) -> Any.Set.union acc s)
Any.Set.empty children in
Any.Map.add w (true, union) cache
else Any.Map.add w (false, Any.Set.singleton w) cache
)
let independent_workflows w ~from:u =
let cache = independent_workflows_aux Any.Map.empty w ~from:u in
Any.Map.find w cache |> snd |> Any.Set.elements
let spawn elts ~f =
let hd = pure ~id:"__should_never_be_executed__" List.hd in
let u = app hd elts in
let f_u = f u in
let id = digest (`Spawn, id elts, id f_u) in
let deps = any elts :: independent_workflows (any f_u) ~from:(any u) in
Spawn { id ; elts ; f ; deps }
let list_nth w i =
let id = digest (`List_nth, id w, i) in
List_nth { id ; elts = w ; index = i }
let glob ?pattern ?type_selection dir =
let id = digest (`Glob, id dir, pattern, type_selection) in
Glob { id ; dir ; pattern ; type_selection }
let trywith w failsafe =
let id = digest (`Trywith, id w, id failsafe) in
Trywith { id ; w ; failsafe }
let ifelse cond _then_ _else_ =
let id = digest (`Ifelse, id cond, id _then_, id _else_) in
Ifelse { id ; cond ; _then_ ; _else_ }