package async_extra

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file async_bus.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
open! Core_kernel
open! Async_kernel
open! Import
open! Bus

let pipe1_exn (t : ('a -> unit, _) t) here =
  let r, w = Pipe.create () in
  let subscription =
    subscribe_exn
      t
      here
      ~f:(function
        | v -> Pipe.write_without_pushback_if_open w v)
      ~on_close:(fun () -> Pipe.close w)
  in
  upon (Pipe.closed w) (fun () -> unsubscribe t subscription);
  r
;;

module First_arity = struct
  type (_, _, _) t =
    | Arity1 : ('a -> unit, 'a -> 'r option, 'r) t
    | Arity2 : ('a -> 'b -> unit, 'a -> 'b -> 'r option, 'r) t
    | Arity3 : ('a -> 'b -> 'c -> unit, 'a -> 'b -> 'c -> 'r option, 'r) t
    | Arity4 : ('a -> 'b -> 'c -> 'd -> unit, 'a -> 'b -> 'c -> 'd -> 'r option, 'r) t
    | Arity5
      : ( 'a -> 'b -> 'c -> 'd -> 'e -> unit
        , 'a -> 'b -> 'c -> 'd -> 'e -> 'r option
        , 'r )
          t
  [@@deriving sexp_of]
end

let first_exn (type c f r) ?stop t here (first_arity : (c, f, r) First_arity.t) ~(f : f) =
  let module A = First_arity in
  Deferred.create (fun ivar ->
    let subscriber : c Bus.Subscriber.t option ref = ref None in
    let finish : r option -> unit = function
      | None -> ()
      | Some r ->
        Ivar.fill ivar r;
        Bus.unsubscribe t (Option.value_exn !subscriber)
    in
    (* We define [can_finish] separately from [finish] because we must call [can_finish]
       before we call [f], so that we do not call [f] if [stop] is determined. *)
    let can_finish =
      match stop with
      | None -> fun () -> true
      | Some stop ->
        upon stop (fun () -> Bus.unsubscribe t (Option.value_exn !subscriber));
        fun () -> not (Deferred.is_determined stop)
    in
    let callback : c =
      match first_arity with
      | Arity1 -> fun a -> if can_finish () then finish (f a)
      | Arity2 -> fun a1 a2 -> if can_finish () then finish (f a1 a2)
      | Arity3 -> fun a1 a2 a3 -> if can_finish () then finish (f a1 a2 a3)
      | Arity4 -> fun a1 a2 a3 a4 -> if can_finish () then finish (f a1 a2 a3 a4)
      | Arity5 -> fun a1 a2 a3 a4 a5 -> if can_finish () then finish (f a1 a2 a3 a4 a5)
    in
    subscriber
    := Some
         (Bus.subscribe_exn
            t
            here
            ~on_callback_raise:
              (let monitor = Monitor.current () in
               fun error -> Monitor.send_exn monitor (Error.to_exn error))
            ~f:callback))
;;
OCaml

Innovation. Community. Security.