package async_kernel

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file mvar.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
open! Core
open! Import
open! Deferred_std

type ('a, 'phantom) t =
  { current_value : 'a Moption.t
  ; taken : (unit, read_write) Bvar.t
  ; mutable value_available : unit Ivar.t
  }
[@@deriving fields, sexp_of]

let value_available t = Ivar.read t.value_available
let is_empty t = Moption.is_none t.current_value

let invariant invariant_a _ (t : _ t) =
  Invariant.invariant [%here] t [%sexp_of: (_, _) t] (fun () ->
    let check f = Invariant.check_field t f in
    Fields.iter
      ~current_value:(check (Moption.invariant invariant_a))
      ~taken:(check (Bvar.invariant Unit.invariant ignore))
      ~value_available:
        (check (fun value_available ->
           [%test_result: bool]
             (Ivar.is_full value_available)
             ~expect:(Moption.is_some t.current_value))))
;;

let peek t = Moption.get t.current_value

let peek_exn t =
  if is_empty t then raise_s [%message "Mvar.peek_exn called on empty mvar"];
  Moption.get_some_exn t.current_value
;;

let sexp_of_t sexp_of_a _ t = [%sexp (peek t : a option)]

module Read_write = struct
  type nonrec 'a t = ('a, read_write) t [@@deriving sexp_of]

  let invariant invariant_a t = invariant invariant_a ignore t
end

module Read_only = struct
  type nonrec 'a t = ('a, read) t [@@deriving sexp_of]

  let invariant invariant_a t = invariant invariant_a ignore t
end

let read_only (t : ('a, [> read ]) t) = (t :> ('a, read) t)
let write_only (t : ('a, [> write ]) t) = (t :> ('a, write) t)

let create () =
  { current_value = Moption.create ()
  ; taken = Bvar.create ()
  ; value_available = Ivar.create ()
  }
;;

let take_nonempty t =
  assert (not (is_empty t));
  let r = Moption.get_some_exn t.current_value in
  Moption.set_none t.current_value;
  Bvar.broadcast t.taken ();
  t.value_available <- Ivar.create ();
  r
;;

let take_now_exn t =
  if is_empty t then raise_s [%message "Mvar.take_exn called on empty mvar"];
  take_nonempty t
;;

let take_now t = if not (is_empty t) then Some (take_nonempty t) else None

let rec take t =
  if not (is_empty t)
  then return (take_nonempty t)
  else (
    let%bind () = value_available t in
    take t)
;;

let set t v =
  Moption.set_some t.current_value v;
  Ivar.fill_if_empty t.value_available ()
;;

let update t ~f = set t (f (peek t))
let update_exn t ~f = set t (f (peek_exn t))
let taken t = Bvar.wait t.taken

let rec put t v =
  if is_empty t
  then (
    set t v;
    return ())
  else (
    let%bind () = taken t in
    put t v)
;;

let pipe_when_ready t =
  let r, w = Pipe.create () in
  let rec loop () =
    let%bind () = value_available t in
    if not (Pipe.is_closed w)
    then (
      match take_now t with
      | None -> loop ()
      | Some x ->
        let%bind () = Pipe.write w x in
        loop ())
    else return ()
  in
  don't_wait_for (loop ());
  r
;;
OCaml

Innovation. Community. Security.