aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ide/coqOps.ml8
-rw-r--r--ide/coqOps.mli4
-rw-r--r--ide/interface.mli2
-rw-r--r--ide/session.ml12
-rw-r--r--ide/session.mli2
-rw-r--r--ide/xmlprotocol.ml2
-rw-r--r--lib/control.ml2
-rw-r--r--lib/feedback.ml6
-rw-r--r--lib/feedback.mli2
-rw-r--r--lib/flags.ml9
-rw-r--r--lib/flags.mli4
-rw-r--r--lib/remoteCounter.ml12
-rw-r--r--lib/system.ml7
-rw-r--r--lib/system.mli3
-rw-r--r--stm/asyncTaskQueue.ml296
-rw-r--r--stm/asyncTaskQueue.mli61
-rw-r--r--stm/stm.ml678
-rw-r--r--stm/stm.mli2
-rw-r--r--stm/stm.mllib1
-rw-r--r--stm/workerPool.ml17
-rw-r--r--stm/workerPool.mli5
-rw-r--r--toplevel/coqtop.ml17
22 files changed, 663 insertions, 489 deletions
diff --git a/ide/coqOps.ml b/ide/coqOps.ml
index cefe18092..b242bd092 100644
--- a/ide/coqOps.ml
+++ b/ide/coqOps.ml
@@ -122,11 +122,11 @@ object
method backtrack_last_phrase : unit task
method initialize : unit task
method join_document : unit task
- method stop_worker : int -> unit task
+ method stop_worker : string -> unit task
method get_n_errors : int
method get_errors : (int * string) list
- method get_slaves_status : int * int * string Int.Map.t
+ method get_slaves_status : int * int * string CString.Map.t
method handle_failure : handle_exn_rty -> unit task
@@ -154,7 +154,7 @@ object(self)
(* proofs being processed by the slaves *)
val mutable to_process = 0
val mutable processed = 0
- val mutable slaves_status = Int.Map.empty
+ val mutable slaves_status = CString.Map.empty
val feedbacks : feedback Queue.t = Queue.create ()
val feedback_timer = Ideutils.mktimer ()
@@ -377,7 +377,7 @@ object(self)
else to_process <- to_process + n
| SlaveStatus(id,status), _ ->
log "SlaveStatus" None;
- slaves_status <- Int.Map.add id status slaves_status
+ slaves_status <- CString.Map.add id status slaves_status
| _ ->
if sentence <> None then Minilib.log "Unsupported feedback message"
diff --git a/ide/coqOps.mli b/ide/coqOps.mli
index 4669f016d..d6854a3c0 100644
--- a/ide/coqOps.mli
+++ b/ide/coqOps.mli
@@ -21,11 +21,11 @@ object
method backtrack_last_phrase : unit task
method initialize : unit task
method join_document : unit task
- method stop_worker : int -> unit task
+ method stop_worker : string -> unit task
method get_n_errors : int
method get_errors : (int * string) list
- method get_slaves_status : int * int * string Int.Map.t
+ method get_slaves_status : int * int * string CString.Map.t
method handle_failure : Interface.handle_exn_rty -> unit task
diff --git a/ide/interface.mli b/ide/interface.mli
index 4e5e4a9cd..4dca41a88 100644
--- a/ide/interface.mli
+++ b/ide/interface.mli
@@ -206,7 +206,7 @@ type interp_sty = (raw * verbose) * string
(* spiwack: [Inl] for safe and [Inr] for unsafe. *)
type interp_rty = state_id * (string,string) Util.union
-type stop_worker_sty = int
+type stop_worker_sty = string
type stop_worker_rty = unit
diff --git a/ide/session.ml b/ide/session.ml
index f42feae0c..254c53cd6 100644
--- a/ide/session.ml
+++ b/ide/session.ml
@@ -26,7 +26,7 @@ class type control =
end
type errpage = (int * string) list page
-type jobpage = string Int.Map.t page
+type jobpage = string CString.Map.t page
type session = {
buffer : GText.buffer;
@@ -303,10 +303,10 @@ let create_errpage (script : Wg_ScriptView.script_view) : errpage =
let create_jobpage coqtop coqops : jobpage =
let table, access =
make_table_widget
- [`Int,"Worker",true; `String,"Job name",true]
+ [`String,"Worker",true; `String,"Job name",true]
(fun columns store tp vc ->
let row = store#get_iter tp in
- let w = store#get ~row ~column:(find_int_col "Worker" columns) in
+ let w = store#get ~row ~column:(find_string_col "Worker" columns) in
let info () = Minilib.log ("Coq busy, discarding query") in
Coq.try_grab coqtop (coqops#stop_worker w) info
) in
@@ -314,7 +314,7 @@ let create_jobpage coqtop coqops : jobpage =
let box = GPack.vbox ~homogeneous:false () in
let () = box#pack ~expand:true table#coerce in
let () = box#pack ~expand:false ~padding:2 tip#coerce in
- let last_update = ref Int.Map.empty in
+ let last_update = ref CString.Map.empty in
let callback = ref (fun _ -> ()) in
object (self)
inherit GObj.widget box#as_widget
@@ -324,9 +324,9 @@ let create_jobpage coqtop coqops : jobpage =
last_update := jobs;
access (fun _ store -> store#clear ());
!callback jobs;
- Int.Map.iter (fun id job -> access (fun columns store ->
+ CString.Map.iter (fun id job -> access (fun columns store ->
let line = store#append () in
- store#set line (find_int_col "Worker" columns) id;
+ store#set line (find_string_col "Worker" columns) id;
store#set line (find_string_col "Job name" columns) job))
jobs
end
diff --git a/ide/session.mli b/ide/session.mli
index 925b13567..90138a0c9 100644
--- a/ide/session.mli
+++ b/ide/session.mli
@@ -22,7 +22,7 @@ class type control =
end
type errpage = (int * string) list page
-type jobpage = string Int.Map.t page
+type jobpage = string CString.Map.t page
type session = {
buffer : GText.buffer;
diff --git a/ide/xmlprotocol.ml b/ide/xmlprotocol.ml
index 09626172d..3c312d374 100644
--- a/ide/xmlprotocol.ml
+++ b/ide/xmlprotocol.ml
@@ -434,7 +434,7 @@ let quit_sty_t : quit_sty val_t = unit_t
let about_sty_t : about_sty val_t = unit_t
let init_sty_t : init_sty val_t = option_t string_t
let interp_sty_t : interp_sty val_t = pair_t (pair_t bool_t bool_t) string_t
-let stop_worker_sty_t : stop_worker_sty val_t = int_t
+let stop_worker_sty_t : stop_worker_sty val_t = string_t
let add_rty_t : add_rty val_t =
pair_t state_id_t (pair_t (union_t unit_t state_id_t) string_t)
diff --git a/lib/control.ml b/lib/control.ml
index 18176b843..cce9d3a9f 100644
--- a/lib/control.ml
+++ b/lib/control.ml
@@ -14,7 +14,7 @@ let steps = ref 0
let are_we_threading = lazy (
match !Flags.async_proofs_mode with
- | Flags.APonParallel _ -> true
+ | Flags.APon -> true
| _ -> false)
let check_for_interrupt () =
diff --git a/lib/feedback.ml b/lib/feedback.ml
index 27ed44724..e01844a48 100644
--- a/lib/feedback.ml
+++ b/lib/feedback.ml
@@ -65,7 +65,7 @@ type feedback_content =
| GlobDef of Loc.t * string * string * string
| ErrorMsg of Loc.t * string
| InProgress of int
- | SlaveStatus of int * string
+ | SlaveStatus of string * string
| ProcessingInMaster
| Goals of Loc.t * string
| FileLoaded of string * string
@@ -90,7 +90,7 @@ let to_feedback_content = do_match "feedback_content" (fun s a -> match s,a with
| "errormsg", [loc; s] -> ErrorMsg (to_loc loc, to_string s)
| "inprogress", [n] -> InProgress (to_int n)
| "slavestatus", [ns] ->
- let n, s = to_pair to_int to_string ns in
+ let n, s = to_pair to_string to_string ns in
SlaveStatus(n,s)
| "goals", [loc;s] -> Goals (to_loc loc, to_string s)
| "fileloaded", [dirpath; filename] ->
@@ -121,7 +121,7 @@ let of_feedback_content = function
| InProgress n -> constructor "feedback_content" "inprogress" [of_int n]
| SlaveStatus(n,s) ->
constructor "feedback_content" "slavestatus"
- [of_pair of_int of_string (n,s)]
+ [of_pair of_string of_string (n,s)]
| Goals (loc,s) ->
constructor "feedback_content" "goals" [of_loc loc;of_string s]
| FileLoaded(dirpath, filename) ->
diff --git a/lib/feedback.mli b/lib/feedback.mli
index b3c0c8896..7e7b57625 100644
--- a/lib/feedback.mli
+++ b/lib/feedback.mli
@@ -40,7 +40,7 @@ type feedback_content =
| GlobDef of Loc.t * string * string * string
| ErrorMsg of Loc.t * string
| InProgress of int
- | SlaveStatus of int * string
+ | SlaveStatus of string * string
| ProcessingInMaster
| Goals of Loc.t * string
| FileLoaded of string * string
diff --git a/lib/flags.ml b/lib/flags.ml
index 92824c9f5..4b323f611 100644
--- a/lib/flags.ml
+++ b/lib/flags.ml
@@ -48,18 +48,19 @@ let batch_mode = ref false
type compilation_mode = BuildVo | BuildVi | Vi2Vo
let compilation_mode = ref BuildVo
-type async_proofs = APoff | APonLazy | APonParallel of int
+type async_proofs = APoff | APonLazy | APon
let async_proofs_mode = ref APoff
let async_proofs_n_workers = ref 1
let async_proofs_private_flags = ref None
let async_proofs_always_delegate = ref false
let async_proofs_never_reopen_branch = ref false
let async_proofs_flags_for_workers = ref []
+let async_proofs_worker_id = ref "master"
let async_proofs_is_worker () =
- match !async_proofs_mode with
- | APonParallel n -> n > 0
- | _ -> false
+ !async_proofs_worker_id <> "master"
+let async_proofs_is_master () =
+ !async_proofs_mode = APon && !async_proofs_worker_id = "master"
let debug = ref false
diff --git a/lib/flags.mli b/lib/flags.mli
index 7f2b749b7..db1d8bcc4 100644
--- a/lib/flags.mli
+++ b/lib/flags.mli
@@ -15,14 +15,16 @@ val batch_mode : bool ref
type compilation_mode = BuildVo | BuildVi | Vi2Vo
val compilation_mode : compilation_mode ref
-type async_proofs = APoff | APonLazy | APonParallel of int
+type async_proofs = APoff | APonLazy | APon
val async_proofs_mode : async_proofs ref
val async_proofs_n_workers : int ref
val async_proofs_private_flags : string option ref
val async_proofs_is_worker : unit -> bool
+val async_proofs_is_master : unit -> bool
val async_proofs_always_delegate : bool ref
val async_proofs_never_reopen_branch : bool ref
val async_proofs_flags_for_workers : string list ref
+val async_proofs_worker_id : string ref
val debug : bool ref
diff --git a/lib/remoteCounter.ml b/lib/remoteCounter.ml
index bec46901f..72887b21a 100644
--- a/lib/remoteCounter.ml
+++ b/lib/remoteCounter.ml
@@ -24,18 +24,14 @@ let new_counter ~name a ~incr ~build =
(* - slaves must use a remote counter getter, not this one! *)
(* - in the main process there is a race condition between slave
managers (that are threads) and the main thread, hence the mutex *)
- (match !Flags.async_proofs_mode with
- | Flags.APonParallel n when n > 0 ->
- Errors.anomaly(Pp.str"Slave processes must install remote counters");
- | _ -> ());
+ if Flags.async_proofs_is_worker () then
+ Errors.anomaly(Pp.str"Slave processes must install remote counters");
Mutex.lock m; let x = f () in Mutex.unlock m;
build x in
let getter = ref(mk_thsafe_getter (fun () -> !data := incr !!data; !!data)) in
let installer f =
- (match !Flags.async_proofs_mode with
- | Flags.APoff | Flags.APonLazy | Flags.APonParallel 0 ->
- Errors.anomaly(Pp.str"Only slave processes can install a remote counter")
- | _ -> ());
+ if not (Flags.async_proofs_is_worker ()) then
+ Errors.anomaly(Pp.str"Only slave processes can install a remote counter");
getter := f in
(fun () -> !getter ()), installer
diff --git a/lib/system.ml b/lib/system.ml
index 4188eb2b4..59260fbf6 100644
--- a/lib/system.ml
+++ b/lib/system.ml
@@ -295,3 +295,10 @@ let with_time time f x =
let msg2 = if time then "" else " (failure)" in
msg_info (str msg ++ fmt_time_difference tstart tend ++ str msg2);
raise e
+
+let process_id () =
+ if Flags.async_proofs_is_worker () then !Flags.async_proofs_worker_id
+ else if Flags.async_proofs_is_master () then
+ Printf.sprintf "master:%d" (Thread.id (Thread.self ()))
+ else "master"
+
diff --git a/lib/system.mli b/lib/system.mli
index 9cfb1456f..0569c7889 100644
--- a/lib/system.mli
+++ b/lib/system.mli
@@ -69,3 +69,6 @@ val time_difference : time -> time -> float (** in seconds *)
val fmt_time_difference : time -> time -> Pp.std_ppcmds
val with_time : bool -> ('a -> 'b) -> 'a -> 'b
+
+(** {6 Name of current process.} *)
+val process_id : unit -> string
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
new file mode 100644
index 000000000..65219724e
--- /dev/null
+++ b/stm/asyncTaskQueue.ml
@@ -0,0 +1,296 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2014 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+open Errors
+open Pp
+open Util
+
+let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr
+
+let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
+
+module type Task = sig
+
+ type task
+
+ (* Marshallable *)
+ type request
+ type response
+
+ val name : string (* UID of the task kind, for -toploop *)
+ val extra_env : unit -> string array
+
+ (* run by the master, on a thread *)
+ val request_of_task : task -> request option
+ val use_response : task -> response -> [ `Die | `Stay | `StayReset ]
+ val on_marshal_error : string -> task -> unit
+ val on_slave_death : task option -> [ `Exit of int | `Stay ]
+ val on_task_cancellation_or_expiration : task option -> unit
+ val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit
+
+ (* run by the worker *)
+ val perform : request -> response
+
+ (* debugging *)
+ val name_of_task : task -> string
+ val name_of_request : request -> string
+
+end
+
+type cancel_switch = bool ref
+
+module Make(T : Task) = struct
+
+ module Worker = Spawn.Sync(struct
+ let add_timeout ~sec f =
+ ignore(Thread.create (fun () ->
+ while true do
+ Unix.sleep sec;
+ if not (f ()) then Thread.exit ()
+ done)
+ ())
+ end)
+
+ module WorkersPool = WorkerPool.Make(Worker)
+
+ let queue : (T.task * cancel_switch) TQueue.t = TQueue.create ()
+
+ let enqueue_task t c =
+ prerr_endline ("Enqueue task "^T.name_of_task t);
+ TQueue.push queue (t,c)
+
+ let cancel_worker = WorkersPool.cancel
+
+ type request = Request of T.request
+
+ let name_of_request (Request r) = T.name_of_request r
+
+ type response =
+ | Response of T.response
+ | RespFeedback of Feedback.feedback
+ | RespGetCounterNewUnivLevel
+
+ type more_data =
+ | MoreDataUnivLevel of Univ.universe_level list
+
+ let request_cswitch_of_task (t, c) = T.request_of_task t, c
+
+ let slave_respond msg = match msg with Request r -> Response (T.perform r)
+
+ exception MarshalError of string
+
+ let marshal_to_channel oc data =
+ Marshal.to_channel oc data [];
+ flush oc
+
+ let marshal_err s = raise (MarshalError s)
+
+ let marshal_request oc (req : request) =
+ try marshal_to_channel oc req
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("marshal_request: "^s)
+
+ let unmarshal_request ic =
+ try (Marshal.from_channel ic : request)
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("unmarshal_request: "^s)
+
+ let marshal_response oc (res : response) =
+ try marshal_to_channel oc res
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("marshal_response: "^s)
+
+ let unmarshal_response ic =
+ try (CThread.thread_friendly_input_value ic : response)
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("unmarshal_response: "^s)
+
+ let marshal_more_data oc (res : more_data) =
+ try marshal_to_channel oc res
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("marshal_more_data: "^s)
+
+ let unmarshal_more_data ic =
+ try (Marshal.from_channel ic : more_data)
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("unmarshal_more_data: "^s)
+
+ let reorder_tasks cmp = TQueue.reorder queue (fun (t1,_) (t2,_) -> cmp t1 t2)
+
+ let join () =
+ if not (WorkersPool.is_empty ()) then
+ TQueue.wait_until_n_are_waiting_and_queue_empty
+ (WorkersPool.n_workers ()) queue
+
+ exception KillRespawn
+ exception Die
+ exception Expired
+
+ let report_status ?(id = !Flags.async_proofs_worker_id) s =
+ Pp.feedback ~state_id:Stateid.initial (Feedback.SlaveStatus(id, s))
+
+ let rec manage_slave ~cancel:cancel_user_req id respawn =
+ let ic, oc, proc =
+ let rec set_slave_opt = function
+ | [] -> !Flags.async_proofs_flags_for_workers @
+ ["-toploop"; T.name^"top";
+ "-worker-id"; id]
+ | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl
+ | ("-async-proofs" |"-toploop" |"-vi2vo" |"-compile"
+ |"-compile-verbose")::_::tl -> set_slave_opt tl
+ | x::tl -> x :: set_slave_opt tl in
+ let args =
+ Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
+ let env = Array.append (T.extra_env ()) (Unix.environment ()) in
+ respawn ~args ~env () in
+ let last_task = ref None in
+ let task_expired = ref false in
+ let task_cancelled = ref false in
+ CThread.prepare_in_channel_for_thread_friendly_io ic;
+ try
+ while true do
+ prerr_endline "waiting for a task";
+ report_status ~id "Idle";
+ let task, cancel_switch = TQueue.pop queue in
+ prerr_endline ("got task: "^T.name_of_task task);
+ last_task := Some task;
+ try
+ let req = T.request_of_task task in
+ if req = None then raise Expired;
+ marshal_request oc (Request (Option.get req));
+ Worker.kill_if proc ~sec:1 (fun () ->
+ task_expired := !cancel_switch;
+ task_cancelled := !cancel_user_req;
+ if !cancel_user_req then cancel_user_req := false;
+ !task_expired || !task_cancelled);
+ let rec loop () =
+ let response = unmarshal_response ic in
+ match response with
+ | Response resp ->
+ (match T.use_response task resp with
+ | `Die -> raise Die
+ | `Stay -> last_task := None; ()
+ | `StayReset -> last_task := None; raise KillRespawn)
+ | RespGetCounterNewUnivLevel ->
+ marshal_more_data oc (MoreDataUnivLevel
+ (CList.init 10 (fun _ ->
+ Universes.new_univ_level (Global.current_dirpath ()))));
+ loop ()
+ | RespFeedback ({ Feedback.id = Feedback.State state_id } as fbk) ->
+ T.forward_feedback state_id fbk.Feedback.content;
+ loop ()
+ | RespFeedback _ -> Errors.anomaly (str"Parsing in master process")
+ in
+ loop ()
+ with
+ | Expired -> prerr_endline ("Task expired: " ^ T.name_of_task task)
+ | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e ->
+ raise e (* we pass the exception to the external handler *)
+ | MarshalError s -> T.on_marshal_error s task
+ | e ->
+ pr_err ("Uncaught exception in worker manager: "^
+ string_of_ppcmds (print e));
+ flush_all ()
+ done
+ with
+ | KillRespawn ->
+ Worker.kill proc; ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | Die -> Worker.kill proc; ignore(Worker.wait proc)
+ | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
+ T.on_task_cancellation_or_expiration !last_task;
+ ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
+ msg_warning(strbrk "The worker was cancelled.");
+ T.on_task_cancellation_or_expiration !last_task;
+ Worker.kill proc; ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | Sys_error _ | Invalid_argument _ | End_of_file ->
+ match T.on_slave_death !last_task with
+ | `Stay ->
+ msg_warning(strbrk "The worker process died badly.");
+ Worker.kill proc; ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | `Exit exit_code ->
+ Worker.kill proc;
+ let exit_status proc = match Worker.wait proc with
+ | Unix.WEXITED 0x400 -> "exit code unavailable"
+ | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
+ | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
+ | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in
+ pr_err ("Fatal worker error: " ^ (exit_status proc));
+ flush_all (); exit exit_code
+
+ let slave_ic = ref stdin
+ let slave_oc = ref stdout
+
+ let slave_init_stdout () =
+ let ic, oc = Spawned.get_channels () in
+ slave_oc := oc; slave_ic := ic
+
+ let bufferize f =
+ let l = ref [] in
+ fun () ->
+ match !l with
+ | [] -> let data = f () in l := List.tl data; List.hd data
+ | x::tl -> l := tl; x
+
+ let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc
+
+ let slave_main_loop reset =
+ let feedback_queue = ref [] in
+ let slave_feeder oc fb =
+ match fb.Feedback.content with
+ | Feedback.Goals _ -> feedback_queue := fb :: !feedback_queue
+ | _ -> Marshal.to_channel oc (RespFeedback fb) []; flush oc in
+ let flush_feeder oc =
+ List.iter (fun fb -> Marshal.to_channel oc (RespFeedback fb) [])
+ !feedback_queue;
+ feedback_queue := [];
+ flush oc in
+ Pp.set_feeder (slave_feeder !slave_oc);
+ Pp.log_via_feedback ();
+ Universes.set_remote_new_univ_level (bufferize (fun () ->
+ marshal_response !slave_oc RespGetCounterNewUnivLevel;
+ match unmarshal_more_data !slave_ic with
+ | MoreDataUnivLevel l -> l));
+ let working = ref false in
+ slave_handshake ();
+ while true do
+ try
+ working := false;
+ let request = unmarshal_request !slave_ic in
+ working := true;
+ report_status (name_of_request request);
+ let response = slave_respond request in
+ flush_feeder !slave_oc;
+ report_status "Idle";
+ marshal_response !slave_oc response;
+ reset ()
+ with
+ | MarshalError s ->
+ pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 2
+ | End_of_file ->
+ prerr_endline "connection lost"; flush_all (); exit 2
+ | e ->
+ pr_err ("Slave: critical exception: " ^ Pp.string_of_ppcmds (print e));
+ flush_all (); exit 1
+ done
+
+ let dump () =
+ assert(WorkersPool.is_empty ()); (* ATM, we allow that only if no slaves *)
+ List.map fst (TQueue.dump queue)
+
+ let init n =
+ WorkersPool.init n manage_slave
+ (fun n -> Printf.sprintf "%s:%d" T.name n)
+
+ let n_workers = WorkersPool.n_workers
+
+end
diff --git a/stm/asyncTaskQueue.mli b/stm/asyncTaskQueue.mli
new file mode 100644
index 000000000..e01479d30
--- /dev/null
+++ b/stm/asyncTaskQueue.mli
@@ -0,0 +1,61 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2014 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+module type Task = sig
+
+ type task
+
+ (* Marshallable *)
+ type request
+ type response
+
+ val name : string (* UID of the task kind, for -toploop *)
+ val extra_env : unit -> string array
+
+ (* run by the master, on a thread *)
+ val request_of_task : task -> request option
+ val use_response : task -> response -> [ `Die | `Stay | `StayReset ]
+ val on_marshal_error : string -> task -> unit
+ val on_slave_death : task option -> [ `Exit of int | `Stay ]
+ val on_task_cancellation_or_expiration : task option -> unit
+ val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit
+
+ (* run by the worker *)
+ val perform : request -> response
+
+ (* debugging *)
+ val name_of_task : task -> string
+ val name_of_request : request -> string
+
+end
+
+type cancel_switch = bool ref
+
+module Make(T : Task) : sig
+
+ (* Number of workers, 0 = lazy local *)
+ val init : int -> unit
+
+ val n_workers : unit -> int
+
+ val enqueue_task : T.task -> cancel_switch -> unit
+
+ (* blocking function that waits for the task queue to be empty *)
+ val join : unit -> unit
+
+ (* slave process main loop *)
+ val slave_main_loop : (unit -> unit) -> unit
+ val slave_init_stdout : unit -> unit
+
+ val cancel_worker : string -> unit
+
+ val reorder_tasks : (T.task -> T.task -> int) -> unit
+
+ val dump : unit -> T.task list
+
+end
diff --git a/stm/stm.ml b/stm/stm.ml
index a342bb6f1..0827e0bfa 100644
--- a/stm/stm.ml
+++ b/stm/stm.ml
@@ -6,13 +6,7 @@
(* * GNU Lesser General Public License Version 2.1 *)
(************************************************************************)
-let process_id () =
- match !Flags.async_proofs_mode with
- | Flags.APoff | Flags.APonLazy | Flags.APonParallel 0 ->
- "master" ^ string_of_int (Thread.id (Thread.self ()))
- | Flags.APonParallel n -> "worker" ^ string_of_int n
-
-let pr_err s = Printf.eprintf "%s] %s\n" (process_id ()) s; flush stderr
+let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr
let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
@@ -252,7 +246,8 @@ end = struct
open Printf
let print_dag vcs () =
- let fname = "stm_" ^ process_id () in
+ let fname =
+ "stm_" ^ Str.global_replace (Str.regexp " ") "_" (System.process_id ()) in
let string_of_transaction = function
| Cmd (t, _) | Fork (t, _,_,_) ->
(try string_of_ppcmds (pr_ast t) with _ -> "ERR")
@@ -650,18 +645,6 @@ let get_hint_bp_time proof_name =
try float_of_string (Aux_file.get !hints Loc.ghost proof_name)
with Not_found -> 1.0
-module Worker = Spawn.Sync(struct
- let add_timeout ~sec f =
- ignore(Thread.create (fun () ->
- while true do
- Unix.sleep sec;
- if not (f ()) then Thread.exit ()
- done)
- ())
-end)
-
-module WorkersPool = WorkerPool.Make(Worker)
-
let record_pb_time proof_name loc time =
let proof_build_time = Printf.sprintf "%.3f" time in
Aux_file.record_in_aux_at loc "proof_build_time" proof_build_time;
@@ -669,6 +652,170 @@ let record_pb_time proof_name loc time =
Aux_file.record_in_aux_at Loc.ghost proof_name proof_build_time;
hints := Aux_file.set !hints Loc.ghost proof_name proof_build_time
end
+
+type 'a a_request = {
+ r_exn_info : Stateid.t * Stateid.t;
+ r_stop : Stateid.t;
+ r_document : VCS.vcs;
+ r_loc : Loc.t;
+ r_uuid : 'a;
+ r_name : string }
+
+exception RemoteException of std_ppcmds
+let _ = Errors.register_handler (function
+ | RemoteException ppcmd -> ppcmd
+ | _ -> raise Unhandled)
+
+module Task = struct
+
+ let reach_known_state = ref (fun ?redefine_qed ~cache id -> ())
+ let set_reach_known_state f = reach_known_state := f
+
+ let forward_feedback = forward_feedback
+
+ type task = {
+ t_exn_info : Stateid.t * Stateid.t;
+ t_start : Stateid.t;
+ t_stop : Stateid.t;
+ t_assign : Proof_global.closed_proof_output Future.assignement -> unit;
+ t_loc : Loc.t;
+ t_uuid : Future.UUID.t;
+ t_name : string }
+
+ type request = Future.UUID.t a_request
+
+ type error = {
+ e_error_at : Stateid.t;
+ e_safe_id : Stateid.t;
+ e_msg : std_ppcmds;
+ e_safe_states : (Stateid.t * State.frozen_state) list }
+
+ type response =
+ | RespBuiltProof of Proof_global.closed_proof_output * float
+ | RespError of error
+
+ let name = "stmworker"
+ let extra_env () = !async_proofs_workers_extra_env
+
+ let name_of_task t = t.t_name
+ let name_of_request r = r.r_name
+
+ let request_of_task { t_exn_info; t_start; t_stop; t_loc; t_uuid; t_name } =
+ try Some {
+ r_exn_info = t_exn_info;
+ r_stop = t_stop;
+ r_document = VCS.slice ~start:t_stop ~stop:t_start;
+ r_loc = t_loc;
+ r_uuid = t_uuid;
+ r_name = t_name }
+ with VCS.Expired -> None
+
+ let use_response { t_assign; t_loc; t_name } = function
+ | RespBuiltProof (pl, time) ->
+ Pp.feedback (Feedback.InProgress ~-1);
+ t_assign (`Val pl);
+ record_pb_time t_name t_loc time;
+ (* We restart the slave, to avoid memory leaks. We could just
+ Pp.feedback (Feedback.InProgress ~-1) *)
+ `StayReset
+ | RespError { e_error_at; e_safe_id = valid; e_msg; e_safe_states } ->
+ Pp.feedback (Feedback.InProgress ~-1);
+ let e = Stateid.add ~valid (RemoteException e_msg) e_error_at in
+ t_assign (`Exn e);
+ List.iter (fun (id,s) -> State.assign id s) e_safe_states;
+ (* We restart the slave, to avoid memory leaks. We could just
+ Pp.feedback (Feedback.InProgress ~-1) *)
+ `StayReset
+
+ let on_task_cancellation_or_expiration = function
+ | None -> ()
+ | Some { t_start = start; t_assign } ->
+ let s = "Worker cancelled by the user" in
+ let e = Stateid.add ~valid:start (RemoteException (strbrk s)) start in
+ t_assign (`Exn e);
+ Pp.feedback ~state_id:start (Feedback.ErrorMsg (Loc.ghost, s));
+ Pp.feedback (Feedback.InProgress ~-1)
+
+ let build_proof_here_core loc eop () =
+ let wall_clock1 = Unix.gettimeofday () in
+ if !Flags.batch_mode then !reach_known_state ~cache:`No eop
+ else !reach_known_state ~cache:`Shallow eop;
+ let wall_clock2 = Unix.gettimeofday () in
+ Aux_file.record_in_aux_at loc "proof_build_time"
+ (Printf.sprintf "%.3f" (wall_clock2 -. wall_clock1));
+ Proof_global.return_proof ()
+ let build_proof_here (id,valid) loc eop =
+ Future.create (State.exn_on id ~valid) (build_proof_here_core loc eop)
+ let perform { r_exn_info; r_stop = eop; r_document = vcs; r_loc } =
+ try
+ VCS.restore vcs;
+ VCS.print ();
+ let rc, time =
+ let wall_clock = Unix.gettimeofday () in
+ let l = Future.force (build_proof_here r_exn_info r_loc eop) in
+ List.iter (fun (_,se) -> Declareops.iter_side_effects (function
+ | Declarations.SEsubproof(_,
+ { Declarations.const_body = Declarations.OpaqueDef f;
+ const_universes = univs } ) ->
+ Opaqueproof.join_opaque f
+ | _ -> ())
+ se) (fst l);
+ l, Unix.gettimeofday () -. wall_clock in
+ VCS.print ();
+ RespBuiltProof(rc,time)
+ with
+ |e when Errors.noncritical e ->
+ (* This can happen if the proof is broken. The error has also been
+ * signalled as a feedback, hence we can silently recover *)
+ let e_error_at, e_safe_id = match Stateid.get e with
+ | Some (safe, err) -> err, safe
+ | None -> Stateid.dummy, Stateid.dummy in
+ prerr_endline "failed with the following exception:";
+ prerr_endline (string_of_ppcmds (print e));
+ prerr_endline ("last safe id is: " ^ Stateid.to_string e_safe_id);
+ prerr_endline ("cached? " ^ string_of_bool (State.is_cached e_safe_id));
+ let prog old_v n =
+ if n < 3 then n else old_v + n/3 + if n mod 3 > 0 then 1 else 0 in
+ let e_safe_states =
+ let open State in
+ let rec aux n m prev_id =
+ let next =
+ try Some (VCS.visit prev_id).next
+ with VCS.Expired -> None in
+ match next with
+ | None -> []
+ | Some id when n = m ->
+ prerr_endline ("sending back state " ^ string_of_int m);
+ let tail = aux (n+1) (prog m (n+1)) id in
+ if is_cached id then (id, get_cached id) :: tail else tail
+ | Some id -> aux (n+1) m id in
+ (if is_cached e_safe_id then [e_safe_id,get_cached e_safe_id] else [])
+ @ aux 1 (prog 1 1) e_safe_id in
+ RespError { e_error_at; e_safe_id; e_msg = print e; e_safe_states }
+
+ let on_slave_death task =
+ if not !fallback_to_lazy_if_slave_dies then `Exit 1
+ else match task with
+ | None -> `Stay
+ | Some { t_exn_info; t_loc; t_stop; t_assign } ->
+ msg_warning(strbrk "Falling back to local, lazy, evaluation.");
+ t_assign (`Comp(build_proof_here t_exn_info t_loc t_stop));
+ Pp.feedback (Feedback.InProgress ~-1);
+ `Stay
+
+ let on_marshal_error s { t_exn_info; t_stop; t_assign; t_loc } =
+ if !fallback_to_lazy_if_marshal_error then begin
+ msg_warning(strbrk("Marshalling error: "^s^". "^
+ "The system state could not be sent to the worker process. "^
+ "Falling back to local, lazy, evaluation."));
+ t_assign(`Comp(build_proof_here t_exn_info t_loc t_stop));
+ Pp.feedback (Feedback.InProgress ~-1)
+ end else begin
+ pr_err ("Fatal marshal error: " ^ s);
+ flush_all (); exit 1
+ end
+
+end
(* Slave processes (if initialized, otherwise local lazy evaluation) *)
module Slaves : sig
@@ -688,10 +835,6 @@ module Slaves : sig
val slave_main_loop : (unit -> unit) -> unit
val slave_init_stdout : unit -> unit
- (* to disentangle modules *)
- val set_reach_known_state :
- (?redefine_qed:bool -> cache:Summary.marshallable -> Stateid.t -> unit) -> unit
-
type tasks
val dump : (Future.UUID.t * int) list -> tasks
val check_task : string -> tasks -> int -> bool
@@ -701,129 +844,58 @@ module Slaves : sig
Library.seg_univ -> Library.seg_discharge -> Library.seg_proofs ->
tasks -> int -> Library.seg_univ
- val cancel_worker : int -> unit
+ val cancel_worker : string -> unit
val set_perspective : Stateid.t list -> unit
end = struct
-
- let cancel_worker n = WorkersPool.cancel (n-1)
-
- let reach_known_state = ref (fun ?redefine_qed ~cache id -> ())
- let set_reach_known_state f = reach_known_state := f
-
- type 'a request =
- ReqBuildProof of
- (Stateid.t * Stateid.t) * Stateid.t * VCS.vcs * Loc.t * 'a * string
-
- let name_of_request (ReqBuildProof (_,_,_,_,_,s)) = s
-
- type response =
- | RespBuiltProof of Proof_global.closed_proof_output * float
- | RespError of (* err, safe, msg, safe_states *)
- Stateid.t * Stateid.t * std_ppcmds *
- (Stateid.t * State.frozen_state) list
- | RespFeedback of Feedback.feedback
- | RespGetCounterNewUnivLevel
-
- type more_data =
- | MoreDataUnivLevel of Univ.universe_level list
-
- type task =
- | TaskBuildProof of (Stateid.t * Stateid.t) * Stateid.t * Stateid.t *
- (Proof_global.closed_proof_output Future.assignement -> unit) * cancel_switch
- * Loc.t * Future.UUID.t * string
-
- let pr_task = function
- | TaskBuildProof(_,bop,eop,_,_,_,_,s) ->
- "TaskBuilProof("^Stateid.to_string bop^","^Stateid.to_string eop^
- ","^s^")"
-
- let request_of_task task : Future.UUID.t request =
- match task with
- | TaskBuildProof (exn_info,bop,eop,_,_,loc,uuid,s) ->
- ReqBuildProof(exn_info,eop,VCS.slice ~start:eop ~stop:bop,loc,uuid,s)
-
- let cancel_switch_of_task = function
- | TaskBuildProof (_,_,_,_,c,_,_,_) -> c
-
- let build_proof_here_core loc eop () =
- let wall_clock1 = Unix.gettimeofday () in
- if !Flags.batch_mode then !reach_known_state ~cache:`No eop
- else !reach_known_state ~cache:`Shallow eop;
- let wall_clock2 = Unix.gettimeofday () in
- Aux_file.record_in_aux_at loc "proof_build_time"
- (Printf.sprintf "%.3f" (wall_clock2 -. wall_clock1));
- Proof_global.return_proof ()
- let build_proof_here (id,valid) loc eop =
- Future.create (State.exn_on id ~valid) (build_proof_here_core loc eop)
-
- let slave_respond msg =
- match msg with
- | ReqBuildProof(exn_info,eop,vcs,loc,_,_) ->
- VCS.restore vcs;
- VCS.print ();
- let rc, time =
- let wall_clock = Unix.gettimeofday () in
- let l = Future.force (build_proof_here exn_info loc eop) in
- List.iter (fun (_,se) -> Declareops.iter_side_effects (function
- | Declarations.SEsubproof(_,
- { Declarations.const_body = Declarations.OpaqueDef f;
- const_universes = univs } ) ->
- Opaqueproof.join_opaque f
- | _ -> ())
- se) (fst l);
- l, Unix.gettimeofday () -. wall_clock in
- VCS.print ();
- RespBuiltProof(rc,time)
+ module TaskQueue = AsyncTaskQueue.Make(Task)
let check_task_aux extra name l i =
- match List.nth l i with
- | ReqBuildProof ((id,valid),eop,vcs,loc,_,s) ->
- Pp.msg_info(
- str(Printf.sprintf "Checking task %d (%s%s) of %s" i s extra name));
- VCS.restore vcs;
- try
- !reach_known_state ~cache:`No eop;
- (* The original terminator, a hook, has not been saved in the .vi*)
- Proof_global.set_terminator
- (Lemmas.standard_proof_terminator []
- (Lemmas.mk_hook (fun _ _ -> ())));
- let proof = Proof_global.close_proof (fun x -> x) in
- vernac_interp eop ~proof
- { verbose = false; loc;
- expr = (VernacEndProof (Proved (true,None))) };
- Some proof
- with e ->
- let e = Errors.push e in
- (try match Stateid.get e with
- | None ->
+ let { r_stop; r_document; r_loc; r_name } = List.nth l i in
+ Pp.msg_info(
+ str(Printf.sprintf "Checking task %d (%s%s) of %s" i r_name extra name));
+ VCS.restore r_document;
+ try
+ !Task.reach_known_state ~cache:`No r_stop;
+ (* The original terminator, a hook, has not been saved in the .vi*)
+ Proof_global.set_terminator
+ (Lemmas.standard_proof_terminator []
+ (Lemmas.mk_hook (fun _ _ -> ())));
+ let proof = Proof_global.close_proof (fun x -> x) in
+ vernac_interp r_stop ~proof
+ { verbose = false; loc = r_loc;
+ expr = (VernacEndProof (Proved (true,None))) };
+ Some proof
+ with e ->
+ let e = Errors.push e in
+ (try match Stateid.get e with
+ | None ->
+ Pp.pperrnl Pp.(
+ str"File " ++ str name ++ str ": proof of " ++ str r_name ++
+ spc () ++ print e)
+ | Some (_, cur) ->
+ match VCS.visit cur with
+ | { step = `Cmd ( { loc }, _) }
+ | { step = `Fork ( { loc }, _, _, _) }
+ | { step = `Qed ( { qast = { loc } }, _) }
+ | { step = `Sideff (`Ast ( { loc }, _)) } ->
+ let start, stop = Loc.unloc loc in
+ Pp.pperrnl Pp.(
+ str"File " ++ str name ++ str ": proof of " ++ str r_name ++
+ str ": chars " ++ int start ++ str "-" ++ int stop ++
+ spc () ++ print e)
+ | _ ->
Pp.pperrnl Pp.(
- str"File " ++ str name ++ str ": proof of " ++ str s ++
+ str"File " ++ str name ++ str ": proof of " ++ str r_name ++
spc () ++ print e)
- | Some (_, cur) ->
- match VCS.visit cur with
- | { step = `Cmd ( { loc }, _) }
- | { step = `Fork ( { loc }, _, _, _) }
- | { step = `Qed ( { qast = { loc } }, _) }
- | { step = `Sideff (`Ast ( { loc }, _)) } ->
- let start, stop = Loc.unloc loc in
- Pp.pperrnl Pp.(
- str"File " ++ str name ++ str ": proof of " ++ str s ++
- str ": chars " ++ int start ++ str "-" ++ int stop ++
- spc () ++ print e)
- | _ ->
- Pp.pperrnl Pp.(
- str"File " ++ str name ++ str ": proof of " ++ str s ++
- spc () ++ print e)
- with e ->
- Pp.msg_error (str"unable to print error message: " ++
- str (Printexc.to_string e))); None
+ with e ->
+ Pp.msg_error (str"unable to print error message: " ++
+ str (Printexc.to_string e))); None
let finish_task name (u,cst,_) d p l i =
- let bucket =
- match List.nth l i with ReqBuildProof (_,_,_,_,bucket,_) -> bucket in
+ let bucket = (List.nth l i).r_uuid in
match check_task_aux (Printf.sprintf ", bucket %d" bucket) name l i with
| None -> exit 1
| Some (po,pt) ->
@@ -854,54 +926,14 @@ end = struct
| None -> false
let info_tasks l =
- CList.map_i (fun i (ReqBuildProof(_,_,_,loc,_,s)) ->
+ CList.map_i (fun i { r_loc; r_name } ->
let time1 =
- try float_of_string (Aux_file.get !hints loc "proof_build_time")
+ try float_of_string (Aux_file.get !hints r_loc "proof_build_time")
with Not_found -> 0.0 in
let time2 =
- try float_of_string (Aux_file.get !hints loc "proof_check_time")
+ try float_of_string (Aux_file.get !hints r_loc "proof_check_time")
with Not_found -> 0.0 in
- s,max (time1 +. time2) 0.0001,i) 0 l
-
- exception MarshalError of string
-
- let marshal_to_channel oc data =
- Marshal.to_channel oc data [];
- flush oc
-
- let marshal_err s = raise (MarshalError s)
-
- let marshal_request oc (req : Future.UUID.t request) =
- try marshal_to_channel oc req
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("marshal_request: "^s)
-
- let unmarshal_request ic =
- try (Marshal.from_channel ic : Future.UUID.t request)
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("unmarshal_request: "^s)
-
- let marshal_response oc (res : response) =
- try marshal_to_channel oc res
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("marshal_response: "^s)
-
- let unmarshal_response ic =
- try (CThread.thread_friendly_input_value ic : response)
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("unmarshal_response: "^s)
-
- let marshal_more_data oc (res : more_data) =
- try marshal_to_channel oc res
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("marshal_more_data: "^s)
-
- let unmarshal_more_data ic =
- try (Marshal.from_channel ic : more_data)
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("unmarshal_more_data: "^s)
-
- let queue : task TQueue.t = TQueue.create ()
+ r_name, max (time1 +. time2) 0.0001,i) 0 l
let set_perspective idl =
let open Stateid in
@@ -914,275 +946,44 @@ end = struct
| true, false -> -1
| false, true -> 1)
- let wait_all_done () =
- if not (WorkersPool.is_empty ()) then
- TQueue.wait_until_n_are_waiting_and_queue_empty
- (WorkersPool.n_workers ()) queue
-
- let build_proof ~loc ~exn_info:(id,valid as exn_info) ~start ~stop ~name =
+ let build_proof ~loc ~exn_info:(id,valid as t_exn_info) ~start ~stop ~name =
let cancel_switch = ref false in
- if WorkersPool.is_empty () then
+ if TaskQueue.n_workers () = 0 then
if !Flags.compilation_mode = Flags.BuildVi then begin
let force () : Proof_global.closed_proof_output Future.assignement =
- try `Val (build_proof_here_core loc stop ())
+ try `Val (Task.build_proof_here_core loc stop ())
with e -> let e = Errors.push e in `Exn e in
let f,assign = Future.create_delegate ~force (State.exn_on id ~valid) in
- let uuid = Future.uuid f in
- TQueue.push queue (TaskBuildProof
- (exn_info,start,stop,assign,cancel_switch,loc,uuid,name));
+ let t_uuid = Future.uuid f in
+ TaskQueue.enqueue_task {
+ Task.t_exn_info; t_start = start; t_stop = stop; t_assign = assign;
+ t_loc = loc; t_uuid; t_name = name } cancel_switch;
f, cancel_switch
end else
- build_proof_here exn_info loc stop, cancel_switch
+ Task.build_proof_here t_exn_info loc stop, cancel_switch
else
- let f, assign = Future.create_delegate (State.exn_on id ~valid) in
- let uuid = Future.uuid f in
+ let f, t_assign = Future.create_delegate (State.exn_on id ~valid) in
+ let t_uuid = Future.uuid f in
Pp.feedback (Feedback.InProgress 1);
- TQueue.push queue (TaskBuildProof
- (exn_info,start,stop,assign,cancel_switch,loc,uuid,name));
+ TaskQueue.enqueue_task {
+ Task.t_exn_info; t_start = start; t_stop = stop; t_assign; t_loc = loc;
+ t_uuid; t_name = name } cancel_switch;
f, cancel_switch
- exception RemoteException of std_ppcmds
- let _ = Errors.register_handler (function
- | RemoteException ppcmd -> ppcmd
- | _ -> raise Unhandled)
+ let init () = TaskQueue.init !Flags.async_proofs_n_workers
+ let slave_main_loop = TaskQueue.slave_main_loop
+ let slave_init_stdout = TaskQueue.slave_init_stdout
+ let wait_all_done = TaskQueue.join
- exception KillRespawn
-
- let report_status ?id s =
- let id =
- match id with
- | Some n -> n
- | None ->
- match !Flags.async_proofs_mode with
- | Flags.APonParallel n -> n
- | _ -> assert false in
- Pp.feedback ~state_id:Stateid.initial (Feedback.SlaveStatus(id, s))
-
- let rec manage_slave ~cancel:cancel_user_req id_slave respawn =
- let ic, oc, proc =
- let rec set_slave_opt = function
- | [] -> !Flags.async_proofs_flags_for_workers @
- ["-async-proofs"; "worker"; string_of_int id_slave]
- | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl
- | ("-async-proofs" |"-toploop" |"-vi2vo" |"-compile"
- |"-compile-verbose")::_::tl -> set_slave_opt tl
- | x::tl -> x :: set_slave_opt tl in
- let args =
- Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
- let env =
- Array.append !async_proofs_workers_extra_env (Unix.environment ()) in
- respawn ~args ~env () in
- let last_task = ref None in
- let task_expired = ref false in
- let task_cancelled = ref false in
- CThread.prepare_in_channel_for_thread_friendly_io ic;
- try
- while true do
- prerr_endline "waiting for a task";
- report_status ~id:id_slave "Idle";
- let task = TQueue.pop queue in
- prerr_endline ("got task: "^pr_task task);
- last_task := Some task;
- try
- marshal_request oc (request_of_task task);
- let cancel_switch = cancel_switch_of_task task in
- Worker.kill_if proc ~sec:1 (fun () ->
- task_expired := !cancel_switch;
- task_cancelled := !cancel_user_req;
- if !cancel_user_req then cancel_user_req := false;
- !task_expired || !task_cancelled);
- let rec loop () =
- let response = unmarshal_response ic in
- match task, response with
- | TaskBuildProof(_,_,_, assign,_,loc,_,s),
- RespBuiltProof(pl, time)->
- assign (`Val pl);
- (* We restart the slave, to avoid memory leaks. We could just
- Pp.feedback (Feedback.InProgress ~-1) *)
- record_pb_time s loc time;
- last_task := None;
- raise KillRespawn
- | TaskBuildProof(_,_,_, assign,_,_,_,_),
- RespError(err_id,valid,e,valid_states) ->
- let e = Stateid.add ~valid (RemoteException e) err_id in
- assign (`Exn e);
- List.iter (fun (id,s) -> State.assign id s) valid_states;
- (* We restart the slave, to avoid memory leaks. We could just
- Pp.feedback (Feedback.InProgress ~-1) *)
- last_task := None;
- raise KillRespawn
- (* marshal_more_data oc (MoreDataLocalUniv *)
- (* (CList.init 10 (fun _ -> Universes.fresh_local_univ ()))); *)
- (* loop () *)
- | _, RespGetCounterNewUnivLevel ->
- marshal_more_data oc (MoreDataUnivLevel
- (CList.init 10 (fun _ -> Universes.new_univ_level (Global.current_dirpath ()))));
- loop ()
- | _, RespFeedback {Feedback.id = Feedback.State state_id; content = msg} ->
- forward_feedback state_id msg;
- loop ()
- | _, RespFeedback _ -> assert false (* Parsing in master process *)
- in
- loop ()
- with
- | VCS.Expired -> (* task cancelled: e.g. the user did backtrack *)
- Pp.feedback (Feedback.InProgress ~-1);
- prerr_endline ("Task expired: " ^ pr_task task)
- | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e ->
- raise e (* we pass the exception to the external handler *)
- | MarshalError s when !fallback_to_lazy_if_marshal_error ->
- msg_warning(strbrk("Marshalling error: "^s^". "^
- "The system state could not be sent to the worker process. "^
- "Falling back to local, lazy, evaluation."));
- let TaskBuildProof (exn_info, _, stop, assign,_,loc,_,_) = task in
- assign(`Comp(build_proof_here exn_info loc stop));
- Pp.feedback (Feedback.InProgress ~-1)
- | MarshalError s ->
- pr_err ("Fatal marshal error: " ^ s);
- flush_all (); exit 1
- | e ->
- pr_err ("Uncaught exception in worker manager: "^
- string_of_ppcmds (print e));
- flush_all ()
- done
- with
- | KillRespawn ->
- Pp.feedback (Feedback.InProgress ~-1);
- Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id_slave respawn
- | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
- Pp.feedback (Feedback.InProgress ~-1);
- ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id_slave respawn
- | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
- msg_warning(strbrk "The worker was cancelled.");
- Option.iter (fun task ->
- let TaskBuildProof (_, start, _, assign, _,_,_,_) = task in
- let s = "Worker cancelled by the user" in
- let e = Stateid.add ~valid:start (RemoteException (strbrk s)) start in
- assign (`Exn e);
- Pp.feedback ~state_id:start (Feedback.ErrorMsg (Loc.ghost, s));
- Pp.feedback (Feedback.InProgress ~-1);
- ) !last_task;
- Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id_slave respawn
- | Sys_error _ | Invalid_argument _ | End_of_file
- when !fallback_to_lazy_if_slave_dies ->
- msg_warning(strbrk "The worker process died badly.");
- Option.iter (fun task ->
- msg_warning(strbrk "Falling back to local, lazy, evaluation.");
- let TaskBuildProof (exn_info, _, stop, assign,_,loc,_,_) = task in
- assign(`Comp(build_proof_here exn_info loc stop));
- Pp.feedback (Feedback.InProgress ~-1);
- ) !last_task;
- Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id_slave respawn
- | Sys_error _ | Invalid_argument _ | End_of_file ->
- Worker.kill proc;
- let exit_status proc = match Worker.wait proc with
- | Unix.WEXITED 0x400 -> "exit code unavailable"
- | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
- | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
- | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in
- pr_err ("Fatal worker error: " ^ (exit_status proc));
- flush_all (); exit 1
-
- let init () = WorkersPool.init !Flags.async_proofs_n_workers manage_slave
-
- let slave_ic = ref stdin
- let slave_oc = ref stdout
-
- let slave_init_stdout () =
- let ic, oc = Spawned.get_channels () in
- slave_oc := oc; slave_ic := ic
-
- let bufferize f =
- let l = ref [] in
- fun () ->
- match !l with
- | [] -> let data = f () in l := List.tl data; List.hd data
- | x::tl -> l := tl; x
-
- let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc
-
- let slave_main_loop reset =
- let feedback_queue = ref [] in
- let slave_feeder oc fb =
- match fb.Feedback.content with
- | Feedback.Goals _ -> feedback_queue := fb :: !feedback_queue
- | _ -> Marshal.to_channel oc (RespFeedback fb) []; flush oc in
- let flush_feeder oc =
- List.iter (fun fb -> Marshal.to_channel oc (RespFeedback fb) [])
- !feedback_queue;
- feedback_queue := [];
- flush oc in
- Pp.set_feeder (slave_feeder !slave_oc);
- Pp.log_via_feedback ();
- Universes.set_remote_new_univ_level (bufferize (fun () ->
- marshal_response !slave_oc RespGetCounterNewUnivLevel;
- match unmarshal_more_data !slave_ic with
- | MoreDataUnivLevel l -> l));
- let working = ref false in
- slave_handshake ();
- while true do
- try
- working := false;
- let request = unmarshal_request !slave_ic in
- working := true;
- report_status (name_of_request request);
- let response = slave_respond request in
- flush_feeder !slave_oc;
- report_status "Idle";
- marshal_response !slave_oc response;
- reset ()
- with
- | MarshalError s ->
- pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 2
- | End_of_file ->
- prerr_endline "connection lost"; flush_all (); exit 2
- | e when Errors.noncritical e ->
- (* This can happen if the proof is broken. The error has also been
- * signalled as a feedback, hence we can silently recover *)
- let err_id, safe_id = match Stateid.get e with
- | Some (safe, err) -> err, safe
- | None -> Stateid.dummy, Stateid.dummy in
- prerr_endline "failed with the following exception:";
- prerr_endline (string_of_ppcmds (print e));
- prerr_endline ("last safe id is: " ^ Stateid.to_string safe_id);
- prerr_endline ("cached? " ^ string_of_bool (State.is_cached safe_id));
- let prog old_v n =
- if n < 3 then n else old_v + n/3 + if n mod 3 > 0 then 1 else 0 in
- let states =
- let open State in
- let rec aux n m prev_id =
- let next =
- try Some (VCS.visit prev_id).next
- with VCS.Expired -> None in
- match next with
- | None -> []
- | Some id when n = m ->
- prerr_endline ("sending back state " ^ string_of_int m);
- let tail = aux (n+1) (prog m (n+1)) id in
- if is_cached id then (id, get_cached id) :: tail else tail
- | Some id -> aux (n+1) m id in
- (if is_cached safe_id then [safe_id,get_cached safe_id] else [])
- @ aux 1 (prog 1 1) safe_id in
- flush_feeder !slave_oc;
- marshal_response !slave_oc (RespError(err_id, safe_id, print e, states))
- | e ->
- pr_err ("Slave: critical exception: " ^ Pp.string_of_ppcmds (print e));
- flush_all (); exit 1
- done
+ let cancel_worker = TaskQueue.cancel_worker
(* For external users this name is nicer than request *)
- type tasks = int request list
+ type tasks = int a_request list
let dump f2t_map =
- assert(WorkersPool.is_empty ()); (* ATM, we allow that only if no slaves *)
- let tasks = TQueue.dump queue in
- prerr_endline (Printf.sprintf "dumping %d\n" (List.length tasks));
- let tasks = List.map request_of_task tasks in
- List.map (function ReqBuildProof(a,b,c,d,x,e) ->
- ReqBuildProof(a,b,c,d,List.assoc x f2t_map,e)) tasks
+ let tasks = TaskQueue.dump () in
+ prerr_endline (Printf.sprintf "dumping %d tasks\n" (List.length tasks));
+ List.map (function r -> { r with r_uuid = List.assoc r.r_uuid f2t_map })
+ (CList.map_filter Task.request_of_task tasks)
end
@@ -1198,7 +999,7 @@ let pstate = ["meta counter"; "evar counter"; "program-tcc-table"]
let delegate_policy_check time =
if interactive () = `Yes then
- (!Flags.async_proofs_mode = Flags.APonParallel 0 ||
+ (Flags.async_proofs_is_master () ||
!Flags.async_proofs_mode = Flags.APonLazy) &&
(time >= 1.0 || !Flags.async_proofs_always_delegate)
else if !Flags.compilation_mode = Flags.BuildVi then true
@@ -1394,7 +1195,7 @@ let known_state ?(redefine_qed=false) ~cache id =
inject_non_pstate s
), cache
in
- if !Flags.async_proofs_mode = Flags.APonParallel 0 then
+ if Flags.async_proofs_is_master () then
Pp.feedback ~state_id:id Feedback.ProcessingInMaster;
State.define ~cache:cache_step ~redefine:redefine_qed step id;
if !Flags.feedback_goals then print_goals_of_state id;
@@ -1403,7 +1204,7 @@ let known_state ?(redefine_qed=false) ~cache id =
end
-let _ = Slaves.set_reach_known_state Reach.known_state
+let _ = Task.set_reach_known_state Reach.known_state
(* The backtrack module simulates the classic behavior of a linear document *)
module Backtrack : sig
@@ -1540,7 +1341,8 @@ let init () =
set_undo_classifier Backtrack.undo_vernac_classifier;
State.define ~cache:`Yes (fun () -> ()) Stateid.initial;
Backtrack.record ();
- if !Flags.async_proofs_mode = Flags.APonParallel 0 then begin
+ if Flags.async_proofs_is_master () then begin
+ prerr_endline "Initialising workers";
Slaves.init ();
let opts = match !Flags.async_proofs_private_flags with
| None -> []
diff --git a/stm/stm.mli b/stm/stm.mli
index 3bc458f57..b6450e6ac 100644
--- a/stm/stm.mli
+++ b/stm/stm.mli
@@ -42,7 +42,7 @@ val finish : unit -> unit
val observe : Stateid.t -> unit
-val stop_worker : int -> unit
+val stop_worker : string -> unit
(* Joins the entire document. Implies finish, but also checks proofs *)
val join : unit -> unit
diff --git a/stm/stm.mllib b/stm/stm.mllib
index 347416099..308b2ac4c 100644
--- a/stm/stm.mllib
+++ b/stm/stm.mllib
@@ -5,5 +5,6 @@ TQueue
WorkerPool
Vernac_classifier
Lemmas
+AsyncTaskQueue
Stm
Vi_checking
diff --git a/stm/workerPool.ml b/stm/workerPool.ml
index fcae4f20d..593240ad4 100644
--- a/stm/workerPool.ml
+++ b/stm/workerPool.ml
@@ -13,7 +13,7 @@ module Make(Worker : sig
process * in_channel * out_channel
end) = struct
-type worker_id = int
+type worker_id = string
type spawn =
args:string array -> env:string array -> unit ->
in_channel * out_channel * Worker.process
@@ -32,11 +32,11 @@ let master_handshake worker_id ic oc =
Marshal.to_channel oc magic_no []; flush oc;
let n = (Marshal.from_channel ic : int) in
if n <> magic_no then begin
- Printf.eprintf "Handshake with %d failed: protocol mismatch\n" worker_id;
+ Printf.eprintf "Handshake with %s failed: protocol mismatch\n" worker_id;
exit 1;
end
with e when Errors.noncritical e ->
- Printf.eprintf "Handshake with %d failed: %s\n"
+ Printf.eprintf "Handshake with %s failed: %s\n"
worker_id (Printexc.to_string e);
exit 1
@@ -45,18 +45,21 @@ let respawn n ~args ~env () =
master_handshake n ic oc;
ic, oc, proc
-let init ~size:n ~manager:manage_slave =
+let init ~size:n ~manager:manage_slave mk_name =
slave_managers := Some
(Array.init n (fun x ->
+ let name = mk_name x in
let cancel = ref false in
- cancel, Thread.create (manage_slave ~cancel (x+1)) (respawn (x+1))))
+ name, cancel, Thread.create (manage_slave ~cancel name) (respawn name)))
let cancel n =
match !slave_managers with
| None -> ()
| Some a ->
- let switch, _ = a.(n) in
- switch := true
+ for i = 0 to Array.length a - 1 do
+ let name, switch, _ = a.(i) in
+ if n = name then switch := true
+ done
let worker_handshake slave_ic slave_oc =
try
diff --git a/stm/workerPool.mli b/stm/workerPool.mli
index d7a546929..d55b35c28 100644
--- a/stm/workerPool.mli
+++ b/stm/workerPool.mli
@@ -13,13 +13,14 @@ module Make(Worker : sig
process * in_channel * out_channel
end) : sig
-type worker_id = int
+type worker_id = string
type spawn =
args:string array -> env:string array -> unit ->
in_channel * out_channel * Worker.process
val init :
- size:int -> manager:(cancel:bool ref -> worker_id -> spawn -> unit) -> unit
+ size:int -> manager:(cancel:bool ref -> worker_id -> spawn -> unit) ->
+ (int -> worker_id) -> unit
val is_empty : unit -> bool
val n_workers : unit -> int
val cancel : worker_id -> unit
diff --git a/toplevel/coqtop.ml b/toplevel/coqtop.ml
index 2b763b390..8eab1d7bf 100644
--- a/toplevel/coqtop.ml
+++ b/toplevel/coqtop.ml
@@ -224,15 +224,15 @@ let no_compat_ntn = ref false
let print_where = ref false
let print_config = ref false
-let get_async_proofs_mode opt next = function
+let get_async_proofs_mode opt = function
| "off" -> Flags.APoff
- | "on" -> Flags.APonParallel 0
- | "worker" ->
- let n = int_of_string (next()) in assert (n > 0);
- toploop := Some "stmworkertop";
- Flags.APonParallel n
+ | "on" -> Flags.APon
| "lazy" -> Flags.APonLazy
- | _ -> prerr_endline ("Error: on/off/lazy/worker expected after "^opt); exit 1
+ | _ -> prerr_endline ("Error: on/off/lazy expected after "^opt); exit 1
+
+let set_worker_id opt s =
+ assert (s <> "master");
+ Flags.async_proofs_worker_id := s
let get_bool opt = function
| "yes" -> true
@@ -350,11 +350,12 @@ let parse_args arglist =
(* Options with one arg *)
|"-coqlib" -> Flags.coqlib_spec:=true; Flags.coqlib:=(next ())
|"-async-proofs" ->
- Flags.async_proofs_mode := get_async_proofs_mode opt next (next())
+ Flags.async_proofs_mode := get_async_proofs_mode opt (next())
|"-async-proofs-j" ->
Flags.async_proofs_n_workers := (get_int opt (next ()))
|"-async-proofs-private-flags" ->
Flags.async_proofs_private_flags := Some (next ());
+ |"-worker-id" -> set_worker_id opt (next ())
|"-compat" -> Flags.compat_version := get_compat_version (next ())
|"-compile" -> add_compile false (next ())
|"-compile-verbose" -> add_compile true (next ())