package caqti

  1. Overview
  2. Docs

Source file caqti_connect.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
(* Copyright (C) 2014--2022  Petter A. Urkedal <paurkedal@gmail.com>
 *
 * This library is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version, with the LGPL-3.0 Linking Exception.
 *
 * This library is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * and the LGPL-3.0 Linking Exception along with this library.  If not, see
 * <http://www.gnu.org/licenses/> and <https://spdx.org>, respectively.
 *)

open Printf

let default_tweaks_version = (1, 7)

let dynload_library = ref @@ fun lib ->
  Error (sprintf "Neither %s nor the dynamic linker is linked into the \
                  application." lib)

let define_loader f = dynload_library := f

let drivers = Hashtbl.create 11
let define_unix_driver scheme p = Hashtbl.add drivers scheme p

let scheme_driver_name = function
  | "postgres" | "postgresql" -> "caqti-driver-postgresql"
  | s -> "caqti-driver-" ^ s

let load_driver_functor ~uri scheme =
  (try Ok (Hashtbl.find drivers scheme) with
   | Not_found ->
      (match !dynload_library (scheme_driver_name scheme) with
       | Ok () ->
          (try Ok (Hashtbl.find drivers scheme) with
           | Not_found ->
              let msg = sprintf "The driver for %s did not register itself \
                                 after apparently loading." scheme in
              Error (Caqti_error.load_failed ~uri (Caqti_error.Msg msg)))
       | Error msg ->
          Error (Caqti_error.load_failed ~uri (Caqti_error.Msg msg))))

module Make_unix (System : Caqti_driver_sig.System_unix) = struct
  open System

  let (>>=?) m f = m >>= function Ok x -> f x | Error _ as r -> return r
  let (>|=?) m f = m >|= function Ok x -> (Ok (f x)) | Error _ as r -> r

  module type DRIVER = Caqti_driver_sig.S
    with type 'a future := 'a System.future
     and type ('a, 'err) stream := ('a, 'err) System.Stream.t

  let drivers : (string, (module DRIVER)) Hashtbl.t = Hashtbl.create 11

  let load_driver uri =
    (match Uri.scheme uri with
     | None ->
        let msg = "Missing URI scheme." in
        Error (Caqti_error.load_rejected ~uri (Caqti_error.Msg msg))
     | Some scheme ->
        (try Ok (Hashtbl.find drivers scheme) with
         | Not_found ->
            (match load_driver_functor ~uri scheme with
             | Ok make_driver ->
                let module Make_driver =
                  (val make_driver : Caqti_driver_sig.Of_system_unix) in
                let module Driver = Make_driver (System) in
                let driver = (module Driver : DRIVER) in
                Hashtbl.add drivers scheme driver;
                Ok driver
             | Error _ as r -> r)))

  module type CONNECTION_BASE = Caqti_connection_sig.Base
    with type 'a future := 'a System.future
     and type ('a, 'err) stream := ('a, 'err) System.Stream.t

  module type CONNECTION = Caqti_connection_sig.S
    with type 'a future := 'a System.future
     and type ('a, 'err) stream := ('a, 'err) System.Stream.t

  type connection = (module CONNECTION)

  let connect ?env ?(tweaks_version = default_tweaks_version) uri
      : ((module CONNECTION), _) result future =
    (match load_driver uri with
     | Ok driver ->
        let module Driver = (val driver) in
        Driver.connect ?env ~tweaks_version uri
     | Error err ->
        return (Error err))

  let with_connection ?env ?(tweaks_version = default_tweaks_version) uri f =
    connect ?env ~tweaks_version uri >>=? fun ((module Db) as conn) ->
    try
      f conn >>= fun result -> Db.disconnect () >|= fun () -> result
    with exn ->
      Db.disconnect () >|= fun () -> raise exn

  module Pool = Caqti_pool.Make (System)
  module Stream = System.Stream

  let connect_pool
        ?max_size ?max_idle_size ?(max_use_count = Some 100) ?post_connect
        ?env ?(tweaks_version = default_tweaks_version) uri =
    let check_arg cond =
      if not cond then invalid_arg "Caqti_connect.Make_unix.connect_pool"
    in
    (match max_size, max_idle_size with
     | None, None -> ()
     | Some max_size, None -> check_arg (max_size >= 0)
     | None, Some _ -> check_arg false
     | Some max_size, Some max_idle_size ->
        check_arg (max_size >= 0);
        check_arg (0 <= max_idle_size && max_idle_size <= max_size));
    (match load_driver uri with
     | Ok driver ->
        let module Driver = (val driver) in
        let connect =
          (match post_connect with
           | None ->
              fun () ->
                (Driver.connect ?env ~tweaks_version uri
                    :> (connection, _) result future)
           | Some post_connect ->
              fun () ->
                (Driver.connect ?env ~tweaks_version uri
                    :> (connection, _) result future)
                  >>=? fun conn -> post_connect conn
                  >|=? fun () -> conn)
        in
        let disconnect (module Db : CONNECTION) = Db.disconnect () in
        let validate (module Db : CONNECTION) = Db.validate () in
        let check (module Db : CONNECTION) = Db.check in
        let di = Driver.driver_info in
        let max_size, max_idle_size =
          (match Caqti_driver_info.can_concur di, Caqti_driver_info.can_pool di,
                 max_idle_size with
           | true, true, _ ->     max_size, max_idle_size
           | true, false, _ ->    max_size, Some 0
           | false, true, Some 0 -> Some 1, Some 0
           | false, true, _ ->      Some 1, Some 1
           | false, false, _ ->     Some 1, Some 0)
        in
        let pool =
          Pool.create ?max_size ?max_idle_size ~max_use_count ~validate ~check
            connect disconnect
        in
        Ok pool
     | Error err ->
        Error err)
end
OCaml

Innovation. Community. Security.