package containers-thread

  1. Overview
  2. Docs

Source file CCTimer.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
type job = Job : float * (unit -> 'a) -> job

let ( <= ) (a : float) b = Stdlib.( <= ) a b
let ( >= ) (a : float) b = Stdlib.( >= ) a b
let ( < ) (a : float) b = Stdlib.( < ) a b
let ( > ) (a : float) b = Stdlib.( > ) a b

module TaskHeap = CCHeap.Make (struct
  type t = job

  let leq (Job (f1, _)) (Job (f2, _)) = f1 <= f2
end)

exception Stopped

type t = {
  mutable stop: bool;
  mutable tasks: TaskHeap.t;
  mutable exn_handler: exn -> unit;
  t_mutex: Mutex.t;
  fifo_in: Unix.file_descr;
  fifo_out: Unix.file_descr;
}

let set_exn_handler timer f = timer.exn_handler <- f
let standby_wait = 10.
(* when no task is scheduled, this is the amount of time that is waited
   in a row for something to happen. This is also the maximal delay
   between the call to {!stop} and the actual termination of the
   thread. *)

let epsilon = 0.0001
(* accepted time diff for actions. *)

let with_lock_ t f =
  Mutex.lock t.t_mutex;
  try
    let x = f t in
    Mutex.unlock t.t_mutex;
    x
  with e ->
    Mutex.unlock t.t_mutex;
    raise e

type command = Quit | Run : (unit -> _) -> command | Wait of float

let pop_task_ t =
  let tasks, _ = TaskHeap.take_exn t.tasks in
  t.tasks <- tasks

let call_ timer f = try ignore (f ()) with e -> timer.exn_handler e

(* check next task *)
let next_task_ timer =
  match TaskHeap.find_min timer.tasks with
  | _ when timer.stop -> Quit
  | None -> Wait standby_wait
  | Some (Job (time, f)) ->
    let now = Unix.gettimeofday () in
    if now +. epsilon > time then (
      (* now! *)
      pop_task_ timer;
      Run f
    ) else
      Wait (time -. now)

(* The main thread function: wait for next event, run it, and loop *)
let serve timer =
  let buf = Bytes.make 1 '_' in
  (* acquire lock, call [process_task] and do as it commands *)
  let rec next () =
    match with_lock_ timer next_task_ with
    | Quit -> ()
    | Run f ->
      call_ timer f;
      (* call outside of any lock *)
      next ()
    | Wait delay -> wait delay
  (* wait for [delay] seconds, or until something happens on [fifo_in] *)
  and wait delay =
    let read = Thread.wait_timed_read timer.fifo_in delay in
    (* remove char from fifo, so that next write can happen *)
    if read then ignore (Unix.read timer.fifo_in buf 0 1);
    next ()
  in
  next ()

let nop_handler_ _ = ()

let create () =
  let fifo_in, fifo_out = Unix.pipe () in
  let timer =
    {
      stop = false;
      tasks = TaskHeap.empty;
      exn_handler = nop_handler_;
      t_mutex = Mutex.create ();
      fifo_in;
      fifo_out;
    }
  in
  (* start a thread to process tasks *)
  let _t = Thread.create serve timer in
  timer

let underscore_ = Bytes.make 1 '_'

(* awake the thread *)
let awaken_ timer = ignore (Unix.single_write timer.fifo_out underscore_ 0 1)

(** [at s t ~f] will run [f ()] at the Unix echo [t] *)
let at timer time ~f =
  if timer.stop then raise Stopped;
  let now = Unix.gettimeofday () in
  if now >= time then
    call_ timer f
  else
    with_lock_ timer (fun timer ->
        if timer.stop then raise Stopped;
        (* time of the next scheduled event *)
        let next_time =
          match TaskHeap.find_min timer.tasks with
          | None -> max_float
          | Some (Job (d, _)) -> d
        in
        (* insert task *)
        timer.tasks <- TaskHeap.insert (Job (time, f)) timer.tasks;
        (* see if the timer thread needs to be awaken earlier *)
        if time < next_time then awaken_ timer)

let after timer delay ~f =
  assert (delay >= 0.);
  let now = Unix.gettimeofday () in
  at timer (now +. delay) ~f

exception ExitEvery

let every ?delay timer d ~f =
  let rec run () =
    try
      ignore (f ());
      schedule ()
    with ExitEvery -> ()
  (* stop *)
  and schedule () = after timer d ~f:run in
  match delay with
  | None -> run ()
  | Some d -> after timer d ~f:run

let active timer = not timer.stop

(** Stop the given timer, cancelling pending tasks *)
let stop timer =
  with_lock_ timer (fun timer ->
      if not timer.stop then (
        timer.stop <- true;
        (* empty heap of tasks *)
        timer.tasks <- TaskHeap.empty;
        (* tell the thread to stop *)
        awaken_ timer
      ))
OCaml

Innovation. Community. Security.