Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
connection.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
open Riot module Logger = Logger.Make (struct let namespace = [ "trail"; "conn" ] end) exception Connection_should_be_closed type peer = { ip : Net.Addr.tcp_addr; port : int } type t = { adapter : Adapter.t; before_send_cbs : (t -> unit) list; after_send_cbs : (t -> unit) list; conn : Atacama.Connection.t; halted : bool; chunked : bool; headers : (string * string) list; meth : Http.Method.t; path : string; params : (string * string) list; peer : peer; req : Request.t; resp_body : Bytestring.t; status : Http.Status.t; switch : [ `websocket of Sock.upgrade_opts * Sock.t | `h2c ] option; } type status type body let make adapter conn (req : Request.t) = let peer = Atacama.Connection.peer conn in let peer = { ip = Net.Addr.ip peer; port = Net.Addr.port peer } in { adapter; before_send_cbs = []; after_send_cbs = []; conn; halted = false; chunked = false; headers = []; meth = req.meth; path = Uri.to_string req.uri; params = []; peer; req; resp_body = Bytestring.empty; status = `OK; switch = None; } let halted t = t.halted let run_callbacks fns t = fns |> List.rev |> List.iter (fun cb -> cb t) let register_before_send fn t = { t with before_send_cbs = fn :: t.before_send_cbs } let register_after_send fn t = { t with after_send_cbs = fn :: t.after_send_cbs } let with_header header value t = { t with headers = (header, value) :: t.headers } let with_body resp_body t = { t with resp_body } let with_status status t = { t with status } let respond ~status ?(body = {%b||}) t = t |> with_status status |> with_body body let send ({ adapter = (module A); conn; req; status; headers; resp_body = body; _ } as t) = run_callbacks t.before_send_cbs t; let res = Response.(make status ~version:req.version ~body ~headers ()) in let _ = A.send conn req res in run_callbacks t.after_send_cbs t; { t with halted = true } let send_status status t = respond t ~status |> send let send_response status body t = respond t ~status ~body |> send let inform status headers ({ adapter = (module A); conn; req; _ } as t) = let res = Response.(make status ~version:req.version ~headers ()) in let _ = A.send conn req res in t let send_file status ?off ?len ~path ({ adapter = (module A); conn; req; _ } as t) = let res = Response.(make status ~version:req.version ~headers:t.headers ()) in let _ = A.send_file conn req res ?off ?len ~path () in { t with halted = true } let send_chunked status ({ adapter = (module A); conn; req; _ } as t) = let t = t |> with_header "transfer-encoding" "chunked" |> with_status status in let res = Response.(make t.status ~version:req.version ~headers:t.headers ()) in let _ = A.send conn req res in { t with chunked = true } let chunk chunk ({ adapter = (module A); conn; req; _ } as t) = let _ = A.send_chunk conn req chunk in t let set_params params t = { t with params } type read_result = | Ok of t * Bytestring.t | More of t * Bytestring.t | Error of t * [ `Excess_body_read | `Closed | `Process_down | `Timeout | IO.io_error ] let close ({ adapter = (module A); conn; _ } as t) = if t.chunked then A.close_chunk conn; { t with halted = true } let upgrade switch t = { t with switch = Some switch; halted = true } let switch t = t.switch let read_body ?limit ({ adapter = (module A); conn; req; _ } as t) = Logger.trace (fun f -> f "reading body"); match A.read_body ?limit conn req with | Adapter.Ok (req, body) -> Ok ({ t with req }, body) | Adapter.More (req, body) -> More ({ t with req }, body) | Adapter.Error (req, reason) -> Error ({ t with req }, reason)