package picos_lwt

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file picos_lwt_unix.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
open Picos

let[@inline never] not_main_thread () =
  invalid_arg "not called from the main thread"

module Mpscq = Picos_aux_mpscq

let ready = Mpscq.create ~padded:true ()

type notification = { mutable ref_count : int; mutable id : int }

let notification = { ref_count = 0; id = 0 }
let state = Atomic.make `Not_running

let notify_callback () =
  Atomic.set state `Running;
  let rec loop () =
    match Mpscq.pop_exn ready with
    | resolver ->
        Lwt.wakeup resolver ();
        loop ()
    | exception Mpscq.Empty -> begin
        match Atomic.get state with
        | `Not_running | `Notified ->
            Atomic.set state `Running;
            loop ()
        | `Running ->
            if not (Atomic.compare_and_set state `Running `Not_running) then
              loop ()
      end
  in
  loop ()

let rec notify () =
  match Atomic.get state with
  | `Notified -> ()
  | (`Running | `Not_running) as before ->
      if Atomic.compare_and_set state before `Notified then begin
        if before == `Not_running then
          Lwt_unix.send_notification notification.id
      end
      else notify ()

module System = struct
  let sleep = Lwt_unix.sleep

  type trigger = unit Lwt.t * unit Lwt.u

  let trigger = Lwt.wait

  let signal (_, resolver) =
    if Picos_thread.is_main_thread () then Lwt.wakeup resolver ()
    else begin
      Mpscq.push ready resolver;
      notify ()
    end

  let await (promise, _) = promise
end

let system = (module System : Picos_lwt.System)

let notification_decr _ =
  let ref_count = notification.ref_count - 1 in
  notification.ref_count <- ref_count;
  if ref_count = 0 then Lwt_unix.stop_notification notification.id

let run_fiber fiber main =
  if not (Picos_thread.is_main_thread ()) then not_main_thread ();
  begin
    let ref_count = notification.ref_count + 1 in
    notification.ref_count <- ref_count;
    if ref_count = 1 then
      notification.id <- Lwt_unix.make_notification notify_callback
  end;
  let promise = Picos_lwt.run_fiber system fiber main in
  Lwt.on_any promise notification_decr notification_decr;
  promise

let run ?(forbid = false) main =
  let computation = Computation.create ~mode:`LIFO () in
  let fiber = Fiber.create ~forbid computation in
  let main _ = Computation.capture computation main () in
  run_fiber fiber main |> Lwt.map @@ fun () -> Computation.peek_exn computation

let run_main ?forbid main = Lwt_main.run (run ?forbid main)
OCaml

Innovation. Community. Security.