aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm
diff options
context:
space:
mode:
Diffstat (limited to 'stm')
-rw-r--r--stm/asyncTaskQueue.ml296
-rw-r--r--stm/asyncTaskQueue.mli61
-rw-r--r--stm/stm.ml678
-rw-r--r--stm/stm.mli2
-rw-r--r--stm/stm.mllib1
-rw-r--r--stm/workerPool.ml17
-rw-r--r--stm/workerPool.mli5
7 files changed, 612 insertions, 448 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
new file mode 100644
index 000000000..65219724e
--- /dev/null
+++ b/stm/asyncTaskQueue.ml
@@ -0,0 +1,296 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2014 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+open Errors
+open Pp
+open Util
+
+let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr
+
+let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
+
+module type Task = sig
+
+ type task
+
+ (* Marshallable *)
+ type request
+ type response
+
+ val name : string (* UID of the task kind, for -toploop *)
+ val extra_env : unit -> string array
+
+ (* run by the master, on a thread *)
+ val request_of_task : task -> request option
+ val use_response : task -> response -> [ `Die | `Stay | `StayReset ]
+ val on_marshal_error : string -> task -> unit
+ val on_slave_death : task option -> [ `Exit of int | `Stay ]
+ val on_task_cancellation_or_expiration : task option -> unit
+ val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit
+
+ (* run by the worker *)
+ val perform : request -> response
+
+ (* debugging *)
+ val name_of_task : task -> string
+ val name_of_request : request -> string
+
+end
+
+type cancel_switch = bool ref
+
+module Make(T : Task) = struct
+
+ module Worker = Spawn.Sync(struct
+ let add_timeout ~sec f =
+ ignore(Thread.create (fun () ->
+ while true do
+ Unix.sleep sec;
+ if not (f ()) then Thread.exit ()
+ done)
+ ())
+ end)
+
+ module WorkersPool = WorkerPool.Make(Worker)
+
+ let queue : (T.task * cancel_switch) TQueue.t = TQueue.create ()
+
+ let enqueue_task t c =
+ prerr_endline ("Enqueue task "^T.name_of_task t);
+ TQueue.push queue (t,c)
+
+ let cancel_worker = WorkersPool.cancel
+
+ type request = Request of T.request
+
+ let name_of_request (Request r) = T.name_of_request r
+
+ type response =
+ | Response of T.response
+ | RespFeedback of Feedback.feedback
+ | RespGetCounterNewUnivLevel
+
+ type more_data =
+ | MoreDataUnivLevel of Univ.universe_level list
+
+ let request_cswitch_of_task (t, c) = T.request_of_task t, c
+
+ let slave_respond msg = match msg with Request r -> Response (T.perform r)
+
+ exception MarshalError of string
+
+ let marshal_to_channel oc data =
+ Marshal.to_channel oc data [];
+ flush oc
+
+ let marshal_err s = raise (MarshalError s)
+
+ let marshal_request oc (req : request) =
+ try marshal_to_channel oc req
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("marshal_request: "^s)
+
+ let unmarshal_request ic =
+ try (Marshal.from_channel ic : request)
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("unmarshal_request: "^s)
+
+ let marshal_response oc (res : response) =
+ try marshal_to_channel oc res
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("marshal_response: "^s)
+
+ let unmarshal_response ic =
+ try (CThread.thread_friendly_input_value ic : response)
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("unmarshal_response: "^s)
+
+ let marshal_more_data oc (res : more_data) =
+ try marshal_to_channel oc res
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("marshal_more_data: "^s)
+
+ let unmarshal_more_data ic =
+ try (Marshal.from_channel ic : more_data)
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("unmarshal_more_data: "^s)
+
+ let reorder_tasks cmp = TQueue.reorder queue (fun (t1,_) (t2,_) -> cmp t1 t2)
+
+ let join () =
+ if not (WorkersPool.is_empty ()) then
+ TQueue.wait_until_n_are_waiting_and_queue_empty
+ (WorkersPool.n_workers ()) queue
+
+ exception KillRespawn
+ exception Die
+ exception Expired
+
+ let report_status ?(id = !Flags.async_proofs_worker_id) s =
+ Pp.feedback ~state_id:Stateid.initial (Feedback.SlaveStatus(id, s))
+
+ let rec manage_slave ~cancel:cancel_user_req id respawn =
+ let ic, oc, proc =
+ let rec set_slave_opt = function
+ | [] -> !Flags.async_proofs_flags_for_workers @
+ ["-toploop"; T.name^"top";
+ "-worker-id"; id]
+ | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl
+ | ("-async-proofs" |"-toploop" |"-vi2vo" |"-compile"
+ |"-compile-verbose")::_::tl -> set_slave_opt tl
+ | x::tl -> x :: set_slave_opt tl in
+ let args =
+ Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
+ let env = Array.append (T.extra_env ()) (Unix.environment ()) in
+ respawn ~args ~env () in
+ let last_task = ref None in
+ let task_expired = ref false in
+ let task_cancelled = ref false in
+ CThread.prepare_in_channel_for_thread_friendly_io ic;
+ try
+ while true do
+ prerr_endline "waiting for a task";
+ report_status ~id "Idle";
+ let task, cancel_switch = TQueue.pop queue in
+ prerr_endline ("got task: "^T.name_of_task task);
+ last_task := Some task;
+ try
+ let req = T.request_of_task task in
+ if req = None then raise Expired;
+ marshal_request oc (Request (Option.get req));
+ Worker.kill_if proc ~sec:1 (fun () ->
+ task_expired := !cancel_switch;
+ task_cancelled := !cancel_user_req;
+ if !cancel_user_req then cancel_user_req := false;
+ !task_expired || !task_cancelled);
+ let rec loop () =
+ let response = unmarshal_response ic in
+ match response with
+ | Response resp ->
+ (match T.use_response task resp with
+ | `Die -> raise Die
+ | `Stay -> last_task := None; ()
+ | `StayReset -> last_task := None; raise KillRespawn)
+ | RespGetCounterNewUnivLevel ->
+ marshal_more_data oc (MoreDataUnivLevel
+ (CList.init 10 (fun _ ->
+ Universes.new_univ_level (Global.current_dirpath ()))));
+ loop ()
+ | RespFeedback ({ Feedback.id = Feedback.State state_id } as fbk) ->
+ T.forward_feedback state_id fbk.Feedback.content;
+ loop ()
+ | RespFeedback _ -> Errors.anomaly (str"Parsing in master process")
+ in
+ loop ()
+ with
+ | Expired -> prerr_endline ("Task expired: " ^ T.name_of_task task)
+ | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e ->
+ raise e (* we pass the exception to the external handler *)
+ | MarshalError s -> T.on_marshal_error s task
+ | e ->
+ pr_err ("Uncaught exception in worker manager: "^
+ string_of_ppcmds (print e));
+ flush_all ()
+ done
+ with
+ | KillRespawn ->
+ Worker.kill proc; ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | Die -> Worker.kill proc; ignore(Worker.wait proc)
+ | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
+ T.on_task_cancellation_or_expiration !last_task;
+ ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
+ msg_warning(strbrk "The worker was cancelled.");
+ T.on_task_cancellation_or_expiration !last_task;
+ Worker.kill proc; ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | Sys_error _ | Invalid_argument _ | End_of_file ->
+ match T.on_slave_death !last_task with
+ | `Stay ->
+ msg_warning(strbrk "The worker process died badly.");
+ Worker.kill proc; ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | `Exit exit_code ->
+ Worker.kill proc;
+ let exit_status proc = match Worker.wait proc with
+ | Unix.WEXITED 0x400 -> "exit code unavailable"
+ | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
+ | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
+ | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in
+ pr_err ("Fatal worker error: " ^ (exit_status proc));
+ flush_all (); exit exit_code
+
+ let slave_ic = ref stdin
+ let slave_oc = ref stdout
+
+ let slave_init_stdout () =
+ let ic, oc = Spawned.get_channels () in
+ slave_oc := oc; slave_ic := ic
+
+ let bufferize f =
+ let l = ref [] in
+ fun () ->
+ match !l with
+ | [] -> let data = f () in l := List.tl data; List.hd data
+ | x::tl -> l := tl; x
+
+ let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc
+
+ let slave_main_loop reset =
+ let feedback_queue = ref [] in
+ let slave_feeder oc fb =
+ match fb.Feedback.content with
+ | Feedback.Goals _ -> feedback_queue := fb :: !feedback_queue
+ | _ -> Marshal.to_channel oc (RespFeedback fb) []; flush oc in
+ let flush_feeder oc =
+ List.iter (fun fb -> Marshal.to_channel oc (RespFeedback fb) [])
+ !feedback_queue;
+ feedback_queue := [];
+ flush oc in
+ Pp.set_feeder (slave_feeder !slave_oc);
+ Pp.log_via_feedback ();
+ Universes.set_remote_new_univ_level (bufferize (fun () ->
+ marshal_response !slave_oc RespGetCounterNewUnivLevel;
+ match unmarshal_more_data !slave_ic with
+ | MoreDataUnivLevel l -> l));
+ let working = ref false in
+ slave_handshake ();
+ while true do
+ try
+ working := false;
+ let request = unmarshal_request !slave_ic in
+ working := true;
+ report_status (name_of_request request);
+ let response = slave_respond request in
+ flush_feeder !slave_oc;
+ report_status "Idle";
+ marshal_response !slave_oc response;
+ reset ()
+ with
+ | MarshalError s ->
+ pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 2
+ | End_of_file ->
+ prerr_endline "connection lost"; flush_all (); exit 2
+ | e ->
+ pr_err ("Slave: critical exception: " ^ Pp.string_of_ppcmds (print e));
+ flush_all (); exit 1
+ done
+
+ let dump () =
+ assert(WorkersPool.is_empty ()); (* ATM, we allow that only if no slaves *)
+ List.map fst (TQueue.dump queue)
+
+ let init n =
+ WorkersPool.init n manage_slave
+ (fun n -> Printf.sprintf "%s:%d" T.name n)
+
+ let n_workers = WorkersPool.n_workers
+
+end
diff --git a/stm/asyncTaskQueue.mli b/stm/asyncTaskQueue.mli
new file mode 100644
index 000000000..e01479d30
--- /dev/null
+++ b/stm/asyncTaskQueue.mli
@@ -0,0 +1,61 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2014 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+module type Task = sig
+
+ type task
+
+ (* Marshallable *)
+ type request
+ type response
+
+ val name : string (* UID of the task kind, for -toploop *)
+ val extra_env : unit -> string array
+
+ (* run by the master, on a thread *)
+ val request_of_task : task -> request option
+ val use_response : task -> response -> [ `Die | `Stay | `StayReset ]
+ val on_marshal_error : string -> task -> unit
+ val on_slave_death : task option -> [ `Exit of int | `Stay ]
+ val on_task_cancellation_or_expiration : task option -> unit
+ val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit
+
+ (* run by the worker *)
+ val perform : request -> response
+
+ (* debugging *)
+ val name_of_task : task -> string
+ val name_of_request : request -> string
+
+end
+
+type cancel_switch = bool ref
+
+module Make(T : Task) : sig
+
+ (* Number of workers, 0 = lazy local *)
+ val init : int -> unit
+
+ val n_workers : unit -> int
+
+ val enqueue_task : T.task -> cancel_switch -> unit
+
+ (* blocking function that waits for the task queue to be empty *)
+ val join : unit -> unit
+
+ (* slave process main loop *)
+ val slave_main_loop : (unit -> unit) -> unit
+ val slave_init_stdout : unit -> unit
+
+ val cancel_worker : string -> unit
+
+ val reorder_tasks : (T.task -> T.task -> int) -> unit
+
+ val dump : unit -> T.task list
+
+end
diff --git a/stm/stm.ml b/stm/stm.ml
index a342bb6f1..0827e0bfa 100644
--- a/stm/stm.ml
+++ b/stm/stm.ml
@@ -6,13 +6,7 @@
(* * GNU Lesser General Public License Version 2.1 *)
(************************************************************************)
-let process_id () =
- match !Flags.async_proofs_mode with
- | Flags.APoff | Flags.APonLazy | Flags.APonParallel 0 ->
- "master" ^ string_of_int (Thread.id (Thread.self ()))
- | Flags.APonParallel n -> "worker" ^ string_of_int n
-
-let pr_err s = Printf.eprintf "%s] %s\n" (process_id ()) s; flush stderr
+let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr
let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
@@ -252,7 +246,8 @@ end = struct
open Printf
let print_dag vcs () =
- let fname = "stm_" ^ process_id () in
+ let fname =
+ "stm_" ^ Str.global_replace (Str.regexp " ") "_" (System.process_id ()) in
let string_of_transaction = function
| Cmd (t, _) | Fork (t, _,_,_) ->
(try string_of_ppcmds (pr_ast t) with _ -> "ERR")
@@ -650,18 +645,6 @@ let get_hint_bp_time proof_name =
try float_of_string (Aux_file.get !hints Loc.ghost proof_name)
with Not_found -> 1.0
-module Worker = Spawn.Sync(struct
- let add_timeout ~sec f =
- ignore(Thread.create (fun () ->
- while true do
- Unix.sleep sec;
- if not (f ()) then Thread.exit ()
- done)
- ())
-end)
-
-module WorkersPool = WorkerPool.Make(Worker)
-
let record_pb_time proof_name loc time =
let proof_build_time = Printf.sprintf "%.3f" time in
Aux_file.record_in_aux_at loc "proof_build_time" proof_build_time;
@@ -669,6 +652,170 @@ let record_pb_time proof_name loc time =
Aux_file.record_in_aux_at Loc.ghost proof_name proof_build_time;
hints := Aux_file.set !hints Loc.ghost proof_name proof_build_time
end
+
+type 'a a_request = {
+ r_exn_info : Stateid.t * Stateid.t;
+ r_stop : Stateid.t;
+ r_document : VCS.vcs;
+ r_loc : Loc.t;
+ r_uuid : 'a;
+ r_name : string }
+
+exception RemoteException of std_ppcmds
+let _ = Errors.register_handler (function
+ | RemoteException ppcmd -> ppcmd
+ | _ -> raise Unhandled)
+
+module Task = struct
+
+ let reach_known_state = ref (fun ?redefine_qed ~cache id -> ())
+ let set_reach_known_state f = reach_known_state := f
+
+ let forward_feedback = forward_feedback
+
+ type task = {
+ t_exn_info : Stateid.t * Stateid.t;
+ t_start : Stateid.t;
+ t_stop : Stateid.t;
+ t_assign : Proof_global.closed_proof_output Future.assignement -> unit;
+ t_loc : Loc.t;
+ t_uuid : Future.UUID.t;
+ t_name : string }
+
+ type request = Future.UUID.t a_request
+
+ type error = {
+ e_error_at : Stateid.t;
+ e_safe_id : Stateid.t;
+ e_msg : std_ppcmds;
+ e_safe_states : (Stateid.t * State.frozen_state) list }
+
+ type response =
+ | RespBuiltProof of Proof_global.closed_proof_output * float
+ | RespError of error
+
+ let name = "stmworker"
+ let extra_env () = !async_proofs_workers_extra_env
+
+ let name_of_task t = t.t_name
+ let name_of_request r = r.r_name
+
+ let request_of_task { t_exn_info; t_start; t_stop; t_loc; t_uuid; t_name } =
+ try Some {
+ r_exn_info = t_exn_info;
+ r_stop = t_stop;
+ r_document = VCS.slice ~start:t_stop ~stop:t_start;
+ r_loc = t_loc;
+ r_uuid = t_uuid;
+ r_name = t_name }
+ with VCS.Expired -> None
+
+ let use_response { t_assign; t_loc; t_name } = function
+ | RespBuiltProof (pl, time) ->
+ Pp.feedback (Feedback.InProgress ~-1);
+ t_assign (`Val pl);
+ record_pb_time t_name t_loc time;
+ (* We restart the slave, to avoid memory leaks. We could just
+ Pp.feedback (Feedback.InProgress ~-1) *)
+ `StayReset
+ | RespError { e_error_at; e_safe_id = valid; e_msg; e_safe_states } ->
+ Pp.feedback (Feedback.InProgress ~-1);
+ let e = Stateid.add ~valid (RemoteException e_msg) e_error_at in
+ t_assign (`Exn e);
+ List.iter (fun (id,s) -> State.assign id s) e_safe_states;
+ (* We restart the slave, to avoid memory leaks. We could just
+ Pp.feedback (Feedback.InProgress ~-1) *)
+ `StayReset
+
+ let on_task_cancellation_or_expiration = function
+ | None -> ()
+ | Some { t_start = start; t_assign } ->
+ let s = "Worker cancelled by the user" in
+ let e = Stateid.add ~valid:start (RemoteException (strbrk s)) start in
+ t_assign (`Exn e);
+ Pp.feedback ~state_id:start (Feedback.ErrorMsg (Loc.ghost, s));
+ Pp.feedback (Feedback.InProgress ~-1)
+
+ let build_proof_here_core loc eop () =
+ let wall_clock1 = Unix.gettimeofday () in
+ if !Flags.batch_mode then !reach_known_state ~cache:`No eop
+ else !reach_known_state ~cache:`Shallow eop;
+ let wall_clock2 = Unix.gettimeofday () in
+ Aux_file.record_in_aux_at loc "proof_build_time"
+ (Printf.sprintf "%.3f" (wall_clock2 -. wall_clock1));
+ Proof_global.return_proof ()
+ let build_proof_here (id,valid) loc eop =
+ Future.create (State.exn_on id ~valid) (build_proof_here_core loc eop)
+ let perform { r_exn_info; r_stop = eop; r_document = vcs; r_loc } =
+ try
+ VCS.restore vcs;
+ VCS.print ();
+ let rc, time =
+ let wall_clock = Unix.gettimeofday () in
+ let l = Future.force (build_proof_here r_exn_info r_loc eop) in
+ List.iter (fun (_,se) -> Declareops.iter_side_effects (function
+ | Declarations.SEsubproof(_,
+ { Declarations.const_body = Declarations.OpaqueDef f;
+ const_universes = univs } ) ->
+ Opaqueproof.join_opaque f
+ | _ -> ())
+ se) (fst l);
+ l, Unix.gettimeofday () -. wall_clock in
+ VCS.print ();
+ RespBuiltProof(rc,time)
+ with
+ |e when Errors.noncritical e ->
+ (* This can happen if the proof is broken. The error has also been
+ * signalled as a feedback, hence we can silently recover *)
+ let e_error_at, e_safe_id = match Stateid.get e with
+ | Some (safe, err) -> err, safe
+ | None -> Stateid.dummy, Stateid.dummy in
+ prerr_endline "failed with the following exception:";
+ prerr_endline (string_of_ppcmds (print e));
+ prerr_endline ("last safe id is: " ^ Stateid.to_string e_safe_id);
+ prerr_endline ("cached? " ^ string_of_bool (State.is_cached e_safe_id));
+ let prog old_v n =
+ if n < 3 then n else old_v + n/3 + if n mod 3 > 0 then 1 else 0 in
+ let e_safe_states =
+ let open State in
+ let rec aux n m prev_id =
+ let next =
+ try Some (VCS.visit prev_id).next
+ with VCS.Expired -> None in
+ match next with
+ | None -> []
+ | Some id when n = m ->
+ prerr_endline ("sending back state " ^ string_of_int m);
+ let tail = aux (n+1) (prog m (n+1)) id in
+ if is_cached id then (id, get_cached id) :: tail else tail
+ | Some id -> aux (n+1) m id in
+ (if is_cached e_safe_id then [e_safe_id,get_cached e_safe_id] else [])
+ @ aux 1 (prog 1 1) e_safe_id in
+ RespError { e_error_at; e_safe_id; e_msg = print e; e_safe_states }
+
+ let on_slave_death task =
+ if not !fallback_to_lazy_if_slave_dies then `Exit 1
+ else match task with
+ | None -> `Stay
+ | Some { t_exn_info; t_loc; t_stop; t_assign } ->
+ msg_warning(strbrk "Falling back to local, lazy, evaluation.");
+ t_assign (`Comp(build_proof_here t_exn_info t_loc t_stop));
+ Pp.feedback (Feedback.InProgress ~-1);
+ `Stay
+
+ let on_marshal_error s { t_exn_info; t_stop; t_assign; t_loc } =
+ if !fallback_to_lazy_if_marshal_error then begin
+ msg_warning(strbrk("Marshalling error: "^s^". "^
+ "The system state could not be sent to the worker process. "^
+ "Falling back to local, lazy, evaluation."));
+ t_assign(`Comp(build_proof_here t_exn_info t_loc t_stop));
+ Pp.feedback (Feedback.InProgress ~-1)
+ end else begin
+ pr_err ("Fatal marshal error: " ^ s);
+ flush_all (); exit 1
+ end
+
+end
(* Slave processes (if initialized, otherwise local lazy evaluation) *)
module Slaves : sig
@@ -688,10 +835,6 @@ module Slaves : sig
val slave_main_loop : (unit -> unit) -> unit
val slave_init_stdout : unit -> unit
- (* to disentangle modules *)
- val set_reach_known_state :
- (?redefine_qed:bool -> cache:Summary.marshallable -> Stateid.t -> unit) -> unit
-
type tasks
val dump : (Future.UUID.t * int) list -> tasks
val check_task : string -> tasks -> int -> bool
@@ -701,129 +844,58 @@ module Slaves : sig
Library.seg_univ -> Library.seg_discharge -> Library.seg_proofs ->
tasks -> int -> Library.seg_univ
- val cancel_worker : int -> unit
+ val cancel_worker : string -> unit
val set_perspective : Stateid.t list -> unit
end = struct
-
- let cancel_worker n = WorkersPool.cancel (n-1)
-
- let reach_known_state = ref (fun ?redefine_qed ~cache id -> ())
- let set_reach_known_state f = reach_known_state := f
-
- type 'a request =
- ReqBuildProof of
- (Stateid.t * Stateid.t) * Stateid.t * VCS.vcs * Loc.t * 'a * string
-
- let name_of_request (ReqBuildProof (_,_,_,_,_,s)) = s
-
- type response =
- | RespBuiltProof of Proof_global.closed_proof_output * float
- | RespError of (* err, safe, msg, safe_states *)
- Stateid.t * Stateid.t * std_ppcmds *
- (Stateid.t * State.frozen_state) list
- | RespFeedback of Feedback.feedback
- | RespGetCounterNewUnivLevel
-
- type more_data =
- | MoreDataUnivLevel of Univ.universe_level list
-
- type task =
- | TaskBuildProof of (Stateid.t * Stateid.t) * Stateid.t * Stateid.t *
- (Proof_global.closed_proof_output Future.assignement -> unit) * cancel_switch
- * Loc.t * Future.UUID.t * string
-
- let pr_task = function
- | TaskBuildProof(_,bop,eop,_,_,_,_,s) ->
- "TaskBuilProof("^Stateid.to_string bop^","^Stateid.to_string eop^
- ","^s^")"
-
- let request_of_task task : Future.UUID.t request =
- match task with
- | TaskBuildProof (exn_info,bop,eop,_,_,loc,uuid,s) ->
- ReqBuildProof(exn_info,eop,VCS.slice ~start:eop ~stop:bop,loc,uuid,s)
-
- let cancel_switch_of_task = function
- | TaskBuildProof (_,_,_,_,c,_,_,_) -> c
-
- let build_proof_here_core loc eop () =
- let wall_clock1 = Unix.gettimeofday () in
- if !Flags.batch_mode then !reach_known_state ~cache:`No eop
- else !reach_known_state ~cache:`Shallow eop;
- let wall_clock2 = Unix.gettimeofday () in
- Aux_file.record_in_aux_at loc "proof_build_time"
- (Printf.sprintf "%.3f" (wall_clock2 -. wall_clock1));
- Proof_global.return_proof ()
- let build_proof_here (id,valid) loc eop =
- Future.create (State.exn_on id ~valid) (build_proof_here_core loc eop)
-
- let slave_respond msg =
- match msg with
- | ReqBuildProof(exn_info,eop,vcs,loc,_,_) ->
- VCS.restore vcs;
- VCS.print ();
- let rc, time =
- let wall_clock = Unix.gettimeofday () in
- let l = Future.force (build_proof_here exn_info loc eop) in
- List.iter (fun (_,se) -> Declareops.iter_side_effects (function
- | Declarations.SEsubproof(_,
- { Declarations.const_body = Declarations.OpaqueDef f;
- const_universes = univs } ) ->
- Opaqueproof.join_opaque f
- | _ -> ())
- se) (fst l);
- l, Unix.gettimeofday () -. wall_clock in
- VCS.print ();
- RespBuiltProof(rc,time)
+ module TaskQueue = AsyncTaskQueue.Make(Task)
let check_task_aux extra name l i =
- match List.nth l i with
- | ReqBuildProof ((id,valid),eop,vcs,loc,_,s) ->
- Pp.msg_info(
- str(Printf.sprintf "Checking task %d (%s%s) of %s" i s extra name));
- VCS.restore vcs;
- try
- !reach_known_state ~cache:`No eop;
- (* The original terminator, a hook, has not been saved in the .vi*)
- Proof_global.set_terminator
- (Lemmas.standard_proof_terminator []
- (Lemmas.mk_hook (fun _ _ -> ())));
- let proof = Proof_global.close_proof (fun x -> x) in
- vernac_interp eop ~proof
- { verbose = false; loc;
- expr = (VernacEndProof (Proved (true,None))) };
- Some proof
- with e ->
- let e = Errors.push e in
- (try match Stateid.get e with
- | None ->
+ let { r_stop; r_document; r_loc; r_name } = List.nth l i in
+ Pp.msg_info(
+ str(Printf.sprintf "Checking task %d (%s%s) of %s" i r_name extra name));
+ VCS.restore r_document;
+ try
+ !Task.reach_known_state ~cache:`No r_stop;
+ (* The original terminator, a hook, has not been saved in the .vi*)
+ Proof_global.set_terminator
+ (Lemmas.standard_proof_terminator []
+ (Lemmas.mk_hook (fun _ _ -> ())));
+ let proof = Proof_global.close_proof (fun x -> x) in
+ vernac_interp r_stop ~proof
+ { verbose = false; loc = r_loc;
+ expr = (VernacEndProof (Proved (true,None))) };
+ Some proof
+ with e ->
+ let e = Errors.push e in
+ (try match Stateid.get e with
+ | None ->
+ Pp.pperrnl Pp.(
+ str"File " ++ str name ++ str ": proof of " ++ str r_name ++
+ spc () ++ print e)
+ | Some (_, cur) ->
+ match VCS.visit cur with
+ | { step = `Cmd ( { loc }, _) }
+ | { step = `Fork ( { loc }, _, _, _) }
+ | { step = `Qed ( { qast = { loc } }, _) }
+ | { step = `Sideff (`Ast ( { loc }, _)) } ->
+ let start, stop = Loc.unloc loc in
+ Pp.pperrnl Pp.(
+ str"File " ++ str name ++ str ": proof of " ++ str r_name ++
+ str ": chars " ++ int start ++ str "-" ++ int stop ++
+ spc () ++ print e)
+ | _ ->
Pp.pperrnl Pp.(
- str"File " ++ str name ++ str ": proof of " ++ str s ++
+ str"File " ++ str name ++ str ": proof of " ++ str r_name ++
spc () ++ print e)
- | Some (_, cur) ->
- match VCS.visit cur with
- | { step = `Cmd ( { loc }, _) }
- | { step = `Fork ( { loc }, _, _, _) }
- | { step = `Qed ( { qast = { loc } }, _) }
- | { step = `Sideff (`Ast ( { loc }, _)) } ->
- let start, stop = Loc.unloc loc in
- Pp.pperrnl Pp.(
- str"File " ++ str name ++ str ": proof of " ++ str s ++
- str ": chars " ++ int start ++ str "-" ++ int stop ++
- spc () ++ print e)
- | _ ->
- Pp.pperrnl Pp.(
- str"File " ++ str name ++ str ": proof of " ++ str s ++
- spc () ++ print e)
- with e ->
- Pp.msg_error (str"unable to print error message: " ++
- str (Printexc.to_string e))); None
+ with e ->
+ Pp.msg_error (str"unable to print error message: " ++
+ str (Printexc.to_string e))); None
let finish_task name (u,cst,_) d p l i =
- let bucket =
- match List.nth l i with ReqBuildProof (_,_,_,_,bucket,_) -> bucket in
+ let bucket = (List.nth l i).r_uuid in
match check_task_aux (Printf.sprintf ", bucket %d" bucket) name l i with
| None -> exit 1
| Some (po,pt) ->
@@ -854,54 +926,14 @@ end = struct
| None -> false
let info_tasks l =
- CList.map_i (fun i (ReqBuildProof(_,_,_,loc,_,s)) ->
+ CList.map_i (fun i { r_loc; r_name } ->
let time1 =
- try float_of_string (Aux_file.get !hints loc "proof_build_time")
+ try float_of_string (Aux_file.get !hints r_loc "proof_build_time")
with Not_found -> 0.0 in
let time2 =
- try float_of_string (Aux_file.get !hints loc "proof_check_time")
+ try float_of_string (Aux_file.get !hints r_loc "proof_check_time")
with Not_found -> 0.0 in
- s,max (time1 +. time2) 0.0001,i) 0 l
-
- exception MarshalError of string
-
- let marshal_to_channel oc data =
- Marshal.to_channel oc data [];
- flush oc
-
- let marshal_err s = raise (MarshalError s)
-
- let marshal_request oc (req : Future.UUID.t request) =
- try marshal_to_channel oc req
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("marshal_request: "^s)
-
- let unmarshal_request ic =
- try (Marshal.from_channel ic : Future.UUID.t request)
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("unmarshal_request: "^s)
-
- let marshal_response oc (res : response) =
- try marshal_to_channel oc res
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("marshal_response: "^s)
-
- let unmarshal_response ic =
- try (CThread.thread_friendly_input_value ic : response)
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("unmarshal_response: "^s)
-
- let marshal_more_data oc (res : more_data) =
- try marshal_to_channel oc res
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("marshal_more_data: "^s)
-
- let unmarshal_more_data ic =
- try (Marshal.from_channel ic : more_data)
- with Failure s | Invalid_argument s | Sys_error s ->
- marshal_err ("unmarshal_more_data: "^s)
-
- let queue : task TQueue.t = TQueue.create ()
+ r_name, max (time1 +. time2) 0.0001,i) 0 l
let set_perspective idl =
let open Stateid in
@@ -914,275 +946,44 @@ end = struct
| true, false -> -1
| false, true -> 1)
- let wait_all_done () =
- if not (WorkersPool.is_empty ()) then
- TQueue.wait_until_n_are_waiting_and_queue_empty
- (WorkersPool.n_workers ()) queue
-
- let build_proof ~loc ~exn_info:(id,valid as exn_info) ~start ~stop ~name =
+ let build_proof ~loc ~exn_info:(id,valid as t_exn_info) ~start ~stop ~name =
let cancel_switch = ref false in
- if WorkersPool.is_empty () then
+ if TaskQueue.n_workers () = 0 then
if !Flags.compilation_mode = Flags.BuildVi then begin
let force () : Proof_global.closed_proof_output Future.assignement =
- try `Val (build_proof_here_core loc stop ())
+ try `Val (Task.build_proof_here_core loc stop ())
with e -> let e = Errors.push e in `Exn e in
let f,assign = Future.create_delegate ~force (State.exn_on id ~valid) in
- let uuid = Future.uuid f in
- TQueue.push queue (TaskBuildProof
- (exn_info,start,stop,assign,cancel_switch,loc,uuid,name));
+ let t_uuid = Future.uuid f in
+ TaskQueue.enqueue_task {
+ Task.t_exn_info; t_start = start; t_stop = stop; t_assign = assign;
+ t_loc = loc; t_uuid; t_name = name } cancel_switch;
f, cancel_switch
end else
- build_proof_here exn_info loc stop, cancel_switch
+ Task.build_proof_here t_exn_info loc stop, cancel_switch
else
- let f, assign = Future.create_delegate (State.exn_on id ~valid) in
- let uuid = Future.uuid f in
+ let f, t_assign = Future.create_delegate (State.exn_on id ~valid) in
+ let t_uuid = Future.uuid f in
Pp.feedback (Feedback.InProgress 1);
- TQueue.push queue (TaskBuildProof
- (exn_info,start,stop,assign,cancel_switch,loc,uuid,name));
+ TaskQueue.enqueue_task {
+ Task.t_exn_info; t_start = start; t_stop = stop; t_assign; t_loc = loc;
+ t_uuid; t_name = name } cancel_switch;
f, cancel_switch
- exception RemoteException of std_ppcmds
- let _ = Errors.register_handler (function
- | RemoteException ppcmd -> ppcmd
- | _ -> raise Unhandled)
+ let init () = TaskQueue.init !Flags.async_proofs_n_workers
+ let slave_main_loop = TaskQueue.slave_main_loop
+ let slave_init_stdout = TaskQueue.slave_init_stdout
+ let wait_all_done = TaskQueue.join
- exception KillRespawn
-
- let report_status ?id s =
- let id =
- match id with
- | Some n -> n
- | None ->
- match !Flags.async_proofs_mode with
- | Flags.APonParallel n -> n
- | _ -> assert false in
- Pp.feedback ~state_id:Stateid.initial (Feedback.SlaveStatus(id, s))
-
- let rec manage_slave ~cancel:cancel_user_req id_slave respawn =
- let ic, oc, proc =
- let rec set_slave_opt = function
- | [] -> !Flags.async_proofs_flags_for_workers @
- ["-async-proofs"; "worker"; string_of_int id_slave]
- | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl
- | ("-async-proofs" |"-toploop" |"-vi2vo" |"-compile"
- |"-compile-verbose")::_::tl -> set_slave_opt tl
- | x::tl -> x :: set_slave_opt tl in
- let args =
- Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
- let env =
- Array.append !async_proofs_workers_extra_env (Unix.environment ()) in
- respawn ~args ~env () in
- let last_task = ref None in
- let task_expired = ref false in
- let task_cancelled = ref false in
- CThread.prepare_in_channel_for_thread_friendly_io ic;
- try
- while true do
- prerr_endline "waiting for a task";
- report_status ~id:id_slave "Idle";
- let task = TQueue.pop queue in
- prerr_endline ("got task: "^pr_task task);
- last_task := Some task;
- try
- marshal_request oc (request_of_task task);
- let cancel_switch = cancel_switch_of_task task in
- Worker.kill_if proc ~sec:1 (fun () ->
- task_expired := !cancel_switch;
- task_cancelled := !cancel_user_req;
- if !cancel_user_req then cancel_user_req := false;
- !task_expired || !task_cancelled);
- let rec loop () =
- let response = unmarshal_response ic in
- match task, response with
- | TaskBuildProof(_,_,_, assign,_,loc,_,s),
- RespBuiltProof(pl, time)->
- assign (`Val pl);
- (* We restart the slave, to avoid memory leaks. We could just
- Pp.feedback (Feedback.InProgress ~-1) *)
- record_pb_time s loc time;
- last_task := None;
- raise KillRespawn
- | TaskBuildProof(_,_,_, assign,_,_,_,_),
- RespError(err_id,valid,e,valid_states) ->
- let e = Stateid.add ~valid (RemoteException e) err_id in
- assign (`Exn e);
- List.iter (fun (id,s) -> State.assign id s) valid_states;
- (* We restart the slave, to avoid memory leaks. We could just
- Pp.feedback (Feedback.InProgress ~-1) *)
- last_task := None;
- raise KillRespawn
- (* marshal_more_data oc (MoreDataLocalUniv *)
- (* (CList.init 10 (fun _ -> Universes.fresh_local_univ ()))); *)
- (* loop () *)
- | _, RespGetCounterNewUnivLevel ->
- marshal_more_data oc (MoreDataUnivLevel
- (CList.init 10 (fun _ -> Universes.new_univ_level (Global.current_dirpath ()))));
- loop ()
- | _, RespFeedback {Feedback.id = Feedback.State state_id; content = msg} ->
- forward_feedback state_id msg;
- loop ()
- | _, RespFeedback _ -> assert false (* Parsing in master process *)
- in
- loop ()
- with
- | VCS.Expired -> (* task cancelled: e.g. the user did backtrack *)
- Pp.feedback (Feedback.InProgress ~-1);
- prerr_endline ("Task expired: " ^ pr_task task)
- | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e ->
- raise e (* we pass the exception to the external handler *)
- | MarshalError s when !fallback_to_lazy_if_marshal_error ->
- msg_warning(strbrk("Marshalling error: "^s^". "^
- "The system state could not be sent to the worker process. "^
- "Falling back to local, lazy, evaluation."));
- let TaskBuildProof (exn_info, _, stop, assign,_,loc,_,_) = task in
- assign(`Comp(build_proof_here exn_info loc stop));
- Pp.feedback (Feedback.InProgress ~-1)
- | MarshalError s ->
- pr_err ("Fatal marshal error: " ^ s);
- flush_all (); exit 1
- | e ->
- pr_err ("Uncaught exception in worker manager: "^
- string_of_ppcmds (print e));
- flush_all ()
- done
- with
- | KillRespawn ->
- Pp.feedback (Feedback.InProgress ~-1);
- Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id_slave respawn
- | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
- Pp.feedback (Feedback.InProgress ~-1);
- ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id_slave respawn
- | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
- msg_warning(strbrk "The worker was cancelled.");
- Option.iter (fun task ->
- let TaskBuildProof (_, start, _, assign, _,_,_,_) = task in
- let s = "Worker cancelled by the user" in
- let e = Stateid.add ~valid:start (RemoteException (strbrk s)) start in
- assign (`Exn e);
- Pp.feedback ~state_id:start (Feedback.ErrorMsg (Loc.ghost, s));
- Pp.feedback (Feedback.InProgress ~-1);
- ) !last_task;
- Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id_slave respawn
- | Sys_error _ | Invalid_argument _ | End_of_file
- when !fallback_to_lazy_if_slave_dies ->
- msg_warning(strbrk "The worker process died badly.");
- Option.iter (fun task ->
- msg_warning(strbrk "Falling back to local, lazy, evaluation.");
- let TaskBuildProof (exn_info, _, stop, assign,_,loc,_,_) = task in
- assign(`Comp(build_proof_here exn_info loc stop));
- Pp.feedback (Feedback.InProgress ~-1);
- ) !last_task;
- Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req id_slave respawn
- | Sys_error _ | Invalid_argument _ | End_of_file ->
- Worker.kill proc;
- let exit_status proc = match Worker.wait proc with
- | Unix.WEXITED 0x400 -> "exit code unavailable"
- | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
- | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
- | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in
- pr_err ("Fatal worker error: " ^ (exit_status proc));
- flush_all (); exit 1
-
- let init () = WorkersPool.init !Flags.async_proofs_n_workers manage_slave
-
- let slave_ic = ref stdin
- let slave_oc = ref stdout
-
- let slave_init_stdout () =
- let ic, oc = Spawned.get_channels () in
- slave_oc := oc; slave_ic := ic
-
- let bufferize f =
- let l = ref [] in
- fun () ->
- match !l with
- | [] -> let data = f () in l := List.tl data; List.hd data
- | x::tl -> l := tl; x
-
- let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc
-
- let slave_main_loop reset =
- let feedback_queue = ref [] in
- let slave_feeder oc fb =
- match fb.Feedback.content with
- | Feedback.Goals _ -> feedback_queue := fb :: !feedback_queue
- | _ -> Marshal.to_channel oc (RespFeedback fb) []; flush oc in
- let flush_feeder oc =
- List.iter (fun fb -> Marshal.to_channel oc (RespFeedback fb) [])
- !feedback_queue;
- feedback_queue := [];
- flush oc in
- Pp.set_feeder (slave_feeder !slave_oc);
- Pp.log_via_feedback ();
- Universes.set_remote_new_univ_level (bufferize (fun () ->
- marshal_response !slave_oc RespGetCounterNewUnivLevel;
- match unmarshal_more_data !slave_ic with
- | MoreDataUnivLevel l -> l));
- let working = ref false in
- slave_handshake ();
- while true do
- try
- working := false;
- let request = unmarshal_request !slave_ic in
- working := true;
- report_status (name_of_request request);
- let response = slave_respond request in
- flush_feeder !slave_oc;
- report_status "Idle";
- marshal_response !slave_oc response;
- reset ()
- with
- | MarshalError s ->
- pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 2
- | End_of_file ->
- prerr_endline "connection lost"; flush_all (); exit 2
- | e when Errors.noncritical e ->
- (* This can happen if the proof is broken. The error has also been
- * signalled as a feedback, hence we can silently recover *)
- let err_id, safe_id = match Stateid.get e with
- | Some (safe, err) -> err, safe
- | None -> Stateid.dummy, Stateid.dummy in
- prerr_endline "failed with the following exception:";
- prerr_endline (string_of_ppcmds (print e));
- prerr_endline ("last safe id is: " ^ Stateid.to_string safe_id);
- prerr_endline ("cached? " ^ string_of_bool (State.is_cached safe_id));
- let prog old_v n =
- if n < 3 then n else old_v + n/3 + if n mod 3 > 0 then 1 else 0 in
- let states =
- let open State in
- let rec aux n m prev_id =
- let next =
- try Some (VCS.visit prev_id).next
- with VCS.Expired -> None in
- match next with
- | None -> []
- | Some id when n = m ->
- prerr_endline ("sending back state " ^ string_of_int m);
- let tail = aux (n+1) (prog m (n+1)) id in
- if is_cached id then (id, get_cached id) :: tail else tail
- | Some id -> aux (n+1) m id in
- (if is_cached safe_id then [safe_id,get_cached safe_id] else [])
- @ aux 1 (prog 1 1) safe_id in
- flush_feeder !slave_oc;
- marshal_response !slave_oc (RespError(err_id, safe_id, print e, states))
- | e ->
- pr_err ("Slave: critical exception: " ^ Pp.string_of_ppcmds (print e));
- flush_all (); exit 1
- done
+ let cancel_worker = TaskQueue.cancel_worker
(* For external users this name is nicer than request *)
- type tasks = int request list
+ type tasks = int a_request list
let dump f2t_map =
- assert(WorkersPool.is_empty ()); (* ATM, we allow that only if no slaves *)
- let tasks = TQueue.dump queue in
- prerr_endline (Printf.sprintf "dumping %d\n" (List.length tasks));
- let tasks = List.map request_of_task tasks in
- List.map (function ReqBuildProof(a,b,c,d,x,e) ->
- ReqBuildProof(a,b,c,d,List.assoc x f2t_map,e)) tasks
+ let tasks = TaskQueue.dump () in
+ prerr_endline (Printf.sprintf "dumping %d tasks\n" (List.length tasks));
+ List.map (function r -> { r with r_uuid = List.assoc r.r_uuid f2t_map })
+ (CList.map_filter Task.request_of_task tasks)
end
@@ -1198,7 +999,7 @@ let pstate = ["meta counter"; "evar counter"; "program-tcc-table"]
let delegate_policy_check time =
if interactive () = `Yes then
- (!Flags.async_proofs_mode = Flags.APonParallel 0 ||
+ (Flags.async_proofs_is_master () ||
!Flags.async_proofs_mode = Flags.APonLazy) &&
(time >= 1.0 || !Flags.async_proofs_always_delegate)
else if !Flags.compilation_mode = Flags.BuildVi then true
@@ -1394,7 +1195,7 @@ let known_state ?(redefine_qed=false) ~cache id =
inject_non_pstate s
), cache
in
- if !Flags.async_proofs_mode = Flags.APonParallel 0 then
+ if Flags.async_proofs_is_master () then
Pp.feedback ~state_id:id Feedback.ProcessingInMaster;
State.define ~cache:cache_step ~redefine:redefine_qed step id;
if !Flags.feedback_goals then print_goals_of_state id;
@@ -1403,7 +1204,7 @@ let known_state ?(redefine_qed=false) ~cache id =
end
-let _ = Slaves.set_reach_known_state Reach.known_state
+let _ = Task.set_reach_known_state Reach.known_state
(* The backtrack module simulates the classic behavior of a linear document *)
module Backtrack : sig
@@ -1540,7 +1341,8 @@ let init () =
set_undo_classifier Backtrack.undo_vernac_classifier;
State.define ~cache:`Yes (fun () -> ()) Stateid.initial;
Backtrack.record ();
- if !Flags.async_proofs_mode = Flags.APonParallel 0 then begin
+ if Flags.async_proofs_is_master () then begin
+ prerr_endline "Initialising workers";
Slaves.init ();
let opts = match !Flags.async_proofs_private_flags with
| None -> []
diff --git a/stm/stm.mli b/stm/stm.mli
index 3bc458f57..b6450e6ac 100644
--- a/stm/stm.mli
+++ b/stm/stm.mli
@@ -42,7 +42,7 @@ val finish : unit -> unit
val observe : Stateid.t -> unit
-val stop_worker : int -> unit
+val stop_worker : string -> unit
(* Joins the entire document. Implies finish, but also checks proofs *)
val join : unit -> unit
diff --git a/stm/stm.mllib b/stm/stm.mllib
index 347416099..308b2ac4c 100644
--- a/stm/stm.mllib
+++ b/stm/stm.mllib
@@ -5,5 +5,6 @@ TQueue
WorkerPool
Vernac_classifier
Lemmas
+AsyncTaskQueue
Stm
Vi_checking
diff --git a/stm/workerPool.ml b/stm/workerPool.ml
index fcae4f20d..593240ad4 100644
--- a/stm/workerPool.ml
+++ b/stm/workerPool.ml
@@ -13,7 +13,7 @@ module Make(Worker : sig
process * in_channel * out_channel
end) = struct
-type worker_id = int
+type worker_id = string
type spawn =
args:string array -> env:string array -> unit ->
in_channel * out_channel * Worker.process
@@ -32,11 +32,11 @@ let master_handshake worker_id ic oc =
Marshal.to_channel oc magic_no []; flush oc;
let n = (Marshal.from_channel ic : int) in
if n <> magic_no then begin
- Printf.eprintf "Handshake with %d failed: protocol mismatch\n" worker_id;
+ Printf.eprintf "Handshake with %s failed: protocol mismatch\n" worker_id;
exit 1;
end
with e when Errors.noncritical e ->
- Printf.eprintf "Handshake with %d failed: %s\n"
+ Printf.eprintf "Handshake with %s failed: %s\n"
worker_id (Printexc.to_string e);
exit 1
@@ -45,18 +45,21 @@ let respawn n ~args ~env () =
master_handshake n ic oc;
ic, oc, proc
-let init ~size:n ~manager:manage_slave =
+let init ~size:n ~manager:manage_slave mk_name =
slave_managers := Some
(Array.init n (fun x ->
+ let name = mk_name x in
let cancel = ref false in
- cancel, Thread.create (manage_slave ~cancel (x+1)) (respawn (x+1))))
+ name, cancel, Thread.create (manage_slave ~cancel name) (respawn name)))
let cancel n =
match !slave_managers with
| None -> ()
| Some a ->
- let switch, _ = a.(n) in
- switch := true
+ for i = 0 to Array.length a - 1 do
+ let name, switch, _ = a.(i) in
+ if n = name then switch := true
+ done
let worker_handshake slave_ic slave_oc =
try
diff --git a/stm/workerPool.mli b/stm/workerPool.mli
index d7a546929..d55b35c28 100644
--- a/stm/workerPool.mli
+++ b/stm/workerPool.mli
@@ -13,13 +13,14 @@ module Make(Worker : sig
process * in_channel * out_channel
end) : sig
-type worker_id = int
+type worker_id = string
type spawn =
args:string array -> env:string array -> unit ->
in_channel * out_channel * Worker.process
val init :
- size:int -> manager:(cancel:bool ref -> worker_id -> spawn -> unit) -> unit
+ size:int -> manager:(cancel:bool ref -> worker_id -> spawn -> unit) ->
+ (int -> worker_id) -> unit
val is_empty : unit -> bool
val n_workers : unit -> int
val cancel : worker_id -> unit