package devkit
Development kit - general purpose library
Install
Dune Dependency
Authors
Maintainers
Sources
devkit-1.20240429.tbz
sha256=222f8ac131b1d970dab7eeb2714bfd6b9338b88b1082e6e01c136ae19e7eaef4
sha512=c9e6d93e3d21e5530c0f4d5baca51bf1f0a5d19248f8af7678d0665bb5cdf295d7aaaaa3e50eb2e44b8720e55097cc675af4dc8ec45acf9da39feb3eae1405d5
doc/src/devkit.core/extThread.ml.html
Source file extThread.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
let log = Log.self type 'a t = [ `Exn of exn | `None | `Ok of 'a ] ref * Thread.t let detach f x = let result = ref `None in result, Thread.create (fun () -> result := Exn.map f x) () let join (result,thread) = Thread.join thread; match !result with `None -> assert false | (`Ok _ | `Exn _ as x) -> x let join_exn t = match join t with `Ok x -> x | `Exn exn -> raise exn let map f a = Array.map join_exn @@ Array.map (detach f) a let mapn ?(n=8) f l = assert (n > 0); Action.distribute n l |> map (List.map @@ Exn.map f) |> Action.undistribute let locked mutex f = Mutex.lock mutex; Std.finally (fun () -> Mutex.unlock mutex) f () module LockMutex = struct type t = Mutex.t let create = Mutex.create let locked = locked end module Async_fin = struct open Async module U = ExtUnix.All type t = { q : (unit -> unit) Mtq.t; evfd : Unix.file_descr; } let is_available () = ExtUnix.Config.have `EVENTFD let setup events = let fin = { q = Mtq.create (); evfd = U.eventfd 0; } in let rec loop () = match Mtq.try_get fin.q with | None -> () | Some f -> begin try f () with exn -> log #warn ~exn "fin loop" end; loop () in let reset fd = try ignore (U.eventfd_read fd) with | Unix.Unix_error (Unix.EAGAIN, _, _) -> () | exn -> log #warn ~exn "fin reset"; () in setup_simple_event events fin.evfd [Ev.READ] begin fun _ fd _ -> reset fd; loop () end; fin let shutdown { q; evfd } = Mtq.clear q; Unix.close evfd let callback fin f = Mtq.put fin.q f; U.eventfd_write fin.evfd 1L end let log_create ?name f x = Thread.create (fun () -> Action.log ?name f x) () let run_periodic ~delay ?(now=false) f = let (_:Thread.t) = Thread.create begin fun () -> if not now then Nix.sleep delay; while try f () with exn -> Log.self #warn ~exn "ExtThread.run_periodic"; true do Nix.sleep delay done end () in () module type WorkerT = sig type task type result end module type Workers = sig type task type result type t val create : (task -> result) -> int -> t val perform : t -> ?autoexit:bool -> task Enum.t -> (result -> unit) -> unit val stop : ?wait:int -> t -> unit end module Workers(T:WorkerT) = struct type task = T.task type result = T.result type t = task Mtq.t * result Mtq.t * int let worker qi f qo = while true do Mtq.put qo (f (Mtq.get qi)) done let stop ?wait:_ (qi,_,_) = Mtq.clear qi let create f n = let qi = Mtq.create () and qo = Mtq.create () in for _ = 1 to n do ignore (Thread.create (fun () -> worker qi f qo) ()) done; qi,qo,n let perform (qi,qo,n) ?autoexit:_ e f = let active = ref 0 in for _ = 1 to n do match Enum.get e with | Some x -> Mtq.put qi x; incr active | None -> () done; while !active > 0 do let res = Mtq.get qo in begin match Enum.get e with | Some x -> Mtq.put qi x | None -> decr active end; f res done end let atomic_incr = incr let atomic_decr = decr let atomic_get x = !x module Pool = struct type t = { q : (unit -> unit) Mtq.t; total : int; free : int ref; mutable blocked : bool; } let create n = let t = { q = Mtq.create (); total = n; free = ref (-1); blocked = false;} in t let init t = let worker _i = while true do let f = Mtq.get t.q in atomic_decr t.free; begin try f () with exn -> log #warn ~exn "ThreadPool" end; atomic_incr t.free; done in t.free := t.total; for i = 1 to t.total do let (_:Thread.t) = log_create worker i in () done let status t = Printf.sprintf "queue %d threads %d of %d" (Mtq.length t.q) (atomic_get t.free) t.total let put t = if atomic_get t.free = -1 then init t; while t.blocked do Nix.sleep 0.05 done; Mtq.put t.q let wait_blocked ?(n=0) t = if (atomic_get t.free <> -1) then begin while t.blocked do Nix.sleep 0.05 done;(* Wait for unblock *) t.blocked <- true; assert(n>=0); let i = ref 1 in while Mtq.length t.q + (t.total - atomic_get t.free)> n do (* Notice that some workers can be launched! *) if !i = 100 || !i mod 1000 = 0 then log #info "Thread Pool - waiting block : %s" (status t); Nix.sleep 0.05; incr i done; t.blocked <- false end end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>