package dream

  1. Overview
  2. Docs
Tidy, feature-complete Web framework

Install

Dune Dependency

Authors

Maintainers

Sources

dream-1.0.0-alpha1.tar.gz
sha256=c8d988568fbbeffb151abdb4d6b903fbd3897842d3eb9b2c28fb350f0f02bbd4
md5=b8ad7f3e30f3e88e5451d92e42b49ce4

doc/src/httpaf/reqd.ml.html

Source file reqd.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
(*----------------------------------------------------------------------------
    Copyright (c) 2017 Inhabited Type LLC.

    All rights reserved.

    Redistribution and use in source and binary forms, with or without
    modification, are permitted provided that the following conditions
    are met:

    1. Redistributions of source code must retain the above copyright
       notice, this list of conditions and the following disclaimer.

    2. Redistributions in binary form must reproduce the above copyright
       notice, this list of conditions and the following disclaimer in the
       documentation and/or other materials provided with the distribution.

    3. Neither the name of the author nor the names of his contributors
       may be used to endorse or promote products derived from this software
       without specific prior written permission.

    THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
    OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
    DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
    ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
    OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
    HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
    STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
    POSSIBILITY OF SUCH DAMAGE.
  ----------------------------------------------------------------------------*)

type error =
  [ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ]

type error_handler =
  ?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit

module Reader = Parse.Reader
module Writer = Serialize.Writer

