package moonpool
Pools of threads supported by a pool of domains
Install
Dune Dependency
Authors
Maintainers
Sources
moonpool-0.7.tbz
sha256=c4a1f974200530ab7f6014de3a369fdbb260ff454183640f32e51ba3fec51b15
sha512=865daabb96e3d60f88ecee9fc9030dad8b257fff4121b404e882d8a8d6687b737beb6e22366f52eb14e770dfab28b326853a1d3d883fa19bbd791d8450b40f8b
doc/src/moonpool.dpool/moonpool_dpool.ml.html
Source file moonpool_dpool.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
module Bb_queue = struct type 'a t = { mutex: Mutex.t; cond: Condition.t; q: 'a Queue.t; } let create () : _ t = { mutex = Mutex.create (); cond = Condition.create (); q = Queue.create () } let push (self : _ t) x : unit = Mutex.lock self.mutex; let was_empty = Queue.is_empty self.q in Queue.push x self.q; if was_empty then Condition.broadcast self.cond; Mutex.unlock self.mutex let pop (self : 'a t) : 'a = Mutex.lock self.mutex; let rec loop () = if Queue.is_empty self.q then ( Condition.wait self.cond self.mutex; (loop [@tailcall]) () ) else ( let x = Queue.pop self.q in Mutex.unlock self.mutex; x ) in loop () end module Lock = struct type 'a t = { mutex: Mutex.t; mutable content: 'a; } let create content : _ t = { mutex = Mutex.create (); content } let with_ (self : _ t) f = Mutex.lock self.mutex; try let x = f self.content in Mutex.unlock self.mutex; x with e -> Mutex.unlock self.mutex; raise e let[@inline] update_map l f = with_ l (fun x -> let x', y = f x in l.content <- x'; y) let get l = Mutex.lock l.mutex; let x = l.content in Mutex.unlock l.mutex; x end type domain = Domain_.t type event = | Run of (unit -> unit) (** Run this function *) | Decr (** Decrease count *) (* State for a domain worker. It should not do too much except for starting new threads for pools. *) type worker_state = { q: event Bb_queue.t; th_count: int Atomic_.t; (** Number of threads on this *) } (** Array of (optional) workers. Workers are started/stop on demand. For each index we have the (currently active) domain's state including a work queue and a thread refcount; and the domain itself, if any, in a separate option because it might outlive its own state. *) let domains_ : (worker_state option * Domain_.t option) Lock.t array = let n = max 1 (Domain_.recommended_number ()) in Array.init n (fun _ -> Lock.create (None, None)) (** main work loop for a domain worker. A domain worker does two things: - run functions it's asked to (mainly, to start new threads inside it) - decrease the refcount when one of these threads stops. The thread will notify the domain that it's exiting, so the domain can know how many threads are still using it. If all threads exit, the domain polls a bit (in case new threads are created really shortly after, which happens with a [Pool.with_] or [Pool.create() … Pool.shutdown()] in a tight loop), and if nothing happens it tries to stop to free resources. *) let work_ idx (st : worker_state) : unit = let main_loop () = let continue = ref true in while !continue do match Bb_queue.pop st.q with | Run f -> (try f () with _ -> ()) | Decr -> if Atomic_.fetch_and_add st.th_count (-1) = 1 then ( continue := false; (* wait a bit, we might be needed again in a short amount of time *) try for _n_attempt = 1 to 50 do Thread.delay 0.001; if Atomic_.get st.th_count > 0 then ( (* needed again! *) continue := true; raise Exit ) done with Exit -> () ) done in while main_loop (); (* exit: try to remove ourselves from [domains]. If that fails, keep living. *) let is_alive = Lock.update_map domains_.(idx) (function | None, _ -> assert false | Some _st', dom -> assert (st == _st'); if Atomic_.get st.th_count > 0 then (* still alive! *) (Some st, dom), true else (None, dom), false) in is_alive do () done; () (* special case for main domain: we start a worker immediately *) let () = assert (Domain_.is_main_domain ()); let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in (* thread that stays alive *) ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t); domains_.(0) <- Lock.create (Some w, None) let[@inline] max_number_of_domains () : int = Array.length domains_ let run_on (i : int) (f : unit -> unit) : unit = assert (i < Array.length domains_); let w = Lock.update_map domains_.(i) (function | (Some w, _) as st -> Atomic_.incr w.th_count; st, w | None, dying_dom -> (* join previous dying domain, to free its resources, if any *) Option.iter Domain_.join dying_dom; let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in let worker : domain = Domain_.spawn (fun () -> work_ i w) in (Some w, Some worker), w) in Bb_queue.push w.q (Run f) let decr_on (i : int) : unit = assert (i < Array.length domains_); match Lock.get domains_.(i) with | Some st, _ -> Bb_queue.push st.q Decr | None, _ -> () let run_on_and_wait (i : int) (f : unit -> 'a) : 'a = let q = Bb_queue.create () in run_on i (fun () -> let x = f () in Bb_queue.push q x); Bb_queue.pop q
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>