package async_extra

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file async_bus.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
open! Core
open! Async_kernel
open! Import
open! Bus

let subscribe_and_maybe_write_to_pipe1 t here ~maybe_write_fn =
  if Bus.is_closed t
  then Pipe.empty ()
  else (
    let r, w = Pipe.create () in
    let subscription =
      subscribe_exn t here ~f:(maybe_write_fn w) ~on_close:(fun () -> Pipe.close w)
    in
    upon (Pipe.closed w) (fun () -> unsubscribe t subscription);
    r)
;;

let pipe1_exn t here =
  subscribe_and_maybe_write_to_pipe1
    t
    here
    ~maybe_write_fn:Pipe.write_without_pushback_if_open
;;

let pipe1_filter_map_exn t here ~f =
  subscribe_and_maybe_write_to_pipe1 t here ~maybe_write_fn:(fun pipe v ->
    match f v with
    | None -> ()
    | Some v -> Pipe.write_without_pushback_if_open pipe v)
;;

module First_arity = struct
  type (_, _, _) t =
    | Arity1 : ('a -> unit, 'a -> 'r option, 'r) t
    | Arity2 : ('a -> 'b -> unit, 'a -> 'b -> 'r option, 'r) t
    | Arity3 : ('a -> 'b -> 'c -> unit, 'a -> 'b -> 'c -> 'r option, 'r) t
    | Arity4 : ('a -> 'b -> 'c -> 'd -> unit, 'a -> 'b -> 'c -> 'd -> 'r option, 'r) t
    | Arity5
      : ( 'a -> 'b -> 'c -> 'd -> 'e -> unit
        , 'a -> 'b -> 'c -> 'd -> 'e -> 'r option
        , 'r )
          t
  [@@deriving sexp_of]
end

let first_exn (type c f r) ?stop t here (first_arity : (c, f, r) First_arity.t) ~(f : f) =
  Deferred.create (fun ivar ->
    let subscriber : c Bus.Subscriber.t option ref = ref None in
    let finish : r option -> unit = function
      | None -> ()
      | Some r ->
        Ivar.fill ivar r;
        (match !subscriber with
         | Some subscriber -> Bus.unsubscribe t subscriber
         | None ->
           (* When a [Bus] is created with
              [on_subscription_after_first_write:Allow_and_send_last_value], then [finish]
              can be called before the [Bus.subscribe_exn] below returns.  In that case,
              we won't have captured the subscriber yet.  Instead of [Option.value_exn
              !subscriber], match here and check again after [subscribe_exn] returns. *)
           ())
    in
    (* We define [can_finish] separately from [finish] because we must call [can_finish]
       before we call [f], so that we do not call [f] if [stop] is determined. *)
    let can_finish =
      match stop with
      | None -> fun () -> true
      | Some stop ->
        upon stop (fun () -> Bus.unsubscribe t (Option.value_exn !subscriber));
        fun () -> not (Deferred.is_determined stop)
    in
    let callback : c =
      match first_arity with
      | Arity1 -> fun a -> if can_finish () then finish (f a)
      | Arity2 -> fun a1 a2 -> if can_finish () then finish (f a1 a2)
      | Arity3 -> fun a1 a2 a3 -> if can_finish () then finish (f a1 a2 a3)
      | Arity4 -> fun a1 a2 a3 a4 -> if can_finish () then finish (f a1 a2 a3 a4)
      | Arity5 -> fun a1 a2 a3 a4 a5 -> if can_finish () then finish (f a1 a2 a3 a4 a5)
    in
    subscriber
    := Some
         (Bus.subscribe_exn
            t
            here
            ~on_callback_raise:
              (let monitor = Monitor.current () in
               fun error -> Monitor.send_exn monitor (Error.to_exn error))
            ~f:callback);
    if Ivar.is_full ivar then Bus.unsubscribe t (Option.value_exn !subscriber))
;;
OCaml

Innovation. Community. Security.