diff options
author | Enrico Tassi <Enrico.Tassi@inria.fr> | 2014-07-23 11:54:36 +0200 |
---|---|---|
committer | Enrico Tassi <Enrico.Tassi@inria.fr> | 2014-08-05 18:24:50 +0200 |
commit | 4e724634839726aa11534f16e4bfb95cd81232a4 (patch) | |
tree | 2114ba0a78c4df764d78ad260e30f5fa6854df95 /stm | |
parent | 95e97b68744eeb8bf20811c3938d78912eb3e918 (diff) |
STM: code restructured to reuse task queue for tactics
Diffstat (limited to 'stm')
-rw-r--r-- | stm/asyncTaskQueue.ml | 296 | ||||
-rw-r--r-- | stm/asyncTaskQueue.mli | 61 | ||||
-rw-r--r-- | stm/stm.ml | 678 | ||||
-rw-r--r-- | stm/stm.mli | 2 | ||||
-rw-r--r-- | stm/stm.mllib | 1 | ||||
-rw-r--r-- | stm/workerPool.ml | 17 | ||||
-rw-r--r-- | stm/workerPool.mli | 5 |
7 files changed, 612 insertions, 448 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml new file mode 100644 index 000000000..65219724e --- /dev/null +++ b/stm/asyncTaskQueue.ml @@ -0,0 +1,296 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2014 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +open Errors +open Pp +open Util + +let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr + +let prerr_endline s = if !Flags.debug then begin pr_err s end else () + +module type Task = sig + + type task + + (* Marshallable *) + type request + type response + + val name : string (* UID of the task kind, for -toploop *) + val extra_env : unit -> string array + + (* run by the master, on a thread *) + val request_of_task : task -> request option + val use_response : task -> response -> [ `Die | `Stay | `StayReset ] + val on_marshal_error : string -> task -> unit + val on_slave_death : task option -> [ `Exit of int | `Stay ] + val on_task_cancellation_or_expiration : task option -> unit + val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit + + (* run by the worker *) + val perform : request -> response + + (* debugging *) + val name_of_task : task -> string + val name_of_request : request -> string + +end + +type cancel_switch = bool ref + +module Make(T : Task) = struct + + module Worker = Spawn.Sync(struct + let add_timeout ~sec f = + ignore(Thread.create (fun () -> + while true do + Unix.sleep sec; + if not (f ()) then Thread.exit () + done) + ()) + end) + + module WorkersPool = WorkerPool.Make(Worker) + + let queue : (T.task * cancel_switch) TQueue.t = TQueue.create () + + let enqueue_task t c = + prerr_endline ("Enqueue task "^T.name_of_task t); + TQueue.push queue (t,c) + + let cancel_worker = WorkersPool.cancel + + type request = Request of T.request + + let name_of_request (Request r) = T.name_of_request r + + type response = + | Response of T.response + | RespFeedback of Feedback.feedback + | RespGetCounterNewUnivLevel + + type more_data = + | MoreDataUnivLevel of Univ.universe_level list + + let request_cswitch_of_task (t, c) = T.request_of_task t, c + + let slave_respond msg = match msg with Request r -> Response (T.perform r) + + exception MarshalError of string + + let marshal_to_channel oc data = + Marshal.to_channel oc data []; + flush oc + + let marshal_err s = raise (MarshalError s) + + let marshal_request oc (req : request) = + try marshal_to_channel oc req + with Failure s | Invalid_argument s | Sys_error s -> + marshal_err ("marshal_request: "^s) + + let unmarshal_request ic = + try (Marshal.from_channel ic : request) + with Failure s | Invalid_argument s | Sys_error s -> + marshal_err ("unmarshal_request: "^s) + + let marshal_response oc (res : response) = + try marshal_to_channel oc res + with Failure s | Invalid_argument s | Sys_error s -> + marshal_err ("marshal_response: "^s) + + let unmarshal_response ic = + try (CThread.thread_friendly_input_value ic : response) + with Failure s | Invalid_argument s | Sys_error s -> + marshal_err ("unmarshal_response: "^s) + + let marshal_more_data oc (res : more_data) = + try marshal_to_channel oc res + with Failure s | Invalid_argument s | Sys_error s -> + marshal_err ("marshal_more_data: "^s) + + let unmarshal_more_data ic = + try (Marshal.from_channel ic : more_data) + with Failure s | Invalid_argument s | Sys_error s -> + marshal_err ("unmarshal_more_data: "^s) + + let reorder_tasks cmp = TQueue.reorder queue (fun (t1,_) (t2,_) -> cmp t1 t2) + + let join () = + if not (WorkersPool.is_empty ()) then + TQueue.wait_until_n_are_waiting_and_queue_empty + (WorkersPool.n_workers ()) queue + + exception KillRespawn + exception Die + exception Expired + + let report_status ?(id = !Flags.async_proofs_worker_id) s = + Pp.feedback ~state_id:Stateid.initial (Feedback.SlaveStatus(id, s)) + + let rec manage_slave ~cancel:cancel_user_req id respawn = + let ic, oc, proc = + let rec set_slave_opt = function + | [] -> !Flags.async_proofs_flags_for_workers @ + ["-toploop"; T.name^"top"; + "-worker-id"; id] + | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl + | ("-async-proofs" |"-toploop" |"-vi2vo" |"-compile" + |"-compile-verbose")::_::tl -> set_slave_opt tl + | x::tl -> x :: set_slave_opt tl in + let args = + Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in + let env = Array.append (T.extra_env ()) (Unix.environment ()) in + respawn ~args ~env () in + let last_task = ref None in + let task_expired = ref false in + let task_cancelled = ref false in + CThread.prepare_in_channel_for_thread_friendly_io ic; + try + while true do + prerr_endline "waiting for a task"; + report_status ~id "Idle"; + let task, cancel_switch = TQueue.pop queue in + prerr_endline ("got task: "^T.name_of_task task); + last_task := Some task; + try + let req = T.request_of_task task in + if req = None then raise Expired; + marshal_request oc (Request (Option.get req)); + Worker.kill_if proc ~sec:1 (fun () -> + task_expired := !cancel_switch; + task_cancelled := !cancel_user_req; + if !cancel_user_req then cancel_user_req := false; + !task_expired || !task_cancelled); + let rec loop () = + let response = unmarshal_response ic in + match response with + | Response resp -> + (match T.use_response task resp with + | `Die -> raise Die + | `Stay -> last_task := None; () + | `StayReset -> last_task := None; raise KillRespawn) + | RespGetCounterNewUnivLevel -> + marshal_more_data oc (MoreDataUnivLevel + (CList.init 10 (fun _ -> + Universes.new_univ_level (Global.current_dirpath ())))); + loop () + | RespFeedback ({ Feedback.id = Feedback.State state_id } as fbk) -> + T.forward_feedback state_id fbk.Feedback.content; + loop () + | RespFeedback _ -> Errors.anomaly (str"Parsing in master process") + in + loop () + with + | Expired -> prerr_endline ("Task expired: " ^ T.name_of_task task) + | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e -> + raise e (* we pass the exception to the external handler *) + | MarshalError s -> T.on_marshal_error s task + | e -> + pr_err ("Uncaught exception in worker manager: "^ + string_of_ppcmds (print e)); + flush_all () + done + with + | KillRespawn -> + Worker.kill proc; ignore(Worker.wait proc); + manage_slave ~cancel:cancel_user_req id respawn + | Die -> Worker.kill proc; ignore(Worker.wait proc) + | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired -> + T.on_task_cancellation_or_expiration !last_task; + ignore(Worker.wait proc); + manage_slave ~cancel:cancel_user_req id respawn + | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled -> + msg_warning(strbrk "The worker was cancelled."); + T.on_task_cancellation_or_expiration !last_task; + Worker.kill proc; ignore(Worker.wait proc); + manage_slave ~cancel:cancel_user_req id respawn + | Sys_error _ | Invalid_argument _ | End_of_file -> + match T.on_slave_death !last_task with + | `Stay -> + msg_warning(strbrk "The worker process died badly."); + Worker.kill proc; ignore(Worker.wait proc); + manage_slave ~cancel:cancel_user_req id respawn + | `Exit exit_code -> + Worker.kill proc; + let exit_status proc = match Worker.wait proc with + | Unix.WEXITED 0x400 -> "exit code unavailable" + | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i + | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno + | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in + pr_err ("Fatal worker error: " ^ (exit_status proc)); + flush_all (); exit exit_code + + let slave_ic = ref stdin + let slave_oc = ref stdout + + let slave_init_stdout () = + let ic, oc = Spawned.get_channels () in + slave_oc := oc; slave_ic := ic + + let bufferize f = + let l = ref [] in + fun () -> + match !l with + | [] -> let data = f () in l := List.tl data; List.hd data + | x::tl -> l := tl; x + + let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc + + let slave_main_loop reset = + let feedback_queue = ref [] in + let slave_feeder oc fb = + match fb.Feedback.content with + | Feedback.Goals _ -> feedback_queue := fb :: !feedback_queue + | _ -> Marshal.to_channel oc (RespFeedback fb) []; flush oc in + let flush_feeder oc = + List.iter (fun fb -> Marshal.to_channel oc (RespFeedback fb) []) + !feedback_queue; + feedback_queue := []; + flush oc in + Pp.set_feeder (slave_feeder !slave_oc); + Pp.log_via_feedback (); + Universes.set_remote_new_univ_level (bufferize (fun () -> + marshal_response !slave_oc RespGetCounterNewUnivLevel; + match unmarshal_more_data !slave_ic with + | MoreDataUnivLevel l -> l)); + let working = ref false in + slave_handshake (); + while true do + try + working := false; + let request = unmarshal_request !slave_ic in + working := true; + report_status (name_of_request request); + let response = slave_respond request in + flush_feeder !slave_oc; + report_status "Idle"; + marshal_response !slave_oc response; + reset () + with + | MarshalError s -> + pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 2 + | End_of_file -> + prerr_endline "connection lost"; flush_all (); exit 2 + | e -> + pr_err ("Slave: critical exception: " ^ Pp.string_of_ppcmds (print e)); + flush_all (); exit 1 + done + + let dump () = + assert(WorkersPool.is_empty ()); (* ATM, we allow that only if no slaves *) + List.map fst (TQueue.dump queue) + + let init n = + WorkersPool.init n manage_slave + (fun n -> Printf.sprintf "%s:%d" T.name n) + + let n_workers = WorkersPool.n_workers + +end diff --git a/stm/asyncTaskQueue.mli b/stm/asyncTaskQueue.mli new file mode 100644 index 000000000..e01479d30 --- /dev/null +++ b/stm/asyncTaskQueue.mli @@ -0,0 +1,61 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2014 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +module type Task = sig + + type task + + (* Marshallable *) + type request + type response + + val name : string (* UID of the task kind, for -toploop *) + val extra_env : unit -> string array + + (* run by the master, on a thread *) + val request_of_task : task -> request option + val use_response : task -> response -> [ `Die | `Stay | `StayReset ] + val on_marshal_error : string -> task -> unit + val on_slave_death : task option -> [ `Exit of int | `Stay ] + val on_task_cancellation_or_expiration : task option -> unit + val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit + + (* run by the worker *) + val perform : request -> response + + (* debugging *) + val name_of_task : task -> string + val name_of_request : request -> string + +end + +type cancel_switch = bool ref + +module Make(T : Task) : sig + + (* Number of workers, 0 = lazy local *) + val init : int -> unit + + val n_workers : unit -> int + + val enqueue_task : T.task -> cancel_switch -> unit + + (* blocking function that waits for the task queue to be empty *) + val join : unit -> unit + + (* slave process main loop *) + val slave_main_loop : (unit -> unit) -> unit + val slave_init_stdout : unit -> unit + + val cancel_worker : string -> unit + + val reorder_tasks : (T.task -> T.task -> int) -> unit + + val dump : unit -> T.task list + +end diff --git a/stm/stm.ml b/stm/stm.ml index a342bb6f1..0827e0bfa 100644 --- a/stm/stm.ml +++ b/stm/stm.ml @@ -6,13 +6,7 @@ (* * GNU Lesser General Public License Version 2.1 *) (************************************************************************) -let process_id () = - match !Flags.async_proofs_mode with - | Flags.APoff | Flags.APonLazy | Flags.APonParallel 0 -> - "master" ^ string_of_int (Thread.id (Thread.self ())) - | Flags.APonParallel n -> "worker" ^ string_of_int n - -let pr_err s = Printf.eprintf "%s] %s\n" (process_id ()) s; flush stderr +let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr let prerr_endline s = if !Flags.debug then begin pr_err s end else () @@ -252,7 +246,8 @@ end = struct open Printf let print_dag vcs () = - let fname = "stm_" ^ process_id () in + let fname = + "stm_" ^ Str.global_replace (Str.regexp " ") "_" (System.process_id ()) in let string_of_transaction = function | Cmd (t, _) | Fork (t, _,_,_) -> (try string_of_ppcmds (pr_ast t) with _ -> "ERR") @@ -650,18 +645,6 @@ let get_hint_bp_time proof_name = try float_of_string (Aux_file.get !hints Loc.ghost proof_name) with Not_found -> 1.0 -module Worker = Spawn.Sync(struct - let add_timeout ~sec f = - ignore(Thread.create (fun () -> - while true do - Unix.sleep sec; - if not (f ()) then Thread.exit () - done) - ()) -end) - -module WorkersPool = WorkerPool.Make(Worker) - let record_pb_time proof_name loc time = let proof_build_time = Printf.sprintf "%.3f" time in Aux_file.record_in_aux_at loc "proof_build_time" proof_build_time; @@ -669,6 +652,170 @@ let record_pb_time proof_name loc time = Aux_file.record_in_aux_at Loc.ghost proof_name proof_build_time; hints := Aux_file.set !hints Loc.ghost proof_name proof_build_time end + +type 'a a_request = { + r_exn_info : Stateid.t * Stateid.t; + r_stop : Stateid.t; + r_document : VCS.vcs; + r_loc : Loc.t; + r_uuid : 'a; + r_name : string } + +exception RemoteException of std_ppcmds +let _ = Errors.register_handler (function + | RemoteException ppcmd -> ppcmd + | _ -> raise Unhandled) + +module Task = struct + + let reach_known_state = ref (fun ?redefine_qed ~cache id -> ()) + let set_reach_known_state f = reach_known_state := f + + let forward_feedback = forward_feedback + + type task = { + t_exn_info : Stateid.t * Stateid.t; + t_start : Stateid.t; + t_stop : Stateid.t; + t_assign : Proof_global.closed_proof_output Future.assignement -> unit; + t_loc : Loc.t; + t_uuid : Future.UUID.t; + t_name : string } + + type request = Future.UUID.t a_request + + type error = { + e_error_at : Stateid.t; + e_safe_id : Stateid.t; + e_msg : std_ppcmds; + e_safe_states : (Stateid.t * State.frozen_state) list } + + type response = + | RespBuiltProof of Proof_global.closed_proof_output * float + | RespError of error + + let name = "stmworker" + let extra_env () = !async_proofs_workers_extra_env + + let name_of_task t = t.t_name + let name_of_request r = r.r_name + + let request_of_task { t_exn_info; t_start; t_stop; t_loc; t_uuid; t_name } = + try Some { + r_exn_info = t_exn_info; + r_stop = t_stop; + r_document = VCS.slice ~start:t_stop ~stop:t_start; + r_loc = t_loc; + r_uuid = t_uuid; + r_name = t_name } + with VCS.Expired -> None + + let use_response { t_assign; t_loc; t_name } = function + | RespBuiltProof (pl, time) -> + Pp.feedback (Feedback.InProgress ~-1); + t_assign (`Val pl); + record_pb_time t_name t_loc time; + (* We restart the slave, to avoid memory leaks. We could just + Pp.feedback (Feedback.InProgress ~-1) *) + `StayReset + | RespError { e_error_at; e_safe_id = valid; e_msg; e_safe_states } -> + Pp.feedback (Feedback.InProgress ~-1); + let e = Stateid.add ~valid (RemoteException e_msg) e_error_at in + t_assign (`Exn e); + List.iter (fun (id,s) -> State.assign id s) e_safe_states; + (* We restart the slave, to avoid memory leaks. We could just + Pp.feedback (Feedback.InProgress ~-1) *) + `StayReset + + let on_task_cancellation_or_expiration = function + | None -> () + | Some { t_start = start; t_assign } -> + let s = "Worker cancelled by the user" in + let e = Stateid.add ~valid:start (RemoteException (strbrk s)) start in + t_assign (`Exn e); + Pp.feedback ~state_id:start (Feedback.ErrorMsg (Loc.ghost, s)); + Pp.feedback (Feedback.InProgress ~-1) + + let build_proof_here_core loc eop () = + let wall_clock1 = Unix.gettimeofday () in + if !Flags.batch_mode then !reach_known_state ~cache:`No eop + else !reach_known_state ~cache:`Shallow eop; + let wall_clock2 = Unix.gettimeofday () in + Aux_file.record_in_aux_at loc "proof_build_time" + (Printf.sprintf "%.3f" (wall_clock2 -. wall_clock1)); + Proof_global.return_proof () + let build_proof_here (id,valid) loc eop = + Future.create (State.exn_on id ~valid) (build_proof_here_core loc eop) + let perform { r_exn_info; r_stop = eop; r_document = vcs; r_loc } = + try + VCS.restore vcs; + VCS.print (); + let rc, time = + let wall_clock = Unix.gettimeofday () in + let l = Future.force (build_proof_here r_exn_info r_loc eop) in + List.iter (fun (_,se) -> Declareops.iter_side_effects (function + | Declarations.SEsubproof(_, + { Declarations.const_body = Declarations.OpaqueDef f; + const_universes = univs } ) -> + Opaqueproof.join_opaque f + | _ -> ()) + se) (fst l); + l, Unix.gettimeofday () -. wall_clock in + VCS.print (); + RespBuiltProof(rc,time) + with + |e when Errors.noncritical e -> + (* This can happen if the proof is broken. The error has also been + * signalled as a feedback, hence we can silently recover *) + let e_error_at, e_safe_id = match Stateid.get e with + | Some (safe, err) -> err, safe + | None -> Stateid.dummy, Stateid.dummy in + prerr_endline "failed with the following exception:"; + prerr_endline (string_of_ppcmds (print e)); + prerr_endline ("last safe id is: " ^ Stateid.to_string e_safe_id); + prerr_endline ("cached? " ^ string_of_bool (State.is_cached e_safe_id)); + let prog old_v n = + if n < 3 then n else old_v + n/3 + if n mod 3 > 0 then 1 else 0 in + let e_safe_states = + let open State in + let rec aux n m prev_id = + let next = + try Some (VCS.visit prev_id).next + with VCS.Expired -> None in + match next with + | None -> [] + | Some id when n = m -> + prerr_endline ("sending back state " ^ string_of_int m); + let tail = aux (n+1) (prog m (n+1)) id in + if is_cached id then (id, get_cached id) :: tail else tail + | Some id -> aux (n+1) m id in + (if is_cached e_safe_id then [e_safe_id,get_cached e_safe_id] else []) + @ aux 1 (prog 1 1) e_safe_id in + RespError { e_error_at; e_safe_id; e_msg = print e; e_safe_states } + + let on_slave_death task = + if not !fallback_to_lazy_if_slave_dies then `Exit 1 + else match task with + | None -> `Stay + | Some { t_exn_info; t_loc; t_stop; t_assign } -> + msg_warning(strbrk "Falling back to local, lazy, evaluation."); + t_assign (`Comp(build_proof_here t_exn_info t_loc t_stop)); + Pp.feedback (Feedback.InProgress ~-1); + `Stay + + let on_marshal_error s { t_exn_info; t_stop; t_assign; t_loc } = + if !fallback_to_lazy_if_marshal_error then begin + msg_warning(strbrk("Marshalling error: "^s^". "^ + "The system state could not be sent to the worker process. "^ + "Falling back to local, lazy, evaluation.")); + t_assign(`Comp(build_proof_here t_exn_info t_loc t_stop)); + Pp.feedback (Feedback.InProgress ~-1) + end else begin + pr_err ("Fatal marshal error: " ^ s); + flush_all (); exit 1 + end + +end (* Slave processes (if initialized, otherwise local lazy evaluation) *) module Slaves : sig @@ -688,10 +835,6 @@ module Slaves : sig val slave_main_loop : (unit -> unit) -> unit val slave_init_stdout : unit -> unit - (* to disentangle modules *) - val set_reach_known_state : - (?redefine_qed:bool -> cache:Summary.marshallable -> Stateid.t -> unit) -> unit - type tasks val dump : (Future.UUID.t * int) list -> tasks val check_task : string -> tasks -> int -> bool @@ -701,129 +844,58 @@ module Slaves : sig Library.seg_univ -> Library.seg_discharge -> Library.seg_proofs -> tasks -> int -> Library.seg_univ - val cancel_worker : int -> unit + val cancel_worker : string -> unit val set_perspective : Stateid.t list -> unit end = struct - - let cancel_worker n = WorkersPool.cancel (n-1) - - let reach_known_state = ref (fun ?redefine_qed ~cache id -> ()) - let set_reach_known_state f = reach_known_state := f - - type 'a request = - ReqBuildProof of - (Stateid.t * Stateid.t) * Stateid.t * VCS.vcs * Loc.t * 'a * string - - let name_of_request (ReqBuildProof (_,_,_,_,_,s)) = s - - type response = - | RespBuiltProof of Proof_global.closed_proof_output * float - | RespError of (* err, safe, msg, safe_states *) - Stateid.t * Stateid.t * std_ppcmds * - (Stateid.t * State.frozen_state) list - | RespFeedback of Feedback.feedback - | RespGetCounterNewUnivLevel - - type more_data = - | MoreDataUnivLevel of Univ.universe_level list - - type task = - | TaskBuildProof of (Stateid.t * Stateid.t) * Stateid.t * Stateid.t * - (Proof_global.closed_proof_output Future.assignement -> unit) * cancel_switch - * Loc.t * Future.UUID.t * string - - let pr_task = function - | TaskBuildProof(_,bop,eop,_,_,_,_,s) -> - "TaskBuilProof("^Stateid.to_string bop^","^Stateid.to_string eop^ - ","^s^")" - - let request_of_task task : Future.UUID.t request = - match task with - | TaskBuildProof (exn_info,bop,eop,_,_,loc,uuid,s) -> - ReqBuildProof(exn_info,eop,VCS.slice ~start:eop ~stop:bop,loc,uuid,s) - - let cancel_switch_of_task = function - | TaskBuildProof (_,_,_,_,c,_,_,_) -> c - - let build_proof_here_core loc eop () = - let wall_clock1 = Unix.gettimeofday () in - if !Flags.batch_mode then !reach_known_state ~cache:`No eop - else !reach_known_state ~cache:`Shallow eop; - let wall_clock2 = Unix.gettimeofday () in - Aux_file.record_in_aux_at loc "proof_build_time" - (Printf.sprintf "%.3f" (wall_clock2 -. wall_clock1)); - Proof_global.return_proof () - let build_proof_here (id,valid) loc eop = - Future.create (State.exn_on id ~valid) (build_proof_here_core loc eop) - - let slave_respond msg = - match msg with - | ReqBuildProof(exn_info,eop,vcs,loc,_,_) -> - VCS.restore vcs; - VCS.print (); - let rc, time = - let wall_clock = Unix.gettimeofday () in - let l = Future.force (build_proof_here exn_info loc eop) in - List.iter (fun (_,se) -> Declareops.iter_side_effects (function - | Declarations.SEsubproof(_, - { Declarations.const_body = Declarations.OpaqueDef f; - const_universes = univs } ) -> - Opaqueproof.join_opaque f - | _ -> ()) - se) (fst l); - l, Unix.gettimeofday () -. wall_clock in - VCS.print (); - RespBuiltProof(rc,time) + module TaskQueue = AsyncTaskQueue.Make(Task) let check_task_aux extra name l i = - match List.nth l i with - | ReqBuildProof ((id,valid),eop,vcs,loc,_,s) -> - Pp.msg_info( - str(Printf.sprintf "Checking task %d (%s%s) of %s" i s extra name)); - VCS.restore vcs; - try - !reach_known_state ~cache:`No eop; - (* The original terminator, a hook, has not been saved in the .vi*) - Proof_global.set_terminator - (Lemmas.standard_proof_terminator [] - (Lemmas.mk_hook (fun _ _ -> ()))); - let proof = Proof_global.close_proof (fun x -> x) in - vernac_interp eop ~proof - { verbose = false; loc; - expr = (VernacEndProof (Proved (true,None))) }; - Some proof - with e -> - let e = Errors.push e in - (try match Stateid.get e with - | None -> + let { r_stop; r_document; r_loc; r_name } = List.nth l i in + Pp.msg_info( + str(Printf.sprintf "Checking task %d (%s%s) of %s" i r_name extra name)); + VCS.restore r_document; + try + !Task.reach_known_state ~cache:`No r_stop; + (* The original terminator, a hook, has not been saved in the .vi*) + Proof_global.set_terminator + (Lemmas.standard_proof_terminator [] + (Lemmas.mk_hook (fun _ _ -> ()))); + let proof = Proof_global.close_proof (fun x -> x) in + vernac_interp r_stop ~proof + { verbose = false; loc = r_loc; + expr = (VernacEndProof (Proved (true,None))) }; + Some proof + with e -> + let e = Errors.push e in + (try match Stateid.get e with + | None -> + Pp.pperrnl Pp.( + str"File " ++ str name ++ str ": proof of " ++ str r_name ++ + spc () ++ print e) + | Some (_, cur) -> + match VCS.visit cur with + | { step = `Cmd ( { loc }, _) } + | { step = `Fork ( { loc }, _, _, _) } + | { step = `Qed ( { qast = { loc } }, _) } + | { step = `Sideff (`Ast ( { loc }, _)) } -> + let start, stop = Loc.unloc loc in + Pp.pperrnl Pp.( + str"File " ++ str name ++ str ": proof of " ++ str r_name ++ + str ": chars " ++ int start ++ str "-" ++ int stop ++ + spc () ++ print e) + | _ -> Pp.pperrnl Pp.( - str"File " ++ str name ++ str ": proof of " ++ str s ++ + str"File " ++ str name ++ str ": proof of " ++ str r_name ++ spc () ++ print e) - | Some (_, cur) -> - match VCS.visit cur with - | { step = `Cmd ( { loc }, _) } - | { step = `Fork ( { loc }, _, _, _) } - | { step = `Qed ( { qast = { loc } }, _) } - | { step = `Sideff (`Ast ( { loc }, _)) } -> - let start, stop = Loc.unloc loc in - Pp.pperrnl Pp.( - str"File " ++ str name ++ str ": proof of " ++ str s ++ - str ": chars " ++ int start ++ str "-" ++ int stop ++ - spc () ++ print e) - | _ -> - Pp.pperrnl Pp.( - str"File " ++ str name ++ str ": proof of " ++ str s ++ - spc () ++ print e) - with e -> - Pp.msg_error (str"unable to print error message: " ++ - str (Printexc.to_string e))); None + with e -> + Pp.msg_error (str"unable to print error message: " ++ + str (Printexc.to_string e))); None let finish_task name (u,cst,_) d p l i = - let bucket = - match List.nth l i with ReqBuildProof (_,_,_,_,bucket,_) -> bucket in + let bucket = (List.nth l i).r_uuid in match check_task_aux (Printf.sprintf ", bucket %d" bucket) name l i with | None -> exit 1 | Some (po,pt) -> @@ -854,54 +926,14 @@ end = struct | None -> false let info_tasks l = - CList.map_i (fun i (ReqBuildProof(_,_,_,loc,_,s)) -> + CList.map_i (fun i { r_loc; r_name } -> let time1 = - try float_of_string (Aux_file.get !hints loc "proof_build_time") + try float_of_string (Aux_file.get !hints r_loc "proof_build_time") with Not_found -> 0.0 in let time2 = - try float_of_string (Aux_file.get !hints loc "proof_check_time") + try float_of_string (Aux_file.get !hints r_loc "proof_check_time") with Not_found -> 0.0 in - s,max (time1 +. time2) 0.0001,i) 0 l - - exception MarshalError of string - - let marshal_to_channel oc data = - Marshal.to_channel oc data []; - flush oc - - let marshal_err s = raise (MarshalError s) - - let marshal_request oc (req : Future.UUID.t request) = - try marshal_to_channel oc req - with Failure s | Invalid_argument s | Sys_error s -> - marshal_err ("marshal_request: "^s) - - let unmarshal_request ic = - try (Marshal.from_channel ic : Future.UUID.t request) - with Failure s | Invalid_argument s | Sys_error s -> - marshal_err ("unmarshal_request: "^s) - - let marshal_response oc (res : response) = - try marshal_to_channel oc res - with Failure s | Invalid_argument s | Sys_error s -> - marshal_err ("marshal_response: "^s) - - let unmarshal_response ic = - try (CThread.thread_friendly_input_value ic : response) - with Failure s | Invalid_argument s | Sys_error s -> - marshal_err ("unmarshal_response: "^s) - - let marshal_more_data oc (res : more_data) = - try marshal_to_channel oc res - with Failure s | Invalid_argument s | Sys_error s -> - marshal_err ("marshal_more_data: "^s) - - let unmarshal_more_data ic = - try (Marshal.from_channel ic : more_data) - with Failure s | Invalid_argument s | Sys_error s -> - marshal_err ("unmarshal_more_data: "^s) - - let queue : task TQueue.t = TQueue.create () + r_name, max (time1 +. time2) 0.0001,i) 0 l let set_perspective idl = let open Stateid in @@ -914,275 +946,44 @@ end = struct | true, false -> -1 | false, true -> 1) - let wait_all_done () = - if not (WorkersPool.is_empty ()) then - TQueue.wait_until_n_are_waiting_and_queue_empty - (WorkersPool.n_workers ()) queue - - let build_proof ~loc ~exn_info:(id,valid as exn_info) ~start ~stop ~name = + let build_proof ~loc ~exn_info:(id,valid as t_exn_info) ~start ~stop ~name = let cancel_switch = ref false in - if WorkersPool.is_empty () then + if TaskQueue.n_workers () = 0 then if !Flags.compilation_mode = Flags.BuildVi then begin let force () : Proof_global.closed_proof_output Future.assignement = - try `Val (build_proof_here_core loc stop ()) + try `Val (Task.build_proof_here_core loc stop ()) with e -> let e = Errors.push e in `Exn e in let f,assign = Future.create_delegate ~force (State.exn_on id ~valid) in - let uuid = Future.uuid f in - TQueue.push queue (TaskBuildProof - (exn_info,start,stop,assign,cancel_switch,loc,uuid,name)); + let t_uuid = Future.uuid f in + TaskQueue.enqueue_task { + Task.t_exn_info; t_start = start; t_stop = stop; t_assign = assign; + t_loc = loc; t_uuid; t_name = name } cancel_switch; f, cancel_switch end else - build_proof_here exn_info loc stop, cancel_switch + Task.build_proof_here t_exn_info loc stop, cancel_switch else - let f, assign = Future.create_delegate (State.exn_on id ~valid) in - let uuid = Future.uuid f in + let f, t_assign = Future.create_delegate (State.exn_on id ~valid) in + let t_uuid = Future.uuid f in Pp.feedback (Feedback.InProgress 1); - TQueue.push queue (TaskBuildProof - (exn_info,start,stop,assign,cancel_switch,loc,uuid,name)); + TaskQueue.enqueue_task { + Task.t_exn_info; t_start = start; t_stop = stop; t_assign; t_loc = loc; + t_uuid; t_name = name } cancel_switch; f, cancel_switch - exception RemoteException of std_ppcmds - let _ = Errors.register_handler (function - | RemoteException ppcmd -> ppcmd - | _ -> raise Unhandled) + let init () = TaskQueue.init !Flags.async_proofs_n_workers + let slave_main_loop = TaskQueue.slave_main_loop + let slave_init_stdout = TaskQueue.slave_init_stdout + let wait_all_done = TaskQueue.join - exception KillRespawn - - let report_status ?id s = - let id = - match id with - | Some n -> n - | None -> - match !Flags.async_proofs_mode with - | Flags.APonParallel n -> n - | _ -> assert false in - Pp.feedback ~state_id:Stateid.initial (Feedback.SlaveStatus(id, s)) - - let rec manage_slave ~cancel:cancel_user_req id_slave respawn = - let ic, oc, proc = - let rec set_slave_opt = function - | [] -> !Flags.async_proofs_flags_for_workers @ - ["-async-proofs"; "worker"; string_of_int id_slave] - | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl - | ("-async-proofs" |"-toploop" |"-vi2vo" |"-compile" - |"-compile-verbose")::_::tl -> set_slave_opt tl - | x::tl -> x :: set_slave_opt tl in - let args = - Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in - let env = - Array.append !async_proofs_workers_extra_env (Unix.environment ()) in - respawn ~args ~env () in - let last_task = ref None in - let task_expired = ref false in - let task_cancelled = ref false in - CThread.prepare_in_channel_for_thread_friendly_io ic; - try - while true do - prerr_endline "waiting for a task"; - report_status ~id:id_slave "Idle"; - let task = TQueue.pop queue in - prerr_endline ("got task: "^pr_task task); - last_task := Some task; - try - marshal_request oc (request_of_task task); - let cancel_switch = cancel_switch_of_task task in - Worker.kill_if proc ~sec:1 (fun () -> - task_expired := !cancel_switch; - task_cancelled := !cancel_user_req; - if !cancel_user_req then cancel_user_req := false; - !task_expired || !task_cancelled); - let rec loop () = - let response = unmarshal_response ic in - match task, response with - | TaskBuildProof(_,_,_, assign,_,loc,_,s), - RespBuiltProof(pl, time)-> - assign (`Val pl); - (* We restart the slave, to avoid memory leaks. We could just - Pp.feedback (Feedback.InProgress ~-1) *) - record_pb_time s loc time; - last_task := None; - raise KillRespawn - | TaskBuildProof(_,_,_, assign,_,_,_,_), - RespError(err_id,valid,e,valid_states) -> - let e = Stateid.add ~valid (RemoteException e) err_id in - assign (`Exn e); - List.iter (fun (id,s) -> State.assign id s) valid_states; - (* We restart the slave, to avoid memory leaks. We could just - Pp.feedback (Feedback.InProgress ~-1) *) - last_task := None; - raise KillRespawn - (* marshal_more_data oc (MoreDataLocalUniv *) - (* (CList.init 10 (fun _ -> Universes.fresh_local_univ ()))); *) - (* loop () *) - | _, RespGetCounterNewUnivLevel -> - marshal_more_data oc (MoreDataUnivLevel - (CList.init 10 (fun _ -> Universes.new_univ_level (Global.current_dirpath ())))); - loop () - | _, RespFeedback {Feedback.id = Feedback.State state_id; content = msg} -> - forward_feedback state_id msg; - loop () - | _, RespFeedback _ -> assert false (* Parsing in master process *) - in - loop () - with - | VCS.Expired -> (* task cancelled: e.g. the user did backtrack *) - Pp.feedback (Feedback.InProgress ~-1); - prerr_endline ("Task expired: " ^ pr_task task) - | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e -> - raise e (* we pass the exception to the external handler *) - | MarshalError s when !fallback_to_lazy_if_marshal_error -> - msg_warning(strbrk("Marshalling error: "^s^". "^ - "The system state could not be sent to the worker process. "^ - "Falling back to local, lazy, evaluation.")); - let TaskBuildProof (exn_info, _, stop, assign,_,loc,_,_) = task in - assign(`Comp(build_proof_here exn_info loc stop)); - Pp.feedback (Feedback.InProgress ~-1) - | MarshalError s -> - pr_err ("Fatal marshal error: " ^ s); - flush_all (); exit 1 - | e -> - pr_err ("Uncaught exception in worker manager: "^ - string_of_ppcmds (print e)); - flush_all () - done - with - | KillRespawn -> - Pp.feedback (Feedback.InProgress ~-1); - Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id_slave respawn - | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired -> - Pp.feedback (Feedback.InProgress ~-1); - ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id_slave respawn - | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled -> - msg_warning(strbrk "The worker was cancelled."); - Option.iter (fun task -> - let TaskBuildProof (_, start, _, assign, _,_,_,_) = task in - let s = "Worker cancelled by the user" in - let e = Stateid.add ~valid:start (RemoteException (strbrk s)) start in - assign (`Exn e); - Pp.feedback ~state_id:start (Feedback.ErrorMsg (Loc.ghost, s)); - Pp.feedback (Feedback.InProgress ~-1); - ) !last_task; - Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id_slave respawn - | Sys_error _ | Invalid_argument _ | End_of_file - when !fallback_to_lazy_if_slave_dies -> - msg_warning(strbrk "The worker process died badly."); - Option.iter (fun task -> - msg_warning(strbrk "Falling back to local, lazy, evaluation."); - let TaskBuildProof (exn_info, _, stop, assign,_,loc,_,_) = task in - assign(`Comp(build_proof_here exn_info loc stop)); - Pp.feedback (Feedback.InProgress ~-1); - ) !last_task; - Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req id_slave respawn - | Sys_error _ | Invalid_argument _ | End_of_file -> - Worker.kill proc; - let exit_status proc = match Worker.wait proc with - | Unix.WEXITED 0x400 -> "exit code unavailable" - | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i - | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno - | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in - pr_err ("Fatal worker error: " ^ (exit_status proc)); - flush_all (); exit 1 - - let init () = WorkersPool.init !Flags.async_proofs_n_workers manage_slave - - let slave_ic = ref stdin - let slave_oc = ref stdout - - let slave_init_stdout () = - let ic, oc = Spawned.get_channels () in - slave_oc := oc; slave_ic := ic - - let bufferize f = - let l = ref [] in - fun () -> - match !l with - | [] -> let data = f () in l := List.tl data; List.hd data - | x::tl -> l := tl; x - - let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc - - let slave_main_loop reset = - let feedback_queue = ref [] in - let slave_feeder oc fb = - match fb.Feedback.content with - | Feedback.Goals _ -> feedback_queue := fb :: !feedback_queue - | _ -> Marshal.to_channel oc (RespFeedback fb) []; flush oc in - let flush_feeder oc = - List.iter (fun fb -> Marshal.to_channel oc (RespFeedback fb) []) - !feedback_queue; - feedback_queue := []; - flush oc in - Pp.set_feeder (slave_feeder !slave_oc); - Pp.log_via_feedback (); - Universes.set_remote_new_univ_level (bufferize (fun () -> - marshal_response !slave_oc RespGetCounterNewUnivLevel; - match unmarshal_more_data !slave_ic with - | MoreDataUnivLevel l -> l)); - let working = ref false in - slave_handshake (); - while true do - try - working := false; - let request = unmarshal_request !slave_ic in - working := true; - report_status (name_of_request request); - let response = slave_respond request in - flush_feeder !slave_oc; - report_status "Idle"; - marshal_response !slave_oc response; - reset () - with - | MarshalError s -> - pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 2 - | End_of_file -> - prerr_endline "connection lost"; flush_all (); exit 2 - | e when Errors.noncritical e -> - (* This can happen if the proof is broken. The error has also been - * signalled as a feedback, hence we can silently recover *) - let err_id, safe_id = match Stateid.get e with - | Some (safe, err) -> err, safe - | None -> Stateid.dummy, Stateid.dummy in - prerr_endline "failed with the following exception:"; - prerr_endline (string_of_ppcmds (print e)); - prerr_endline ("last safe id is: " ^ Stateid.to_string safe_id); - prerr_endline ("cached? " ^ string_of_bool (State.is_cached safe_id)); - let prog old_v n = - if n < 3 then n else old_v + n/3 + if n mod 3 > 0 then 1 else 0 in - let states = - let open State in - let rec aux n m prev_id = - let next = - try Some (VCS.visit prev_id).next - with VCS.Expired -> None in - match next with - | None -> [] - | Some id when n = m -> - prerr_endline ("sending back state " ^ string_of_int m); - let tail = aux (n+1) (prog m (n+1)) id in - if is_cached id then (id, get_cached id) :: tail else tail - | Some id -> aux (n+1) m id in - (if is_cached safe_id then [safe_id,get_cached safe_id] else []) - @ aux 1 (prog 1 1) safe_id in - flush_feeder !slave_oc; - marshal_response !slave_oc (RespError(err_id, safe_id, print e, states)) - | e -> - pr_err ("Slave: critical exception: " ^ Pp.string_of_ppcmds (print e)); - flush_all (); exit 1 - done + let cancel_worker = TaskQueue.cancel_worker (* For external users this name is nicer than request *) - type tasks = int request list + type tasks = int a_request list let dump f2t_map = - assert(WorkersPool.is_empty ()); (* ATM, we allow that only if no slaves *) - let tasks = TQueue.dump queue in - prerr_endline (Printf.sprintf "dumping %d\n" (List.length tasks)); - let tasks = List.map request_of_task tasks in - List.map (function ReqBuildProof(a,b,c,d,x,e) -> - ReqBuildProof(a,b,c,d,List.assoc x f2t_map,e)) tasks + let tasks = TaskQueue.dump () in + prerr_endline (Printf.sprintf "dumping %d tasks\n" (List.length tasks)); + List.map (function r -> { r with r_uuid = List.assoc r.r_uuid f2t_map }) + (CList.map_filter Task.request_of_task tasks) end @@ -1198,7 +999,7 @@ let pstate = ["meta counter"; "evar counter"; "program-tcc-table"] let delegate_policy_check time = if interactive () = `Yes then - (!Flags.async_proofs_mode = Flags.APonParallel 0 || + (Flags.async_proofs_is_master () || !Flags.async_proofs_mode = Flags.APonLazy) && (time >= 1.0 || !Flags.async_proofs_always_delegate) else if !Flags.compilation_mode = Flags.BuildVi then true @@ -1394,7 +1195,7 @@ let known_state ?(redefine_qed=false) ~cache id = inject_non_pstate s ), cache in - if !Flags.async_proofs_mode = Flags.APonParallel 0 then + if Flags.async_proofs_is_master () then Pp.feedback ~state_id:id Feedback.ProcessingInMaster; State.define ~cache:cache_step ~redefine:redefine_qed step id; if !Flags.feedback_goals then print_goals_of_state id; @@ -1403,7 +1204,7 @@ let known_state ?(redefine_qed=false) ~cache id = end -let _ = Slaves.set_reach_known_state Reach.known_state +let _ = Task.set_reach_known_state Reach.known_state (* The backtrack module simulates the classic behavior of a linear document *) module Backtrack : sig @@ -1540,7 +1341,8 @@ let init () = set_undo_classifier Backtrack.undo_vernac_classifier; State.define ~cache:`Yes (fun () -> ()) Stateid.initial; Backtrack.record (); - if !Flags.async_proofs_mode = Flags.APonParallel 0 then begin + if Flags.async_proofs_is_master () then begin + prerr_endline "Initialising workers"; Slaves.init (); let opts = match !Flags.async_proofs_private_flags with | None -> [] diff --git a/stm/stm.mli b/stm/stm.mli index 3bc458f57..b6450e6ac 100644 --- a/stm/stm.mli +++ b/stm/stm.mli @@ -42,7 +42,7 @@ val finish : unit -> unit val observe : Stateid.t -> unit -val stop_worker : int -> unit +val stop_worker : string -> unit (* Joins the entire document. Implies finish, but also checks proofs *) val join : unit -> unit diff --git a/stm/stm.mllib b/stm/stm.mllib index 347416099..308b2ac4c 100644 --- a/stm/stm.mllib +++ b/stm/stm.mllib @@ -5,5 +5,6 @@ TQueue WorkerPool Vernac_classifier Lemmas +AsyncTaskQueue Stm Vi_checking diff --git a/stm/workerPool.ml b/stm/workerPool.ml index fcae4f20d..593240ad4 100644 --- a/stm/workerPool.ml +++ b/stm/workerPool.ml @@ -13,7 +13,7 @@ module Make(Worker : sig process * in_channel * out_channel end) = struct -type worker_id = int +type worker_id = string type spawn = args:string array -> env:string array -> unit -> in_channel * out_channel * Worker.process @@ -32,11 +32,11 @@ let master_handshake worker_id ic oc = Marshal.to_channel oc magic_no []; flush oc; let n = (Marshal.from_channel ic : int) in if n <> magic_no then begin - Printf.eprintf "Handshake with %d failed: protocol mismatch\n" worker_id; + Printf.eprintf "Handshake with %s failed: protocol mismatch\n" worker_id; exit 1; end with e when Errors.noncritical e -> - Printf.eprintf "Handshake with %d failed: %s\n" + Printf.eprintf "Handshake with %s failed: %s\n" worker_id (Printexc.to_string e); exit 1 @@ -45,18 +45,21 @@ let respawn n ~args ~env () = master_handshake n ic oc; ic, oc, proc -let init ~size:n ~manager:manage_slave = +let init ~size:n ~manager:manage_slave mk_name = slave_managers := Some (Array.init n (fun x -> + let name = mk_name x in let cancel = ref false in - cancel, Thread.create (manage_slave ~cancel (x+1)) (respawn (x+1)))) + name, cancel, Thread.create (manage_slave ~cancel name) (respawn name))) let cancel n = match !slave_managers with | None -> () | Some a -> - let switch, _ = a.(n) in - switch := true + for i = 0 to Array.length a - 1 do + let name, switch, _ = a.(i) in + if n = name then switch := true + done let worker_handshake slave_ic slave_oc = try diff --git a/stm/workerPool.mli b/stm/workerPool.mli index d7a546929..d55b35c28 100644 --- a/stm/workerPool.mli +++ b/stm/workerPool.mli @@ -13,13 +13,14 @@ module Make(Worker : sig process * in_channel * out_channel end) : sig -type worker_id = int +type worker_id = string type spawn = args:string array -> env:string array -> unit -> in_channel * out_channel * Worker.process val init : - size:int -> manager:(cancel:bool ref -> worker_id -> spawn -> unit) -> unit + size:int -> manager:(cancel:bool ref -> worker_id -> spawn -> unit) -> + (int -> worker_id) -> unit val is_empty : unit -> bool val n_workers : unit -> int val cancel : worker_id -> unit |