package async_sendfile

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file async_sendfile.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
open Core
open Async_kernel
open Async_unix

let default_delivery_unit = Byte_units.of_megabytes 2.

module File = struct
  type t =
    { path : string
    ; fd : Fd.t
    ; raw_fd : Core_unix.File_descr.t
    ; bytes_sent : int
    ; bytes_pending : int
    }
  [@@deriving fields]

  let with_file file ~f =
    let%bind stat = Unix.stat file in
    Unix.with_file ~mode:[ `Rdonly ] file ~f:(fun fd ->
      let t =
        { path = file
        ; fd
        ; raw_fd = Fd.file_descr_exn fd
        ; bytes_sent = 0
        ; bytes_pending = Int64.to_int_exn (Unix.Stats.size stat)
        }
      in
      f t)
  ;;

  let update t ~bytes_sent_now =
    { t with
      bytes_sent = t.bytes_sent + bytes_sent_now
    ; bytes_pending = t.bytes_pending - bytes_sent_now
    }
  ;;

  let sendfile = Or_error.ok_exn Linux_ext.sendfile

  let sendfile t ~fd ~delivery_unit =
    let delivery_unit = Byte_units.bytes_int_exn delivery_unit in
    if Int.( = ) 0 t.bytes_pending
    then Ok `Fully_sent
    else (
      try
        let bytes_sent_now =
          sendfile
            ~pos:t.bytes_sent
            ~len:(Int.min t.bytes_pending delivery_unit)
            ~fd:t.raw_fd
            fd
        in
        if bytes_sent_now < 0
        then
          Error
            (Error.create_s
               [%message
                 "Negative return value from [sendfile]"
                   ~return_value:(bytes_sent_now : int)])
        else Ok (`Sent (bytes_sent_now, update t ~bytes_sent_now))
      with
      | exn -> Error (Error.of_exn exn))
  ;;
end

module Limiter = struct
  module Limiter = Limiter_async.Token_bucket

  type t = bytes_sent:int -> unit Deferred.t

  let create ~rate_per_sec =
    let rate = Byte_units.bytes_float rate_per_sec in
    let limiter =
      Limiter.create_exn
        ~burst_size:(Float.to_int rate)
        ~sustained_rate_per_sec:rate
        ~continue_on_error:true
        ()
    in
    fun ~bytes_sent ->
      let sent = Ivar.create () in
      Limiter.enqueue_exn limiter ~allow_immediate_run:true bytes_sent (Ivar.fill sent) ();
      Ivar.read sent
  ;;

  let no_pushback ~bytes_sent:_ = Deferred.unit
end

let optimization_to_achieve_the_limiter_limits deferred f =
  match Deferred.peek deferred with
  | None -> deferred >>= f
  | Some v -> f v
;;

let ( >>== ) = optimization_to_achieve_the_limiter_limits
let error_fd_closed = Error.of_string "Destination fd is closed."

let failed_to_send ~file ~error =
  Error
    (Error.create_s
       [%message
         "Failed to fully send file"
           ~file:(File.path file : string)
           ~bytes_sent:(File.bytes_sent file : int)
           ~bytes_pending:(File.bytes_pending file : int)
           (error : Error.t)])
;;

let feed_file ~file ~fd ~delivery_unit ~limiter =
  if Fd.is_closed fd
  then return (failed_to_send ~file ~error:error_fd_closed)
  else (
    let raw_client_fd = Fd.file_descr_exn fd in
    let rec loop file =
      match File.sendfile ~fd:raw_client_fd file ~delivery_unit with
      | Error error -> return (failed_to_send ~file ~error)
      | Ok `Fully_sent -> Deferred.Or_error.ok_unit
      | Ok (`Sent (bytes_sent, file)) ->
        let ready_to_send = limiter ~bytes_sent in
        let ready_to_write = Fd.ready_to fd `Write in
        ready_to_send
        >>== fun () ->
        ready_to_write
        >>== (function
          | `Ready -> loop file
          | `Closed | `Bad_fd -> return (failed_to_send ~file ~error:error_fd_closed))
    in
    loop file)
;;

let sendfile
      ?(limiter = Limiter.no_pushback)
      ?(delivery_unit = default_delivery_unit)
      ~fd
      ~file
      ()
  =
  File.with_file file ~f:(fun file -> feed_file ~file ~fd ~delivery_unit ~limiter)
;;
OCaml

Innovation. Community. Security.