package obuilder

  1. Overview
  2. Docs

Source file build_log.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
open Lwt.Infix

let max_chunk_size = 4096

type t = {
  mutable state : [
    | `Open of Lwt_unix.file_descr * unit Lwt_condition.t  (* Fires after writing more data. *)
    | `Readonly of string
    | `Empty
    | `Finished
  ];
  mutable len : int;
}

let with_dup fd fn =
  let fd = Lwt_unix.dup ~cloexec:true fd in
  Lwt.finalize
    (fun () -> fn fd)
    (fun () -> Lwt_unix.close fd)

let catch_cancel fn =
  Lwt.catch fn
    (function
      | Lwt.Canceled -> Lwt_result.fail `Cancelled
      | ex -> Lwt.fail ex
    )

let tail ?switch t dst =
  let rec readonly_tail ch buf =
    Lwt_io.read_into ch buf 0 max_chunk_size >>= function
    | 0 -> Lwt_result.return ()
    | n -> dst (Bytes.sub_string buf 0 n); readonly_tail ch buf
  in

  let rec open_tail fd cond buf i =
    match switch with
    | Some sw when not (Lwt_switch.is_on sw) -> Lwt_result.fail `Cancelled
    | Some _ | None ->
      let avail = min (t.len - i) max_chunk_size in
      if avail > 0 then (
        Lwt_unix.pread fd ~file_offset:i buf 0 avail >>= fun n ->
        dst (Bytes.sub_string buf 0 n);
        open_tail fd cond buf (i + avail)
      ) else (
        match t.state with
        | `Open _ -> Lwt_condition.wait cond >>= fun () -> open_tail fd cond buf i
        | `Readonly _ | `Empty | `Finished -> Lwt_result.return ()
      )
  in

  let interrupt th =
    catch_cancel @@ fun () ->
    Lwt_switch.add_hook_or_exec switch (fun () -> Lwt.cancel th; Lwt.return_unit) >>= fun () ->
    th
  in

  match t.state with
  | `Finished -> invalid_arg "tail: log is finished!"
  | `Readonly path ->
    let flags = [Unix.O_RDONLY; Unix.O_NONBLOCK; Unix.O_CLOEXEC] in
    Lwt_io.(with_file ~mode:input ~flags) path @@ fun ch ->
    let buf = Bytes.create max_chunk_size in
    interrupt (readonly_tail ch buf)
  | `Empty -> Lwt_result.return ()
  | `Open (fd, cond) ->
    (* Dup [fd], which can still work after [fd] is closed. *)
    with_dup fd @@ fun fd ->
    let buf = Bytes.create max_chunk_size in
    interrupt (open_tail fd cond buf 0)

let create path =
  Lwt_unix.openfile path Lwt_unix.[O_CREAT; O_TRUNC; O_RDWR; O_CLOEXEC] 0o666 >|= fun fd ->
  let cond = Lwt_condition.create () in
  {
    state = `Open (fd, cond);
    len = 0;
  }

let finish t =
  match t.state with
  | `Finished -> invalid_arg "Log is already finished!"
  | `Open (fd, cond) ->
    t.state <- `Finished;
    Lwt_unix.close fd >|= fun () ->
    Lwt_condition.broadcast cond ()
  | `Readonly _ ->
    t.state <- `Finished;
    Lwt.return_unit
  | `Empty ->
    Lwt.return_unit (* Empty can be reused *)

let write t data =
  match t.state with
  | `Finished -> invalid_arg "write: log is finished!"
  | `Readonly _ | `Empty -> invalid_arg "Log is read-only!"
  | `Open (fd, cond) ->
    let len = String.length data in
    Os.write_all fd (Bytes.of_string data) 0 len >>= fun () ->
    t.len <- t.len + len;
    Lwt_condition.broadcast cond ();
    Lwt.return_unit

let of_saved path =
  Lwt_unix.lstat path >|= fun stat ->
  {
    state = `Readonly path;
    len = stat.st_size;
  }

let printf t fmt =
  Fmt.kstr (write t) fmt

let empty = {
  state = `Empty;
  len = 0;
}

let copy ~src ~dst =
  let buf = Bytes.create max_chunk_size in
  let rec aux () =
    Lwt_unix.read src buf 0 (Bytes.length buf) >>= function
    | 0 -> Lwt.return_unit
    | n -> write dst (Bytes.sub_string buf 0 n) >>= aux
  in
  aux ()
OCaml

Innovation. Community. Security.