package picos_io

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file picos_io_select.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
open Picos
open Picos_std_event

let handle_sigchld_bit = 0b001
let select_thread_running_on_main_domain_bit = 0b010
let ignore_sigpipe_bit = 0b100

type config = {
  mutable bits : int;
  mutable intr_sig : int;
  mutable intr_sigs : int list;
}

let config = { bits = 0; intr_sig = 0; intr_sigs = [] }

(* *)

type return =
  | Return : {
      value : 'a;
      computation : 'a Computation.t;
      mutable alive : bool;
    }
      -> return

(** We use random numbers as keys for the awaiters. *)
module RandomInt = struct
  type t = int

  let equal = Int.equal
  let hash = Fun.id
end

module Htbl = Picos_aux_htbl

let chld_awaiters = Htbl.create ~hashed_type:(module RandomInt) ()

(* *)

type cancel_at =
  | Cancel_at : {
      time : Mtime.span;
      exn : exn;
      bt : Printexc.raw_backtrace;
      computation : 'a Computation.t;
    }
      -> cancel_at

module Q =
  Psq.Make
    (Int)
    (struct
      type t = cancel_at

      let compare (Cancel_at l) (Cancel_at r) = Mtime.Span.compare l.time r.time
    end)

type return_on =
  | Return_on : {
      file_descr : Picos_io_fd.t;
      value : 'a;
      computation : 'a Computation.t;
      mutable alive : bool;
    }
      -> return_on

type phase = Continue | Select | Waking_up | Process

