package async_extra
Monadic concurrency library
Install
Dune Dependency
Authors
Maintainers
Sources
async_extra-v0.14.0.tar.gz
sha256=782a955450c0cbd3e978fef4d4b77422b1f15dac29ddd5298642cedb48520264
md5=e645c6df8dff3da2e56205b3c95f920c
doc/src/async_extra.async_bus/async_bus.ml.html
Source file async_bus.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
open! Core_kernel open! Async_kernel open! Import open! Bus let pipe1_exn (t : ('a -> unit, _) t) here = let r, w = Pipe.create () in let subscription = subscribe_exn t here ~f:(function | v -> Pipe.write_without_pushback_if_open w v) ~on_close:(fun () -> Pipe.close w) in upon (Pipe.closed w) (fun () -> unsubscribe t subscription); r ;; module First_arity = struct type (_, _, _) t = | Arity1 : ('a -> unit, 'a -> 'r option, 'r) t | Arity2 : ('a -> 'b -> unit, 'a -> 'b -> 'r option, 'r) t | Arity3 : ('a -> 'b -> 'c -> unit, 'a -> 'b -> 'c -> 'r option, 'r) t | Arity4 : ('a -> 'b -> 'c -> 'd -> unit, 'a -> 'b -> 'c -> 'd -> 'r option, 'r) t | Arity5 : ( 'a -> 'b -> 'c -> 'd -> 'e -> unit , 'a -> 'b -> 'c -> 'd -> 'e -> 'r option , 'r ) t [@@deriving sexp_of] end let first_exn (type c f r) ?stop t here (first_arity : (c, f, r) First_arity.t) ~(f : f) = let module A = First_arity in Deferred.create (fun ivar -> let subscriber : c Bus.Subscriber.t option ref = ref None in let finish : r option -> unit = function | None -> () | Some r -> Ivar.fill ivar r; Bus.unsubscribe t (Option.value_exn !subscriber) in (* We define [can_finish] separately from [finish] because we must call [can_finish] before we call [f], so that we do not call [f] if [stop] is determined. *) let can_finish = match stop with | None -> fun () -> true | Some stop -> upon stop (fun () -> Bus.unsubscribe t (Option.value_exn !subscriber)); fun () -> not (Deferred.is_determined stop) in let callback : c = match first_arity with | Arity1 -> fun a -> if can_finish () then finish (f a) | Arity2 -> fun a1 a2 -> if can_finish () then finish (f a1 a2) | Arity3 -> fun a1 a2 a3 -> if can_finish () then finish (f a1 a2 a3) | Arity4 -> fun a1 a2 a3 a4 -> if can_finish () then finish (f a1 a2 a3 a4) | Arity5 -> fun a1 a2 a3 a4 a5 -> if can_finish () then finish (f a1 a2 a3 a4 a5) in subscriber := Some (Bus.subscribe_exn t here ~on_callback_raise: (let monitor = Monitor.current () in fun error -> Monitor.send_exn monitor (Error.to_exn error)) ~f:callback)) ;;
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>