package riot

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file socket.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
open Runtime
open Net
include Socket

type listen_opts = {
  reuse_addr : bool;
  reuse_port : bool;
  backlog : int;
  addr : Addr.tcp_addr;
}

type timeout = Infinity | Bounded of float
type unix_error = [ `Unix_error of Unix.error ]
type ('ok, 'err) result = ('ok, ([> unix_error ] as 'err)) Stdlib.result

let default_listen_opts =
  { reuse_addr = true; reuse_port = true; backlog = 128; addr = Addr.loopback }

let close socket =
  let sch = Scheduler.get_current_scheduler () in
  let this = self () in
  Logger.trace (fun f ->
      f "Process %a: Closing socket fd=%a" Pid.pp this Fd.pp socket);
  Io.close sch.io_tbl socket

let listen ?(opts = default_listen_opts) ~port () =
  let sch = Scheduler.get_current_scheduler () in
  let { reuse_addr; reuse_port; backlog; addr } = opts in
  let addr = Addr.tcp addr port in
  Logger.trace (fun f -> f "Listening on 0.0.0.0:%d" port);
  Io.listen sch.io_tbl ~reuse_port ~reuse_addr ~backlog addr

let rec connect addr =
  let sch = Scheduler.get_current_scheduler () in
  Logger.error (fun f -> f "Connecting to %a" Addr.pp addr);
  match Io.connect sch.io_tbl addr with
  | `Connected fd -> Ok fd
  | `In_progress fd ->
      let this = _get_proc (self ()) in
      Io.register sch.io_tbl this `w fd;
      syscall "connect" `w fd @@ fun socket -> Ok socket
  | `Abort reason -> Error (`Unix_error reason)
  | `Retry ->
      yield ();
      connect addr

let rec accept ?(timeout = Infinity) (socket : Socket.listen_socket) =
  let sch = Scheduler.get_current_scheduler () in
  match Io.accept sch.io_tbl socket with
  | exception Fd.(Already_closed _) -> Error `Closed
  | `Abort reason -> Error (`Unix_error reason)
  | `Retry -> syscall "accept" `r socket @@ accept ~timeout
  | `Connected (socket, addr) -> Ok (socket, addr)

let controlling_process _socket ~new_owner:_ = Ok ()

let rec receive ?(timeout = Infinity) ~len socket =
  let bytes = Bytes.create len in
  match Io.read socket bytes 0 len with
  | exception Fd.(Already_closed _) -> Error `Closed
  | `Abort reason -> Error (`Unix_error reason)
  | `Retry -> syscall "read" `r socket @@ receive ~timeout ~len
  | `Read 0 -> Error `Closed
  | `Read len ->
      let data = Bigstringaf.create len in
      Bigstringaf.blit_from_bytes bytes ~src_off:0 data ~dst_off:0 ~len;
      Ok data

let rec send data socket =
  Logger.debug (fun f -> f "sending: %S" (Bigstringaf.to_string data));
  let off = 0 in
  let len = Bigstringaf.length data in
  let bytes = Bytes.create len in
  Bigstringaf.blit_to_bytes data ~src_off:off bytes ~dst_off:0 ~len;
  match Io.write socket bytes off len with
  | exception Fd.(Already_closed _) -> Error `Closed
  | `Abort reason -> Error (`Unix_error reason)
  | `Retry ->
      Logger.debug (fun f -> f "retrying");
      syscall "write" `w socket @@ send data
  | `Wrote bytes ->
      Logger.debug (fun f -> f "sent: %S" (Bigstringaf.to_string data));
      Ok bytes

let pp_err fmt = function
  | `Timeout -> Format.fprintf fmt "Timeout"
  | `System_limit -> Format.fprintf fmt "System_limit"
  | `Closed -> Format.fprintf fmt "Closed"
  | `Unix_error err ->
      Format.fprintf fmt "Unix_error(%s)" (Unix.error_message err)
OCaml

Innovation. Community. Security.