package shexp

  1. Overview
  2. Docs

Source file job.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
open Import

type 'a state =
  | Pending
  | Finished of ('a, exn * Printexc.raw_backtrace) result
  | Waiting_for_result of Condition.t

type 'a t =
  { work : unit -> 'a
  ; mutable state : 'a state
  ; mutex : Mutex.t
  }

type packed = T : _ t -> packed

let run t =
  let state =
    Finished
      (match t.work () with
       | x -> Ok x
       | exception e ->
         let backtrace = Printexc.get_raw_backtrace () in
         Error (e, backtrace))
  in
  Mutex.lock t.mutex;
  let old_state = t.state in
  t.state <- state;
  (match old_state with
   | Waiting_for_result cond -> Condition.broadcast cond
   | _ -> ());
  Mutex.unlock t.mutex
;;

module Worker = struct
  type t = { next_job : packed Event.channel }

  let workers = Queue.create ()
  let count = ref 0
  let mutex = Mutex.create ()

  let rec loop t =
    Mutex.lock mutex;
    Queue.push t workers;
    Mutex.unlock mutex;
    (* Wait for a job *)
    let (T job) = Event.sync (Event.receive t.next_job) in
    run job;
    loop t
  ;;

  let start job =
    run job;
    loop { next_job = Event.new_channel () }
  ;;
end

let pid = ref 0

let detach ~f =
  let t = { work = f; state = Pending; mutex = Mutex.create () } in
  Mutex.lock Worker.mutex;
  (* Detect forks *)
  let current_pid = Unix.getpid () in
  if !pid <> current_pid
  then (
    pid := current_pid;
    Queue.clear Worker.workers;
    Worker.count := 0);
  if not (Queue.is_empty Worker.workers)
  then (
    let worker = Queue.pop Worker.workers in
    Mutex.unlock Worker.mutex;
    Event.sync (Event.send worker.next_job (T t)))
  else (
    let f =
      if !Worker.count = 16
      then run
      else (
        incr Worker.count;
        Worker.start)
    in
    Mutex.unlock Worker.mutex;
    ignore (Thread.create f t : Thread.t));
  t
;;

let really_wait t cond =
  Condition.wait cond t.mutex;
  match t.state with
  | Finished res -> res
  | _ -> assert false
;;

let wait t =
  Mutex.lock t.mutex;
  let res =
    match t.state with
    | Finished res -> res
    | Waiting_for_result cond -> really_wait t cond
    | Pending ->
      let cond = Condition.create () in
      t.state <- Waiting_for_result cond;
      really_wait t cond
  in
  Mutex.unlock t.mutex;
  match res with
  | Ok x -> x
  | Error (exn, backtrace) -> Printexc.raise_with_backtrace exn backtrace
;;
OCaml

Innovation. Community. Security.