(* XXX(seliopou): The current design assumes that a new [Reqd.t] will be
 * allocated for each new request/response on a connection. This is wasteful,
 * as it creates garbage on persistent connections. A better approach would be
 * to allocate a single [Reqd.t] per connection and reuse it across
 * request/responses. This would allow a single [Faraday.t] to be allocated for
 * the body and reused. The [response_state] type could then be inlined into
 * the [Reqd.t] record, with dummy values occuping the fields for [response].
 * Something like this:
 *
 * {[
 *   type 'handle t =
 *     { mutable request        : Request.t
 *     ; mutable request_body   : Response.Body.t
 *     ; mutable response       : Response.t (* Starts off as a dummy value,
 *                                            * using [(==)] to identify it when
 *                                            * necessary *)
 *     ; mutable response_body  : Response.Body.t
 *     ; mutable persistent     : bool
 *     ; mutable response_state : [ `Waiting | `Started | `Streaming ]
 *     }
 *  ]}
 *
 * *)
type t =
  { request                 : Request.t
  ; request_body            : [`read] Body.t
  ; reader                  : Reader.request
  ; writer                  : Writer.t
  ; response_body_buffer    : Bigstringaf.t
  ; error_handler           : error_handler
  ; mutable persistent      : bool
  ; mutable response_state  : Response_state.t
  ; mutable error_code      : [`Ok | error ]
  }

let create error_handler request request_body reader writer response_body_buffer =
  { request
  ; request_body
  ; reader
  ; writer
  ; response_body_buffer
  ; error_handler
  ; persistent              = Request.persistent_connection request
  ; response_state          = Waiting
  ; error_code              = `Ok
  }

let request { request; _ } = request
let request_body { request_body; _ } = request_body

let response { response_state; _ } =
  match response_state with
  | Waiting -> None
  | Streaming (response, _)
  | Fixed response
  | Upgrade (response, _) -> Some response

let response_exn { response_state; _ } =
  match response_state with
  | Waiting -> failwith "httpaf.Reqd.response_exn: response has not started"
  | Streaming(response, _)
  | Fixed response
  | Upgrade (response, _) -> response

let respond_with_string t response str =
  if t.error_code <> `Ok then
    failwith "httpaf.Reqd.respond_with_string: invalid state, currently handling error";
  match t.response_state with
  | Waiting ->
    (* XXX(seliopou): check response body length *)
    Writer.write_response t.writer response;
    Writer.write_string t.writer str;
    if t.persistent then
      t.persistent <- Response.persistent_connection response;
    t.response_state <- Fixed response;
    Writer.wakeup t.writer;
  | Streaming _ | Upgrade _ ->
    failwith "httpaf.Reqd.respond_with_string: response already started"
  | Fixed _ ->
    failwith "httpaf.Reqd.respond_with_string: response already complete"

let respond_with_bigstring t response (bstr:Bigstringaf.t) =
  if t.error_code <> `Ok then
    failwith "httpaf.Reqd.respond_with_bigstring: invalid state, currently handling error";
  match t.response_state with
  | Waiting ->
    (* XXX(seliopou): check response body length *)
    Writer.write_response     t.writer response;
    Writer.schedule_bigstring t.writer bstr;
    if t.persistent then
      t.persistent <- Response.persistent_connection response;
    t.response_state <- Fixed response;
    Writer.wakeup t.writer;
  | Streaming _ | Upgrade _ ->
    failwith "httpaf.Reqd.respond_with_bigstring: response already started"
  | Fixed _ ->
    failwith "httpaf.Reqd.respond_with_bigstring: response already complete"

let unsafe_respond_with_streaming ~flush_headers_immediately t response =
  match t.response_state with
  | Waiting ->
    let response_body =
      Body.create t.response_body_buffer (Optional_thunk.some (fun () ->
        Writer.wakeup t.writer))
    in
    Writer.write_response t.writer response;
    if t.persistent then
      t.persistent <- Response.persistent_connection response;
    t.response_state <- Streaming (response, response_body);
    if flush_headers_immediately
    then Writer.wakeup t.writer
    else Writer.yield t.writer;
    response_body
  | Streaming _ | Upgrade _ ->
    failwith "httpaf.Reqd.respond_with_streaming: response already started"
  | Fixed _ ->
    failwith "httpaf.Reqd.respond_with_streaming: response already complete"

let respond_with_streaming ?(flush_headers_immediately=false) t response =
  if t.error_code <> `Ok then
    failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error";
  unsafe_respond_with_streaming ~flush_headers_immediately t response

let unsafe_respond_with_upgrade t headers upgrade_handler =
  match t.response_state with
  | Waiting ->
    let response = Response.create ~headers `Switching_protocols in
    Writer.write_response t.writer response;
    if t.persistent then
      t.persistent <- Response.persistent_connection response;
    t.response_state <- Upgrade (response, upgrade_handler);
    Writer.flush t.writer upgrade_handler;
    Body.close_reader t.request_body;
    Writer.wakeup t.writer
  | Streaming _ | Upgrade _ ->
    failwith "httpaf.Reqd.unsafe_respond_with_upgrade: response already started"
  | Fixed _ ->
    failwith "httpaf.Reqd.unsafe_respond_with_upgrade: response already complete"

let respond_with_upgrade t response upgrade_handler =
  if t.error_code <> `Ok then
    failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error";
  unsafe_respond_with_upgrade t response upgrade_handler

let report_error t error =
  t.persistent <- false;
  match t.response_state, t.error_code with
  | Waiting, `Ok ->
    t.error_code <- (error :> [`Ok | error]);
    let status =
      match (error :> [error | Status.standard]) with
      | `Exn _                     -> `Internal_server_error
      | #Status.standard as status -> status
    in
    t.error_handler ~request:t.request error (fun headers ->
      let response_body =
        unsafe_respond_with_streaming
          t
          ~flush_headers_immediately:true
          (Response.create ~headers status)
      in
      (* NOTE(anmonteiro): When reporting an error that calls the error
         handler, we can only deliver an EOF to the request body once the error
         response has started. Otherwise, the request body `on_eof` handler
         could erroneously send a successful response instead of letting us
         handle the error. *)
      Body.close_reader t.request_body;
      response_body)
  | other ->
    Body.close_reader t.request_body;
    match other with
    | Waiting, `Exn _ ->
      (* XXX(seliopou): Decide what to do in this unlikely case. There is an
       * outstanding call to the [error_handler], but an intervening exception
       * has been reported as well. *)
      failwith "httpaf.Reqd.report_exn: NYI"
    | Streaming (_response, response_body), `Ok ->
      Body.set_non_chunked response_body;
      Body.close_writer response_body;
      Reader.wakeup t.reader;
    | Streaming (_response, response_body), `Exn _ ->
      Body.close_writer response_body;
      Writer.close_and_drain t.writer;
      Reader.wakeup t.reader;
    | (Fixed _ | Streaming _ | Upgrade _ | Waiting) , _ ->
      (* XXX(seliopou): Once additional logging support is added, log the error
       * in case it is not spurious. *)
      ()

let report_exn t exn =
  report_error t (`Exn exn)

let try_with t f : (unit, exn) Result.result =
  try f (); Ok () with exn -> report_exn t exn; Error exn

(* Private API, not exposed to the user through httpaf.mli *)

let close_request_body { request_body; _ } =
  Body.close_reader request_body

let error_code t =
  match t.error_code with
  | #error as error -> Some error
  | `Ok             -> None

let persistent_connection t =
  t.persistent

let input_state t : Input_state.t =
  match t.response_state with
  | Upgrade _ -> Ready
  | _ ->
    if Body.is_closed t.request_body
    then Complete
    else if Body.is_read_scheduled t.request_body
    then Ready
    else Wait

let output_state t = Response_state.output_state t.response_state

let flush_request_body t =
  let request_body = request_body t in
  if Body.has_pending_output request_body
  then try Body.execute_read request_body
  with exn -> report_exn t exn

let flush_response_body t =
  let request_method = t.request.Request.meth in
  Response_state.flush_response_body t.response_state ~request_method t.writer
OCaml

Innovation. Community. Security.