type state = {
  phase : phase Atomic.t;
  mutable state : [ `Initial | `Starting | `Alive | `Stopping | `Stopped ];
  mutable exn_bt : exn * Printexc.raw_backtrace;
  mutable pipe_inn : Unix.file_descr;
  mutable pipe_out : Unix.file_descr;
  byte : Bytes.t;
  (* *)
  timeouts : Q.t Atomic.t;
  mutable next_id : int;
  (* *)
  new_rd : return_on list ref;
  new_wr : return_on list ref;
  new_ex : return_on list ref;
}

type intr_status = Cleared | Signaled

type _ tdt =
  | Nothing : [> `Nothing ] tdt
  | Req : {
      state : state;
      mutable unused : bool;
      mutable computation : intr_status Computation.t;
    }
      -> [> `Req ] tdt

type req = R : [< `Nothing | `Req ] tdt -> req [@@unboxed]
type counter_state = { value : int; req : req }

let intr_pending = Atomic.make { value = 0; req = R Nothing }
let empty_bt = Printexc.get_callstack 0
let exit_bt = (Exit, empty_bt)

let cleared =
  let computation = Computation.create () in
  Computation.return computation Cleared;
  computation

let intr_key : [ `Req ] tdt Picos_thread.TLS.t = Picos_thread.TLS.create ()

let key =
  Picos_domain.DLS.new_key @@ fun () ->
  {
    phase = Atomic.make Continue;
    state = `Initial;
    exn_bt = exit_bt;
    pipe_inn = Unix.stdin;
    pipe_out = Unix.stdin;
    byte = Bytes.create 1;
    timeouts = Atomic.make Q.empty;
    next_id = 0;
    new_rd = ref [];
    new_wr = ref [];
    new_ex = ref [];
  }

let[@poll error] [@inline never] try_transition s from into =
  s.state == from
  && begin
       s.state <- into;
       true
     end

let[@poll error] [@inline never] transition s into =
  let from = s.state in
  s.state <- into;
  from

let rec wakeup s from =
  match Atomic.get s.phase with
  | Process | Waking_up ->
      (* The thread will process the fds and timeouts before next select. *)
      ()
  | Continue ->
      if Atomic.compare_and_set s.phase Continue Process then
        (* We managed to signal the wakeup before the thread was ready to call
           select and the thread will notice this without us needing to write to
           the pipe. *)
        ()
      else
        (* Either the thread called select or another wakeup won the race.  We
           need to retry. *)
        wakeup s from
  | Select ->
      if Atomic.compare_and_set s.phase Select Waking_up then
        if s.state == from then
          (* We are now responsible for writing to the pipe to force the thread
             to exit the select. *)
          let n = Unix.write s.pipe_out s.byte 0 1 in
          assert (n = 1)

type fos = { n : int; unique_fds : Unix.file_descr list; ops : return_on list }

let fos_empty = { n = 1; unique_fds = []; ops = [] }

module Ht = Hashtbl.Make (Picos_io_fd.Resource)

let rec process_fds ht unique_fds ops = function
  | [] ->
      if unique_fds == [] && ops == [] then fos_empty
      else { n = Ht.length ht; unique_fds; ops }
  | (Return_on r as op) :: ops_todo ->
      if Computation.is_running r.computation then begin
        let file_descr = Picos_io_fd.unsafe_get r.file_descr in
        match Ht.find ht file_descr with
        | `Return ->
            Picos_io_fd.decr r.file_descr;
            r.alive <- false;
            Computation.return r.computation r.value;
            process_fds ht unique_fds ops ops_todo
        | `Alive -> process_fds ht unique_fds (op :: ops) ops_todo
        | exception Not_found ->
            Ht.add ht file_descr `Alive;
            process_fds ht (file_descr :: unique_fds) (op :: ops) ops_todo
      end
      else begin
        Picos_io_fd.decr r.file_descr;
        process_fds ht unique_fds ops ops_todo
      end

let process_fds unique_fds fos new_ops =
  if fos.ops == [] && new_ops == [] then fos_empty
  else
    let ht = Ht.create fos.n in
    unique_fds |> List.iter (fun fd -> Ht.add ht fd `Return);
    let r = process_fds ht [] [] fos.ops in
    if new_ops == [] then r else process_fds ht r.unique_fds r.ops new_ops

let rec process_timeouts s =
  let before = Atomic.get s.timeouts in
  match Q.pop before with
  | None -> -1.0
  | Some ((_, Cancel_at e), after) ->
      let elapsed = Mtime_clock.elapsed () in
      if Mtime.Span.compare e.time elapsed <= 0 then begin
        if Atomic.compare_and_set s.timeouts before after then
          Computation.cancel e.computation e.exn e.bt;
        process_timeouts s
      end
      else
        Mtime.Span.to_float_ns (Mtime.Span.abs_diff e.time elapsed)
        *. (1. /. 1_000_000_000.)

module Thread_atomic = Picos_io_thread_atomic

let rec select_thread s timeout rd wr ex =
  if s.state == `Alive then begin
    let rd_fds, wr_fds, ex_fds =
      if Atomic.compare_and_set s.phase Continue Select then begin
        try
          Unix.select
            (s.pipe_inn :: rd.unique_fds)
            wr.unique_fds ex.unique_fds timeout
        with Unix.Unix_error (EINTR, _, _) -> ([], [], [])
      end
      else ([], [], [])
    in
    begin
      match Atomic.exchange s.phase Continue with
      | Select | Process | Continue -> ()
      | Waking_up ->
          let n = Unix.read s.pipe_inn s.byte 0 1 in
          assert (n = 1)
    end;
    let rd = process_fds rd_fds rd (Thread_atomic.exchange s.new_rd []) in
    let wr = process_fds wr_fds wr (Thread_atomic.exchange s.new_wr []) in
    let ex = process_fds ex_fds ex (Thread_atomic.exchange s.new_ex []) in
    let timeout = process_timeouts s in
    let timeout =
      let state = Atomic.get intr_pending in
      if state.value = 0 then timeout
      else begin
        assert (0 < state.value);
        Unix.kill (Unix.getpid ()) config.intr_sig;
        let idle = 0.000_001 (* 1μs *) in
        if timeout < 0.0 || idle <= timeout then idle else timeout
      end
    in
    select_thread s timeout rd wr ex
  end

let select_thread s =
  if Picos_domain.is_main_domain () then
    config.bits <- select_thread_running_on_main_domain_bit lor config.bits;
  if not Sys.win32 then begin
    Thread.sigmask SIG_BLOCK config.intr_sigs |> ignore;
    Thread.sigmask
      (if config.bits land handle_sigchld_bit <> 0 then SIG_UNBLOCK
       else SIG_BLOCK)
      [ Sys.sigchld ]
    |> ignore
  end;
  begin
    try
      let pipe_inn, pipe_out = Unix.pipe ~cloexec:true () in
      s.pipe_inn <- pipe_inn;
      s.pipe_out <- pipe_out;
      if try_transition s `Starting `Alive then
        select_thread s (-1.0) fos_empty fos_empty fos_empty
    with exn ->
      let bt = Printexc.get_raw_backtrace () in
      s.exn_bt <- (exn, bt)
  end;
  transition s `Stopped |> ignore;
  if s.pipe_inn != Unix.stdin then Unix.close s.pipe_inn;
  if s.pipe_out != Unix.stdin then Unix.close s.pipe_out

let[@poll error] [@inline never] try_configure ~intr_sig ~intr_sigs
    ~handle_sigchld ~ignore_sigpipe =
  config.intr_sigs == []
  && begin
       config.bits <-
         Bool.to_int handle_sigchld
         lor (ignore_sigpipe_bit land -Bool.to_int ignore_sigpipe);
       config.intr_sig <- intr_sig;
       config.intr_sigs <- intr_sigs;
       true
     end

let is_intr_sig signum = signum = config.intr_sig

let handle_signal signal =
  if signal = Sys.sigchld then begin
    Htbl.remove_all chld_awaiters
    |> Seq.iter @@ fun (_, Return r) ->
       r.alive <- false;
       Computation.return r.computation r.value
  end
  else if signal = config.intr_sig then
    let (Req r) = Picos_thread.TLS.get_exn intr_key in
    Computation.return r.computation Signaled

let reconfigure_signal_handlers () =
  if not Sys.win32 then begin
    Sys.signal config.intr_sig (Sys.Signal_handle handle_signal) |> ignore;
    Thread.sigmask SIG_BLOCK config.intr_sigs |> ignore;
    if config.bits land handle_sigchld_bit <> 0 then begin
      Sys.signal Sys.sigchld (Sys.Signal_handle handle_signal) |> ignore;
      Thread.sigmask SIG_BLOCK [ Sys.sigchld ] |> ignore
    end;
    if config.bits land ignore_sigpipe_bit <> 0 then begin
      Sys.signal Sys.sigpipe Signal_ignore |> ignore
    end
  end

let configure ?(intr_sig = Sys.sigusr2) ?(handle_sigchld = true)
    ?(ignore_sigpipe = true) () =
  if not (Picos_thread.is_main_thread ()) then
    invalid_arg "must be called from the main thread on the main domain";
  assert (Sys.sigabrt = -1 && Sys.sigxfsz < Sys.sigabrt);
  if intr_sig < Sys.sigxfsz || 0 <= intr_sig || intr_sig = Sys.sigchld then
    invalid_arg "invalid interrupt signal number";
  if
    not
      (try_configure ~intr_sig ~intr_sigs:[ intr_sig ] ~handle_sigchld
         ~ignore_sigpipe)
  then invalid_arg "already configured";

  reconfigure_signal_handlers ()

let check_configured () =
  (* [instantenous_domain_index] uses [Domain.at_exit] and we want to ensure it
     is called as early as possible. *)
  Multicore_magic.instantaneous_domain_index () |> ignore;
  if config.intr_sigs == [] then configure ()
  else reconfigure_signal_handlers ()

let[@inline never] init s =
  check_configured ();
  if try_transition s `Initial `Starting then begin
    match Thread.create select_thread s with
    | thread ->
        Picos_domain.at_exit @@ fun () ->
        if try_transition s `Alive `Stopping then wakeup s `Stopping;
        Thread.join thread;
        if s.exn_bt != exit_bt then
          Printexc.raise_with_backtrace (fst s.exn_bt) (snd s.exn_bt)
    | exception exn ->
        transition s `Stopped |> ignore;
        raise exn
  end;
  while s.state == `Starting do
    Thread.yield ()
  done;
  if s.state != `Alive then invalid_arg "domain has been terminated"

let get () =
  let s = Picos_domain.DLS.get key in
  if s.state != `Alive then init s;
  s

(* *)

let[@poll error] [@inline never] next_id t =
  let id = t.next_id in
  t.next_id <- id + 1;
  id

let rec add_timeout s id entry =
  let before = Atomic.get s.timeouts in
  let after = Q.add id entry before in
  if Atomic.compare_and_set s.timeouts before after then
    match Q.min after with
    | Some (id', _) -> if id = id' then wakeup s `Alive
    | None -> ()
  else add_timeout s id entry

let rec remove_action _trigger s id =
  let before = Atomic.get s.timeouts in
  let after = Q.remove id before in
  if not (Atomic.compare_and_set s.timeouts before after) then
    remove_action (Obj.magic ()) s id

let to_deadline ~seconds =
  match Mtime.Span.of_float_ns (seconds *. 1_000_000_000.) with
  | None -> invalid_arg "seconds should be between 0 to pow(2, 53) nanoseconds"
  | Some span -> Mtime.Span.add (Mtime_clock.elapsed ()) span

let[@alert "-handler"] cancel_after computation ~seconds exn bt =
  let time = to_deadline ~seconds in
  let entry = Cancel_at { time; exn; bt; computation } in
  let s = get () in
  let id = next_id s in
  add_timeout s id entry;
  let remover = Trigger.from_action s id remove_action in
  if not (Computation.try_attach computation remover) then
    Trigger.signal remover

let[@alert "-handler"] timeout ~seconds =
  let request outer to_result =
    let inner =
      Computation.with_action to_result outer @@ fun _ to_result outer ->
      Computation.return outer to_result
    in
    let canceler =
      Trigger.from_action () inner @@ fun _ _ inner ->
      Computation.cancel inner Exit empty_bt
    in
    if Computation.try_attach outer canceler then
      cancel_after inner ~seconds Exit empty_bt
  in
  Event.from_request { request }

(* *)

let wakeup_action _trigger s (Return_on r) = if r.alive then wakeup s `Alive

let[@alert "-handler"] rec insert_fd s fds (Return_on r as op) =
  let before = !fds in
  if Computation.is_running r.computation then
    if Thread_atomic.compare_and_set fds before (Return_on r :: before) then
      let _ : bool =
        Computation.try_attach r.computation
          (Trigger.from_action s op wakeup_action)
      in
      wakeup s `Alive
    else insert_fd s fds op
  else Picos_io_fd.decr r.file_descr

let return_on computation file_descr op value =
  Picos_io_fd.incr file_descr;
  let s = get () in
  insert_fd s
    (match op with `R -> s.new_rd | `W -> s.new_wr | `E -> s.new_ex)
    (Return_on { computation; file_descr; value; alive = true })

let await_on file_descr op =
  let computation = Computation.create ~mode:`LIFO () in
  return_on computation file_descr op file_descr;
  try Computation.await computation
  with exn ->
    Computation.cancel computation Exit empty_bt;
    raise exn

let on file_descr op =
  let request computation to_result =
    return_on computation file_descr op to_result
  in
  Event.from_request { request }

(* *)

module Intr = struct
  type t = req

  let[@inline] use = function R Nothing -> () | R (Req r) -> r.unused <- false

  (** This is used to ensure that the [intr_pending] counter is incremented
      exactly once before the counter is decremented. *)
  let rec incr_once (Req r as req : [ `Req ] tdt) backoff =
    let before = Atomic.get intr_pending in
    (* [intr_pending] must be read before [r.unused]! *)
    r.unused && before.req != R req
    && begin
         use before.req;
         let after = { value = before.value + 1; req = R req } in
         if Atomic.compare_and_set intr_pending before after then
           after.value = 1
         else incr_once req (Backoff.once backoff)
       end

  let intr_action trigger (Req r as req : [ `Req ] tdt) id =
    match Computation.await r.computation with
    | Cleared ->
        (* No signal needs to be delivered. *)
        remove_action trigger r.state id
    | Signaled ->
        (* Signal was delivered before timeout. *)
        remove_action trigger r.state id;
        if incr_once req Backoff.default then
          (* We need to make sure at least one select thread will keep on
             triggering interrupts. *)
          wakeup r.state `Alive
    | exception Exit ->
        (* The timeout was triggered.  This must have been called from the
           select thread, which will soon trigger an interrupt. *)
        let _ : bool = incr_once req Backoff.default in
        ()

  let nothing = R Nothing

  let[@alert "-handler"] req ~seconds =
    if Sys.win32 then invalid_arg "not supported on Windows"
    else begin
      let time = to_deadline ~seconds in
      (* assert (not (Computation.is_running r.computation)); *)
      let state = get () in
      let id = next_id state in
      let (Req r as req : [ `Req ] tdt) =
        Req { state; unused = true; computation = cleared }
      in
      let computation = Computation.with_action req id intr_action in
      r.computation <- computation;
      Picos_thread.TLS.set intr_key req;
      let entry = Cancel_at { time; exn = Exit; bt = empty_bt; computation } in
      add_timeout state id entry;
      let was_blocked : int list =
        Thread.sigmask SIG_UNBLOCK config.intr_sigs
      in
      assert (List.exists is_intr_sig was_blocked);
      R req
    end

  let rec decr backoff =
    let before = Atomic.get intr_pending in
    use before.req;
    let after = { value = before.value - 1; req = R Nothing } in
    assert (0 <= after.value);
    if not (Atomic.compare_and_set intr_pending before after) then
      decr (Backoff.once backoff)

  let clr = function
    | R Nothing -> ()
    | R (Req r as req) ->
        let was_blocked : int list =
          Thread.sigmask SIG_BLOCK config.intr_sigs
        in
        assert (not (List.exists is_intr_sig was_blocked));
        if not (Computation.try_return r.computation Cleared) then begin
          let _ : bool = incr_once req Backoff.default in
          (* We ensure that the associated increment has been done before we
             decrement so that the [intr_pending] counter is never too low. *)
          decr Backoff.default
        end
end

(* *)

let rec insert return =
  let id = Random.bits () in
  if Htbl.try_add chld_awaiters id return then id else insert return

let[@alert "-handler"] return_on_sigchld computation value =
  if
    config.bits
    land (select_thread_running_on_main_domain_bit lor handle_sigchld_bit)
    = handle_sigchld_bit
  then
    (* Ensure there is at least one thread handling [Sys.sigchld] signals. *)
    get () |> ignore;
  let return = Return { value; computation; alive = true } in
  let id = insert return in
  let remover =
    Trigger.from_action id return @@ fun _trigger id (Return this_r as this) ->
    if this_r.alive then begin
      this_r.alive <- false;
      (* It should be extremely rare, but possible, that the return was already
         removed and another added just at this point and so we must account for
         the possibility and make sure that whatever we remove is completed. *)
      match Htbl.remove_exn chld_awaiters id with
      | Return that_r as that ->
          if this != that then
            Computation.return that_r.computation that_r.value
      | exception Not_found -> ()
    end
  in
  if not (Computation.try_attach computation remover) then
    Trigger.signal remover

let on_sigchld = Event.from_request { request = return_on_sigchld }
OCaml

Innovation. Community. Security.