Source file fuseau_lwt.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
include Fuseau
let ( let@ ) = ( @@ )
open struct
let[@inline] conv_handle (ev : Lwt_engine.event) : Cancel_handle.t =
let cancel () = Lwt_engine.stop_event ev in
{ Cancel_handle.cancel }
let _pp_pending out engine =
Printf.fprintf out "readc=%d writec=%d timerc=%d" engine#readable_count
engine#writable_count engine#timer_count
end
(** Action scheduled from outside the loop *)
module Action = struct
type event = Lwt_engine.event
type cb = event -> unit
(** Action that we ask the lwt loop to perform, from the outside *)
type t =
| Wakeup_loop (** The only point of this is to wakeup the event loop *)
| Wait_readable of Unix.file_descr * Lwt_engine.event option ref * cb
| Wait_writable of Unix.file_descr * Lwt_engine.event option ref * cb
(** Perform the action from within the Lwt thread *)
let perform (self : t) : unit =
match self with
| Wakeup_loop -> ()
| Wait_readable (fd, r, cb) -> r := Some (Lwt_engine.on_readable fd cb)
| Wait_writable (fd, r, cb) -> r := Some (Lwt_engine.on_writable fd cb)
end
module Action_queue = struct
type t = { q: Action.t list Atomic.t } [@@unboxed]
let create () : t = { q = Atomic.make [] }
let pop_all (self : t) : _ list = Atomic.exchange self.q []
(** Push the action and return whether the queue was previously empty *)
let push (self : t) (a : Action.t) : bool =
let is_first = ref true in
while
let old = Atomic.get self.q in
if Atomic.compare_and_set self.q old (a :: old) then (
is_first := old = [];
false
) else
true
do
()
done;
!is_first
end
module Perform_action_in_lwt = struct
open struct
let actions_ : Action_queue.t = Action_queue.create ()
(** Gets the current set of notifications and perform them from inside the
Lwt thread *)
let perform_pending_actions () : unit =
let l = Action_queue.pop_all actions_ in
List.iter Action.perform l
let notification : int =
Lwt_unix.make_notification ~once:false perform_pending_actions
end
let schedule (a : Action.t) : unit =
let is_first = Action_queue.push actions_ a in
if is_first then Lwt_unix.send_notification notification
end
let _in_blocking_section = Atomic.make false
let ev_loop : Event_loop.t =
object
method on_timer time ~repeat f =
(Lwt_engine.get ())#on_timer time repeat (fun ev -> f (conv_handle ev))
|> conv_handle
method one_step ~block () =
let@ _sp =
Trace_core.with_span ~__FILE__ ~__LINE__ "fuseau-lwt.one-step"
~data:(fun () -> [ "block", `Bool block ])
in
Lwt.wakeup_paused ();
Atomic.set _in_blocking_section true;
(Lwt_engine.get ())#iter block;
Atomic.set _in_blocking_section false
method interrupt_if_in_blocking_section =
if Atomic.get _in_blocking_section then
Perform_action_in_lwt.schedule Action.Wakeup_loop
end
let main (f : unit -> 'a) : 'a =
let engine : Lwt_engine.t = Lwt_engine.get () in
let loop = ev_loop in
try
let x = Fuseau.main ~loop f in
engine#destroy;
x
with e ->
let bt = Printexc.get_raw_backtrace () in
engine#destroy;
Printexc.raise_with_backtrace e bt
let await_lwt (fut : _ Lwt.t) =
match Lwt.poll fut with
| Some x -> x
| None ->
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
Lwt.on_termination fut wakeup);
(match Lwt.poll fut with
| Some x -> x
| None -> assert false)
let spawn_as_lwt ?parent ?name ?propagate_cancel_to_parent (f : unit -> 'a) :
'a Lwt.t =
let fut, promise = Lwt.wait () in
let run () =
try
let x = f () in
Lwt.wakeup promise x
with exn -> Lwt.wakeup_exn promise exn
in
let _fib =
match parent with
| None -> Fuseau.spawn ?name ?propagate_cancel_to_parent run
| Some p ->
let scheduler = Fuseau.Scheduler.get_for_current_thread () in
Fuseau.spawn_as_child_of ?name ?propagate_cancel_to_parent scheduler p run
in
fut
let spawn_as_lwt_from_anywhere ?name sched f : 'a Lwt.t =
let fut, promise = Lwt.wait () in
let _fib =
Fuseau.spawn_from_anywhere ?name sched (fun () ->
try
let x = f () in
Lwt.wakeup promise x
with exn -> Lwt.wakeup_exn promise exn)
in
fut
open struct
let _default_buf_size = 16 * 1024
end
module IO_lwt = struct
type file_descr = Unix.file_descr
let rec read fd buf i len : int =
if len = 0 then
0
else (
match Unix.read fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
Perform_action_in_lwt.schedule
@@ Action.Wait_readable
( fd,
ref None,
fun _ev ->
wakeup ();
Lwt_engine.stop_event _ev ));
read fd buf i len
| exception Unix.Unix_error (Unix.ECONNRESET, _, _) -> 0
| n -> n
)
let rec write_once fd buf i len : int =
if len = 0 then
0
else (
match Unix.write fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
Perform_action_in_lwt.schedule
@@ Action.Wait_writable
( fd,
ref None,
fun ev ->
wakeup ();
Lwt_engine.stop_event ev ));
write_once fd buf i len
| n -> n
)
let write fd buf i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
let n = write_once fd buf !i !len in
i := !i + n;
len := !len - n
done
end
let ev_read fd buf i len : int Event.t =
let poll () =
if len = 0 then
Some (Ok 0)
else (
match Unix.read fd buf i len with
| n -> Some (Ok n)
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
None
| exception Unix.Unix_error (Unix.ECONNRESET, _, _) -> Some (Ok 0)
| exception e ->
let ebt = Exn_bt.get e in
Some (Error ebt)
)
in
let wait cb =
let r = ref None in
let cancel =
Cancel_handle.make
~cancel:(fun () -> Option.iter Lwt_engine.stop_event !r)
()
in
Perform_action_in_lwt.schedule
@@ Action.Wait_readable
( fd,
r,
fun _ev ->
cb ();
Lwt_engine.stop_event _ev );
cancel
in
{ poll; wait }
let ev_write fd buf i len : int Event.t =
let poll () =
if len = 0 then
Some (Ok 0)
else (
match Unix.write fd buf i len with
| n -> Some (Ok n)
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
None
| exception Unix.Unix_error (Unix.ECONNRESET, _, _) -> Some (Ok 0)
| exception e ->
let ebt = Exn_bt.get e in
Some (Error ebt)
)
in
let wait cb =
let r = ref None in
let cancel =
Cancel_handle.make
~cancel:(fun () -> Option.iter Lwt_engine.stop_event !r)
()
in
Perform_action_in_lwt.schedule
@@ Action.Wait_writable
( fd,
r,
fun _ev ->
cb ();
Lwt_engine.stop_event _ev );
cancel
in
{ poll; wait }
module Iostream = struct
module Out = struct
include Iostream.Out
let of_unix_fd ?(close_noerr = false)
?(buf = Bytes.create _default_buf_size) fd : t =
let buf_off = ref 0 in
let[@inline] is_full () = !buf_off = Bytes.length buf in
let flush () =
if !buf_off > 0 then (
IO_lwt.write fd buf 0 !buf_off;
buf_off := 0
)
in
object
method output_char c =
if is_full () then flush ();
Bytes.set buf !buf_off c;
incr buf_off
method output bs i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
if is_full () then flush ();
let n = min !len (Bytes.length buf - !buf_off) in
Bytes.blit bs !i buf !buf_off n;
buf_off := !buf_off + n;
i := !i + n;
len := !len - n
done;
if is_full () then flush ()
method close () =
if close_noerr then (
try
flush ();
Unix.close fd
with _ -> ()
) else (
flush ();
Unix.close fd
)
method flush = flush
end
end
module In = struct
include Iostream.In
let of_unix_fd ?(close_noerr = false)
?(buf = Bytes.create _default_buf_size) (fd : Unix.file_descr) : t =
let buf_len = ref 0 in
let buf_off = ref 0 in
let refill () =
buf_off := 0;
buf_len := IO_lwt.read fd buf 0 (Bytes.length buf)
in
object
method input b i len : int =
if !buf_len = 0 then refill ();
let n = min len !buf_len in
if n > 0 then (
Bytes.blit buf !buf_off b i n;
buf_off := !buf_off + n;
buf_len := !buf_len - n
);
n
method close () =
if close_noerr then (
try Unix.close fd with _ -> ()
) else
Unix.close fd
end
end
end
module Net = struct
module TCP_server = struct
type t = Lwt_io.server
let establish ?backlog ?no_close addr handler : t =
let (Any_fiber parent_fiber) = Fuseau.Fiber.get_current () in
let server =
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
(fun client_addr client_sock ->
let ic =
Iostream.In.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock
in
let oc =
Iostream.Out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock
in
spawn_as_lwt ~parent:parent_fiber ~name:"tcp.server.handler"
(fun () -> handler client_addr ic oc))
in
await_lwt server
let shutdown self = await_lwt @@ Lwt_io.shutdown_server self
end
module TCP_client = struct
let with_connect addr (f : Iostream.In.t -> Iostream.Out.t -> 'a) : 'a =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.TCP_NODELAY true;
while
try
Unix.connect sock addr;
false
with
| Unix.Unix_error
((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
->
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
Perform_action_in_lwt.schedule
@@ Action.Wait_writable
( sock,
ref None,
fun ev ->
wakeup ();
Lwt_engine.stop_event ev ));
true
do
()
done;
let ic = Iostream.In.of_unix_fd sock in
let oc = Iostream.Out.of_unix_fd sock in
let finally () = try Unix.close sock with _ -> () in
let@ () = Fun.protect ~finally in
f ic oc
end
end