package rock

  1. Overview
  2. Docs

Source file server_connection.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
open Lwt.Syntax

exception Halt of Response.t

let halt response = raise (Halt response)

let default_error_handler ?request:_ error start_response =
  let open Httpaf in
  let message =
    match error with
    | `Exn _e ->
      (* TODO: log error *)
      Status.default_reason_phrase `Internal_server_error
    | (#Status.server_error | #Status.client_error) as error ->
      Status.default_reason_phrase error
  in
  let len = Int.to_string (String.length message) in
  let headers = Headers.of_list [ "Content-Length", len ] in
  let body = start_response headers in
  Body.write_string body message;
  Body.close_writer body
;;

let create_error_handler handler =
  let error_handler ?request error start_response =
    let req_headers =
      match request with
      | None -> Httpaf.Headers.empty
      | Some req -> req.Httpaf.Request.headers
    in
    Lwt.async (fun () ->
        let* headers, body = handler req_headers error in
        let headers =
          match Body.length body with
          | None -> headers
          | Some l ->
            Httpaf.Headers.add_unless_exists headers "Content-Length" (Int64.to_string l)
        in
        let res_body = start_response headers in
        let+ () =
          Lwt_stream.iter
            (fun s -> Httpaf.Body.write_string res_body s)
            (Body.to_stream body)
        in
        Httpaf.Body.close_writer res_body)
  in
  error_handler
;;

type error_handler =
  Httpaf.Headers.t -> Httpaf.Server_connection.error -> (Httpaf.Headers.t * Body.t) Lwt.t

let read_httpaf_body body =
  Lwt_stream.from (fun () ->
      let promise, wakeup = Lwt.wait () in
      let on_eof () = Lwt.wakeup_later wakeup None in
      let on_read buf ~off ~len =
        let b = Bytes.create len in
        Bigstringaf.blit_to_bytes buf ~src_off:off ~dst_off:0 ~len b;
        Lwt.wakeup_later wakeup (Some (Bytes.unsafe_to_string b))
      in
      Httpaf.Body.schedule_read body ~on_eof ~on_read;
      promise)
;;

let httpaf_request_to_request ?body req =
  let headers =
    req.Httpaf.Request.headers |> Httpaf.Headers.to_list |> Httpaf.Headers.of_rev_list
  in
  Request.make ~headers ?body req.target req.meth
;;

let run server_handler ?error_handler app =
  let { App.middlewares; handler } = app in
  let filters = ListLabels.map ~f:(fun m -> m.Middleware.filter) middlewares in
  let service = Filter.apply_all filters handler in
  let request_handler reqd =
    Lwt.async (fun () ->
        let req = Httpaf.Reqd.request reqd in
        let req_body = Httpaf.Reqd.request_body reqd in
        let length =
          match Httpaf.Request.body_length req with
          | `Chunked -> None
          | `Fixed l -> Some l
          | `Error _ -> failwith "Bad request"
        in
        let body =
          let stream = read_httpaf_body req_body in
          Lwt.on_termination (Lwt_stream.closed stream) (fun () ->
              Httpaf.Body.close_reader req_body);
          Body.of_stream ?length stream
        in
        let write_fixed_response ~headers f status body =
          f reqd (Httpaf.Response.create ~headers status) body;
          Lwt.return_unit
        in
        let request = httpaf_request_to_request ~body req in
        Lwt.catch
          (fun () ->
            let* { Response.body; headers; status; _ } =
              Lwt.catch
                (fun () -> service request)
                (function
                  | Halt response -> Lwt.return response
                  | exn -> Lwt.fail exn)
            in
            let { Body.length; _ } = body in
            let headers =
              match length with
              | None ->
                Httpaf.Headers.add_unless_exists headers "Transfer-Encoding" "chunked"
              | Some l ->
                Httpaf.Headers.add_unless_exists
                  headers
                  "Content-Length"
                  (Int64.to_string l)
            in
            match body.content with
            | `Empty ->
              write_fixed_response ~headers Httpaf.Reqd.respond_with_string status ""
            | `String s ->
              write_fixed_response ~headers Httpaf.Reqd.respond_with_string status s
            | `Bigstring b ->
              write_fixed_response ~headers Httpaf.Reqd.respond_with_bigstring status b
            | `Stream s ->
              let rb =
                Httpaf.Reqd.respond_with_streaming
                  reqd
                  (Httpaf.Response.create ~headers status)
              in
              let+ () = Lwt_stream.iter (fun s -> Httpaf.Body.write_string rb s) s in
              Httpaf.Body.flush rb (fun () -> Httpaf.Body.close_writer rb))
          (fun exn ->
            Httpaf.Reqd.report_exn reqd exn;
            Lwt.return_unit))
  in
  let error_handler =
    match error_handler with
    | None -> default_error_handler
    | Some h -> create_error_handler h
  in
  server_handler ~request_handler ~error_handler
;;
OCaml

Innovation. Community. Security.