aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--Makefile.common2
-rw-r--r--lib/lib.mllib2
-rw-r--r--lib/spawn.ml280
-rw-r--r--lib/spawn.mli88
-rw-r--r--lib/spawned.ml107
-rw-r--r--lib/spawned.mli26
6 files changed, 504 insertions, 1 deletions
diff --git a/Makefile.common b/Makefile.common
index e25aed09e..06d97567b 100644
--- a/Makefile.common
+++ b/Makefile.common
@@ -210,7 +210,7 @@ endif
LINKCMO:=$(CORECMA) $(STATICPLUGINS)
LINKCMX:=$(CORECMA:.cma=.cmxa) $(STATICPLUGINS:.cma=.cmxa)
-IDEDEPS:=lib/clib.cma lib/xml_lexer.cmo lib/xml_parser.cmo lib/xml_printer.cmo
+IDEDEPS:=lib/clib.cma lib/xml_lexer.cmo lib/xml_parser.cmo lib/xml_printer.cmo lib/errors.cmo lib/spawn.cmo
IDECMA:=ide/ide.cma
LINKIDE:=$(IDEDEPS) $(IDECMA) ide/coqide_main.ml
diff --git a/lib/lib.mllib b/lib/lib.mllib
index f641e68f0..be15eca60 100644
--- a/lib/lib.mllib
+++ b/lib/lib.mllib
@@ -8,6 +8,8 @@ Segmenttree
Unicodetable
Unicode
System
+Spawn
+Spawned
Trie
Profile
Explore
diff --git a/lib/spawn.ml b/lib/spawn.ml
new file mode 100644
index 000000000..e350926b7
--- /dev/null
+++ b/lib/spawn.ml
@@ -0,0 +1,280 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+let proto_version = 0
+let prefer_sock = Sys.os_type = "Win32"
+let accept_timeout = 2.0
+
+let pr_err s = Printf.eprintf "(Spawn ,%d) %s\n%!" (Unix.getpid ()) s
+let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
+
+type req = ReqDie | ReqStats | Hello of int * int
+type resp = RespStats of Gc.stat
+
+module type Control = sig
+ type handle
+
+ val kill : handle -> unit
+ val stats : handle -> Gc.stat
+ val wait : handle -> Unix.process_status
+ val unixpid : handle -> int
+ val uid : handle -> string
+
+ val kill_if : handle -> sec:int -> (unit -> bool) -> unit
+end
+
+module type Timer = sig
+
+ val add_timeout : sec:int -> (unit -> bool) -> unit
+end
+
+module type MainLoopModel = sig
+ type async_chan
+ type condition
+ type watch_id
+
+ val add_watch : callback:(condition list -> bool) -> async_chan -> watch_id
+ val remove_watch : watch_id -> unit
+ val read_all : async_chan -> string
+ val async_chan_of_file : Unix.file_descr -> async_chan
+ val async_chan_of_socket : Unix.file_descr -> async_chan
+
+ include Timer
+end
+
+(* Common code *)
+let assert_ b s = if not b then Errors.anomaly (Pp.str s)
+
+let mk_socket_channel () =
+ let open Unix in
+ let s = socket PF_INET SOCK_STREAM 0 in
+ bind s (ADDR_INET (inet_addr_loopback,0));
+ listen s 1;
+ match getsockname s with
+ | ADDR_INET(host, port) ->
+ s, string_of_inet_addr host ^":"^ string_of_int port
+ | _ -> assert false
+
+let accept s =
+ let r, _, _ = Unix.select [s] [] [] accept_timeout in
+ if r = [] then raise (Failure (Printf.sprintf
+ "The spawned process did not connect back in %2.1fs" accept_timeout));
+ let cs, _ = Unix.accept s in
+ Unix.close s;
+ let cin, cout = Unix.in_channel_of_descr cs, Unix.out_channel_of_descr cs in
+ set_binary_mode_in cin true;
+ set_binary_mode_out cout true;
+ cs, cin, cout
+
+let handshake cin cout =
+ try
+ output_value cout (Hello (proto_version,Unix.getpid ())); flush cout;
+ match input_value cin with
+ | Hello(v, pid) when v = proto_version ->
+ prerr_endline (Printf.sprintf "Handshake with %d OK" pid);
+ pid
+ | _ -> raise (Failure "handshake protocol")
+ with
+ | Failure s | Invalid_argument s | Sys_error s ->
+ pr_err ("Handshake failed: " ^ s); raise (Failure "handshake")
+ | End_of_file ->
+ pr_err "Handshake failed: End_of_file"; raise (Failure "handshake")
+
+let spawn_sock env prog args =
+ let main_sock, main_sock_name = mk_socket_channel () in
+ let extra = [| prog; "-main-channel"; main_sock_name |] in
+ let args = Array.append extra args in
+ prerr_endline ("EXEC: " ^ String.concat " " (Array.to_list args));
+ let pid =
+ Unix.create_process_env prog args env Unix.stdin Unix.stdout Unix.stderr in
+ if pid = 0 then begin
+ Unix.sleep 1; (* to avoid respawning like crazy *)
+ raise (Failure "create_process failed")
+ end;
+ let cs, cin, cout = accept main_sock in
+ let winpid = handshake cin cout in
+ pid, winpid, cin, cout, cs
+
+let spawn_pipe env prog args =
+ let master2worker_r,master2worker_w = Unix.pipe () in
+ let worker2master_r,worker2master_w = Unix.pipe () in
+ let extra = [| prog; "-main-channel"; "stdfds" |] in
+ let args = Array.append extra args in
+ Unix.set_close_on_exec master2worker_w;
+ Unix.set_close_on_exec worker2master_r;
+ prerr_endline ("EXEC: " ^ String.concat " " (Array.to_list args));
+ let pid =
+ Unix.create_process_env
+ prog args env master2worker_r worker2master_w Unix.stderr in
+ if pid = 0 then begin
+ Unix.sleep 1; (* to avoid respawning like crazy *)
+ raise (Failure "create_process failed")
+ end;
+ prerr_endline ("PID " ^ string_of_int pid);
+ Unix.close master2worker_r;
+ Unix.close worker2master_w;
+ let cin = Unix.in_channel_of_descr worker2master_r in
+ let cout = Unix.out_channel_of_descr master2worker_w in
+ set_binary_mode_in cin true;
+ set_binary_mode_out cout true;
+ let winpid = handshake cin cout in
+ pid, winpid, cin, cout, worker2master_r
+
+let filter_args args =
+ let rec aux = function
+ | "-control-channel" :: _ :: rest -> aux rest
+ | "-main-channel" :: _ :: rest -> aux rest
+ | x :: rest -> x :: aux rest
+ | [] -> [] in
+ Array.of_list (aux (Array.to_list args))
+
+let spawn_with_control prefer_sock env prog args =
+ let control_sock, control_sock_name = mk_socket_channel () in
+ let extra = [| "-control-channel"; control_sock_name |] in
+ let args = Array.append extra (filter_args args) in
+ let (pid, winpid, cin, cout, s), is_sock =
+ if Sys.os_type <> "Unix" || prefer_sock
+ then spawn_sock env prog args, true
+ else spawn_pipe env prog args, false in
+ let _, oob_resp, oob_req = accept control_sock in
+ let _ = handshake oob_resp oob_req in
+ (pid, winpid), oob_resp, oob_req, cin, cout, s, is_sock
+
+let output_death_sentence pid oob_req =
+ prerr_endline ("death sentence for " ^ pid);
+ try output_value oob_req ReqDie; flush oob_req
+ with e -> prerr_endline ("death sentence: " ^ Printexc.to_string e)
+
+(* spawn a process and read its output asynchronously *)
+module Async(ML : MainLoopModel) = struct
+
+type process = {
+ cin : in_channel;
+ cout : out_channel;
+ oob_resp : in_channel;
+ oob_req : out_channel;
+ gchan : ML.async_chan;
+ pid : int * int;
+ mutable watch : ML.watch_id option;
+ mutable alive : bool;
+}
+
+type callback = ML.condition list -> read_all:(unit -> string) -> bool
+type handle = process
+
+let uid { pid = (_, winpid) } = string_of_int winpid
+let unixpid { pid = (unixpid, _) } = unixpid
+
+let kill ({ pid = (unixpid, _winpid); oob_req; cin; cout; alive; watch } as p) =
+ p.alive <- false;
+ if not alive then prerr_endline "This process is already dead"
+ else begin try
+ Option.iter ML.remove_watch watch;
+ output_death_sentence (uid p) oob_req;
+ close_in_noerr cin;
+ close_out_noerr cout;
+ if Sys.os_type = "Unix" then Unix.kill unixpid 9;
+ p.watch <- None
+ with e -> prerr_endline ("kill: "^Printexc.to_string e) end
+
+let spawn ?(prefer_sock=prefer_sock) ?(env=Unix.environment ())
+ prog args callback
+=
+ let pid, oob_resp, oob_req, cin, cout, main, is_sock =
+ spawn_with_control prefer_sock env prog args in
+ Unix.set_nonblock main;
+ let gchan =
+ if is_sock then ML.async_chan_of_socket main
+ else ML.async_chan_of_file main in
+ let alive, watch = true, None in
+ let p = { cin; cout; gchan; pid; oob_resp; oob_req; alive; watch } in
+ p.watch <- Some (
+ ML.add_watch ~callback:(fun cl ->
+ try
+ let live = callback cl ~read_all:(fun () -> ML.read_all gchan) in
+ if not live then kill p;
+ live
+ with e when Errors.noncritical e ->
+ pr_err ("Async reader raised: " ^ (Printexc.to_string e));
+ kill p;
+ false) gchan
+ );
+ p, cout
+
+let stats { oob_req; oob_resp; alive } =
+ assert_ alive "This process is dead";
+ output_value oob_req ReqStats;
+ flush oob_req;
+ input_value oob_resp
+
+let kill_if p ~sec test =
+ ML.add_timeout ~sec (fun () ->
+ if not p.alive then false
+ else if test () then begin
+ prerr_endline ("death condition for " ^ uid p ^ " is true");
+ kill p;
+ false
+ end else true)
+
+let wait { pid = (unixpid, _) } =
+ try snd (Unix.waitpid [] unixpid)
+ with Unix.Unix_error _ -> Unix.WEXITED 0o400
+
+end
+
+module Sync(T : Timer) = struct
+
+type process = {
+ cin : in_channel;
+ cout : out_channel;
+ oob_resp : in_channel;
+ oob_req : out_channel;
+ pid : int * int;
+ mutable alive : bool;
+}
+
+type handle = process
+
+let spawn ?(prefer_sock=prefer_sock) ?(env=Unix.environment ()) prog args =
+ let pid, oob_resp, oob_req, cin, cout, _, _ =
+ spawn_with_control prefer_sock env prog args in
+ { cin; cout; pid; oob_resp; oob_req; alive = true }, cin, cout
+
+let uid { pid = (_, winpid) } = string_of_int winpid
+let unixpid { pid = (unixpid, _) } = unixpid
+
+let kill ({ pid = (unixpid, _winpid); oob_req; cin; cout; alive } as p) =
+ p.alive <- false;
+ if not alive then prerr_endline "This process is already dead"
+ else begin try
+ output_death_sentence (uid p) oob_req;
+ close_in_noerr cin;
+ close_out_noerr cout;
+ if Sys.os_type = "Unix" then Unix.kill unixpid 9;
+ with e -> prerr_endline ("kill: "^Printexc.to_string e) end
+
+let stats { oob_req; oob_resp; alive } =
+ assert_ alive "This process is dead";
+ output_value oob_req ReqStats;
+ flush oob_req;
+ input_value oob_resp
+
+let kill_if p ~sec test =
+ T.add_timeout ~sec (fun () ->
+ if not p.alive then false
+ else if test () then begin
+ prerr_endline ("death condition for " ^ uid p ^ " is true");
+ kill p;
+ false
+ end else true)
+
+let wait { pid = (unixpid, _) } =
+ try snd (Unix.waitpid [] unixpid)
+ with Unix.Unix_error _ -> Unix.WEXITED 0o400
+
+end
diff --git a/lib/spawn.mli b/lib/spawn.mli
new file mode 100644
index 000000000..1554e0a1e
--- /dev/null
+++ b/lib/spawn.mli
@@ -0,0 +1,88 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+(* This module implements spawning/killing managed processes with a
+ * synchronous or asynchronous comunication channel that works with
+ * threads or with a glib like main loop model.
+ *
+ * This module requires no threads and no main loop model. It takes care
+ * of using the fastest communication channel given the underlying OS and
+ * the requested kind of communication.
+ *
+ * The spawned process must use the Spawned module to init its communication
+ * channels.
+ *)
+
+(* This is the control panel for managed processes *)
+module type Control = sig
+ type handle
+
+ val kill : handle -> unit
+ val stats : handle -> Gc.stat
+ val wait : handle -> Unix.process_status
+ val unixpid : handle -> int
+
+ (* What is used in debug messages *)
+ val uid : handle -> string
+
+ (* Installs a callback, called every [sec] seconds. If the returned value
+ * is true the process is killed *)
+ val kill_if : handle -> sec:int -> (unit -> bool) -> unit
+end
+
+(* Abstraction to work with both threads and main loop models *)
+module type Timer = sig
+
+ val add_timeout : sec:int -> (unit -> bool) -> unit
+end
+
+module type MainLoopModel = sig
+ type async_chan
+ type condition
+ type watch_id
+
+ val add_watch : callback:(condition list -> bool) -> async_chan -> watch_id
+ val remove_watch : watch_id -> unit
+ val read_all : async_chan -> string
+ val async_chan_of_file : Unix.file_descr -> async_chan
+ val async_chan_of_socket : Unix.file_descr -> async_chan
+
+ include Timer
+end
+
+(* spawn a process and read its output asynchronously *)
+module Async(ML : MainLoopModel) : sig
+ type process
+
+ (* If the returned value is false the callback is never called again and
+ * the process is killed *)
+ type callback = ML.condition list -> read_all:(unit -> string) -> bool
+
+ val spawn :
+ ?prefer_sock:bool -> ?env:string array -> string -> string array ->
+ callback -> process * out_channel
+
+ include Control with type handle = process
+end
+
+(* spawn a process and read its output synchronously *)
+module Sync(T : Timer) : sig
+ type process
+
+ val spawn :
+ ?prefer_sock:bool -> ?env:string array -> string -> string array ->
+ process * in_channel * out_channel
+
+ include Control with type handle = process
+end
+
+(* This is exported to separate the Spawned module, that for simplicity assumes
+ * Threads so it is in a separate file *)
+type req = ReqDie | ReqStats | Hello of int * int
+val proto_version : int
+type resp = RespStats of Gc.stat
diff --git a/lib/spawned.ml b/lib/spawned.ml
new file mode 100644
index 000000000..29cf51769
--- /dev/null
+++ b/lib/spawned.ml
@@ -0,0 +1,107 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+open Spawn
+
+let pr_err s = Printf.eprintf "(Spawned,%d) %s\n%!" (Unix.getpid ()) s
+let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
+
+type chandescr = AnonPipe | Socket of string * int
+
+let handshake cin cout =
+ try
+ match input_value cin with
+ | Hello(v, pid) when v = proto_version ->
+ prerr_endline (Printf.sprintf "Handshake with %d OK" pid);
+ output_value cout (Hello (proto_version,Unix.getpid ())); flush cout
+ | _ -> raise (Failure "handshake protocol")
+ with
+ | Failure s | Invalid_argument s | Sys_error s ->
+ pr_err ("Handshake failed: " ^ s); raise (Failure "handshake")
+ | End_of_file ->
+ pr_err "Handshake failed: End_of_file"; raise (Failure "handshake")
+
+let open_bin_connection h p =
+ let open Unix in
+ let cin, cout = open_connection (ADDR_INET (inet_addr_of_string h,p)) in
+ set_binary_mode_in cin true;
+ set_binary_mode_out cout true;
+ handshake cin cout;
+ cin, cout
+
+let controller h p =
+ prerr_endline "starting controller thread";
+ let main () =
+ let ic, oc = open_bin_connection h p in
+ let rec loop () =
+ try
+ match input_value ic with
+ | Hello _ -> prerr_endline "internal protocol error"; exit 1
+ | ReqDie -> prerr_endline "death sentence received"; exit 0
+ | ReqStats ->
+ output_value oc (RespStats (Gc.stat ())); flush oc; loop ()
+ with
+ | e ->
+ prerr_endline ("control channel broken: " ^ Printexc.to_string e);
+ exit 1 in
+ loop () in
+ ignore(Thread.create main ())
+
+let main_channel = ref None
+let control_channel = ref None
+
+let channels = ref None
+
+let init_channels () =
+ if !channels <> None then Errors.anomaly(Pp.str "init_channels called twice");
+ match !main_channel, !control_channel with
+ | None, None -> ()
+ | None, Some _ | Some _, None ->
+ Errors.anomaly (Pp.str "incomplete channels options")
+ | _, Some AnonPipe ->
+ Errors.anomaly (Pp.str "control channel cannot be a pipe")
+ | Some (Socket(mh,mp)), Some (Socket(ch,cp)) ->
+ channels := Some (open_bin_connection mh mp);
+ controller ch cp
+ | Some AnonPipe, Some (Socket (ch,cp)) ->
+ let stdin = Unix.in_channel_of_descr (Unix.dup Unix.stdin) in
+ let stdout = Unix.out_channel_of_descr (Unix.dup Unix.stdout) in
+ Unix.dup2 Unix.stderr Unix.stdout;
+ set_binary_mode_in stdin true;
+ set_binary_mode_out stdout true;
+ channels := Some (stdin, stdout);
+ handshake stdin stdout;
+ controller ch cp
+
+let get_channels () =
+ match !channels with
+ | None -> Errors.anomaly(Pp.str "init_channels not called")
+ | Some(ic, oc) -> ic, oc
+
+let prepare_in_channel_for_thread_friendly_blocking_input ic =
+ if Sys.os_type = "Win32" then Unix.set_nonblock (Unix.descr_of_in_channel ic)
+ else ()
+
+let thread_friendly_blocking_input ic =
+ if Sys.os_type = "Win32" then
+ let open Unix in
+ let open Thread in
+ let fd = descr_of_in_channel ic in
+ let rec loop buf n =
+ try read fd buf 0 n
+ with
+ | Unix.Unix_error((Unix.EWOULDBLOCK|Unix.EAGAIN),_,_) ->
+ (* We wait for some data explicitly yielding each second *)
+ while not (wait_timed_read fd 1.0) do yield () done;
+ loop buf n
+ | Unix.Unix_error _ -> 0
+ in
+ loop
+ else
+ (fun buf n -> Pervasives.input ic buf 0 n)
+
diff --git a/lib/spawned.mli b/lib/spawned.mli
new file mode 100644
index 000000000..bc5649a2f
--- /dev/null
+++ b/lib/spawned.mli
@@ -0,0 +1,26 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+(* To link this file, threads are needed *)
+
+type chandescr = AnonPipe | Socket of string * int
+
+(* Argument parsing should set these *)
+val main_channel : chandescr option ref
+val control_channel : chandescr option ref
+
+(* Immediately after argument parsing one *must* call this *)
+val init_channels : unit -> unit
+
+(* Once initialized, these are the channels to talk with our master *)
+val get_channels : unit -> in_channel * out_channel
+
+(* In multi threaded environments on windows blocking calls do
+ prevent context switch. This seems a huge BUG, here a work around *)
+val prepare_in_channel_for_thread_friendly_blocking_input : in_channel -> unit
+val thread_friendly_blocking_input : in_channel -> string -> int -> int