diff options
-rw-r--r-- | Makefile.common | 2 | ||||
-rw-r--r-- | lib/lib.mllib | 2 | ||||
-rw-r--r-- | lib/spawn.ml | 280 | ||||
-rw-r--r-- | lib/spawn.mli | 88 | ||||
-rw-r--r-- | lib/spawned.ml | 107 | ||||
-rw-r--r-- | lib/spawned.mli | 26 |
6 files changed, 504 insertions, 1 deletions
diff --git a/Makefile.common b/Makefile.common index e25aed09e..06d97567b 100644 --- a/Makefile.common +++ b/Makefile.common @@ -210,7 +210,7 @@ endif LINKCMO:=$(CORECMA) $(STATICPLUGINS) LINKCMX:=$(CORECMA:.cma=.cmxa) $(STATICPLUGINS:.cma=.cmxa) -IDEDEPS:=lib/clib.cma lib/xml_lexer.cmo lib/xml_parser.cmo lib/xml_printer.cmo +IDEDEPS:=lib/clib.cma lib/xml_lexer.cmo lib/xml_parser.cmo lib/xml_printer.cmo lib/errors.cmo lib/spawn.cmo IDECMA:=ide/ide.cma LINKIDE:=$(IDEDEPS) $(IDECMA) ide/coqide_main.ml diff --git a/lib/lib.mllib b/lib/lib.mllib index f641e68f0..be15eca60 100644 --- a/lib/lib.mllib +++ b/lib/lib.mllib @@ -8,6 +8,8 @@ Segmenttree Unicodetable Unicode System +Spawn +Spawned Trie Profile Explore diff --git a/lib/spawn.ml b/lib/spawn.ml new file mode 100644 index 000000000..e350926b7 --- /dev/null +++ b/lib/spawn.ml @@ -0,0 +1,280 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +let proto_version = 0 +let prefer_sock = Sys.os_type = "Win32" +let accept_timeout = 2.0 + +let pr_err s = Printf.eprintf "(Spawn ,%d) %s\n%!" (Unix.getpid ()) s +let prerr_endline s = if !Flags.debug then begin pr_err s end else () + +type req = ReqDie | ReqStats | Hello of int * int +type resp = RespStats of Gc.stat + +module type Control = sig + type handle + + val kill : handle -> unit + val stats : handle -> Gc.stat + val wait : handle -> Unix.process_status + val unixpid : handle -> int + val uid : handle -> string + + val kill_if : handle -> sec:int -> (unit -> bool) -> unit +end + +module type Timer = sig + + val add_timeout : sec:int -> (unit -> bool) -> unit +end + +module type MainLoopModel = sig + type async_chan + type condition + type watch_id + + val add_watch : callback:(condition list -> bool) -> async_chan -> watch_id + val remove_watch : watch_id -> unit + val read_all : async_chan -> string + val async_chan_of_file : Unix.file_descr -> async_chan + val async_chan_of_socket : Unix.file_descr -> async_chan + + include Timer +end + +(* Common code *) +let assert_ b s = if not b then Errors.anomaly (Pp.str s) + +let mk_socket_channel () = + let open Unix in + let s = socket PF_INET SOCK_STREAM 0 in + bind s (ADDR_INET (inet_addr_loopback,0)); + listen s 1; + match getsockname s with + | ADDR_INET(host, port) -> + s, string_of_inet_addr host ^":"^ string_of_int port + | _ -> assert false + +let accept s = + let r, _, _ = Unix.select [s] [] [] accept_timeout in + if r = [] then raise (Failure (Printf.sprintf + "The spawned process did not connect back in %2.1fs" accept_timeout)); + let cs, _ = Unix.accept s in + Unix.close s; + let cin, cout = Unix.in_channel_of_descr cs, Unix.out_channel_of_descr cs in + set_binary_mode_in cin true; + set_binary_mode_out cout true; + cs, cin, cout + +let handshake cin cout = + try + output_value cout (Hello (proto_version,Unix.getpid ())); flush cout; + match input_value cin with + | Hello(v, pid) when v = proto_version -> + prerr_endline (Printf.sprintf "Handshake with %d OK" pid); + pid + | _ -> raise (Failure "handshake protocol") + with + | Failure s | Invalid_argument s | Sys_error s -> + pr_err ("Handshake failed: " ^ s); raise (Failure "handshake") + | End_of_file -> + pr_err "Handshake failed: End_of_file"; raise (Failure "handshake") + +let spawn_sock env prog args = + let main_sock, main_sock_name = mk_socket_channel () in + let extra = [| prog; "-main-channel"; main_sock_name |] in + let args = Array.append extra args in + prerr_endline ("EXEC: " ^ String.concat " " (Array.to_list args)); + let pid = + Unix.create_process_env prog args env Unix.stdin Unix.stdout Unix.stderr in + if pid = 0 then begin + Unix.sleep 1; (* to avoid respawning like crazy *) + raise (Failure "create_process failed") + end; + let cs, cin, cout = accept main_sock in + let winpid = handshake cin cout in + pid, winpid, cin, cout, cs + +let spawn_pipe env prog args = + let master2worker_r,master2worker_w = Unix.pipe () in + let worker2master_r,worker2master_w = Unix.pipe () in + let extra = [| prog; "-main-channel"; "stdfds" |] in + let args = Array.append extra args in + Unix.set_close_on_exec master2worker_w; + Unix.set_close_on_exec worker2master_r; + prerr_endline ("EXEC: " ^ String.concat " " (Array.to_list args)); + let pid = + Unix.create_process_env + prog args env master2worker_r worker2master_w Unix.stderr in + if pid = 0 then begin + Unix.sleep 1; (* to avoid respawning like crazy *) + raise (Failure "create_process failed") + end; + prerr_endline ("PID " ^ string_of_int pid); + Unix.close master2worker_r; + Unix.close worker2master_w; + let cin = Unix.in_channel_of_descr worker2master_r in + let cout = Unix.out_channel_of_descr master2worker_w in + set_binary_mode_in cin true; + set_binary_mode_out cout true; + let winpid = handshake cin cout in + pid, winpid, cin, cout, worker2master_r + +let filter_args args = + let rec aux = function + | "-control-channel" :: _ :: rest -> aux rest + | "-main-channel" :: _ :: rest -> aux rest + | x :: rest -> x :: aux rest + | [] -> [] in + Array.of_list (aux (Array.to_list args)) + +let spawn_with_control prefer_sock env prog args = + let control_sock, control_sock_name = mk_socket_channel () in + let extra = [| "-control-channel"; control_sock_name |] in + let args = Array.append extra (filter_args args) in + let (pid, winpid, cin, cout, s), is_sock = + if Sys.os_type <> "Unix" || prefer_sock + then spawn_sock env prog args, true + else spawn_pipe env prog args, false in + let _, oob_resp, oob_req = accept control_sock in + let _ = handshake oob_resp oob_req in + (pid, winpid), oob_resp, oob_req, cin, cout, s, is_sock + +let output_death_sentence pid oob_req = + prerr_endline ("death sentence for " ^ pid); + try output_value oob_req ReqDie; flush oob_req + with e -> prerr_endline ("death sentence: " ^ Printexc.to_string e) + +(* spawn a process and read its output asynchronously *) +module Async(ML : MainLoopModel) = struct + +type process = { + cin : in_channel; + cout : out_channel; + oob_resp : in_channel; + oob_req : out_channel; + gchan : ML.async_chan; + pid : int * int; + mutable watch : ML.watch_id option; + mutable alive : bool; +} + +type callback = ML.condition list -> read_all:(unit -> string) -> bool +type handle = process + +let uid { pid = (_, winpid) } = string_of_int winpid +let unixpid { pid = (unixpid, _) } = unixpid + +let kill ({ pid = (unixpid, _winpid); oob_req; cin; cout; alive; watch } as p) = + p.alive <- false; + if not alive then prerr_endline "This process is already dead" + else begin try + Option.iter ML.remove_watch watch; + output_death_sentence (uid p) oob_req; + close_in_noerr cin; + close_out_noerr cout; + if Sys.os_type = "Unix" then Unix.kill unixpid 9; + p.watch <- None + with e -> prerr_endline ("kill: "^Printexc.to_string e) end + +let spawn ?(prefer_sock=prefer_sock) ?(env=Unix.environment ()) + prog args callback += + let pid, oob_resp, oob_req, cin, cout, main, is_sock = + spawn_with_control prefer_sock env prog args in + Unix.set_nonblock main; + let gchan = + if is_sock then ML.async_chan_of_socket main + else ML.async_chan_of_file main in + let alive, watch = true, None in + let p = { cin; cout; gchan; pid; oob_resp; oob_req; alive; watch } in + p.watch <- Some ( + ML.add_watch ~callback:(fun cl -> + try + let live = callback cl ~read_all:(fun () -> ML.read_all gchan) in + if not live then kill p; + live + with e when Errors.noncritical e -> + pr_err ("Async reader raised: " ^ (Printexc.to_string e)); + kill p; + false) gchan + ); + p, cout + +let stats { oob_req; oob_resp; alive } = + assert_ alive "This process is dead"; + output_value oob_req ReqStats; + flush oob_req; + input_value oob_resp + +let kill_if p ~sec test = + ML.add_timeout ~sec (fun () -> + if not p.alive then false + else if test () then begin + prerr_endline ("death condition for " ^ uid p ^ " is true"); + kill p; + false + end else true) + +let wait { pid = (unixpid, _) } = + try snd (Unix.waitpid [] unixpid) + with Unix.Unix_error _ -> Unix.WEXITED 0o400 + +end + +module Sync(T : Timer) = struct + +type process = { + cin : in_channel; + cout : out_channel; + oob_resp : in_channel; + oob_req : out_channel; + pid : int * int; + mutable alive : bool; +} + +type handle = process + +let spawn ?(prefer_sock=prefer_sock) ?(env=Unix.environment ()) prog args = + let pid, oob_resp, oob_req, cin, cout, _, _ = + spawn_with_control prefer_sock env prog args in + { cin; cout; pid; oob_resp; oob_req; alive = true }, cin, cout + +let uid { pid = (_, winpid) } = string_of_int winpid +let unixpid { pid = (unixpid, _) } = unixpid + +let kill ({ pid = (unixpid, _winpid); oob_req; cin; cout; alive } as p) = + p.alive <- false; + if not alive then prerr_endline "This process is already dead" + else begin try + output_death_sentence (uid p) oob_req; + close_in_noerr cin; + close_out_noerr cout; + if Sys.os_type = "Unix" then Unix.kill unixpid 9; + with e -> prerr_endline ("kill: "^Printexc.to_string e) end + +let stats { oob_req; oob_resp; alive } = + assert_ alive "This process is dead"; + output_value oob_req ReqStats; + flush oob_req; + input_value oob_resp + +let kill_if p ~sec test = + T.add_timeout ~sec (fun () -> + if not p.alive then false + else if test () then begin + prerr_endline ("death condition for " ^ uid p ^ " is true"); + kill p; + false + end else true) + +let wait { pid = (unixpid, _) } = + try snd (Unix.waitpid [] unixpid) + with Unix.Unix_error _ -> Unix.WEXITED 0o400 + +end diff --git a/lib/spawn.mli b/lib/spawn.mli new file mode 100644 index 000000000..1554e0a1e --- /dev/null +++ b/lib/spawn.mli @@ -0,0 +1,88 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +(* This module implements spawning/killing managed processes with a + * synchronous or asynchronous comunication channel that works with + * threads or with a glib like main loop model. + * + * This module requires no threads and no main loop model. It takes care + * of using the fastest communication channel given the underlying OS and + * the requested kind of communication. + * + * The spawned process must use the Spawned module to init its communication + * channels. + *) + +(* This is the control panel for managed processes *) +module type Control = sig + type handle + + val kill : handle -> unit + val stats : handle -> Gc.stat + val wait : handle -> Unix.process_status + val unixpid : handle -> int + + (* What is used in debug messages *) + val uid : handle -> string + + (* Installs a callback, called every [sec] seconds. If the returned value + * is true the process is killed *) + val kill_if : handle -> sec:int -> (unit -> bool) -> unit +end + +(* Abstraction to work with both threads and main loop models *) +module type Timer = sig + + val add_timeout : sec:int -> (unit -> bool) -> unit +end + +module type MainLoopModel = sig + type async_chan + type condition + type watch_id + + val add_watch : callback:(condition list -> bool) -> async_chan -> watch_id + val remove_watch : watch_id -> unit + val read_all : async_chan -> string + val async_chan_of_file : Unix.file_descr -> async_chan + val async_chan_of_socket : Unix.file_descr -> async_chan + + include Timer +end + +(* spawn a process and read its output asynchronously *) +module Async(ML : MainLoopModel) : sig + type process + + (* If the returned value is false the callback is never called again and + * the process is killed *) + type callback = ML.condition list -> read_all:(unit -> string) -> bool + + val spawn : + ?prefer_sock:bool -> ?env:string array -> string -> string array -> + callback -> process * out_channel + + include Control with type handle = process +end + +(* spawn a process and read its output synchronously *) +module Sync(T : Timer) : sig + type process + + val spawn : + ?prefer_sock:bool -> ?env:string array -> string -> string array -> + process * in_channel * out_channel + + include Control with type handle = process +end + +(* This is exported to separate the Spawned module, that for simplicity assumes + * Threads so it is in a separate file *) +type req = ReqDie | ReqStats | Hello of int * int +val proto_version : int +type resp = RespStats of Gc.stat diff --git a/lib/spawned.ml b/lib/spawned.ml new file mode 100644 index 000000000..29cf51769 --- /dev/null +++ b/lib/spawned.ml @@ -0,0 +1,107 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +open Spawn + +let pr_err s = Printf.eprintf "(Spawned,%d) %s\n%!" (Unix.getpid ()) s +let prerr_endline s = if !Flags.debug then begin pr_err s end else () + +type chandescr = AnonPipe | Socket of string * int + +let handshake cin cout = + try + match input_value cin with + | Hello(v, pid) when v = proto_version -> + prerr_endline (Printf.sprintf "Handshake with %d OK" pid); + output_value cout (Hello (proto_version,Unix.getpid ())); flush cout + | _ -> raise (Failure "handshake protocol") + with + | Failure s | Invalid_argument s | Sys_error s -> + pr_err ("Handshake failed: " ^ s); raise (Failure "handshake") + | End_of_file -> + pr_err "Handshake failed: End_of_file"; raise (Failure "handshake") + +let open_bin_connection h p = + let open Unix in + let cin, cout = open_connection (ADDR_INET (inet_addr_of_string h,p)) in + set_binary_mode_in cin true; + set_binary_mode_out cout true; + handshake cin cout; + cin, cout + +let controller h p = + prerr_endline "starting controller thread"; + let main () = + let ic, oc = open_bin_connection h p in + let rec loop () = + try + match input_value ic with + | Hello _ -> prerr_endline "internal protocol error"; exit 1 + | ReqDie -> prerr_endline "death sentence received"; exit 0 + | ReqStats -> + output_value oc (RespStats (Gc.stat ())); flush oc; loop () + with + | e -> + prerr_endline ("control channel broken: " ^ Printexc.to_string e); + exit 1 in + loop () in + ignore(Thread.create main ()) + +let main_channel = ref None +let control_channel = ref None + +let channels = ref None + +let init_channels () = + if !channels <> None then Errors.anomaly(Pp.str "init_channels called twice"); + match !main_channel, !control_channel with + | None, None -> () + | None, Some _ | Some _, None -> + Errors.anomaly (Pp.str "incomplete channels options") + | _, Some AnonPipe -> + Errors.anomaly (Pp.str "control channel cannot be a pipe") + | Some (Socket(mh,mp)), Some (Socket(ch,cp)) -> + channels := Some (open_bin_connection mh mp); + controller ch cp + | Some AnonPipe, Some (Socket (ch,cp)) -> + let stdin = Unix.in_channel_of_descr (Unix.dup Unix.stdin) in + let stdout = Unix.out_channel_of_descr (Unix.dup Unix.stdout) in + Unix.dup2 Unix.stderr Unix.stdout; + set_binary_mode_in stdin true; + set_binary_mode_out stdout true; + channels := Some (stdin, stdout); + handshake stdin stdout; + controller ch cp + +let get_channels () = + match !channels with + | None -> Errors.anomaly(Pp.str "init_channels not called") + | Some(ic, oc) -> ic, oc + +let prepare_in_channel_for_thread_friendly_blocking_input ic = + if Sys.os_type = "Win32" then Unix.set_nonblock (Unix.descr_of_in_channel ic) + else () + +let thread_friendly_blocking_input ic = + if Sys.os_type = "Win32" then + let open Unix in + let open Thread in + let fd = descr_of_in_channel ic in + let rec loop buf n = + try read fd buf 0 n + with + | Unix.Unix_error((Unix.EWOULDBLOCK|Unix.EAGAIN),_,_) -> + (* We wait for some data explicitly yielding each second *) + while not (wait_timed_read fd 1.0) do yield () done; + loop buf n + | Unix.Unix_error _ -> 0 + in + loop + else + (fun buf n -> Pervasives.input ic buf 0 n) + diff --git a/lib/spawned.mli b/lib/spawned.mli new file mode 100644 index 000000000..bc5649a2f --- /dev/null +++ b/lib/spawned.mli @@ -0,0 +1,26 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +(* To link this file, threads are needed *) + +type chandescr = AnonPipe | Socket of string * int + +(* Argument parsing should set these *) +val main_channel : chandescr option ref +val control_channel : chandescr option ref + +(* Immediately after argument parsing one *must* call this *) +val init_channels : unit -> unit + +(* Once initialized, these are the channels to talk with our master *) +val get_channels : unit -> in_channel * out_channel + +(* In multi threaded environments on windows blocking calls do + prevent context switch. This seems a huge BUG, here a work around *) +val prepare_in_channel_for_thread_friendly_blocking_input : in_channel -> unit +val thread_friendly_blocking_input : in_channel -> string -> int -> int |