Source file task_seg_place_gens.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
open Int64_utils
let single_task_seg_shift ~incre ~cur_pos ~(task_seg : Task.task_seg)
(time_slots : Time_slot.t Seq.t) : Task.task_seg_place Seq.t =
let rec aux incre cur_pos ((task_seg_id, task_seg_size) as task_seg)
time_slots =
let time_slots = Time_slots.Slice.slice ~start:cur_pos time_slots in
match time_slots () with
| Seq.Nil -> Seq.empty
| Seq.Cons ((start, end_exc), slots) ->
if end_exc -^ start >= task_seg_size then
fun () ->
Seq.Cons
( (task_seg_id, start, start +^ task_seg_size),
aux incre (start +^ incre) task_seg time_slots )
else
aux incre cur_pos task_seg slots
in
assert (incre > 0L);
aux incre cur_pos task_seg time_slots
let single_task_seg_shift_rev ~incre ~cur_end_pos_exc
~(task_seg : Task.task_seg) (time_slots : Time_slot.t Seq.t) :
('a * int64 * int64) Seq.t =
let rec aux incre cur_end_pos_exc ((task_seg_id, task_seg_size) as task_seg)
time_slots =
let time_slots =
Time_slots.Slice.slice_rev ~end_exc:cur_end_pos_exc time_slots
in
match time_slots () with
| Seq.Nil -> Seq.Nil
| Seq.Cons ((start, end_exc), slots) ->
if end_exc -^ start >= task_seg_size then
Seq.Cons
( (task_seg_id, end_exc -^ task_seg_size, end_exc),
fun () -> aux incre (end_exc -^ incre) task_seg time_slots )
else
aux incre cur_end_pos_exc task_seg slots
in
assert (incre > 0L);
let time_slots = time_slots |> List.of_seq |> List.rev |> List.to_seq in
fun () -> aux incre cur_end_pos_exc task_seg time_slots
let multi_task_segs_shift ~incre ~(task_segs : Task.task_seg list)
(time_slots : Time_slot.t Seq.t) : Task.task_seg_place list Seq.t =
assert (incre > 0L);
match task_segs with
| [] -> Seq.empty
| _ ->
List.fold_left
(fun places_seq task_seg ->
Seq.flat_map
(fun places ->
match places with
| [] ->
single_task_seg_shift ~incre ~cur_pos:0L ~task_seg time_slots
|> Seq.map (fun x -> [ x ])
| (last_id, last_start, last_end_exc) :: pos_s ->
let time_slots =
Time_slots.Slice.slice ~start:last_end_exc time_slots
in
single_task_seg_shift ~incre ~cur_pos:last_end_exc ~task_seg
time_slots
|> Seq.map (fun (id, start, end_exc) ->
(id, start, end_exc)
:: (last_id, last_start, last_end_exc)
:: pos_s))
places_seq)
(Seq.return []) task_segs
|> Seq.map List.rev
let single_task_seg_single_split ~min_seg_size ~max_seg_size ~cur_split_pos
~(task_seg : Task.task_seg) : (Task.task_seg * Task.task_seg) Seq.t =
let rec aux min_seg_size max_seg_size cur_split_pos
((task_seg_id, task_seg_size) as task_seg) =
if cur_split_pos >= task_seg_size then Seq.empty
else
let l_split_size = cur_split_pos -^ 0L in
let r_split_size = task_seg_size -^ cur_split_pos in
if
l_split_size < min_seg_size
|| r_split_size < min_seg_size
|| max_seg_size < l_split_size
|| max_seg_size < r_split_size
then aux min_seg_size max_seg_size (Int64.succ cur_split_pos) task_seg
else fun () ->
Seq.Cons
( ( (task_seg_id, l_split_size),
(Task.Id.succ_task_seg_sub_id task_seg_id, r_split_size) ),
aux min_seg_size max_seg_size (Int64.succ cur_split_pos) task_seg )
in
let _, task_seg_size = task_seg in
assert (min_seg_size > 0L);
assert (max_seg_size > 0L);
assert (cur_split_pos >= 0L);
assert (task_seg_size > 0L);
let task_seg = Task.Id.init_task_seg_sub_id task_seg in
aux min_seg_size max_seg_size cur_split_pos task_seg
let single_task_seg_multi_splits_exact ~min_seg_size ~max_seg_size
~(split_count : int64) ~(task_seg : Task.task_seg) :
Task.task_seg list Seq.t =
let _, task_seg_size = task_seg in
assert (min_seg_size > 0L);
Option.iter (fun max_seg_size -> assert (max_seg_size > 0L)) max_seg_size;
assert (task_seg_size > 0L);
Seq.fold_left
(fun splits_seq _ ->
Seq.flat_map
(fun splits ->
match splits with
| [] ->
if min_seg_size <= task_seg_size then Seq.return [ task_seg ]
else Seq.empty
| first :: rest ->
let splits_with_first_sub_splits =
single_task_seg_single_split ~min_seg_size
~max_seg_size:task_seg_size ~cur_split_pos:0L ~task_seg:first
|> Seq.map (fun (s1, s2) -> s2 :: s1 :: rest)
in
splits_with_first_sub_splits)
splits_seq)
(Seq.return [])
(Seq_utils.zero_to_n_inc_int64 split_count)
|> (fun s ->
match max_seg_size with
| None -> s
| Some max_seg_size ->
Seq.filter
(fun l -> List.for_all (fun (_, s) -> s <= max_seg_size) l)
s)
|> Seq.map List.rev
let single_task_seg_multi_splits_max ~min_seg_size ~max_seg_size
~(split_count : int64) ~(task_seg : Task.task_seg) :
Task.task_seg list Seq.t =
Seq.flat_map
(fun split_count ->
single_task_seg_multi_splits_exact ~min_seg_size ~max_seg_size
~split_count ~task_seg)
(Seq_utils.zero_to_n_inc_int64 split_count)
let single_task_seg_multi_splits_exact_shift ~min_seg_size ~max_seg_size
~split_count ~(incre : int64) ~(task_seg : Task.task_seg)
(time_slots : Time_slot.t Seq.t) : Task.task_seg_place list Seq.t =
single_task_seg_multi_splits_exact ~min_seg_size ~max_seg_size ~split_count
~task_seg
|> Seq.flat_map (fun task_segs ->
multi_task_segs_shift ~incre ~task_segs time_slots)
let single_task_seg_multi_splits_max_shift ~min_seg_size ~max_seg_size
~split_count ~(incre : int64) ~(task_seg : Task.task_seg)
(time_slots : Time_slot.t Seq.t) : Task.task_seg_place list Seq.t =
single_task_seg_multi_splits_max ~min_seg_size ~max_seg_size ~split_count
~task_seg
|> Seq.flat_map (fun task_segs ->
multi_task_segs_shift ~incre ~task_segs time_slots)
let multi_task_segs_interleave ~interval_size ~(task_segs : Task.task_seg list)
(time_slots : Time_slot.t Seq.t) : Task.task_seg_place Seq.t =
assert (interval_size > 0L);
let quota =
List.fold_left
(fun m ((id1, id2, id3, id4, _), len) ->
Task_seg_id_map.add (id1, id2, id3, id4, None) len m)
Task_seg_id_map.empty task_segs
in
match task_segs with
| [] -> Seq.empty
| _ ->
let max_round_count =
let max_len =
List.fold_left (fun acc (_, len) -> max acc len) 0L task_segs
in
max_len /^ interval_size
in
let max_seg_count =
Seq.fold_left
(fun acc (start, end_exc) ->
acc +^ ((end_exc -^ start) /^ interval_size))
0L time_slots
|> Int64.to_int
in
let time_slots_chunked =
Time_slots.chunk ~chunk_size:interval_size ~drop_partial:true time_slots
in
let task_segs =
Seq_utils.zero_to_n_exc_int64 max_round_count
|> Seq.flat_map (fun round ->
quota
|> Task_seg_id_map.to_seq
|> Seq.filter (fun (_id, len) ->
let quota_left = len -^ (round *^ interval_size) in
quota_left >= interval_size)
|> Seq.map (fun ((id1, id2, id3, id4, _), _) ->
(id1, id2, id3, id4, Some round)))
|> OSeq.take max_seg_count
in
OSeq.map2
(fun id (start, end_exc) -> (id, start, end_exc))
task_segs time_slots_chunked
let single_task_seg_multi_even_splits ~incre ~(task_seg : Task.task_seg)
~(buckets : Time_slot.t list) ~(usable_time_slots : Time_slot.t Seq.t) :
Task.task_seg_place list Seq.t =
let rec aux task_seg_size n buckets =
if n = 0L then (None, [])
else
let bucket_count = List.length buckets in
let seg_part_size = Int64_utils.div_round_up task_seg_size n in
let usable_buckets =
buckets
|> List.filter (fun bucket_parts ->
List.for_all
(fun (start, end_exc) -> end_exc -^ start >= seg_part_size)
bucket_parts)
in
let usable_bucket_count = List.length usable_buckets in
if usable_bucket_count > 0 && usable_bucket_count = bucket_count then
(Some seg_part_size, usable_buckets)
else aux task_seg_size (Int64.pred n) usable_buckets
in
let (id1, id2, id3, id4, _), task_seg_size = task_seg in
let possibly_usable_buckets =
buckets
|> List.map (fun bucket ->
Time_slots.inter (Seq.return bucket) usable_time_slots |> List.of_seq)
in
let possibly_usable_bucket_count =
List.length possibly_usable_buckets |> Int64.of_int
in
match
aux task_seg_size possibly_usable_bucket_count possibly_usable_buckets
with
| None, _ -> Seq.empty
| Some task_seg_part_size, l ->
l
|> List.to_seq
|> OSeq.mapi (fun i bucket -> (Int64.of_int i, List.to_seq bucket))
|> Seq.fold_left
(fun places_seq (bucket_id, bucket) ->
let id = (id1, id2, id3, id4, Some bucket_id) in
let task_seg = (id, task_seg_part_size) in
Seq.flat_map
(fun places ->
single_task_seg_shift ~incre ~cur_pos:0L ~task_seg bucket
|> Seq.map (fun place -> place :: places))
places_seq)
(Seq.return [])
|> Seq.map List.rev