package obatcher
A Framework for building Batched Concurrent Data Structures
Install
Dune Dependency
Authors
Maintainers
Sources
obatcher-1.0.tbz
sha256=bad8af8223b14bd6d582e34eba90048d632f2ac611183e120d0faaeaefb7549e
sha512=cc8ede53c694abbb4aabb6f898e57057c6f2726411eb9f64b056652a0de7adf85432d55df5833d4555ccf03e681869ac0af218119c94f8577008ebd9e0601779
doc/src/obatcher.ds/batched_skiplist.ml.html
Source file batched_skiplist.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
open Picos module Make (V : Stdlib.Map.OrderedType) = struct module Sequential = struct type t = { hdr : node; level : int ref; maxlevel : int; nil : node } and node = Hd of node array | Node of data | Null and data = { mutable value : V.t; forward : node array } let show to_string = function | Hd _ -> "Hd" | Node { value; _ } -> Printf.sprintf "Node(%s)" (to_string value) | Null -> "Null" let[@warning "-32"] to_string to_string = function | Hd forward -> "Hd -> [|" ^ Array.fold_right (fun node acc -> acc ^ "; " ^ show to_string node) forward "" ^ "|]" | Node { forward; _ } as n -> show to_string n ^ "-> [|" ^ Array.fold_right (fun node acc -> acc ^ "; " ^ show to_string node) forward "" ^ "|]" | Null -> "Null" let ( !> ) = function | Null -> failwith "[!>] Tried to dereference Null" | Hd forward | Node { forward; _ } -> forward let ( !^ ) = function | Null -> failwith "[!^] Tried to dereference Null" | Hd _ -> failwith "[!^] Tried to dereference Hdr" | Node r -> r let ( *= ) v1 v2 = V.compare v1 v2 = 0 let ( *< ) v1 v2 = V.compare v1 v2 = -1 let compare n1 n2 = match (n1, n2) with | Null, Null -> assert (n1 == n2); 0 | _, Null -> -1 | Null, _ -> 1 | Hd r1, Hd r2 -> assert (r1 == r2); 0 | Hd _, _ -> -1 | _, Hd _ -> 1 | Node d1, Node d2 -> V.compare d1.value d2.value let rec log2 n = if n <= 1 then 0 else 1 + log2 (n asr 1) let init ~size () = let maxlevel = log2 size in let nil = Null in { hdr = Hd (Array.make (maxlevel + 1) nil); level = ref 0; maxlevel; nil } let random_level t = let lvl = ref 0 in while Random.float 1. < 0.5 && !lvl < t.maxlevel do incr lvl done; !lvl let make_node t lvl value = Node { value; forward = Array.make (lvl + 1) t.nil } let mem t elt = let x = ref t.hdr in for i = !(t.level) downto 0 do while match !>(!x).(i) with | Null | Hd _ -> false | Node { value; _ } -> value *< elt do x := !>(!x).(i) done done; x := !>(!x).(0); match !x with Null | Hd _ -> false | Node { value; _ } -> value *= elt let insert t elt = (* Search for Node *) let update = Array.make (t.maxlevel + 1) t.nil in let x = ref t.hdr in for i = !(t.level) downto 0 do while match !>(!x).(i) with | Null | Hd _ -> false | Node { value; _ } -> value *< elt do x := !>(!x).(i) done; update.(i) <- !x done; let x = !>(!x).(0) in (* Check if we are at the correct point *) if match x with Null | Hd _ -> false | Node { value; _ } -> value *= elt then !^x.value <- elt else let lvl = random_level t in if lvl > !(t.level) then ( for i = !(t.level) + 1 to lvl do update.(i) <- t.hdr done; t.level := lvl); let x = make_node t lvl elt in for i = 0 to lvl do !>x.(i) <- !>(update.(i)).(i); !>(update.(i)).(i) <- x done let size t = let rec aux acc = function | Null -> acc | Hd forward -> aux acc forward.(0) | Node { forward; _ } -> aux (acc + 1) forward.(0) in aux 0 t.hdr (* Does not work on empty lists *) let validate ?(to_string = fun _ -> "<opaque>") t = let rec walk prev = function | Null -> () | Hd forward -> walk prev forward.(0) | Node { value; forward; _ } -> let vals = value |> to_string in let prevs = prev |> to_string in if value < prev then Printf.printf "Ordering error %s -> %s\n" vals prevs else if value = prev then Printf.printf "Duplicate error %s -> %s\n" vals prevs; walk value forward.(0) in let starting_point = !>(t.hdr).(0) in let first_val = !^starting_point.value in walk first_val !>starting_point.(0) let print_slist t to_string = let print_level t lvl = let rec aux = function | Null -> print_endline "Null" | Hd forward -> Printf.printf "Level %d : Hd -> " lvl; aux forward.(lvl) | Node { value; forward; _ } -> let val_str = to_string value in Printf.printf "(%s) -> " val_str; aux forward.(lvl) in aux t.hdr in for lvl = !(t.level) downto 0 do print_level t lvl; Printf.printf "\n" done end module Batched = struct type t = Sequential.t type cfg = { size : int } type 'a op = | Insert : V.t -> unit op | Member : V.t -> bool op | Size : int op type wrapped_op = Mk : 'a op * 'a Picos.Computation.t -> wrapped_op let init ?(cfg = { size = Int.shift_left 1 30 - 1 }) () = Sequential.init ~size:cfg.size () type intermediate = { batch_size : int; maxinsertlevel : int ref; level_arr : int array; new_node_arr : Sequential.node array; new_node_back_arr : Sequential.node array; prev_node_idx : int array; } let build_node t idx elem { maxinsertlevel; new_node_arr; new_node_back_arr; level_arr; _ } = let rdm_level = Sequential.random_level t in level_arr.(idx) <- rdm_level; if rdm_level > !maxinsertlevel then maxinsertlevel := rdm_level; let new_node = Sequential.make_node t rdm_level elem in let new_node_back = Sequential.make_node t rdm_level elem in new_node_arr.(idx) <- new_node; new_node_back_arr.(idx) <- new_node_back let relate_nodes t idx { batch_size; maxinsertlevel; level_arr; new_node_arr; new_node_back_arr; prev_node_idx; _; } = let exception Break in let node = new_node_arr.(idx) in let next = ref (idx + 1) in for lvl = 0 to level_arr.(idx) do Sequential.(!>node.(lvl) <- t.nil); try for id = !next to batch_size - 1 do if lvl <= level_arr.(id) then ( (* Set forward pointer *) Sequential.(!>node.(lvl) <- new_node_arr.(id)); (* Set back_pointer *) Sequential.(!>(new_node_back_arr.(id)).(lvl) <- node); (* Set the previous pointer *) prev_node_idx.(((!maxinsertlevel + 1) * id) + lvl) <- idx; (* Update next id to start from *) next := id; raise Break) done with Break -> () done let merge_list t idx { maxinsertlevel; new_node_arr; level_arr; new_node_back_arr; prev_node_idx; _; } = let open Sequential in let exception Return in let node = new_node_arr.(idx) in let node_back = new_node_back_arr.(idx) in let update = Array.make (t.Sequential.maxlevel + 1) t.nil in let x = ref t.hdr in try for i = !(t.level) downto 0 do while match Sequential.(!>(!x)).(i) with | Null | Hd _ -> false | Node { value; _ } -> Sequential.(value *< !^node.value) do x := Sequential.(!>(!x).(i)) done; (* No duplicates *) (match !x with | Null -> () | Hd forward | Node { forward; _ } -> if forward.(i) != t.nil && (!^(forward.(i)).value *= !^node.value) then raise Return); update.(i) <- !x done; for i = 0 to level_arr.(idx) do if !>node.(i) == t.nil || compare !>(update.(i)).(i) !>node.(i) <= 0 then if !>(update.(i)).(i) != t.nil then !>node.(i) <- !>(update.(i)).(i); let prev_node_id = prev_node_idx.(((!maxinsertlevel + 1) * idx) + i) in if prev_node_id = -1 || compare new_node_arr.(prev_node_id) update.(i) <= 0 then ( !>node_back.(i) <- update.(i); prev_node_idx.(((!maxinsertlevel + 1) * idx) + i) <- -2) done with Return -> () let remove_duplicates arr num_elements = if num_elements <= 1 then num_elements else let j = ref 0 in for i = 0 to num_elements - 2 do if arr.(i) <> arr.(i + 1) then ( arr.(!j) <- arr.(i); incr j) done; arr.(!j) <- arr.(num_elements - 1); incr j; !j let par_insert t (elems : V.t array) = let open Sequential in (* Sort in acscending order *) Array.sort V.compare elems; (* Utils.Par_sort.sort pool ~compare:V.compare elems; *) let num_elems = remove_duplicates elems (Array.length elems) in let intermediary = { batch_size = num_elems; maxinsertlevel = t.level; level_arr = Array.make num_elems 0; new_node_arr = Array.make num_elems t.nil; new_node_back_arr = Array.make num_elems t.nil; prev_node_idx = Array.make ((t.maxlevel + 1) * num_elems) (-1); } in for idx = 0 to num_elems - 1 do build_node t idx elems.(idx) intermediary done; for idx = 0 to num_elems - 1 do relate_nodes t idx intermediary done; Utils.parallel_for ~start:0 ~finish:(num_elems - 1) (fun idx -> merge_list t idx intermediary); for i = 0 to num_elems - 1 do for j = 0 to intermediary.level_arr.(i) do if intermediary.prev_node_idx.((!(intermediary.maxinsertlevel) + 1) * i + j) = -2 then let back_node = !>(intermediary.new_node_back_arr.(i)).(j) in !>back_node.(j) <- intermediary.new_node_arr.(i) done done let par_search t (elems : V.t array) : bool array = let result_arr = Array.make (Array.length elems) false in Utils.parallel_for ~start:0 ~finish:(Array.length elems - 1) (fun i -> result_arr.(i) <- Sequential.mem t elems.(i)); result_arr let par_size t (elems : int array) : int array = let size = Sequential.size t in Utils.parallel_for ~start:0 ~finish:(Array.length elems - 1) (fun i -> elems.(i) <- size); elems let run t (ops : wrapped_op array) : unit = let inserts : V.t list ref = ref [] in let searches : (V.t * (bool -> unit)) list ref = ref [] in let size = lazy (Sequential.size t) in Array.iter (function | Mk (Size, comp) -> Computation.return comp (Lazy.force size) | Mk (Member vl, comp) -> searches := (vl, Computation.return comp) :: !searches | Mk (Insert vl, comp) -> Computation.return comp (); inserts := vl :: !inserts) ops; (* now, do all searches in parallel *) let searches = Array.of_list !searches in Utils.parallel_for ~start:0 ~finish:(Array.length searches - 1) (fun i -> let key, kont = searches.(i) in let result = Sequential.mem t key in kont result); (* now, all inserts *) let inserts = Array.of_list !inserts in par_insert t inserts end include Obatcher.Make (Batched) let insert t v = exec t (Batched.Insert v) let mem t v = exec t (Batched.Member v) let sz t = exec t Batched.Size end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>