package guardian

  1. Overview
  2. Docs

Source file database_pools.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
exception Exception of string

let src = Logs.Src.create "guardian.pools"
let find_pool_name = CCList.assoc_opt ~eq:CCString.equal "pool"

module LogTag = struct
  let add_label : string Logs.Tag.def =
    Logs.Tag.def "database_label" ~doc:"Database Label" CCString.pp
  ;;

  let create database = Logs.Tag.(empty |> add add_label database)

  let ctx_opt ?ctx () =
    let open CCOption.Infix in
    ctx >>= find_pool_name >|= fun db -> Logs.Tag.(empty |> add add_label db)
  ;;
end

type connection_type =
  | SinglePool of string
  | MultiPools of (string * string) list

let with_log ?tags ?(log_level = Logs.Error) ?(msg_prefix = "Error") err =
  let msg = Caqti_error.show err in
  Logs.msg ~src log_level (fun m -> m ?tags "%s: %s" msg_prefix msg);
  msg
;;

let get_or_raise ?tags ?log_level ?msg_prefix () = function
  | Ok result -> result
  | Error error -> failwith (with_log ?tags ?log_level ?msg_prefix error)
;;

let map_or_raise ?tags ?log_level ?msg_prefix fcn result =
  result |> CCResult.map fcn |> get_or_raise ?tags ?log_level ?msg_prefix ()
;;

module type ConfigSig = sig
  val database : connection_type
  val database_pool_size : int
end

module DefaultConfig : ConfigSig = struct
  let database = SinglePool "mariadb://root@database:3306/test"
  let database_pool_size = 5
end

