package async_extra
Monadic concurrency library
Install
Dune Dependency
Authors
Maintainers
Sources
async_extra-v0.16.0.tar.gz
sha256=8084ad31437e9cede75470ac4f893c7a9d438c1c8502eea25fc3ef0af66aed00
doc/src/async_extra.async_bus/async_bus.ml.html
Source file async_bus.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
open! Core open! Async_kernel open! Import open! Bus let subscribe_and_maybe_write_to_pipe1 t here ~maybe_write_fn = if Bus.is_closed t then Pipe.empty () else ( let r, w = Pipe.create () in let subscription = subscribe_exn t here ~f:(maybe_write_fn w) ~on_close:(fun () -> Pipe.close w) in upon (Pipe.closed w) (fun () -> unsubscribe t subscription); r) ;; let pipe1_exn t here = subscribe_and_maybe_write_to_pipe1 t here ~maybe_write_fn:Pipe.write_without_pushback_if_open ;; let pipe1_filter_map_exn t here ~f = subscribe_and_maybe_write_to_pipe1 t here ~maybe_write_fn:(fun pipe v -> match f v with | None -> () | Some v -> Pipe.write_without_pushback_if_open pipe v) ;; module First_arity = struct type (_, _, _) t = | Arity1 : ('a -> unit, 'a -> 'r option, 'r) t | Arity2 : ('a -> 'b -> unit, 'a -> 'b -> 'r option, 'r) t | Arity3 : ('a -> 'b -> 'c -> unit, 'a -> 'b -> 'c -> 'r option, 'r) t | Arity4 : ('a -> 'b -> 'c -> 'd -> unit, 'a -> 'b -> 'c -> 'd -> 'r option, 'r) t | Arity5 : ( 'a -> 'b -> 'c -> 'd -> 'e -> unit , 'a -> 'b -> 'c -> 'd -> 'e -> 'r option , 'r ) t [@@deriving sexp_of] end let first_exn (type c f r) ?stop t here (first_arity : (c, f, r) First_arity.t) ~(f : f) = Deferred.create (fun ivar -> let subscriber : c Bus.Subscriber.t option ref = ref None in let finish : r option -> unit = function | None -> () | Some r -> Ivar.fill ivar r; (match !subscriber with | Some subscriber -> Bus.unsubscribe t subscriber | None -> (* When a [Bus] is created with [on_subscription_after_first_write:Allow_and_send_last_value], then [finish] can be called before the [Bus.subscribe_exn] below returns. In that case, we won't have captured the subscriber yet. Instead of [Option.value_exn !subscriber], match here and check again after [subscribe_exn] returns. *) ()) in (* We define [can_finish] separately from [finish] because we must call [can_finish] before we call [f], so that we do not call [f] if [stop] is determined. *) let can_finish = match stop with | None -> fun () -> true | Some stop -> upon stop (fun () -> Bus.unsubscribe t (Option.value_exn !subscriber)); fun () -> not (Deferred.is_determined stop) in let callback : c = match first_arity with | Arity1 -> fun a -> if can_finish () then finish (f a) | Arity2 -> fun a1 a2 -> if can_finish () then finish (f a1 a2) | Arity3 -> fun a1 a2 a3 -> if can_finish () then finish (f a1 a2 a3) | Arity4 -> fun a1 a2 a3 a4 -> if can_finish () then finish (f a1 a2 a3 a4) | Arity5 -> fun a1 a2 a3 a4 a5 -> if can_finish () then finish (f a1 a2 a3 a4 a5) in subscriber := Some (Bus.subscribe_exn t here ~on_callback_raise: (let monitor = Monitor.current () in fun error -> Monitor.send_exn monitor (Error.to_exn error)) ~f:callback); if Ivar.is_full ivar then Bus.unsubscribe t (Option.value_exn !subscriber)) ;;
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>