diff options
Diffstat (limited to 'toplevel')
-rw-r--r-- | toplevel/stm.ml | 157 |
1 files changed, 69 insertions, 88 deletions
diff --git a/toplevel/stm.ml b/toplevel/stm.ml index a0c5bcf2d..ce1d6b1b9 100644 --- a/toplevel/stm.ml +++ b/toplevel/stm.ml @@ -8,14 +8,13 @@ let process_id () = match !Flags.async_proofs_mode with - | Flags.APoff | Flags.APonLazy | Flags.APonParallel 0 -> "master" + | Flags.APoff | Flags.APonLazy | Flags.APonParallel 0 -> + "master" ^ string_of_int (Thread.id (Thread.self ())) | Flags.APonParallel n -> "worker" ^ string_of_int n -let prerr_endline s = - if !Flags.debug then begin - prerr_endline (Printf.sprintf "%s] %s" (process_id ()) s); - flush stderr - end else () +let pr_err s = Printf.eprintf "%s] %s\n" (process_id ()) s; flush stderr + +let prerr_endline s = if !Flags.debug then begin pr_err s end else () open Vernacexpr open Errors @@ -435,7 +434,8 @@ end = struct (* {{{ *) let new_vcs, erased_nodes = gc old_vcs in Vcs_.NodeSet.iter (fun id -> match (Vcs_aux.visit old_vcs id).step with - | `Qed ({ fproof = Some (_, cancel_switch) }, _) -> cancel_switch := true + | `Qed ({ fproof = Some (_, cancel_switch) }, _) -> + cancel_switch := true | _ -> ()) erased_nodes; vcs := new_vcs @@ -583,6 +583,16 @@ let get_hint_ctx loc = let ids = List.map (fun id -> Loc.ghost, id) ids in SsExpr (SsSet ids) + module Worker = Spawn.Sync(struct + let add_timeout ~sec f = + ignore(Thread.create (fun () -> + while true do + Unix.sleep sec; + if not (f ()) then Thread.exit () + done) + ()) + end) + (* Slave processes (if initialized, otherwise local lazy evaluation) *) module Slaves : sig @@ -666,7 +676,7 @@ end = struct (* {{{ *) module SlavesPool : sig - val init : int -> ((unit -> in_channel * out_channel * int * int) -> unit) -> unit + val init : int -> ((unit -> in_channel * out_channel * Worker.process * int) -> unit) -> unit val is_empty : unit -> bool val n_slaves : unit -> int @@ -679,21 +689,19 @@ end = struct (* {{{ *) | Some managers -> Array.length managers let is_empty () = !slave_managers = None - let master_handshake ic oc = + let master_handshake worker_id ic oc = try Marshal.to_channel oc 17 []; flush oc; let n = (Marshal.from_channel ic : int) in assert(n = 17); - prerr_endline "Handshake OK" + prerr_endline (Printf.sprintf "Handshake with %d OK" worker_id) with e -> - prerr_endline ("Handshake failed: "^Printexc.to_string e); + prerr_endline + (Printf.sprintf "Handshake with %d failed: %s" + worker_id (Printexc.to_string e)); exit 1 let respawn n () = - let c2s_r, c2s_w = Unix.pipe () in - let s2c_r, s2c_w = Unix.pipe () in - Unix.set_close_on_exec c2s_w; - Unix.set_close_on_exec s2c_r; let prog = Sys.argv.(0) in let rec set_slave_opt = function | [] -> ["-async-proofs"; "worker"; string_of_int n] @@ -702,19 +710,13 @@ end = struct (* {{{ *) |"-compile" |"-compile-verbose")::_::tl -> set_slave_opt tl | x::tl -> x :: set_slave_opt tl in - let args = Array.of_list (set_slave_opt (Array.to_list Sys.argv)) in - prerr_endline ("EXEC: " ^ prog ^ " " ^ - (String.concat " " (Array.to_list args))); - let env = Array.append !async_proofs_workers_extra_env (Unix.environment ()) in - let pid = Unix.create_process_env prog args env c2s_r s2c_w Unix.stderr in - Unix.close c2s_r; - Unix.close s2c_w; - let s2c_r = Unix.in_channel_of_descr s2c_r in - let c2s_w = Unix.out_channel_of_descr c2s_w in - set_binary_mode_out c2s_w true; - set_binary_mode_in s2c_r true; - master_handshake s2c_r c2s_w; - s2c_r, c2s_w, pid, n + let args = + Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in + let env = + Array.append !async_proofs_workers_extra_env (Unix.environment ()) in + let proc, ic, oc = Worker.spawn ~env prog args in + master_handshake n ic oc; + ic, oc, proc, n let init n manage_slave = slave_managers := Some @@ -737,7 +739,6 @@ end = struct (* {{{ *) | RespFeedback of Interface.feedback | RespGetCounterFreshLocalUniv | RespGetCounterNewUnivLevel - | RespTick let pr_response = function | RespBuiltProof _ -> "Sucess" | RespError _ -> "Error" @@ -746,7 +747,6 @@ end = struct (* {{{ *) | RespFeedback _ -> assert false | RespGetCounterFreshLocalUniv -> "GetCounterFreshLocalUniv" | RespGetCounterNewUnivLevel -> "GetCounterNewUnivLevel" - | RespTick -> "Tick" type more_data = | MoreDataLocalUniv of Univ.universe list @@ -841,17 +841,10 @@ end = struct (* {{{ *) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_request: "^s) - (* Since cancelling is still cooperative, the slave runs a thread that - periodically sends a RespTick message on the same channel used by the - main slave thread to send back feedbacks and responses. We need mutual - exclusion. *) - let marshal_response = - let m = Mutex.create () in - fun oc (res : response) -> - Mutex.lock m; - try marshal_to_channel oc res; Mutex.unlock m - with Failure s | Invalid_argument s | Sys_error s -> - Mutex.unlock m; marshal_err ("marshal_response: "^s) + let marshal_response oc (res : response) = + try marshal_to_channel oc res + with Failure s | Invalid_argument s | Sys_error s -> + marshal_err ("marshal_response: "^s) let unmarshal_response ic = try (Marshal.from_channel ic : response) @@ -912,12 +905,9 @@ end = struct (* {{{ *) Pp.feedback ~state_id:Stateid.initial (Interface.SlaveStatus(id, s)) let rec manage_slave respawn = - let ic, oc, pid, id_slave = respawn () in - let kill_pid = - ref (fun () -> try Unix.kill pid 9 - with Unix.Unix_error _ | Invalid_argument _ -> ()) in - at_exit (fun () -> !kill_pid ()); + let ic, oc, proc, id_slave = respawn () in let last_task = ref None in + let cancelled = ref false in try while true do prerr_endline "waiting for a task"; @@ -925,9 +915,11 @@ end = struct (* {{{ *) let task = TQueue.pop queue in prerr_endline ("got task: "^pr_task task); last_task := Some task; - let cancel_switch = cancel_switch_of_task task in try marshal_request oc (request_of_task task); + let cancel_switch = cancel_switch_of_task task in + Worker.kill_if proc ~sec:1 (fun () -> + cancelled := !cancel_switch; !cancelled); let rec loop () = let response = unmarshal_response ic in match task, response with @@ -949,15 +941,12 @@ end = struct (* {{{ *) (CList.init 10 (fun _ -> Univ.fresh_local_univ ()))); if !cancel_switch then raise KillRespawn else loop () | _, RespGetCounterNewUnivLevel -> - prerr_endline "-> MoreDataUnivLevel"; marshal_more_data oc (MoreDataUnivLevel (CList.init 10 (fun _ -> Termops.new_univ_level ()))); - if !cancel_switch then raise KillRespawn else loop () + loop () | _, RespFeedback {id = State state_id; content = msg} -> Pp.feedback ~state_id msg; - if !cancel_switch then raise KillRespawn else loop () - | _, RespTick -> - if !cancel_switch then raise KillRespawn else loop () + loop () | _, RespFeedback _ -> assert false (* Parsing in master process *) in loop () @@ -965,6 +954,8 @@ end = struct (* {{{ *) | VCS.Expired -> (* task cancelled: e.g. the user did backtrack *) Pp.feedback (Interface.InProgress ~-1); prerr_endline ("Task expired: " ^ pr_task task) + | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e -> + raise e (* we pass the exception to the external handler *) | MarshalError s when !fallback_to_lazy_if_marshal_error -> msg_warning(strbrk("Marshalling error: "^s^". "^ "The system state could not be sent to the worker process. "^ @@ -972,10 +963,8 @@ end = struct (* {{{ *) let TaskBuildProof (exn_info, _, stop, assign,_,loc,_) = task in assign(`Comp(build_proof_here exn_info loc stop)); Pp.feedback (Interface.InProgress ~-1) - | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e -> - raise e (* we pass the exception to the external handler *) | MarshalError s -> - Printf.eprintf "Fatal marshal error: %s\n" s; + pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 1 | e -> prerr_endline ("Uncaught exception in worker manager: "^ @@ -985,15 +974,15 @@ end = struct (* {{{ *) with | KillRespawn -> Pp.feedback (Interface.InProgress ~-1); - !kill_pid (); (* FIXME: This does not work on Windows *) - kill_pid := (fun () -> ()); - close_in ic; - close_out oc; - ignore(Unix.waitpid [] pid); + Worker.kill proc; + ignore(Worker.wait proc); + manage_slave respawn + | Sys_error _ | Invalid_argument _ | End_of_file when !cancelled -> + Pp.feedback (Interface.InProgress ~-1); + ignore(Worker.wait proc); manage_slave respawn | Sys_error _ | Invalid_argument _ | End_of_file when !fallback_to_lazy_if_slave_dies -> - kill_pid := (fun () -> ()); msg_warning(strbrk "The worker process died badly."); (match !last_task with | Some task -> @@ -1002,18 +991,18 @@ end = struct (* {{{ *) assign(`Comp(build_proof_here exn_info loc stop)); Pp.feedback (Interface.InProgress ~-1); | None -> ()); - close_in ic; - close_out oc; - ignore(Unix.waitpid [] pid); + Worker.kill proc; + ignore(Worker.wait proc); manage_slave respawn | Sys_error _ | Invalid_argument _ | End_of_file -> - let exit_status pid = match Unix.waitpid [] pid with - | _, Unix.WEXITED i -> Printf.sprintf "exit(%d)" i - | _, Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno - | _, Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in - Printf.eprintf "Fatal worker error: %s\n" (exit_status pid); + Worker.kill proc; + let exit_status proc = match Worker.wait proc with + | Unix.WEXITED 0x400 -> "exit code unavailable" + | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i + | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno + | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in + pr_err ("Fatal worker error: " ^ (exit_status proc)); flush_all (); exit 1 - let init () = SlavesPool.init !Flags.async_proofs_n_workers manage_slave @@ -1021,11 +1010,8 @@ end = struct (* {{{ *) let slave_oc = ref stdout let slave_init_stdout () = - slave_oc := Unix.out_channel_of_descr (Unix.dup Unix.stdout); - slave_ic := Unix.in_channel_of_descr (Unix.dup Unix.stdin); - set_binary_mode_out !slave_oc true; - set_binary_mode_in !slave_ic true; - Unix.dup2 Unix.stderr Unix.stdout + let ic, oc = Spawned.get_channels () in + slave_oc := oc; slave_ic := ic let bufferize f = let l = ref [] in @@ -1038,7 +1024,7 @@ end = struct (* {{{ *) try let v = (Marshal.from_channel !slave_ic : int) in assert(v = 17); - Marshal.to_channel !slave_oc v []; flush !slave_oc; + Marshal.to_channel !slave_oc v []; flush !slave_oc; prerr_endline "Handshake OK" with e -> prerr_endline ("Handshake failed: " ^ Printexc.to_string e); @@ -1058,11 +1044,6 @@ end = struct (* {{{ *) match unmarshal_more_data !slave_ic with | MoreDataLocalUniv l -> l | _ -> assert false)); let working = ref false in - let _tick = Thread.create (fun n -> - while true do - Unix.sleep n; - if !working then marshal_response !slave_oc RespTick - done) 1 in slave_handshake (); while true do try @@ -1076,20 +1057,20 @@ end = struct (* {{{ *) reset () with | MarshalError s -> - Printf.eprintf "Slave: fatal marshal error: %s\n" s; - flush_all (); exit 2 + pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 2 + | End_of_file -> + prerr_endline "connection lost"; flush_all (); exit 2 | e when Errors.noncritical e -> (* This can happen if the proof is broken. The error has also been * signalled as a feedback, hence we can silently recover *) let err_id, safe_id = match Stateid.get e with | Some (safe, err) -> err, safe | None -> Stateid.dummy, Stateid.dummy in - marshal_response !slave_oc (RespError (err_id, safe_id, print e)); - prerr_endline "Slave: failed with the following exception:"; - prerr_endline (string_of_ppcmds (print e)) + prerr_endline "failed with the following exception:"; + prerr_endline (string_of_ppcmds (print e)); + marshal_response !slave_oc (RespError (err_id, safe_id, print e)) | e -> - Printf.eprintf "Slave: critical exception: %s\n" - (Pp.string_of_ppcmds (print e)); + pr_err ("Slave: critical exception: " ^ Pp.string_of_ppcmds (print e)); flush_all (); exit 1 done |