package luv

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file thread_pool.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
(* This file is part of Luv, released under the MIT license. See LICENSE.md for
   details, or visit https://github.com/aantron/luv/blob/master/LICENSE.md. *)



module Request_ =
struct
  type t = [ `Thread_pool ] Request.t

  let make () =
    Request.allocate
      ~reference_count:C.Types.Work.reference_count C.Types.Work.t
end

let work_trampoline =
  C.Functions.Work.get_work_trampoline ()

let after_work_trampoline =
  C.Functions.Work.get_after_work_trampoline ()

let queue_work ?loop ?(request = Request_.make ()) f callback =
  let f = Error.catch_exceptions f in
  let wrapped_callback result =
    Error.catch_exceptions callback (Error.to_result () result)
  in
  Request.set_reference ~index:C.Types.Work.function_index request f;
  Request.set_callback request wrapped_callback;

  let immediate_result =
    C.Functions.Work.queue
      (Loop.or_default loop) request work_trampoline after_work_trampoline
  in

  if immediate_result < 0 then begin
    Request.release request;
    callback (Error.result_from_c immediate_result)
  end

let c_work_trampoline =
  C.Functions.Work.get_c_work_trampoline ()

let after_c_work_trampoline =
  C.Functions.Work.get_after_c_work_trampoline ()

let queue_c_work
    ?loop
    ?(request = Request_.make ())
    ?(argument = Nativeint.zero)
    f
    callback =

  let wrapped_callback result =
    Error.catch_exceptions callback (Error.to_result () result)
  in
  Request.set_callback request wrapped_callback;

  let result =
    C.Functions.Work.add_c_function_and_argument request f argument in
  if not result then begin
    Request.release request;
    callback (Result.Error `ENOMEM)
  end

  else begin
    let immediate_result =
      C.Functions.Work.queue
        (Loop.or_default loop)
        request
        c_work_trampoline
        after_c_work_trampoline
    in

    if immediate_result < 0 then begin
      Request.release request;
      callback (Error.result_from_c immediate_result)
    end
  end

module Request = Request_

let set_size ?(if_not_already_set = false) thread_count =
  let already_set =
    match Env.getenv "UV_THREADPOOL_SIZE" with
    | Result.Ok _ -> true
    | Result.Error _ -> false
  in
  if already_set && if_not_already_set then
    ()
  else
    ignore (Env.setenv "UV_THREADPOOL_SIZE" ~value:(string_of_int thread_count))
OCaml

Innovation. Community. Security.