Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
client.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
open Core open Async open Freetds open Mssql_error type t = (* dbprocess will be set to None when closed to prevent null pointer crashes *) (* The sequencer prevents concurrent use of the DB connection, and also prevent queries during unrelated transactions. *) { mutable conn : Dblib.dbprocess Sequencer.t option (* ID used to detect deadlocks when attempting to use an outer DB handle inside of with_transaction *) ; transaction_id : Bigint.t (* Months are sometimes 0-based and sometimes 1-based. See: http://www.pymssql.org/en/stable/freetds_and_dates.html *) ; month_offset : int } let next_transaction_id = let next = ref Bigint.zero in fun () -> let current = !next in next := Bigint.(one + current); current let parent_transactions_key = Univ_map.Key.create ~name:"mssql_parent_transactions" [%sexp_of: Bigint.Set.t] let sequencer_enqueue t f = match t.conn with | None -> failwith [%here] "Attempt to use closed DB" | Some conn -> Scheduler.find_local parent_transactions_key |> function | Some parent_transactions when Set.mem parent_transactions t.transaction_id -> failwith [%here] "Attempted to use outer DB handle inside of with_transaction. \ This would have lead to a deadlock." | _ -> Throttle.enqueue conn f let run_query ~month_offset t query = Logger.debug_in_thread !"Executing query: %s" query; let colnames t = Dblib.numcols t |> List.range 0 |> List.map ~f:(fun i -> Dblib.colname t (i + 1)) in Dblib.cancel t; Dblib.sqlexec t query; let rec result_set_loop result_sets = match Dblib.results t with | true -> let colnames = colnames t in let rec loop rows colnames = Result.try_with (fun () -> Dblib.nextrow t) |> function | Ok row -> let row = Row.create_exn ~month_offset row colnames in loop (row :: rows) colnames | Error Caml.Not_found -> List.rev rows :: result_sets |> result_set_loop | Error e -> raise e in loop [] colnames | false -> result_sets in result_set_loop [] |> List.rev let format_query query params = let params_formatted = List.map params ~f:Db_field.to_string_escaped |> Array.of_list in let lexbuf = Lexing.from_string query in Query_parser.main Query_lexer.token lexbuf |> List.map ~f:( let open Query_parser_types in function | Other s -> s | Param n -> (* $1 is the first param *) let i = n - 1 in if i < 0 then failwithf [%here] ~query ~params "Query has param $%d but params should start at $1." n; let len = Array.length params_formatted in if i >= len then failwithf [%here] ~query ~params "Query has param $%d but there are only %d params." n len; Array.get params_formatted i) |> String.concat ~sep:"" let execute' ?params ~query ~formatted_query ({ month_offset ; _ } as t) = sequencer_enqueue t @@ fun conn -> In_thread.run (fun () -> Mssql_error.with_wrap ~query ?params ~formatted_query [%here] (fun () -> run_query ~month_offset conn formatted_query)) let execute_multi_result ?(params=[]) conn query = let formatted_query = format_query query params in execute' conn ~query ~params ~formatted_query let execute ?params conn query = execute_multi_result ?params conn query >>| function | [] -> [] | result_set :: [] -> result_set | results -> failwithf [%here] ~query ?params ~results "Mssql.execute expected one result set but got %d result sets" (List.length results) let execute_unit ?params conn query = let%map results = execute_multi_result ?params conn query in List.iteri results ~f:(fun i -> function | [] -> () | rows -> failwithf [%here] ~query ?params ~results "Mssql.execute_unit expected no rows but result set %d has %d rows" i (List.length rows)) let execute_single ?params conn query = execute ?params conn query >>| function | [] -> None | row :: [] -> Some row | rows -> failwithf [%here] ~query ?params ~results:[rows] "Mssql.execute_single expected 0 or 1 results but got %d rows" (List.length rows) let execute_many ~params conn query = let formatted_query = List.map params ~f:(format_query query) |> String.concat ~sep:";" in execute' conn ~query ~params:(List.concat params) ~formatted_query let begin_transaction conn = execute_unit conn "BEGIN TRANSACTION" let commit conn = execute_unit conn "COMMIT" let rollback conn = execute_unit conn "ROLLBACK" let with_transaction' t f = (* Use the sequencer to prevent any other copies of this DB handle from executing during the transaction *) sequencer_enqueue t @@ fun conn -> Scheduler.find_local parent_transactions_key |> Option.value ~default:Bigint.Set.empty |> Fn.flip Set.add t.transaction_id |> Option.some |> Scheduler.with_local parent_transactions_key ~f:(fun () -> (* Make a new sub-sequencer so our own queries can continue *) let t = { t with conn = Sequencer.create ~continue_on_error:true conn |> Option.some ; transaction_id = next_transaction_id () } in let%bind () = begin_transaction t in let%bind res = f t in let%map () = match res with | Ok _ -> commit t | Error _ -> rollback t in res) let with_transaction t f = with_transaction' t (fun t -> Monitor.try_with (fun () -> f t)) >>| function | Ok res -> res | Error exn -> raise exn let with_transaction_or_error t f = with_transaction' t (fun t -> Monitor.try_with_join_or_error (fun () -> f t)) let rec connect ?(tries=5) ~host ~db ~user ~password ~port () = try let conn = Dblib.connect ~user ~password (* We have issues with anything higher than this *) ~version:Dblib.V70 (* Clifford gives FreeTDS conversion errors if we choose anything else, eg: ("Error(CONVERSION, \"Some character(s) could not be converted into client's character set. Unconverted bytes were changed to question marks ('?')\")") *) ~charset:"CP1252" host in Dblib.use conn db; conn with exn -> if tries = 0 then raise exn else Logger.info_in_thread "Retrying Mssql.connect due to exn: %s" (Exn.to_string exn); connect ~tries:(tries-1) ~host ~db ~user ~password ~port () (* These need to be on for some reason, eg: DELETE failed because the following SET options have incorrect settings: 'ANSI_NULLS, QUOTED_IDENTIFIER, CONCAT_NULL_YIELDS_NULL, ANSI_WARNINGS, ANSI_PADDING'. Verify that SET options are correct for use with indexed views and/or indexes on computed columns and/or filtered indexes and/or query notifications and/or XML data type methods and/or spatial index operations.*) let init_conn c = execute_multi_result c "SET QUOTED_IDENTIFIER ON SET ANSI_NULLS ON SET ANSI_WARNINGS ON SET ANSI_PADDING ON SET CONCAT_NULL_YIELDS_NULL ON" |> Deferred.ignore let close ({ conn ; _ } as t) = match conn with (* already closed *) | None -> Deferred.unit | Some conn -> t.conn <- None; Throttle.enqueue conn @@ fun conn -> In_thread.run (fun () -> Dblib.close conn) let create ~host ~db ~user ~password ~port () = let%bind conn = let%map conn = In_thread.run (connect ~host ~db ~user ~password ~port) >>| Sequencer.create ~continue_on_error:true in { conn = Some conn ; transaction_id = next_transaction_id () ; month_offset = 0 } in Monitor.try_with begin fun () -> (* Since FreeTDS won't tell us if it was compiled with 0-based month or 1-based months, make a query to check when we first startup and keep track of the offset so we can correct it. *) let query = "SELECT CAST('2017-02-02' AS DATETIME) AS x" in execute_single conn query >>= function | Some row -> let month_offset = Row.datetime_exn row "x" |> Time.(to_date ~zone:Zone.utc) |> Date.month |> function | Month.Feb -> 0 | Month.Jan -> 1 | month -> failwithf [%here] ~query "Expected month index workaround query to return February as \ either Jan or Feb but got %s" (Month.to_string month) in let conn = { conn with month_offset } in init_conn conn >>| fun () -> conn | None -> failwith [%here] ~query "Expected month index workaround query to return one row but got none" end >>= function | Ok res -> return res | Error exn -> let%map () = close conn in raise exn let with_conn ~host ~db ~user ~password ~port f = let%bind conn = create ~host ~db ~user ~password ~port () in Monitor.protect (fun () -> f conn) ~finally:(fun () -> close conn) (* FIXME: There's a bunch of other stuff we should really reset, but SQL Server doesn't publically expose sp_reset_connect :( *) let cleanup_connection conn = execute_unit conn {| IF @@TRANCOUNT > 0 ROLLBACK |} module Pool = struct type p = { make : unit -> t Deferred.t ; connections : t option ref Throttle.t } let with_pool ~host ~db ~user ~password ~port ?(max_connections=10) f = let make = create ~host ~db ~user ~password ~port in let connections = List.init max_connections ~f:(fun _ -> ref None) |> Throttle.create_with ~continue_on_error:true in Throttle.at_kill connections (fun conn -> match !conn with | Some conn -> close conn | None -> return ()); let%map res = f { make ; connections } in Throttle.kill connections; res let with_conn { make ; connections } f = let pool_conn_close conn = match !conn with | Some c -> Monitor.try_with (fun () -> close c) (* Intentionally ignore any errors when closing the broken connection *) >>| fun _ -> conn := None | None -> Deferred.unit in Throttle.enqueue connections (fun conn -> (* Check if the connection is good; close it if it's not *) (match !conn with | Some c -> Monitor.try_with (fun () -> execute c "SELECT 1") >>= (function | Ok _ -> return () | Error exn -> Exn.to_string exn |> Logger.error "Closing bad MSSQL connection due to exn: %s"; pool_conn_close conn) | None -> return ()) >>= fun () -> (* Find or open a connection *) let%bind c = match !conn with | Some c -> return c | None -> let%map c = make () in conn := Some c; c in (* Run our actual target function *) Monitor.try_with (fun () -> let%bind res = f c in let%map () = cleanup_connection c in res) >>= function | Ok res -> return res | Error exn -> let backtrace = Caml.Printexc.get_raw_backtrace () in let%map () = pool_conn_close conn in Caml.Printexc.raise_with_backtrace exn backtrace) end