package riot
An actor-model multi-core scheduler for OCaml 5
Install
Dune Dependency
Authors
Maintainers
Sources
riot-0.0.5.tbz
sha256=01b7b82ccc656b12b7315960d9df17eb4682b8f1af68e9fee33171fee1f9cf88
sha512=d8831d8a75fe43a7e8d16d2c0bb7d27f6d975133e17c5dd89ef7e575039c59d27c1ab74fbadcca81ddfbc0c74d1e46c35baba35ef825b36ac6c4e49d7a41d0c2
doc/src/lib_net/socket.ml.html
Source file socket.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
open Runtime open Net include Socket type listen_opts = { reuse_addr : bool; reuse_port : bool; backlog : int; addr : Addr.tcp_addr; } type timeout = Infinity | Bounded of float type unix_error = [ `Unix_error of Unix.error ] type ('ok, 'err) result = ('ok, ([> unix_error ] as 'err)) Stdlib.result let default_listen_opts = { reuse_addr = true; reuse_port = true; backlog = 128; addr = Addr.loopback } let close socket = let sch = Scheduler.get_current_scheduler () in let this = self () in Logger.trace (fun f -> f "Process %a: Closing socket fd=%a" Pid.pp this Fd.pp socket); Io.close sch.io_tbl socket let listen ?(opts = default_listen_opts) ~port () = let sch = Scheduler.get_current_scheduler () in let { reuse_addr; reuse_port; backlog; addr } = opts in let addr = Addr.tcp addr port in Logger.trace (fun f -> f "Listening on 0.0.0.0:%d" port); Io.listen sch.io_tbl ~reuse_port ~reuse_addr ~backlog addr let rec connect addr = let sch = Scheduler.get_current_scheduler () in Logger.error (fun f -> f "Connecting to %a" Addr.pp addr); match Io.connect sch.io_tbl addr with | `Connected fd -> Ok fd | `In_progress fd -> let this = _get_proc (self ()) in Io.register sch.io_tbl this `w fd; syscall "connect" `w fd @@ fun socket -> Ok socket | `Abort reason -> Error (`Unix_error reason) | `Retry -> yield (); connect addr let rec accept ?(timeout = Infinity) (socket : Socket.listen_socket) = let sch = Scheduler.get_current_scheduler () in match Io.accept sch.io_tbl socket with | exception Fd.(Already_closed _) -> Error `Closed | `Abort reason -> Error (`Unix_error reason) | `Retry -> syscall "accept" `r socket @@ accept ~timeout | `Connected (socket, addr) -> Ok (socket, addr) let controlling_process _socket ~new_owner:_ = Ok () let rec receive ?(timeout = Infinity) ~len socket = let bytes = Bytes.create len in match Io.read socket bytes 0 len with | exception Fd.(Already_closed _) -> Error `Closed | `Abort reason -> Error (`Unix_error reason) | `Retry -> syscall "read" `r socket @@ receive ~timeout ~len | `Read 0 -> Error `Closed | `Read len -> let data = Bigstringaf.create len in Bigstringaf.blit_from_bytes bytes ~src_off:0 data ~dst_off:0 ~len; Ok data let rec send data socket = Logger.debug (fun f -> f "sending: %S" (Bigstringaf.to_string data)); let off = 0 in let len = Bigstringaf.length data in let bytes = Bytes.create len in Bigstringaf.blit_to_bytes data ~src_off:off bytes ~dst_off:0 ~len; match Io.write socket bytes off len with | exception Fd.(Already_closed _) -> Error `Closed | `Abort reason -> Error (`Unix_error reason) | `Retry -> Logger.debug (fun f -> f "retrying"); syscall "write" `w socket @@ send data | `Wrote bytes -> Logger.debug (fun f -> f "sent: %S" (Bigstringaf.to_string data)); Ok bytes let pp_err fmt = function | `Timeout -> Format.fprintf fmt "Timeout" | `System_limit -> Format.fprintf fmt "System_limit" | `Closed -> Format.fprintf fmt "Closed" | `Unix_error err -> Format.fprintf fmt "Unix_error(%s)" (Unix.error_message err)
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>