Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
tcp_channel.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
open! Core open! Async let close_channels reader writer = let%bind () = Output_channel.close writer in Input_channel.close reader ;; let collect_errors writer fn = let monitor = Output_channel.monitor writer in Monitor.detach monitor; choose [ choice (Monitor.get_next_error monitor) (fun e -> Error e) ; choice (Monitor.try_with ~run:`Now ~rest:`Log fn) Fn.id ] ;; let listen ?max_connections ?max_accepts_per_batch ?backlog ?socket ?max_buffer_size ?buf_len ?write_timeout ?time_source ~on_handler_error where_to_listen handler = Tcp.Server.create_sock ?max_connections ?max_accepts_per_batch ?backlog ?socket ?time_source ~on_handler_error where_to_listen (fun addr socket -> let fd = Socket.fd socket in let input_channel = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in let output_channel = Output_channel.create ?max_buffer_size ?buf_len ?write_timeout ?time_source fd in let%bind res = Deferred.any [ collect_errors output_channel (fun () -> handler addr input_channel output_channel) ; Output_channel.remote_closed output_channel |> Deferred.ok ] in let%bind () = close_channels input_channel output_channel in match res with | Ok () -> Deferred.unit | Error exn -> raise exn) ;; let listen_inet ?max_connections ?max_accepts_per_batch ?backlog ?socket ?max_buffer_size ?buf_len ?write_timeout ?time_source ~on_handler_error where_to_listen handler = Tcp.Server.create_sock_inet ?max_connections ?max_accepts_per_batch ?backlog ?socket ?time_source ~on_handler_error where_to_listen (fun addr socket -> let fd = Socket.fd socket in let input_channel = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in let output_channel = Output_channel.create ?max_buffer_size ?buf_len ?write_timeout ?time_source fd in let%bind res = Deferred.any [ collect_errors output_channel (fun () -> handler addr input_channel output_channel) ; Output_channel.remote_closed output_channel |> Deferred.ok ] in let%bind () = close_channels input_channel output_channel in match res with | Ok () -> Deferred.unit | Error exn -> raise exn) ;; let with_connection ?interrupt ?connect_timeout ?max_buffer_size ?buf_len ?write_timeout ?time_source where_to_connect f = let%bind socket = Tcp.connect_sock ?interrupt ?timeout:connect_timeout ?time_source where_to_connect in let fd = Socket.fd socket in let input_channel = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in let output_channel = Output_channel.create ?max_buffer_size ?buf_len ?time_source ?write_timeout fd in let res = collect_errors output_channel (fun () -> f input_channel output_channel) in let%bind () = Deferred.any_unit [ (res >>| fun _ -> ()) ; Output_channel.close_finished output_channel ; Input_channel.closed input_channel ] in let%bind () = close_channels input_channel output_channel in match%map res with | Ok v -> v | Error exn -> Exn.reraise exn "Shuttle.Connection: Unhandled exception in TCP client connection" ;; let connect ?interrupt ?connect_timeout ?max_buffer_size ?buf_len ?write_timeout ?time_source where_to_connect = let%map socket = Tcp.connect_sock ?interrupt ?timeout:connect_timeout ?time_source where_to_connect in let fd = Socket.fd socket in let reader = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in let writer = Output_channel.create ?max_buffer_size ?buf_len ?time_source ?write_timeout fd in reader, writer ;;