module Make (Config : ConfigSig) = struct
  let main_pool_ref
    : (Caqti_lwt.connection, Caqti_error.t) Caqti_lwt.Pool.t option ref
    =
    ref None
  ;;

  let pools
    : (string, (Caqti_lwt.connection, Caqti_error.t) Caqti_lwt.Pool.t) Hashtbl.t
    =
    let spare_for_pools = 5 in
    Hashtbl.create
      (match Config.database with
       | SinglePool _ -> 1
       | MultiPools pools -> CCList.length pools + spare_for_pools)
  ;;

  let print_pool_usage ?tags pool =
    let n_connections = Caqti_lwt.Pool.size pool in
    let max_connections = Config.database_pool_size in
    Logs.debug ~src (fun m ->
      m ?tags "Pool usage: %i/%i" n_connections max_connections)
  ;;

  let connect_or_failwith
    ?(pool_size = Config.database_pool_size)
    ok_fun
    database_url
    =
    database_url
    |> Uri.of_string
    |> Caqti_lwt.connect_pool ~max_size:pool_size
    |> map_or_raise ~msg_prefix:"Failed to connect to DB pool" ok_fun
  ;;

  let add_pool ?pool_size name database_url =
    let tags = LogTag.create name in
    match Config.database, Hashtbl.find_opt pools name with
    | SinglePool _, _ ->
      failwith "SinglePool is selected: Switch to 'MultiPools' first"
    | MultiPools _, Some _ ->
      let msg =
        Format.asprintf
          "Failed to create pool: Connection pool with name '%s' exists already"
          name
      in
      Logs.err ~src (fun m -> m ~tags "%s" msg);
      failwith msg
    | MultiPools _, None ->
      database_url |> connect_or_failwith ?pool_size (Hashtbl.add pools name)
  ;;

  let initialize () =
    match Config.database with
    | SinglePool database_url when CCOption.is_none !main_pool_ref ->
      database_url
      |> connect_or_failwith (fun pool ->
           main_pool_ref := Some pool;
           ())
    | SinglePool _ -> ()
    | MultiPools pools' ->
      pools'
      |> CCList.filter (fun (name, _) ->
           CCOption.is_none (Hashtbl.find_opt pools name))
      |> CCList.iter (fun (name, url) ->
           url |> connect_or_failwith (Hashtbl.add pools name))
  ;;

  let fetch_pool ?(ctx = []) () =
    let open CCOption in
    let () = initialize () in
    match Config.database with
    | SinglePool _ ->
      !main_pool_ref |> get_exn_or "Initialization missed: run 'initialize'"
    | MultiPools _ ->
      find_pool_name ctx
      >>= Hashtbl.find_opt pools
      |> (function
      | Some pool -> pool
      | None -> failwith "Unknown Pool: Please 'add_pool' first!")
  ;;

  let transaction ?ctx f =
    let open Lwt.Infix in
    let pool = fetch_pool ?ctx () in
    print_pool_usage pool;
    Caqti_lwt.Pool.use
      (fun connection ->
        Logs.debug ~src (fun m ->
          m ?tags:(LogTag.ctx_opt ?ctx ()) "Fetched connection from pool");
        let (module Connection : Caqti_lwt.CONNECTION) = connection in
        let open Caqti_error in
        match%lwt Connection.start () with
        | Error msg ->
          Logs.debug ~src (fun m ->
            m
              ?tags:(LogTag.ctx_opt ?ctx ())
              "Failed to start transaction: %s"
              (show msg));
          Lwt.return_error msg
        | Ok () ->
          Logs.debug ~src (fun m ->
            m ?tags:(LogTag.ctx_opt ?ctx ()) "Started transaction");
          Lwt.catch
            (fun () ->
              match%lwt Connection.commit () with
              | Ok () ->
                Logs.debug ~src (fun m ->
                  m
                    ?tags:(LogTag.ctx_opt ?ctx ())
                    "Successfully committed transaction");
                f connection |> Lwt_result.return
              | Error error ->
                Exception
                  (with_log
                     ?tags:(LogTag.ctx_opt ?ctx ())
                     ~msg_prefix:"Failed to commit transaction"
                     error)
                |> Lwt.fail)
            (fun e ->
              match%lwt Connection.rollback () with
              | Ok () ->
                Logs.debug ~src (fun m ->
                  m
                    ?tags:(LogTag.ctx_opt ?ctx ())
                    "Successfully rolled back transaction");
                Lwt.fail e
              | Error error ->
                Exception
                  (with_log
                     ?tags:(LogTag.ctx_opt ?ctx ())
                     ~msg_prefix:"Failed to rollback transaction"
                     error)
                |> Lwt.fail))
      pool
    >|= get_or_raise ?tags:(LogTag.ctx_opt ?ctx ()) ()
  ;;

  let transaction' ?ctx f =
    transaction ?ctx f
    |> Lwt.map (get_or_raise ?tags:(LogTag.ctx_opt ?ctx ()) ())
  ;;

  let query ?ctx f =
    let open Lwt.Infix in
    let pool = fetch_pool ?ctx () in
    print_pool_usage pool;
    Caqti_lwt.Pool.use (fun connection -> f connection >|= CCResult.return) pool
    >|= get_or_raise ?tags:(LogTag.ctx_opt ?ctx ()) ()
  ;;

  let query' ?ctx f =
    query ?ctx f |> Lwt.map (get_or_raise ?tags:(LogTag.ctx_opt ?ctx ()) ())
  ;;

  let find_opt ?ctx request input =
    query' ?ctx (fun connection ->
      let module Connection = (val connection : Caqti_lwt.CONNECTION) in
      Connection.find_opt request input)
  ;;

  let find ?ctx request input =
    query' ?ctx (fun connection ->
      let module Connection = (val connection : Caqti_lwt.CONNECTION) in
      Connection.find request input)
  ;;

  let collect ?ctx request input =
    query' ?ctx (fun connection ->
      let module Connection = (val connection : Caqti_lwt.CONNECTION) in
      Connection.collect_list request input)
  ;;

  let exec ?ctx request input =
    query' ?ctx (fun connection ->
      let module Connection = (val connection : Caqti_lwt.CONNECTION) in
      Connection.exec request input)
  ;;
end

module type Sig = sig
  val initialize : unit -> unit

  val fetch_pool
    :  ?ctx:(string * string) list
    -> unit
    -> (Caqti_lwt.connection, Caqti_error.t) Caqti_lwt.Pool.t

  val add_pool : ?pool_size:int -> string -> string -> unit

  val find
    :  ?ctx:(string * string) list
    -> ('a, 'b, [< `One ]) Caqti_request.t
    -> 'a
    -> 'b Lwt.t

  val find_opt
    :  ?ctx:(string * string) list
    -> ('a, 'b, [< `One | `Zero ]) Caqti_request.t
    -> 'a
    -> 'b option Lwt.t

  val collect
    :  ?ctx:(string * string) list
    -> ('a, 'b, [< `Many | `One | `Zero ]) Caqti_request.t
    -> 'a
    -> 'b list Lwt.t

  val exec
    :  ?ctx:(string * string) list
    -> ('a, unit, [< `Zero ]) Caqti_request.t
    -> 'a
    -> unit Lwt.t
end
OCaml

Innovation. Community. Security.