Source file caqti_connect.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
open Printf
let default_tweaks_version = (1, 7)
let dynload_library = ref @@ fun lib ->
Error (sprintf "Neither %s nor the dynamic linker is linked into the \
application." lib)
let define_loader f = dynload_library := f
let drivers = Hashtbl.create 11
let define_unix_driver scheme p = Hashtbl.add drivers scheme p
let scheme_driver_name = function
| "postgres" | "postgresql" -> "caqti-driver-postgresql"
| s -> "caqti-driver-" ^ s
let load_driver_functor ~uri scheme =
(try Ok (Hashtbl.find drivers scheme) with
| Not_found ->
(match !dynload_library (scheme_driver_name scheme) with
| Ok () ->
(try Ok (Hashtbl.find drivers scheme) with
| Not_found ->
let msg = sprintf "The driver for %s did not register itself \
after apparently loading." scheme in
Error (Caqti_error.load_failed ~uri (Caqti_error.Msg msg)))
| Error msg ->
Error (Caqti_error.load_failed ~uri (Caqti_error.Msg msg))))
module Make_unix (System : Caqti_driver_sig.System_unix) = struct
open System
let (>>=?) m f = m >>= function Ok x -> f x | Error _ as r -> return r
let (>|=?) m f = m >|= function Ok x -> (Ok (f x)) | Error _ as r -> r
module type DRIVER = Caqti_driver_sig.S
with type 'a future := 'a System.future
and type ('a, 'err) stream := ('a, 'err) System.Stream.t
let drivers : (string, (module DRIVER)) Hashtbl.t = Hashtbl.create 11
let load_driver uri =
(match Uri.scheme uri with
| None ->
let msg = "Missing URI scheme." in
Error (Caqti_error.load_rejected ~uri (Caqti_error.Msg msg))
| Some scheme ->
(try Ok (Hashtbl.find drivers scheme) with
| Not_found ->
(match load_driver_functor ~uri scheme with
| Ok make_driver ->
let module Make_driver =
(val make_driver : Caqti_driver_sig.Of_system_unix) in
let module Driver = Make_driver (System) in
let driver = (module Driver : DRIVER) in
Hashtbl.add drivers scheme driver;
Ok driver
| Error _ as r -> r)))
module type CONNECTION_BASE = Caqti_connection_sig.Base
with type 'a future := 'a System.future
and type ('a, 'err) stream := ('a, 'err) System.Stream.t
module type CONNECTION = Caqti_connection_sig.S
with type 'a future := 'a System.future
and type ('a, 'err) stream := ('a, 'err) System.Stream.t
type connection = (module CONNECTION)
let connect ?env ?(tweaks_version = default_tweaks_version) uri
: ((module CONNECTION), _) result future =
(match load_driver uri with
| Ok driver ->
let module Driver = (val driver) in
Driver.connect ?env ~tweaks_version uri
| Error err ->
return (Error err))
let with_connection ?env ?(tweaks_version = default_tweaks_version) uri f =
connect ?env ~tweaks_version uri >>=? fun ((module Db) as conn) ->
try
f conn >>= fun result -> Db.disconnect () >|= fun () -> result
with exn ->
Db.disconnect () >|= fun () -> raise exn
module Pool = Caqti_pool.Make (System)
module Stream = System.Stream
let connect_pool
?max_size ?max_idle_size ?(max_use_count = Some 100) ?post_connect
?env ?(tweaks_version = default_tweaks_version) uri =
let check_arg cond =
if not cond then invalid_arg "Caqti_connect.Make_unix.connect_pool"
in
(match max_size, max_idle_size with
| None, None -> ()
| Some max_size, None -> check_arg (max_size >= 0)
| None, Some _ -> check_arg false
| Some max_size, Some max_idle_size ->
check_arg (max_size >= 0);
check_arg (0 <= max_idle_size && max_idle_size <= max_size));
(match load_driver uri with
| Ok driver ->
let module Driver = (val driver) in
let connect =
(match post_connect with
| None ->
fun () ->
(Driver.connect ?env ~tweaks_version uri
:> (connection, _) result future)
| Some post_connect ->
fun () ->
(Driver.connect ?env ~tweaks_version uri
:> (connection, _) result future)
>>=? fun conn -> post_connect conn
>|=? fun () -> conn)
in
let disconnect (module Db : CONNECTION) = Db.disconnect () in
let validate (module Db : CONNECTION) = Db.validate () in
let check (module Db : CONNECTION) = Db.check in
let di = Driver.driver_info in
let max_size, max_idle_size =
(match Caqti_driver_info.can_concur di, Caqti_driver_info.can_pool di,
max_idle_size with
| true, true, _ -> max_size, max_idle_size
| true, false, _ -> max_size, Some 0
| false, true, Some 0 -> Some 1, Some 0
| false, true, _ -> Some 1, Some 1
| false, false, _ -> Some 1, Some 0)
in
let pool =
Pool.create ?max_size ?max_idle_size ~max_use_count ~validate ~check
connect disconnect
in
Ok pool
| Error err ->
Error err)
end