package async_unix

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file raw_fd.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
open Core
open Import

let debug = Debug.fd

module File_descr = Unix.File_descr

module Kind = struct
  type t =
    | Char
    | Fifo
    | File
    | Socket of [ `Unconnected | `Bound | `Passive | `Active | `Unknown ]
  [@@deriving sexp_of]
end

module State = struct
  (* [State] is is used to keep track of when the file descriptor is in use or being
     closed.  Here are the allowed transitions.

     Open --> Close_requested --> Closed *)
  type t =
    (* [Close_requested (execution_context, do_close_syscall)] indicates that [Fd.close t]
       has been called, but that we haven't yet started the close() syscall, because there
       are still active syscalls using the file descriptor.  Once there are no active
       syscalls, we enqueue a job to [do_close_syscall] in [execution_context]. *)
    | Close_requested of Execution_context.t * (unit -> unit)
    (* [Closed] indicates that there are no more active syscalls and we have started the
       close() syscall. *)
    | Closed
    (* [Open] is the initial state of a file descriptor, and the normal state when it is
       in use.  It indicates that it has not not yet been closed.  The argument is an ivar
       to be filled when [close] is called. *)
    | Open of unit Ivar.t
  [@@deriving sexp_of]

  let transition_is_allowed t t' =
    match t, t' with
    | Open _, Close_requested _ | Close_requested _, Closed -> true
    | _ -> false
  ;;

  let is_open = function
    | Open _ -> true
    | Close_requested _ | Closed -> false
  ;;
end

type ready_to_result =
  [ `Ready
  | `Bad_fd
  | `Closed
  | `Interrupted
  | `Unsupported
  ]
[@@deriving sexp_of]

module Watching = struct
  (* Every fd can be monitored by a file_descr_watcher for read, for write, for both, or
     for neither.  Each fd also has its own notion, independent of the file_descr_watcher,
     of a [Watching.t], for both read and write that indicates the desired state of the
     file_descr_watcher for this fd.  That desired state is maintained only in the fd
     while async jobs are running, and is then synchronized with the file_descr_watcher's
     notion, via calls to [File_descr_watcher.set], just prior to asking the
     file_descr_watcher to check fds for ready I/O.

     Initially, watching state starts as [Not_watching].  When one initially requests that
     the fd be monitored via [request_start_watching], the state transitions to
     [Watch_once] or [Watch_repeatedly].  After the file_descr_watcher detects I/O is
     available, the job in [Watch_repeatedly] is enqueued, or the ivar in [Watch_once] is
     filled and the state transitions to [Stop_requested].  Or, if one calls
     [request_stop_watching], the state transitions to [Stop_requested].  Finally,
     [Stop_requested] will transition to [Not_watching] when the desired state is
     synchronized with the file_descr_watcher. *)
  type t =
    | Not_watching
    | Watch_once of ready_to_result Ivar.t
    | Watch_repeatedly of
        Job.t * [ `Bad_fd | `Closed | `Interrupted | `Unsupported ] Ivar.t
    | Stop_requested
  [@@deriving sexp_of]

  let invariant t : unit =
    try
      match t with
      | Not_watching | Stop_requested -> ()
      | Watch_once ivar -> assert (Ivar.is_empty ivar)
      | Watch_repeatedly (_, ivar) -> assert (Ivar.is_empty ivar)
    with
    | exn -> raise_s [%message "Watching.invariant failed" (exn : exn) ~watching:(t : t)]
  ;;
end

module Nonblock_status = struct
  (* Encodes the knowledge of the O_NONBLOCK flag of an fd. *)
  type t =
    | Blocking
    | Nonblocking
    | Unknown
  [@@deriving sexp_of]
end

