package vscoq-language-server
VSCoq language server
Install
Dune Dependency
Authors
Maintainers
Sources
vscoq-language-server-2.2.0.tar.gz
md5=1a5e8a51bc5c10d50055364c2d58ef24
sha512=f044284a187a11161740ea93be2d4ffe6a52db3e9e84fbd45b561dc1edd760ecf76a3792609f8e339aba637e1e6a417d47ecaee6a1d9a54d8352a38e9363ec8c
doc/src/vscoq-language-server.dm/delegationManager.ml.html
Source file delegationManager.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
(**************************************************************************) (* *) (* VSCoq *) (* *) (* Copyright INRIA and contributors *) (* (see version control and README file for authors & dates) *) (* *) (**************************************************************************) (* *) (* This file is distributed under the terms of the MIT License. *) (* See LICENSE file. *) (* *) (**************************************************************************) open Types let Log log = Log.mk_log "delegationManager" type sentence_id = Stateid.t type link = { write_to : Unix.file_descr; read_from: Unix.file_descr; } let write_value { write_to; _ } x = (* alert: calling log from write value causes a loop, since log (from the worker) writes the value to a channel. Hence we mask [log] *) let [@warning "-26"] log _ = () in let data = Marshal.to_bytes x [] in let datalength = Bytes.length data in let writeno = Unix.write write_to data 0 datalength in assert(writeno = datalength); flush_all () let abort_on_unix_error f x = try f x with Unix.Unix_error(e,f,p) -> Printf.eprintf "Error: %s: %s: %s\n%!" f p (Unix.error_message e); exit 3 type job_handle = (Feedback.route_id * sentence_id) * int option ref module type Job = sig type t val name : string val binary_name : string val initial_pool_size : int type update_request val appendFeedback : Feedback.route_id * sentence_id -> (Feedback.level * Loc.t option * Quickfix.t list * Pp.t) -> update_request end (* One typically created a job id way before the worker is spawned, so we allocate a slot for the PID, but set it later. The sentence_id is used for error reporting (e.g. fail to spawn) *) let mk_job_handle (rid, sid) : job_handle = (rid, sid), ref None let cancel_job (_,id) = match !id with | None -> () | Some pid -> Unix.kill pid 9 (* TODO: this queue should not be here, it should be "per URI" but we want to keep here the conversion (STM) feedback -> (LSP) feedback *) let install_feedback send = Log.feedback_add_feeder_on_Message (fun route span _ lvl loc qf msg -> send (route,span,(lvl,loc, qf, msg))) module type Worker = sig type job_t type job_update_request val resize_pool : int -> unit (** Event for the main loop *) type delegation val pr_event : delegation -> Pp.t type events = delegation Sel.Event.t list (** handling an event may require an update to a sentence in the exec state, e.g. when a feedback is received *) val handle_event : delegation -> (job_update_request option * events) (* When a worker is available and the [jobs] queue can be popped the event becomes ready; in turn the event triggers the action: - if we can fork, job is passed to fork_action - otherwise Job.binary_name is spawn and the job sent to it *) val worker_available : jobs:((job_handle * Sel.Event.cancellation_handle * job_t) Queue.t) -> fork_action:(job_t -> send_back:(job_update_request -> unit) -> unit) -> feedback_cleanup:(unit -> unit) -> delegation Sel.Event.t (* for worker toplevels *) type options val parse_options : string list -> options * string list (* the sentence ids of the remote_mapping being delegated *) val setup_plumbing : options -> ((job_update_request -> unit) * job_t) (* CDebug aware print *) val log : string -> unit end module MakeWorker (Job : Job) = struct type job_t = Job.t type job_update_request = Job.update_request type worker_message = | Job_update of Job.update_request | DebugMessage of Log.event let Log log_worker = Log.mk_log ("worker." ^ Job.name) let install_feedback_worker ~feedback_cleanup link = feedback_cleanup (); ignore(install_feedback (fun (rid,id,fb) -> write_value link (Job.appendFeedback (rid, id) fb))) type feedback_cleanup = unit -> unit (* This is the lifetime of a delegation, there is one start event, many progress evants, then one ending event. *) type delegation = | WorkerStart : feedback_cleanup * job_handle * 'job * ('job -> send_back:(Job.update_request -> unit) -> unit) * string -> delegation | WorkerProgress of { link : link; update_request : worker_message } (* TODO: use a recurring event (+cancel) and remove link *) | WorkerEnd of (int * Unix.process_status) | WorkerIOError of exn let pr_event = function | WorkerEnd _ -> Pp.str "WorkerEnd" | WorkerIOError _ -> Pp.str "WorkerIOError" | WorkerProgress _ -> Pp.str "WorkerProgress" | WorkerStart _ -> Pp.str "WorkerStart" let install_debug_worker link = Log.worker_initialization_done ~fwd_event:(fun e -> write_value link (DebugMessage e)) type events = delegation Sel.Event.t list type role = Master | Worker of link (* The pool is just a queue of tokens *) let pool = Queue.create () let () = assert(Job.initial_pool_size >= 1); for _i = 0 to Job.initial_pool_size do Queue.push () pool done let current_pool_size = ref Job.initial_pool_size let resize_pool new_pool_size = assert(new_pool_size >= 1); let delta = !current_pool_size - new_pool_size in current_pool_size := new_pool_size; (* We add tokens if needed *) if delta < 0 then for _i = 1 to abs(delta) do Queue.push () pool done; (* We remove tokens if needed, the ones currently in use are not added back. See handling of WorkerEnd and WorkerIOError *) if delta > 0 then for _i = 1 to abs(delta) do ignore(Queue.take_opt pool) done ;; (* In order to create a job we enqueue this event *) let worker_available ~jobs ~fork_action ~feedback_cleanup : delegation Sel.Event.t = Sel.On.queues jobs pool (fun (job_handle, _, job) () -> WorkerStart (feedback_cleanup,job_handle,job,fork_action,Job.binary_name)) (* When a worker is spawn, we enqueue this event, since eventually it will die *) let worker_ends pid : delegation Sel.Event.t = Sel.On.death_of ~pid (fun reason -> WorkerEnd(pid,reason)) (* When a worker is spawn, we enqueue this event, since eventually will make progress *) let worker_progress link : delegation Sel.Event.t = Sel.On.ocaml_value link.read_from (function | Error e -> WorkerIOError e | Ok update_request -> WorkerProgress { link; update_request; }) (* ************ spawning *************************************************** *) let accept_timeout ?(timeout=2.0) sr = let r, _, _ = Unix.select [sr] [] [] timeout in if r = [] then None else Some (Unix.accept sr) let fork_worker : feedback_cleanup:feedback_cleanup -> int option ref -> (role * events, string * events) result = fun ~feedback_cleanup cancellation_handle -> let open Unix in try let chan = socket PF_INET SOCK_STREAM 0 in bind chan (ADDR_INET (Unix.inet_addr_loopback,0)); listen chan 1; let address = getsockname chan in log @@ "forking..."; flush_all (); let null = openfile "/dev/null" [O_RDWR] 0o640 in let pid = fork () in if pid = 0 then begin (* Children process *) dup2 null stdin; dup2 null stdout; close chan; Log.worker_initialization_begins (); let chan = socket PF_INET SOCK_STREAM 0 in connect chan address; let read_from = chan in let write_to = chan in let link = { write_to; read_from } in install_feedback_worker ~feedback_cleanup link; install_debug_worker link; log_worker @@ "borning..."; Ok (Worker link, []) end else (* Parent process *) let () = cancellation_handle := Some pid in match accept_timeout chan with | None -> close chan; log @@ Printf.sprintf "forked pid %d did not connect back" pid; Unix.kill pid 9; Error ("worker did not connect back", [worker_ends pid]) | Some (worker, _worker_addr) -> close chan; log @@ Printf.sprintf "forked pid %d called back" pid; let read_from = worker in let write_to = worker in let link = { write_to; read_from } in Ok (Master, [worker_progress link; worker_ends pid]) with Unix_error(e,f,p) -> Error (f ^": "^ p^": " ^error_message e,[]) ;; let option_name = "-" ^ Str.global_replace (Str.regexp_string " ") "." Job.name ^ "_master_address" let create_process_worker procname cancellation_handle job = let open Unix in try let chan = socket PF_INET SOCK_STREAM 0 in bind chan (ADDR_INET (Unix.inet_addr_loopback,0)); listen chan 1; let port = match getsockname chan with | ADDR_INET(_,port) -> port | _ -> assert false in let null = openfile "/dev/null" [O_RDWR] 0o640 in let extra_flags = if CDebug.get_flags () = "all" then [|"-debug"|] else [||] in let args = Array.append [|procname;option_name;string_of_int port|] extra_flags in let pid = create_process procname args null stdout stderr in close null; let () = cancellation_handle := Some pid in log @@ Printf.sprintf "created worker %d, waiting on port %d" pid port; match accept_timeout chan with | Some(worker, _worker_addr) -> close chan; let read_from = worker in let write_to = worker in let link = { write_to; read_from } in install_feedback_worker ~feedback_cleanup:(fun _ -> ()) link; install_debug_worker link; log @@ "sending job"; write_value link job; flush_all (); log @@ "sent"; Ok [worker_progress link; worker_ends pid] | None -> log @@ Printf.sprintf "child process %d did not connect back" pid; Unix.kill pid 9; Error ("worker did not connect back", [worker_ends pid]) with Unix_error(e,f,p) -> Error (f ^": "^ p^": " ^error_message e,[]) (* **************** /spawning ********************************************** *) let handle_event = function | WorkerIOError e -> log @@ "worker IO Error: " ^ Printexc.to_string e; if Queue.length pool < !current_pool_size then Queue.push () pool; (None, []) | WorkerEnd (pid, _status) -> log @@ Printf.sprintf "worker %d went on holidays" pid; if Queue.length pool < !current_pool_size then Queue.push () pool; (None,[]) | WorkerProgress { link; update_request = DebugMessage d } -> Log.handle_event d; (None, [worker_progress link]) | WorkerProgress { link; update_request = Job_update u } -> log "worker progress"; (Some u, [worker_progress link]) | WorkerStart (feedback_cleanup, (feedback_route,cancellation_handle),job,action,procname) -> log "worker starts"; if Sys.os_type = "Unix" then match fork_worker ~feedback_cleanup cancellation_handle with | Ok(Master, events) -> log "worker spawned (fork)"; (None, events) | Ok(Worker link, _) -> action job ~send_back:(fun j -> abort_on_unix_error write_value link (Job_update j)); exit 0 | Error(msg, cleanup_events) -> log @@ "worker did not spawn: " ^ msg; (Some(Job.appendFeedback feedback_route (Feedback.Error,None,[],Pp.str msg)), cleanup_events) else match create_process_worker procname cancellation_handle job with | Ok events -> log "worker spawned (create_process)"; (None, events) | Error(msg, cleanup_events) -> log @@ "worker did not spawn: " ^ msg; (Some(Job.appendFeedback feedback_route (Feedback.Error,None,[],Pp.str msg)), cleanup_events) (* the only option is the socket port *) type options = int let setup_plumbing port = try let open Unix in let chan = socket PF_INET SOCK_STREAM 0 in let address = ADDR_INET (inet_addr_loopback,port) in log_worker @@ "connecting to " ^ string_of_int port; connect chan address; let read_from = chan in let write_to = chan in let link = { read_from; write_to } in (* Unix.read_value does not exist, we use Sel *) match Sel.(pop Todo.(add empty [Sel.On.ocaml_value read_from (fun x -> x)])) with | Ok (job : Job.t), _ -> (write_value link, job) | Error exn, _ -> log_worker @@ "error receiving job: " ^ Printexc.to_string exn; exit 1 with Unix.Unix_error(code,syscall,param) -> log_worker @@ Printf.sprintf "error starting: %s: %s: %s" syscall param (Unix.error_message code); exit 1 let parse_options extra_args = match extra_args with | [ o ; port ] when o = option_name -> int_of_string port, [] | _ -> Printf.eprintf "unknown arguments: %s" (String.concat " " extra_args); exit 2 let log = log_worker end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>