package fuseau

  1. Overview
  2. Docs

Source file net.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
open Common_

module Inet_addr = struct
  type t = Unix.inet_addr

  let any = Unix.inet_addr_any
  let loopback = Unix.inet_addr_loopback
  let show = Unix.string_of_inet_addr
  let of_string s = try Some (Unix.inet_addr_of_string s) with _ -> None

  let of_string_exn s =
    try Unix.inet_addr_of_string s with _ -> invalid_arg "Inet_addr.of_string"
end

module Sockaddr = struct
  type t = Unix.sockaddr

  let show = function
    | Unix.ADDR_UNIX s -> s
    | Unix.ADDR_INET (addr, port) ->
      spf "%s:%d" (Unix.string_of_inet_addr addr) port

  let unix s : t = Unix.ADDR_UNIX s
  let inet addr port : t = Unix.ADDR_INET (addr, port)

  let inet_parse addr port =
    try Some (inet (Unix.inet_addr_of_string addr) port) with _ -> None

  let inet_parse_exn addr port =
    try inet (Unix.inet_addr_of_string addr) port
    with _ -> invalid_arg "Sockadd.inet_parse"

  let inet_local port = inet Unix.inet_addr_loopback port
  let inet_any port = inet Unix.inet_addr_any port
end

module TCP_server = struct
  type t = { fiber: unit Fiber.t } [@@unboxed]

  exception Stop

  let stop_ fiber =
    let ebt = Exn_bt.get Stop in
    Fuseau.Fiber.Private_.cancel fiber ebt

  let stop self = stop_ self.fiber
  let join self = Fuseau.await self.fiber

  let with_serve' (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a =
    let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in

    Unix.bind sock addr;
    Unix.set_nonblock sock;
    Unix.setsockopt sock Unix.SO_REUSEADDR true;
    Unix.listen sock 32;

    let fiber = Fuseau.Fiber.Private_.create () in
    let self = { fiber } in

    let loop_client client_sock client_addr : unit =
      Unix.set_nonblock client_sock;
      Unix.setsockopt client_sock Unix.TCP_NODELAY true;

      let@ () =
        Fun.protect ~finally:(fun () ->
            try Unix.close client_sock with _ -> ())
      in
      handle_client client_addr client_sock
    in

    let loop () =
      while not (Fiber.is_done fiber) do
        match Unix.accept sock with
        | client_sock, client_addr ->
          ignore
            (Fuseau.spawn ~propagate_cancel_to_parent:false (fun () ->
                 loop_client client_sock client_addr)
              : _ Fiber.t)
        | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
          (* suspend *)
          let loop = U_loop.cur () in
          Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
              (* FIXME: possible race condition: the socket became readable
                  in the mid-time and we won't get notified. We need to call
                  [accept] after subscribing to [on_readable]. *)
              ignore
                (loop#on_readable sock (fun _ev ->
                     wakeup ();
                     Cancel_handle.cancel _ev)
                  : Cancel_handle.t))
      done
    in

    let loop_fiber =
      let sched = Fuseau.get_scheduler () in
      Fuseau.spawn_as_child_of ~propagate_cancel_to_parent:true sched fiber loop
    in
    let finally () =
      stop_ loop_fiber;
      Unix.close sock
    in
    let@ () = Fun.protect ~finally in
    f self

  let with_serve (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a =
    with_serve' addr
      (fun client_addr client_sock ->
        let ic = IO_unix.In.of_unix_fd client_sock in
        let oc = IO_unix.Out.of_unix_fd client_sock in
        handle_client client_addr ic oc)
      f
end

module TCP_client = struct
  let with_connect' addr (f : Unix.file_descr -> 'a) : 'a =
    let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
    Unix.set_nonblock sock;
    Unix.setsockopt sock Unix.TCP_NODELAY true;

    (* connect asynchronously *)
    while
      try
        Unix.connect sock addr;
        false
      with
      | Unix.Unix_error
          ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
      ->
        Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
            let loop = U_loop.cur () in
            ignore
              (loop#on_writable sock (fun _ev ->
                   wakeup ();
                   Cancel_handle.cancel _ev)
                : Cancel_handle.t));
        true
    do
      ()
    done;

    let finally () = try Unix.close sock with _ -> () in
    let@ () = Fun.protect ~finally in
    f sock

  let with_connect addr (f : Iostream.In.t -> Iostream.Out.t -> 'a) : 'a =
    with_connect' addr (fun sock ->
        let ic = IO_unix.In.of_unix_fd sock in
        let oc = IO_unix.Out.of_unix_fd sock in
        f ic oc)
end
OCaml

Innovation. Community. Security.