package http_async

  1. Overview
  2. Docs

Source file server.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
open! Core
open! Async
open! Shuttle

type error_handler =
  ?exn:Exn.t -> ?request:Request.t -> Status.t -> (Response.t * Body.Writer.t) Deferred.t

let keep_alive headers =
  match Headers.find headers "connection" with
  | Some x when String.Caseless.equal x "close" -> false
  | _ -> true
;;

let write_response writer encoding res =
  Output_channel.write writer (Version.to_string (Response.version res));
  Output_channel.write_char writer ' ';
  Output_channel.write writer (Status.to_string (Response.status res));
  Output_channel.write_char writer ' ';
  Output_channel.write writer "\r\n";
  let headers = Response.headers res in
  let headers =
    match encoding with
    | `Fixed len ->
      Headers.add_unless_exists headers ~key:"Content-Length" ~data:(Int.to_string len)
    | `Chunked ->
      Headers.add_unless_exists headers ~key:"Transfer-Encoding" ~data:"chunked"
  in
  Headers.iter
    ~f:(fun ~key ~data ->
      Output_channel.write writer key;
      Output_channel.write writer ": ";
      Output_channel.write writer data;
      Output_channel.write writer "\r\n")
    headers;
  Output_channel.write writer "\r\n"
;;

let default_error_handler ?exn:_ ?request:_ status =
  let response =
    Response.create
      ~headers:(Headers.of_rev_list [ "Connection", "close"; "Content-Length", "0" ])
      status
  in
  return (response, Body.Writer.empty)
;;

let run_server_loop ?(error_handler = default_error_handler) handle_request reader writer =
  let monitor = Monitor.create () in
  let finished = Ivar.create () in
  let rec loop reader writer handle_request =
    let view = Input_channel.view reader in
    match Parser.parse_request view.buf ~pos:view.pos ~len:view.len with
    | Error Partial ->
      Input_channel.refill reader
      >>> (function
      | `Ok -> loop reader writer handle_request
      | `Eof -> Ivar.fill finished ())
    | Error (Fail error) ->
      error_handler ~exn:(Error.to_exn error) `Bad_request
      >>> fun (res, res_body) ->
      write_response writer (Body.Writer.encoding res_body) res;
      Body.Writer.Private.write res_body writer >>> fun () -> Ivar.fill finished ()
    | Ok (req, consumed) ->
      Input_channel.consume reader consumed;
      (match Body.Reader.Private.create req reader with
       | Error error ->
         error_handler ~exn:(Error.to_exn error) ~request:req `Bad_request
         >>> fun (res, res_body) ->
         write_response writer (Body.Writer.encoding res_body) res;
         Body.Writer.Private.write res_body writer >>> fun () -> Ivar.fill finished ()
       | Ok req_body ->
         handle_request (req, req_body)
         >>> fun (res, res_body) ->
         let keep_alive =
           keep_alive (Request.headers req) && keep_alive (Response.headers res)
         in
         write_response writer (Body.Writer.encoding res_body) res;
         Body.Writer.Private.write res_body writer
         >>> fun () ->
         Body.Reader.drain req_body
         >>> fun () ->
         if keep_alive then loop reader writer handle_request else Ivar.fill finished ())
  in
  (Monitor.detach_and_get_next_error monitor
  >>> fun exn ->
  error_handler ~exn `Internal_server_error
  >>> fun (res, res_body) ->
  write_response writer (Body.Writer.encoding res_body) res;
  Body.Writer.Private.write res_body writer >>> fun () -> Ivar.fill finished ());
  Scheduler.within ~priority:Priority.Normal ~monitor (fun () ->
    loop reader writer handle_request);
  Ivar.read finished
;;

let run
  ?(where_to_listen = Tcp.Where_to_listen.of_port 8080)
  ?max_connections
  ?(max_accepts_per_batch = 64)
  ?backlog
  ?socket
  ?(buffer_config = Buffer_config.create ())
  ?error_handler
  service
  =
  Shuttle.Connection.listen
    ~input_buffer_size:(Buffer_config.initial_size buffer_config)
    ~max_input_buffer_size:(Buffer_config.max_buffer_size buffer_config)
    ~output_buffer_size:(Buffer_config.initial_size buffer_config)
    ~max_output_buffer_size:(Buffer_config.max_buffer_size buffer_config)
    ?max_connections
    ?backlog
    ?socket
    ~max_accepts_per_batch
    where_to_listen
    ~on_handler_error:`Raise
    ~f:(fun _addr reader writer -> run_server_loop ?error_handler service reader writer)
;;

let run_command ?(interrupt = Deferred.never ()) ?readme ?error_handler ~summary service =
  Command.async
    ~summary
    ?readme
    Command.Let_syntax.(
      let%map_open port =
        flag "-port" ~doc:"int Source port to listen on" (optional_with_default 8080 int)
      and max_connections =
        flag
          "-max-connections"
          ~doc:"int Maximum number of active connections"
          (optional int)
      and max_accepts_per_batch =
        flag
          "-max-accepts-per-batch"
          ~doc:"int Maximum number of connections to accept per Unix.accept call."
          (optional_with_default 64 int)
      and backlog =
        flag
          "-backlog"
          ~doc:"int Number of clients that can have a pending connection."
          (optional int)
      and initial_buffer_size =
        flag
          "-initial-buffer-size"
          ~doc:"int Initial size of the Read and Write buffers used by the server."
          (optional int)
      and max_buffer_size =
        flag
          "-max-buffer-size"
          ~doc:"int Maximum size of the Read and Write buffers used by the server."
          (optional int)
      in
      fun () ->
        let%bind.Deferred server =
          run
            ?error_handler
            ~where_to_listen:(Tcp.Where_to_listen.of_port port)
            ~max_accepts_per_batch
            ?max_connections
            ?backlog
            ~buffer_config:
              (Buffer_config.create ?initial_size:initial_buffer_size ?max_buffer_size ())
            service
        in
        choose
          [ choice interrupt (fun () -> `Shutdown)
          ; choice (Tcp.Server.close_finished_and_handlers_determined server) (fun () ->
              `Closed)
          ]
        >>= function
        | `Shutdown -> Tcp.Server.close ~close_existing_connections:true server
        | `Closed -> Deferred.unit)
;;
OCaml

Innovation. Community. Security.