package luv

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file stream.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
(* This file is part of Luv, released under the MIT license. See LICENSE.md for
   details, or visit https://github.com/aantron/luv/blob/master/LICENSE.md. *)



type 'kind t = [ `Stream of 'kind ] Handle.t

let allocate kind =
  Handle.allocate ~reference_count:C.Types.Stream.reference_count kind

let coerce : type kind. kind t -> [ `Base ] t =
  Obj.magic

let shutdown_trampoline =
  C.Functions.Stream.Shutdown_request.get_trampoline ()

let shutdown stream callback =
  let wrapped_callback result =
    Error.catch_exceptions callback (Error.to_result () result)
  in
  let request = Request.allocate C.Types.Stream.Shutdown_request.t in
  Request.set_callback request wrapped_callback;
  let immediate_result =
    C.Functions.Stream.shutdown request (coerce stream) shutdown_trampoline in
  if immediate_result < 0 then begin
    Request.release request;
    callback (Error.result_from_c immediate_result)
  end

let connection_trampoline =
  C.Functions.Stream.get_connection_trampoline ()

let listen ?(backlog = C.Types.Stream.somaxconn) server callback =
  let wrapped_callback result =
    Error.catch_exceptions callback (Error.to_result () result)
  in
  Handle.set_reference
    ~index:C.Types.Stream.connection_callback_index server wrapped_callback;
  let immediate_result =
    C.Functions.Stream.listen (coerce server) backlog connection_trampoline in
  if immediate_result < 0 then
    callback (Error.result_from_c immediate_result)

let accept ~server ~client =
  C.Functions.Stream.accept (coerce server) (coerce client)
  |> Error.to_result ()

let alloc_trampoline =
  C.Functions.Handle.get_alloc_trampoline ()

let read_trampoline =
  C.Functions.Stream.get_read_trampoline ()

let read_start ?(allocate = Buffer.create) stream callback =
  let last_allocated_buffer = ref None in

  let wrapped_callback = Error.catch_exceptions callback in
  Handle.set_reference stream begin fun nread_or_error ->
    let result =
      if nread_or_error > 0 then begin
        let length = nread_or_error in
        let buffer =
          match !last_allocated_buffer with
          | Some buffer -> buffer
          | None -> assert false
        in
        last_allocated_buffer := None;
        let buffer =
          if Buffer.size buffer <= length then
            buffer
          else
            Buffer.sub buffer ~offset:0 ~length
        in
        Result.Ok buffer
      end
      else begin
        last_allocated_buffer := None;
        Error.result_from_c nread_or_error
      end
    in
    wrapped_callback result
  end;

  Handle.set_reference stream ~index:C.Types.Stream.allocate_callback_index
      begin fun suggested_size ->

    let buffer = allocate suggested_size in
    last_allocated_buffer := Some buffer;
    buffer
  end;

  let immediate_result =
    C.Functions.Stream.read_start
      (coerce stream) alloc_trampoline read_trampoline
  in
  if immediate_result < 0 then
    callback (Error.result_from_c immediate_result)

let read_stop stream =
  C.Functions.Stream.read_stop (coerce stream)
  |> Error.to_result ()

let write_trampoline =
  C.Functions.Stream.Write_request.get_trampoline ()

let write_general ?send_handle stream buffers callback =
  let count = List.length buffers in
  let bytes = Buffer.total_size buffers in
  let iovecs = Helpers.Buf.bigstrings_to_iovecs buffers count in

  let request = Request.allocate C.Types.Stream.Write_request.t in

  let wrapped_callback result =
    let module Sys = Compatibility.Sys in
    ignore (Sys.opaque_identity buffers);
    ignore (Sys.opaque_identity iovecs);
    let bytes_unwritten =
      C.Functions.Stream.get_write_queue_size (coerce stream)
      |> Unsigned.Size_t.to_int
    in
    callback (Error.to_result () result) (bytes - bytes_unwritten)
  in
  let wrapped_callback = Error.catch_exceptions wrapped_callback in
  Request.set_callback request wrapped_callback;

  let send_handle =
    match send_handle with
    | None -> Ctypes.from_voidp C.Types.Stream.t Ctypes.null
    | Some handle -> coerce handle
  in

  let immediate_result =
    C.Functions.Stream.write2
      request
      (coerce stream)
      (Ctypes.CArray.start iovecs)
      (Unsigned.UInt.of_int count)
      send_handle
      write_trampoline
  in

  if immediate_result < 0 then begin
    Request.release request;
    callback (Error.result_from_c immediate_result) 0
  end

let write stream buffers callback =
  write_general ?send_handle:None stream buffers callback

let write2 pipe buffers ~send_handle callback =
  write_general ~send_handle pipe buffers callback

let try_write stream buffers =
  let count = List.length buffers in
  let iovecs = Helpers.Buf.bigstrings_to_iovecs buffers count in

  let result =
    C.Functions.Stream.try_write
      (coerce stream)
      (Ctypes.CArray.start iovecs)
      (Unsigned.UInt.of_int count)
  in

  let module Sys = Compatibility.Sys in
  ignore (Sys.opaque_identity buffers);
  ignore (Sys.opaque_identity iovecs);

  Error.to_result result result

let is_readable stream =
  C.Functions.Stream.is_readable (coerce stream)

let is_writable stream =
  C.Functions.Stream.is_writable (coerce stream)

let set_blocking stream blocking =
  C.Functions.Stream.set_blocking (coerce stream) blocking
  |> Error.to_result ()

module Connect_request =
struct
  type t = [ `Connect ] Request.t

  let make () =
    Request.allocate C.Types.Stream.Connect_request.t

  let trampoline =
    C.Functions.Stream.Connect_request.get_trampoline ()
end
OCaml

Innovation. Community. Security.