module T = struct
  type t =
    { file_descr : File_descr.t
    ; (* [info] is for debugging info. It is mutable because it changes after [bind],
         [listen], or[connect]. *)
      mutable info : Info.t
    ; (* [kind] is mutable because it changes after [bind], [listen], or [connect]. *)
      mutable kind : Kind.t
    ; (* if [can_set_nonblock] is true, async will switch the underlying file
         descriptor into nonblocking mode any time a non-blocking operation is attempted.
         It can be [false] if the user explicitly tells async to avoid modifying that
         flag on the underlying fd, or if Async detects that the file descriptor
         doesn't support nonblocking I/O. *)
      mutable can_set_nonblock : bool
    ; mutable nonblock_status : Nonblock_status.t
    ; mutable state : State.t
    ; watching : Watching.t Read_write_pair.Mutable.t
    ; (* [watching_has_changed] is true if [watching] has changed since the last time
         [watching] was synchronized with the file_descr_watcher.  In this case, the
         fd appears in the scheduler's [fds_whose_watching_has_changed] list so that
         it can be synchronized later. *)
      mutable watching_has_changed : bool
    ; (* [num_active_syscalls] is used to ensure that we don't call [close] on a file
         descriptor until there are no active system calls involving that file descriptor.
         This prevents races in which the OS assigns that file descriptor to a new
         open file, and thus a system call deals with the wrong open file.   If the
         state of an fd is [Close_requested], then once [num_active_syscalls] drops to
         zero, the close() syscall will start and the state will transition to [Closed],
         thus preventing further system calls from using the file descriptor.

         [num_active_syscalls] is abused slightly to include the syscall to the
         file_descr_watcher to check for ready I/O.  Watching for read and for write
         each potentially count for one active syscall. *)
      mutable num_active_syscalls : int
    ; (* [close_finished] becomes determined after the file descriptor has been closed
         and the underlying close() system call has finished. *)
      close_finished : unit Ivar.t
    }
  [@@deriving fields ~iterators:iter, sexp_of]

  type t_hum = t

  let sexp_of_t_hum { file_descr; info; kind; _ } =
    [%sexp { file_descr : File_descr.t; info : Info.t; kind : Kind.t }]
  ;;
end

include T

let equal (t : t) t' = phys_equal t t'

let invariant t : unit =
  try
    let check f field = f (Field.get field t) in
    Fields.iter
      ~info:ignore
      ~file_descr:ignore
      ~kind:ignore
      ~can_set_nonblock:ignore
      ~nonblock_status:ignore
      ~state:ignore
      ~watching:
        (check (fun watching -> Read_write_pair.iter watching ~f:Watching.invariant))
      ~watching_has_changed:ignore
      ~num_active_syscalls:
        (check (fun num_active_syscalls ->
           assert (t.num_active_syscalls >= 0);
           let watching read_or_write =
             match Read_write_pair.get t.watching read_or_write with
             | Not_watching -> 0
             | Stop_requested | Watch_once _ | Watch_repeatedly _ -> 1
           in
           assert (t.num_active_syscalls >= watching `Read + watching `Write);
           match t.state with
           | Closed -> assert (num_active_syscalls = 0)
           | Close_requested _ | Open _ -> ()))
      ~close_finished:
        (check (fun close_finished ->
           match t.state with
           | Closed -> ()
           | Close_requested _ -> assert (Ivar.is_empty close_finished)
           | Open close_started ->
             assert (Ivar.is_empty close_finished);
             assert (Ivar.is_empty close_started)))
  with
  | exn -> raise_s [%message "Fd.invariant failed" (exn : exn) ~fd:(t : t)]
;;

let to_int t = File_descr.to_int t.file_descr

let create ?(avoid_setting_nonblock = false) (kind : Kind.t) file_descr info =
  let can_set_nonblock =
    if avoid_setting_nonblock
    then false
    else (
      match kind with
      (* No point in setting nonblocking for files.  Unix doesn't care. *)
      | File -> false
      (* We don't use nonblocking I/O for char devices because we don't want to change the
         blocking status of TTYs, which would affect all processes currently attached to
         that TTY and even persist after this process terminates.

         Also, /dev/null is a char device not supported by epoll.

         We don't really care about doing nonblocking I/O on other character devices,
         e.g. /dev/random. *)
      | Char -> false
      | Fifo -> true
      | Socket _ ->
        (* All one can do on a `Bound socket is listen() to it, and we don't use listen()
           in a nonblocking way.
           `Unconnected sockets support nonblocking so we can connect() them.
           `Passive     sockets support nonblocking so we can accept() them.
           `Active      sockets support nonblocking so we can read() and write() them.

           We need the sockets to be in nonblocking mode for [`Unconnected] and [`Passive]
           sockets, because [accept_interruptible] and [connect_interruptible] in
           unix_syscalls.ml assume that such sockets are nonblocking.  On the other hand,
           there is no such assumption about [`Active] sockets.

           In any case, returning [true] here makes sense. Not only is it harmless
           in case we don't end up using it, but this field is also not updated by
           socket state transitions ([Fd.Private.replace]), so its value needs to be the
           same across the different socket states.
        *)
        true)
  in
  let t =
    { info
    ; file_descr
    ; kind
    ; can_set_nonblock
    ; nonblock_status = Nonblock_status.Unknown
    ; state = State.Open (Ivar.create ())
    ; watching = Read_write_pair.create_both Watching.Not_watching
    ; watching_has_changed = false
    ; num_active_syscalls = 0
    ; close_finished = Ivar.create ()
    }
  in
  if debug then Debug.log "Fd.create" t [%sexp_of: t];
  t
;;

let inc_num_active_syscalls t =
  match t.state with
  | Close_requested _ | Closed -> `Already_closed
  | Open _ ->
    t.num_active_syscalls <- t.num_active_syscalls + 1;
    `Ok
;;

let set_state t new_state =
  if debug then Debug.log "Fd.set_state" (new_state, t) [%sexp_of: State.t * t];
  if State.transition_is_allowed t.state new_state
  then t.state <- new_state
  else
    raise_s
      [%message
        "Fd.set_state attempted disallowed state transition"
          ~fd:(t : t)
          (new_state : State.t)]
;;

let is_open t = State.is_open t.state
let is_closed t = not (is_open t)

let determine_nonblock_status t =
  match t.nonblock_status with
  | Unknown ->
    let flags = Core_unix.fcntl_getfl t.file_descr in
    let nonblock =
      Core_unix.Open_flags.is_subset Core_unix.Open_flags.nonblock ~of_:flags
    in
    t.nonblock_status
      <- (match nonblock with
          | false -> Blocking
          | true -> Nonblocking);
    nonblock
  | Nonblocking -> true
  | Blocking -> false
;;

let supports_nonblock t = if t.can_set_nonblock then true else determine_nonblock_status t

let set_nonblock_if_necessary ?(nonblocking = false) t =
  if nonblocking
  then (
    match determine_nonblock_status t with
    | true -> ()
    | false ->
      if t.can_set_nonblock
      then (
        Unix.set_nonblock t.file_descr;
        t.nonblock_status <- Nonblocking)
      else
        raise_s
          [%message
            "Fd.set_nonblock_if_necessary called on fd that does not support nonblock"
              ~fd:(t : t)])
;;

let with_file_descr_exn ?nonblocking t f =
  if is_closed t
  then raise_s [%message "Fd.with_file_descr_exn got closed fd" ~_:(t : t)]
  else (
    set_nonblock_if_necessary t ?nonblocking;
    f t.file_descr)
;;

let with_file_descr ?nonblocking t f =
  if is_closed t
  then `Already_closed
  else (
    try
      set_nonblock_if_necessary t ?nonblocking;
      `Ok (f t.file_descr)
    with
    | exn -> `Error exn)
;;

let syscall ?nonblocking t f =
  with_file_descr t ?nonblocking (fun file_descr ->
    Result.ok_exn (Syscall.syscall (fun () -> f file_descr)))
;;

let syscall_exn ?nonblocking t f =
  match syscall t f ?nonblocking with
  | `Ok a -> a
  | `Already_closed -> raise_s [%message "Fd.syscall_exn got closed fd" ~_:(t : t)]
  | `Error exn -> raise exn
;;

let syscall_result_exn ?nonblocking t a f =
  if is_closed t
  then raise_s [%message "Fd.syscall_result_exn got closed fd" ~_:(t : t)]
  else (
    set_nonblock_if_necessary t ?nonblocking;
    Syscall.syscall_result2 t.file_descr a f)
;;
OCaml

Innovation. Community. Security.