package lwt_eio

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file lwt_eio.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
open Eio.Std

exception Cancel

module Token = struct
  [@@@warning "-65"]
  type t = ()
  let v : t = ()
end

(* Call this to cause the current [Lwt_engine.iter] to return. *)
let ready = ref (lazy ())

(* Indicates that [Lwt_unix.fork] has been called and we're the child process.
   Lwt tries to reinitialise the Lwt engine in the child in case the user wants
   to continue using Lwt there (rather than execing), but we don't support that.
   Make sure the reinitialisation doesn't break things (e.g. by adding bogus
   cancellation requests to the io_uring). *)
let is_forked = ref false

(* While the Lwt event loop is running, this is the switch that contains any fibers handling Lwt operations.
   Lwt does not use structured concurrency, so it can spawn background threads without explicitly taking a
   switch argument, which is why we need to use a global variable here. *)
let loop_switch = ref None

type debug_mode =
  | Eio                 (* Effects are permitted *)
  | Lwt                 (* Effects are not permitted *)
  | Normal              (* We're not checking *)

let mode = ref Normal

let with_mode m fn =
  match !mode, m with
  | Normal, _ -> fn ()
  | Lwt, Eio ->
    mode := Eio;
    begin
      match fn () with
      | x -> mode := Lwt; x
      | exception ex -> mode := Lwt; raise ex
    end
  | Eio, Lwt ->
    mode := Lwt;
    Effect.Deep.match_with fn ()
      { retc = (fun x -> mode := Eio; x);
        exnc = (fun ex -> mode := Eio; raise ex);
        effc = fun (type a) (e : a Effect.t) : ((a, _) Effect.Deep.continuation -> _) option ->
            match e with
            | Eio.Private.Effects.Get_context -> None
            | _ ->
              match !mode with
              | Normal -> assert false
              | Eio -> None
              | Lwt ->
                Printf.eprintf "WARNING: Attempt to perform effect in Lwt context\n";
                Some (fun k ->
                    if Printexc.backtrace_status () then
                      Printexc.print_raw_backtrace stderr (Effect.Deep.get_callstack k 10);
                    flush stderr;
                    Effect.Deep.discontinue k (Invalid_argument "Attempt to perform effect in Lwt context")
                  )
      }
  | Eio, Eio ->
    let bt = Printexc.get_callstack (if Printexc.backtrace_status () then 20 else 0) in
    let ex = Failure "Already in Eio context!" in
    traceln "WARNING: %a" Fmt.exn_backtrace (ex, bt);
    raise ex
  | Lwt, Lwt ->
    let bt = Printexc.get_callstack (if Printexc.backtrace_status () then 20 else 0) in
    let ex = Failure "Already in Lwt context!" in
    traceln "WARNING: %a" Fmt.exn_backtrace (ex, bt);
    raise ex
  | _, Normal -> assert false

let notify () = Lazy.force !ready

(* Run [fn] in a new fiber and return a lazy value that can be forced to cancel it. *)
let fork_with_cancel ~sw fn =
  if !is_forked then lazy (failwith "Can't use Eio in a forked child process")
  else (
    with_mode Eio @@ fun () ->
    let cancel = ref None in
    Fiber.fork ~sw (fun () ->
        try
          Eio.Cancel.sub @@ fun cc ->
          cancel := Some (lazy (
              if not !is_forked then (
                try Eio.Cancel.cancel cc Cancel with Invalid_argument _ -> ()
              )
            ));
          fn ()
        with Eio.Cancel.Cancelled Cancel -> ()
      );
    (* The forked fiber runs first, so [cancel] must be set by now. *)
    Option.get !cancel
  )

(* Lwt wants to set SIGCHLD to its own handler, but some Eio backends also install a handler for it.
   Both Lwt and Eio need to be notified, as they may each have child processes to monitor.

   There are two cases:

   1. Eio installs its handler first, then Lwt tries to replace it while Eio is running.
      We intercept that attempt and prevent the handler from changing.

   2. Lwt installs its handler first (e.g. because someone ran a Lwt event loop for a bit before using Eio).
      In that case, Eio will already have replaced Lwt's handler by the time we get called.

   Either way, Eio ends up owning the installed handler. We also want things to continue working if the Eio
   event loop finishes and then the application runs a plain Lwt loop. That's why we use [register_immediate],
   rather than running an Eio fiber.

   We also send an extra notification initially, in case we missed one during the hand-over. *)
let install_sigchld_handler = lazy (
  if not Sys.win32 then (
    Eio_unix.Process.install_sigchld_handler ();
    let rec register () =
      ignore (Eio.Condition.register_immediate Eio_unix.Process.sigchld register : Eio.Condition.request);
      Lwt_unix.handle_signal Sys.sigchld
    in
    register ()
  )
)

let make_engine ~sw ~clock = object
  inherit Lwt_engine.abstract

  method private cleanup =
    try Switch.fail sw Exit
    with Invalid_argument _ -> ()            (* Already destroyed *)

  method private register_readable fd callback =
    fork_with_cancel ~sw @@ fun () ->
    while true do
      Eio_unix.await_readable fd;
      Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
    done

  method private register_writable fd callback =
    fork_with_cancel ~sw @@ fun () ->
    while true do
      Eio_unix.await_writable fd;
      Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
    done

  method private register_timer delay repeat callback =
    fork_with_cancel ~sw @@ fun () ->
    if repeat then (
      while true do
        Eio.Time.sleep clock delay;
        Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
      done
    ) else (
      Eio.Time.sleep clock delay;
      Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
    )

  method! forwards_signal signum =
    signum = Sys.sigchld

  method iter block =
    with_mode Eio @@ fun () ->
    if block then (
      let p, r = Promise.create () in
      ready := lazy (Promise.resolve r ());
      Promise.await p
    ) else (
      Fiber.yield ()
    )

  method! fork =
    is_forked := true
end

(* Run an Lwt event loop until [user_promise] resolves. Raises [Exit] when done. *)
let main ~clock user_promise =
  let old_engine = Lwt_engine.get () in
  try
    Switch.run @@ fun sw ->
    if Option.is_some !loop_switch then invalid_arg "Lwt_eio event loop already running";
    Switch.on_release sw (fun () -> loop_switch := None);
    loop_switch := Some sw;
    with_mode Lwt @@ fun () ->
    Lwt_engine.set ~destroy:false (make_engine ~sw ~clock);
    (* An Eio fiber may resume an Lwt thread while in [Lwt_engine.iter] and forget to call [notify].
       If that called [Lwt.pause] then it wouldn't wake up, so handle this common case here. *)
    Lwt.register_pause_notifier (fun _ -> notify ());
    Lwt_main.run user_promise;
    (* Stop any event fibers still running: *)
    raise Exit
  with Exit ->
    Lwt_engine.set old_engine

let with_event_loop ?(debug=false) ~clock fn =
  Lazy.force install_sigchld_handler;
  let p, r = Lwt.wait () in
  mode := if debug then Eio else Normal;
  Switch.run @@ fun sw ->
  Fiber.fork ~sw (fun () -> main ~clock p);
  Fun.protect (fun () -> fn Token.v)
    ~finally:(fun () ->
        Lwt.wakeup r ();
        notify ()
      )

let get_loop_switch () =
  match !loop_switch with
  | Some sw -> sw
  | None -> Fmt.failwith "Must be called from within Lwt_eio.with_event_loop!"

module Promise = struct
  let await_lwt lwt_promise =
    let p, r = Promise.create () in
    Lwt.on_any lwt_promise (Promise.resolve_ok r) (Promise.resolve_error r);
    Promise.await_exn p

  let await_eio eio_promise =
    with_mode Eio @@ fun () ->
    let sw = get_loop_switch () in
    let p, r = Lwt.wait () in
    Fiber.fork ~sw (fun () ->
        let x = Promise.await eio_promise in
        with_mode Lwt @@ fun () ->
        Lwt.wakeup r x;
        notify ()
      );
    p

  let await_eio_result eio_promise =
    with_mode Eio @@ fun () ->
    let sw = get_loop_switch () in
    let p, r = Lwt.wait () in
    Fiber.fork ~sw (fun () ->
        match Promise.await eio_promise with
        | Ok x ->
          with_mode Lwt @@ fun () ->
          Lwt.wakeup r x; notify ()
        | Error ex ->
          with_mode Lwt @@ fun () ->
          Lwt.wakeup_exn r ex; notify ()
      );
    p
end

let run_eio fn =
  with_mode Eio @@ fun () ->
  let sw = get_loop_switch () in
  let p, r = Lwt.task () in
  let cc = ref None in
  Fiber.fork ~sw (fun () ->
      Eio.Cancel.sub (fun cancel ->
          cc := Some cancel;
          match fn () with
          | x ->
            with_mode Lwt @@ fun () ->
            Lwt.wakeup r x; notify ()
          | exception ex ->
            with_mode Lwt @@ fun () ->
            Lwt.wakeup_exn r ex; notify ()
        )
    );
  Lwt.on_cancel p (fun () -> Option.iter (fun cc -> Eio.Cancel.cancel cc Lwt.Canceled) !cc);
  p

let run_lwt fn =
  Fiber.check ();
  let p = with_mode Lwt fn in
  try
    Fiber.check ();
    Promise.await_lwt p
  with Eio.Cancel.Cancelled _ as ex ->
    Lwt.cancel p;
    raise ex

module Lf_queue = Eio_utils.Lf_queue

(* Jobs to be run in the main Lwt domain. *)
let jobs : (unit -> unit) Lf_queue.t = Lf_queue.create ()

let job_notification =
  Lwt_unix.make_notification
    (fun () ->
       (* Take the first job. The queue is never empty at this point. *)
       let thunk = Lf_queue.pop jobs |> Option.get in
       with_mode Lwt thunk
    )

let run_in_main_dont_wait f =
  (* Add the job to the queue. *)
  Lf_queue.push jobs f;
  (* Notify the main thread. *)
  Lwt_unix.send_notification job_notification

let run_lwt_in_main f =
  let cancel = ref (fun () -> assert false) in
  let p, r = Eio.Promise.create () in
  run_in_main_dont_wait (fun () ->
      let thread = f () in
      cancel := (fun () -> Lwt.cancel thread);
      Lwt.on_any thread
        (Eio.Promise.resolve_ok r)
        (Eio.Promise.resolve_error r)
    );
  match
    Fiber.check ();
    Eio.Promise.await p
  with
  | Ok x -> x
  | Error ex -> raise ex
  | exception (Eio.Cancel.Cancelled _ as ex) ->
    let cancelled, set_cancelled = Eio.Promise.create () in
    run_in_main_dont_wait (fun () ->
        (* By the time this runs, [cancel] must have been set. *)
        !cancel ();
        Eio.Promise.resolve set_cancelled ()
      );
    Eio.Cancel.protect (fun () -> Eio.Promise.await cancelled);
    raise ex
OCaml

Innovation. Community. Security.