aboutsummaryrefslogtreecommitdiffhomepage
path: root/toplevel
diff options
context:
space:
mode:
authorGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-01-22 10:33:47 +0100
committerGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-01-26 14:20:52 +0100
commit7def8d76bf50dc14d198052cae56de567c349640 (patch)
treef85339f7ec061d83eea4cd78a9e4b2ae579fcf5f /toplevel
parentb0da879dc6abfca6b4e233b7469265a5cf52ce15 (diff)
STM: ported to spawn
Diffstat (limited to 'toplevel')
-rw-r--r--toplevel/stm.ml157
1 files changed, 69 insertions, 88 deletions
diff --git a/toplevel/stm.ml b/toplevel/stm.ml
index a0c5bcf2d..ce1d6b1b9 100644
--- a/toplevel/stm.ml
+++ b/toplevel/stm.ml
@@ -8,14 +8,13 @@
let process_id () =
match !Flags.async_proofs_mode with
- | Flags.APoff | Flags.APonLazy | Flags.APonParallel 0 -> "master"
+ | Flags.APoff | Flags.APonLazy | Flags.APonParallel 0 ->
+ "master" ^ string_of_int (Thread.id (Thread.self ()))
| Flags.APonParallel n -> "worker" ^ string_of_int n
-let prerr_endline s =
- if !Flags.debug then begin
- prerr_endline (Printf.sprintf "%s] %s" (process_id ()) s);
- flush stderr
- end else ()
+let pr_err s = Printf.eprintf "%s] %s\n" (process_id ()) s; flush stderr
+
+let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
open Vernacexpr
open Errors
@@ -435,7 +434,8 @@ end = struct (* {{{ *)
let new_vcs, erased_nodes = gc old_vcs in
Vcs_.NodeSet.iter (fun id ->
match (Vcs_aux.visit old_vcs id).step with
- | `Qed ({ fproof = Some (_, cancel_switch) }, _) -> cancel_switch := true
+ | `Qed ({ fproof = Some (_, cancel_switch) }, _) ->
+ cancel_switch := true
| _ -> ())
erased_nodes;
vcs := new_vcs
@@ -583,6 +583,16 @@ let get_hint_ctx loc =
let ids = List.map (fun id -> Loc.ghost, id) ids in
SsExpr (SsSet ids)
+ module Worker = Spawn.Sync(struct
+ let add_timeout ~sec f =
+ ignore(Thread.create (fun () ->
+ while true do
+ Unix.sleep sec;
+ if not (f ()) then Thread.exit ()
+ done)
+ ())
+ end)
+
(* Slave processes (if initialized, otherwise local lazy evaluation) *)
module Slaves : sig
@@ -666,7 +676,7 @@ end = struct (* {{{ *)
module SlavesPool : sig
- val init : int -> ((unit -> in_channel * out_channel * int * int) -> unit) -> unit
+ val init : int -> ((unit -> in_channel * out_channel * Worker.process * int) -> unit) -> unit
val is_empty : unit -> bool
val n_slaves : unit -> int
@@ -679,21 +689,19 @@ end = struct (* {{{ *)
| Some managers -> Array.length managers
let is_empty () = !slave_managers = None
- let master_handshake ic oc =
+ let master_handshake worker_id ic oc =
try
Marshal.to_channel oc 17 []; flush oc;
let n = (Marshal.from_channel ic : int) in
assert(n = 17);
- prerr_endline "Handshake OK"
+ prerr_endline (Printf.sprintf "Handshake with %d OK" worker_id)
with e ->
- prerr_endline ("Handshake failed: "^Printexc.to_string e);
+ prerr_endline
+ (Printf.sprintf "Handshake with %d failed: %s"
+ worker_id (Printexc.to_string e));
exit 1
let respawn n () =
- let c2s_r, c2s_w = Unix.pipe () in
- let s2c_r, s2c_w = Unix.pipe () in
- Unix.set_close_on_exec c2s_w;
- Unix.set_close_on_exec s2c_r;
let prog = Sys.argv.(0) in
let rec set_slave_opt = function
| [] -> ["-async-proofs"; "worker"; string_of_int n]
@@ -702,19 +710,13 @@ end = struct (* {{{ *)
|"-compile"
|"-compile-verbose")::_::tl -> set_slave_opt tl
| x::tl -> x :: set_slave_opt tl in
- let args = Array.of_list (set_slave_opt (Array.to_list Sys.argv)) in
- prerr_endline ("EXEC: " ^ prog ^ " " ^
- (String.concat " " (Array.to_list args)));
- let env = Array.append !async_proofs_workers_extra_env (Unix.environment ()) in
- let pid = Unix.create_process_env prog args env c2s_r s2c_w Unix.stderr in
- Unix.close c2s_r;
- Unix.close s2c_w;
- let s2c_r = Unix.in_channel_of_descr s2c_r in
- let c2s_w = Unix.out_channel_of_descr c2s_w in
- set_binary_mode_out c2s_w true;
- set_binary_mode_in s2c_r true;
- master_handshake s2c_r c2s_w;
- s2c_r, c2s_w, pid, n
+ let args =
+ Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
+ let env =
+ Array.append !async_proofs_workers_extra_env (Unix.environment ()) in
+ let proc, ic, oc = Worker.spawn ~env prog args in
+ master_handshake n ic oc;
+ ic, oc, proc, n
let init n manage_slave =
slave_managers := Some
@@ -737,7 +739,6 @@ end = struct (* {{{ *)
| RespFeedback of Interface.feedback
| RespGetCounterFreshLocalUniv
| RespGetCounterNewUnivLevel
- | RespTick
let pr_response = function
| RespBuiltProof _ -> "Sucess"
| RespError _ -> "Error"
@@ -746,7 +747,6 @@ end = struct (* {{{ *)
| RespFeedback _ -> assert false
| RespGetCounterFreshLocalUniv -> "GetCounterFreshLocalUniv"
| RespGetCounterNewUnivLevel -> "GetCounterNewUnivLevel"
- | RespTick -> "Tick"
type more_data =
| MoreDataLocalUniv of Univ.universe list
@@ -841,17 +841,10 @@ end = struct (* {{{ *)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_request: "^s)
- (* Since cancelling is still cooperative, the slave runs a thread that
- periodically sends a RespTick message on the same channel used by the
- main slave thread to send back feedbacks and responses. We need mutual
- exclusion. *)
- let marshal_response =
- let m = Mutex.create () in
- fun oc (res : response) ->
- Mutex.lock m;
- try marshal_to_channel oc res; Mutex.unlock m
- with Failure s | Invalid_argument s | Sys_error s ->
- Mutex.unlock m; marshal_err ("marshal_response: "^s)
+ let marshal_response oc (res : response) =
+ try marshal_to_channel oc res
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("marshal_response: "^s)
let unmarshal_response ic =
try (Marshal.from_channel ic : response)
@@ -912,12 +905,9 @@ end = struct (* {{{ *)
Pp.feedback ~state_id:Stateid.initial (Interface.SlaveStatus(id, s))
let rec manage_slave respawn =
- let ic, oc, pid, id_slave = respawn () in
- let kill_pid =
- ref (fun () -> try Unix.kill pid 9
- with Unix.Unix_error _ | Invalid_argument _ -> ()) in
- at_exit (fun () -> !kill_pid ());
+ let ic, oc, proc, id_slave = respawn () in
let last_task = ref None in
+ let cancelled = ref false in
try
while true do
prerr_endline "waiting for a task";
@@ -925,9 +915,11 @@ end = struct (* {{{ *)
let task = TQueue.pop queue in
prerr_endline ("got task: "^pr_task task);
last_task := Some task;
- let cancel_switch = cancel_switch_of_task task in
try
marshal_request oc (request_of_task task);
+ let cancel_switch = cancel_switch_of_task task in
+ Worker.kill_if proc ~sec:1 (fun () ->
+ cancelled := !cancel_switch; !cancelled);
let rec loop () =
let response = unmarshal_response ic in
match task, response with
@@ -949,15 +941,12 @@ end = struct (* {{{ *)
(CList.init 10 (fun _ -> Univ.fresh_local_univ ())));
if !cancel_switch then raise KillRespawn else loop ()
| _, RespGetCounterNewUnivLevel ->
- prerr_endline "-> MoreDataUnivLevel";
marshal_more_data oc (MoreDataUnivLevel
(CList.init 10 (fun _ -> Termops.new_univ_level ())));
- if !cancel_switch then raise KillRespawn else loop ()
+ loop ()
| _, RespFeedback {id = State state_id; content = msg} ->
Pp.feedback ~state_id msg;
- if !cancel_switch then raise KillRespawn else loop ()
- | _, RespTick ->
- if !cancel_switch then raise KillRespawn else loop ()
+ loop ()
| _, RespFeedback _ -> assert false (* Parsing in master process *)
in
loop ()
@@ -965,6 +954,8 @@ end = struct (* {{{ *)
| VCS.Expired -> (* task cancelled: e.g. the user did backtrack *)
Pp.feedback (Interface.InProgress ~-1);
prerr_endline ("Task expired: " ^ pr_task task)
+ | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e ->
+ raise e (* we pass the exception to the external handler *)
| MarshalError s when !fallback_to_lazy_if_marshal_error ->
msg_warning(strbrk("Marshalling error: "^s^". "^
"The system state could not be sent to the worker process. "^
@@ -972,10 +963,8 @@ end = struct (* {{{ *)
let TaskBuildProof (exn_info, _, stop, assign,_,loc,_) = task in
assign(`Comp(build_proof_here exn_info loc stop));
Pp.feedback (Interface.InProgress ~-1)
- | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e ->
- raise e (* we pass the exception to the external handler *)
| MarshalError s ->
- Printf.eprintf "Fatal marshal error: %s\n" s;
+ pr_err ("Fatal marshal error: " ^ s);
flush_all (); exit 1
| e ->
prerr_endline ("Uncaught exception in worker manager: "^
@@ -985,15 +974,15 @@ end = struct (* {{{ *)
with
| KillRespawn ->
Pp.feedback (Interface.InProgress ~-1);
- !kill_pid (); (* FIXME: This does not work on Windows *)
- kill_pid := (fun () -> ());
- close_in ic;
- close_out oc;
- ignore(Unix.waitpid [] pid);
+ Worker.kill proc;
+ ignore(Worker.wait proc);
+ manage_slave respawn
+ | Sys_error _ | Invalid_argument _ | End_of_file when !cancelled ->
+ Pp.feedback (Interface.InProgress ~-1);
+ ignore(Worker.wait proc);
manage_slave respawn
| Sys_error _ | Invalid_argument _ | End_of_file
when !fallback_to_lazy_if_slave_dies ->
- kill_pid := (fun () -> ());
msg_warning(strbrk "The worker process died badly.");
(match !last_task with
| Some task ->
@@ -1002,18 +991,18 @@ end = struct (* {{{ *)
assign(`Comp(build_proof_here exn_info loc stop));
Pp.feedback (Interface.InProgress ~-1);
| None -> ());
- close_in ic;
- close_out oc;
- ignore(Unix.waitpid [] pid);
+ Worker.kill proc;
+ ignore(Worker.wait proc);
manage_slave respawn
| Sys_error _ | Invalid_argument _ | End_of_file ->
- let exit_status pid = match Unix.waitpid [] pid with
- | _, Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
- | _, Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
- | _, Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in
- Printf.eprintf "Fatal worker error: %s\n" (exit_status pid);
+ Worker.kill proc;
+ let exit_status proc = match Worker.wait proc with
+ | Unix.WEXITED 0x400 -> "exit code unavailable"
+ | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
+ | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
+ | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in
+ pr_err ("Fatal worker error: " ^ (exit_status proc));
flush_all (); exit 1
-
let init () = SlavesPool.init !Flags.async_proofs_n_workers manage_slave
@@ -1021,11 +1010,8 @@ end = struct (* {{{ *)
let slave_oc = ref stdout
let slave_init_stdout () =
- slave_oc := Unix.out_channel_of_descr (Unix.dup Unix.stdout);
- slave_ic := Unix.in_channel_of_descr (Unix.dup Unix.stdin);
- set_binary_mode_out !slave_oc true;
- set_binary_mode_in !slave_ic true;
- Unix.dup2 Unix.stderr Unix.stdout
+ let ic, oc = Spawned.get_channels () in
+ slave_oc := oc; slave_ic := ic
let bufferize f =
let l = ref [] in
@@ -1038,7 +1024,7 @@ end = struct (* {{{ *)
try
let v = (Marshal.from_channel !slave_ic : int) in
assert(v = 17);
- Marshal.to_channel !slave_oc v []; flush !slave_oc;
+ Marshal.to_channel !slave_oc v []; flush !slave_oc;
prerr_endline "Handshake OK"
with e ->
prerr_endline ("Handshake failed: " ^ Printexc.to_string e);
@@ -1058,11 +1044,6 @@ end = struct (* {{{ *)
match unmarshal_more_data !slave_ic with
| MoreDataLocalUniv l -> l | _ -> assert false));
let working = ref false in
- let _tick = Thread.create (fun n ->
- while true do
- Unix.sleep n;
- if !working then marshal_response !slave_oc RespTick
- done) 1 in
slave_handshake ();
while true do
try
@@ -1076,20 +1057,20 @@ end = struct (* {{{ *)
reset ()
with
| MarshalError s ->
- Printf.eprintf "Slave: fatal marshal error: %s\n" s;
- flush_all (); exit 2
+ pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 2
+ | End_of_file ->
+ prerr_endline "connection lost"; flush_all (); exit 2
| e when Errors.noncritical e ->
(* This can happen if the proof is broken. The error has also been
* signalled as a feedback, hence we can silently recover *)
let err_id, safe_id = match Stateid.get e with
| Some (safe, err) -> err, safe
| None -> Stateid.dummy, Stateid.dummy in
- marshal_response !slave_oc (RespError (err_id, safe_id, print e));
- prerr_endline "Slave: failed with the following exception:";
- prerr_endline (string_of_ppcmds (print e))
+ prerr_endline "failed with the following exception:";
+ prerr_endline (string_of_ppcmds (print e));
+ marshal_response !slave_oc (RespError (err_id, safe_id, print e))
| e ->
- Printf.eprintf "Slave: critical exception: %s\n"
- (Pp.string_of_ppcmds (print e));
+ pr_err ("Slave: critical exception: " ^ Pp.string_of_ppcmds (print e));
flush_all (); exit 1
done