package activitypub_server

  1. Overview
  2. Docs

Source file qfile.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
(*********************************************************************************)
(*                OCaml-ActivityPub                                              *)
(*                                                                               *)
(*    Copyright (C) 2023-2024 INRIA All rights reserved.                         *)
(*    Author: Maxence Guesdon, INRIA Saclay                                      *)
(*                                                                               *)
(*    This program is free software; you can redistribute it and/or modify       *)
(*    it under the terms of the GNU Lesser General Public License version        *)
(*    3 as published by the Free Software Foundation.                            *)
(*                                                                               *)
(*    This program is distributed in the hope that it will be useful,            *)
(*    but WITHOUT ANY WARRANTY; without even the implied warranty of             *)
(*    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *)
(*    GNU General Public License for more details.                               *)
(*                                                                               *)
(*    You should have received a copy of the GNU General Public License          *)
(*    along with this program; if not, write to the Free Software                *)
(*    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA                   *)
(*    02111-1307  USA                                                            *)
(*                                                                               *)
(*    Contact: maxence.guesdon@inria.fr                                          *)
(*                                                                               *)
(*********************************************************************************)

module AP = Activitypub
module Log = AP.Log

module type P = sig
    type t
    val store : t list -> Lwt_io.output_channel -> unit Lwt.t
    val read : Lwt_io.input_channel -> t list Lwt.t
    val to_string : t -> string
  end
module type S =
  sig
    type elt
    type t
    val create : dir:string -> prefix:string ->
      handle:(elt -> unit Lwt.t) -> t Lwt.t
    val push : elt -> t -> unit Lwt.t
  end
module Make (P:P) : S with type elt = P.t = struct
    type elt = P.t
    type t = {
        file : string ;
        file_bak : string ;
        q : P.t Queue.t ;
        mutex : Lwt_mutex.t ;
      }

    let with_mutex f t = Lwt_mutex.with_lock t.mutex (fun () -> f t)

    let refill_ t =
      let%lwt l, failed_first =
        try%lwt
          let%lwt l = Lwt_io.(with_file Input t.file P.read) in
          Lwt.return (l, false)
        with e ->
            Log.err (fun m -> m "%s: %s" t.file (Printexc.to_string e));
            try%lwt
              let%lwt l = Lwt_io.(with_file Input t.file_bak P.read) in
              Lwt.return (l, true)
            with e ->
                Log.err (fun m -> m "%s: %s" t.file (Printexc.to_string e));
                Lwt.return ([], true)
      in
      let%lwt () =
        if failed_first then
          Lwt_io.(with_file Output t.file (P.store l))
        else
          Lwt.return_unit
      in
      Queue.clear t.q ;
      List.iter (fun x -> Queue.push x t.q) l;
      Lwt.return_unit

    let refill = with_mutex refill_

    let save_ t =
      let l = List.rev (Queue.fold (fun acc x -> x :: acc) [] t.q) in
      let%lwt () = Lwt_unix.rename t.file t.file_bak in
      Lwt_io.(with_file Output t.file (P.store l))

    let save = with_mutex save_

    let pop =
      with_mutex (fun t -> ignore (Queue.take_opt t.q); Lwt.return_unit)

    let push elt =
      with_mutex (fun t -> Queue.push elt t.q; save_ t)

    let peek_opt =
      with_mutex (fun t -> Lwt.return (Queue.peek_opt t.q))

    let rec handle_peek t f =
      match%lwt peek_opt t with
      | None -> let%lwt () = Lwt_unix.sleep 0.1 in handle_peek t f
      | Some elt  ->
          Log.debug (fun m -> m "Qfile: handling %s" (P.to_string elt));
          (match%lwt f elt with
           | () ->
               let%lwt () = pop t in
               let%lwt () = save t in
               handle_peek t f
           | exception e ->
               Log.err (fun m -> m "%s" (Printexc.to_string e));
               let%lwt () = pop t in
               let%lwt () = push elt t in
               let%lwt () = Lwt_unix.sleep 0.1 in
               handle_peek t f
          )

    let create ~dir ~prefix ~handle =
      let%lwt () = AP.Utils.mkdir dir in
      let file = Filename.concat dir prefix in
      let file_bak = file ^ ".bak" in
      let mutex = Lwt_mutex.create () in
      let q = Queue.create () in
      let t = { file ; file_bak ; q ; mutex } in
      let%lwt () = refill t in
      Lwt.async (fun () -> handle_peek t handle) ;
      Lwt.return t
  end
OCaml

Innovation. Community. Security.