aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-03-13 15:41:44 +0100
committerGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-03-13 16:04:13 +0100
commitc9b1caaa5516d616e400faa7a7c0278c8677c51c (patch)
tree2dc6f7870a9824f8b9c8357774287c5c44b332a2
parent8ee720fef8e21595827d18e1e28777c1d061a9e5 (diff)
STM: move out a couple of submodules
These modules are not as reusable as one may want them to be, but moving them out simplifies a little STM.
-rw-r--r--lib/lib.mllib2
-rw-r--r--lib/tQueue.ml64
-rw-r--r--lib/tQueue.mli17
-rw-r--r--lib/workerPool.ml73
-rw-r--r--lib/workerPool.mli30
-rw-r--r--toplevel/stm.ml161
6 files changed, 213 insertions, 134 deletions
diff --git a/lib/lib.mllib b/lib/lib.mllib
index 9ba1e15b9..50621df20 100644
--- a/lib/lib.mllib
+++ b/lib/lib.mllib
@@ -25,3 +25,5 @@ Future
RemoteCounter
Dag
Vcs
+TQueue
+WorkerPool
diff --git a/lib/tQueue.ml b/lib/tQueue.ml
new file mode 100644
index 000000000..783c545fd
--- /dev/null
+++ b/lib/tQueue.ml
@@ -0,0 +1,64 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+type 'a t = {
+ queue: 'a Queue.t;
+ lock : Mutex.t;
+ cond : Condition.t;
+ mutable nwaiting : int;
+ cond_waiting : Condition.t;
+}
+
+let create () = {
+ queue = Queue.create ();
+ lock = Mutex.create ();
+ cond = Condition.create ();
+ nwaiting = 0;
+ cond_waiting = Condition.create ();
+}
+
+let pop ({ queue = q; lock = m; cond = c; cond_waiting = cn } as tq) =
+ Mutex.lock m;
+ while Queue.is_empty q do
+ tq.nwaiting <- tq.nwaiting + 1;
+ Condition.signal cn;
+ Condition.wait c m;
+ tq.nwaiting <- tq.nwaiting - 1;
+ done;
+ let x = Queue.pop q in
+ Condition.signal c;
+ Condition.signal cn;
+ Mutex.unlock m;
+ x
+
+let push { queue = q; lock = m; cond = c } x =
+ Mutex.lock m;
+ Queue.push x q;
+ Condition.signal c;
+ Mutex.unlock m
+
+let wait_until_n_are_waiting_and_queue_empty j tq =
+ Mutex.lock tq.lock;
+ while not (Queue.is_empty tq.queue) || tq.nwaiting < j do
+ Condition.wait tq.cond_waiting tq.lock
+ done;
+ Mutex.unlock tq.lock
+
+let dump { queue; lock } =
+ let l = ref [] in
+ Mutex.lock lock;
+ while not (Queue.is_empty queue) do l := Queue.pop queue :: !l done;
+ Mutex.unlock lock;
+ List.rev !l
+
+let reorder tq rel =
+ Mutex.lock tq.lock;
+ let l = ref [] in
+ while not (Queue.is_empty tq.queue) do l := Queue.pop tq.queue :: !l done;
+ List.iter (fun x -> Queue.push x tq.queue) (List.sort rel !l);
+ Mutex.unlock tq.lock
diff --git a/lib/tQueue.mli b/lib/tQueue.mli
new file mode 100644
index 000000000..a3ea5532f
--- /dev/null
+++ b/lib/tQueue.mli
@@ -0,0 +1,17 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+(* Thread safe queue with some extras *)
+
+type 'a t
+val create : unit -> 'a t
+val pop : 'a t -> 'a
+val push : 'a t -> 'a -> unit
+val reorder : 'a t -> ('a -> 'a -> int) -> unit
+val wait_until_n_are_waiting_and_queue_empty : int -> 'a t -> unit
+val dump : 'a t -> 'a list
diff --git a/lib/workerPool.ml b/lib/workerPool.ml
new file mode 100644
index 000000000..fcae4f20d
--- /dev/null
+++ b/lib/workerPool.ml
@@ -0,0 +1,73 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+module Make(Worker : sig
+ type process
+ val spawn :
+ ?prefer_sock:bool -> ?env:string array -> string -> string array ->
+ process * in_channel * out_channel
+end) = struct
+
+type worker_id = int
+type spawn =
+ args:string array -> env:string array -> unit ->
+ in_channel * out_channel * Worker.process
+
+let slave_managers = ref None
+
+let n_workers () = match !slave_managers with
+ | None -> 0
+ | Some managers -> Array.length managers
+let is_empty () = !slave_managers = None
+
+let magic_no = 17
+
+let master_handshake worker_id ic oc =
+ try
+ Marshal.to_channel oc magic_no []; flush oc;
+ let n = (Marshal.from_channel ic : int) in
+ if n <> magic_no then begin
+ Printf.eprintf "Handshake with %d failed: protocol mismatch\n" worker_id;
+ exit 1;
+ end
+ with e when Errors.noncritical e ->
+ Printf.eprintf "Handshake with %d failed: %s\n"
+ worker_id (Printexc.to_string e);
+ exit 1
+
+let respawn n ~args ~env () =
+ let proc, ic, oc = Worker.spawn ~env Sys.argv.(0) args in
+ master_handshake n ic oc;
+ ic, oc, proc
+
+let init ~size:n ~manager:manage_slave =
+ slave_managers := Some
+ (Array.init n (fun x ->
+ let cancel = ref false in
+ cancel, Thread.create (manage_slave ~cancel (x+1)) (respawn (x+1))))
+
+let cancel n =
+ match !slave_managers with
+ | None -> ()
+ | Some a ->
+ let switch, _ = a.(n) in
+ switch := true
+
+let worker_handshake slave_ic slave_oc =
+ try
+ let v = (Marshal.from_channel slave_ic : int) in
+ if v <> magic_no then begin
+ prerr_endline "Handshake failed: protocol mismatch\n";
+ exit 1;
+ end;
+ Marshal.to_channel slave_oc v []; flush slave_oc;
+ with e when Errors.noncritical e ->
+ prerr_endline ("Handshake failed: " ^ Printexc.to_string e);
+ exit 1
+
+end
diff --git a/lib/workerPool.mli b/lib/workerPool.mli
new file mode 100644
index 000000000..d7a546929
--- /dev/null
+++ b/lib/workerPool.mli
@@ -0,0 +1,30 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+module Make(Worker : sig
+ type process
+ val spawn :
+ ?prefer_sock:bool -> ?env:string array -> string -> string array ->
+ process * in_channel * out_channel
+end) : sig
+
+type worker_id = int
+type spawn =
+ args:string array -> env:string array -> unit ->
+ in_channel * out_channel * Worker.process
+
+val init :
+ size:int -> manager:(cancel:bool ref -> worker_id -> spawn -> unit) -> unit
+val is_empty : unit -> bool
+val n_workers : unit -> int
+val cancel : worker_id -> unit
+
+(* The worker should call this function *)
+val worker_handshake : in_channel -> out_channel -> unit
+
+end
diff --git a/toplevel/stm.ml b/toplevel/stm.ml
index c61258655..f6564e892 100644
--- a/toplevel/stm.ml
+++ b/toplevel/stm.ml
@@ -617,6 +617,8 @@ module Worker = Spawn.Sync(struct
())
end)
+module WorkersPool = WorkerPool.Make(Worker)
+
let record_pb_time proof_name loc time =
let proof_build_time = Printf.sprintf "%.3f" time in
Aux_file.record_in_aux_at loc "proof_build_time" proof_build_time;
@@ -660,121 +662,8 @@ module Slaves : sig
end = struct (* {{{ *)
- module TQueue : sig
-
- type 'a t
- val create : unit -> 'a t
- val pop : 'a t -> 'a
- val push : 'a t -> 'a -> unit
- val wait_until_n_are_waiting_and_queue_empty : int -> 'a t -> unit
- val dump : 'a t -> 'a list
-
- end = struct (* {{{ *)
-
- (* queue, lock, cond_q, n_waiting, cond_n_waiting *)
- type 'a t = 'a Queue.t * Mutex.t * Condition.t * int ref * Condition.t
-
- let create () =
- Queue.create (), Mutex.create (), Condition.create (),
- ref 0, Condition.create ()
-
- let pop (q,m,c,n,cn) =
- Mutex.lock m;
- while Queue.is_empty q do
- n := !n+1;
- Condition.signal cn;
- Condition.wait c m;
- n := !n-1
- done;
- let x = Queue.pop q in
- Condition.signal c;
- Condition.signal cn;
- Mutex.unlock m;
- x
- let push (q,m,c,n,cn) x =
- Mutex.lock m;
- Queue.push x q;
- Condition.signal c;
- Mutex.unlock m
-
- let wait_until_n_are_waiting_and_queue_empty (j : int) (q,m,c,n,cn) =
- Mutex.lock m;
- while not (Queue.is_empty q) || !n < j do Condition.wait cn m done;
- Mutex.unlock m
-
- let dump (q,m,_,_,_) =
- let l = ref [] in
- Mutex.lock m;
- while not (Queue.is_empty q) do l := Queue.pop q :: !l done;
- Mutex.unlock m;
- List.rev !l
-
- end (* }}} *)
-
- module SlavesPool : sig
-
- val init : int -> (bool ref -> (unit -> in_channel * out_channel * Worker.process * int) -> unit) -> unit
- val is_empty : unit -> bool
- val n_slaves : unit -> int
-
- val cancel : int -> unit
-
- end = struct (* {{{ *)
-
- let slave_managers = ref None
-
- let n_slaves () = match !slave_managers with
- | None -> 0
- | Some managers -> Array.length managers
- let is_empty () = !slave_managers = None
-
- let master_handshake worker_id ic oc =
- try
- Marshal.to_channel oc 17 []; flush oc;
- let n = (Marshal.from_channel ic : int) in
- assert(n = 17);
- prerr_endline (Printf.sprintf "Handshake with %d OK" worker_id)
- with e ->
- prerr_endline
- (Printf.sprintf "Handshake with %d failed: %s"
- worker_id (Printexc.to_string e));
- exit 1
-
- let respawn n () =
- let prog = Sys.argv.(0) in
- let rec set_slave_opt = function
- | [] -> ["-async-proofs"; "worker"; string_of_int n]
- | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl
- | ("-async-proofs"
- |"-compile"
- |"-compile-verbose")::_::tl -> set_slave_opt tl
- | x::tl -> x :: set_slave_opt tl in
- let args =
- Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
- let env =
- Array.append !async_proofs_workers_extra_env (Unix.environment ()) in
- let proc, ic, oc = Worker.spawn ~env prog args in
- master_handshake n ic oc;
- ic, oc, proc, n
-
- let init n manage_slave =
- slave_managers := Some
- (Array.init n (fun x ->
- let calcel_req = ref false in
- calcel_req,
- Thread.create (manage_slave calcel_req) (respawn (x+1))))
-
- let cancel n =
- match !slave_managers with
- | None -> ()
- | Some a ->
- let switch, _ = a.(n) in
- switch := true
-
- end (* }}} *)
-
- let cancel_worker n = SlavesPool.cancel (n-1)
+ let cancel_worker n = WorkersPool.cancel (n-1)
let reach_known_state = ref (fun ?redefine_qed ~cache id -> ())
let set_reach_known_state f = reach_known_state := f
@@ -976,13 +865,13 @@ end = struct (* {{{ *)
let queue : task TQueue.t = TQueue.create ()
let wait_all_done () =
- if not (SlavesPool.is_empty ()) then
+ if not (WorkersPool.is_empty ()) then
TQueue.wait_until_n_are_waiting_and_queue_empty
- (SlavesPool.n_slaves ()) queue
+ (WorkersPool.n_workers ()) queue
let build_proof ~loc ~exn_info:(id,valid as exn_info) ~start ~stop ~name =
let cancel_switch = ref false in
- if SlavesPool.is_empty () then
+ if WorkersPool.is_empty () then
if !Flags.compilation_mode = Flags.BuildVi then begin
let force () : Entries.proof_output list Future.assignement =
try `Val (build_proof_here_core loc stop ()) with e -> `Exn e in
@@ -1018,8 +907,20 @@ end = struct (* {{{ *)
| _ -> assert false in
Pp.feedback ~state_id:Stateid.initial (Interface.SlaveStatus(id, s))
- let rec manage_slave cancel_user_req respawn =
- let ic, oc, proc, id_slave = respawn () in
+ let rec manage_slave ~cancel:cancel_user_req id_slave respawn =
+ let ic, oc, proc =
+ let rec set_slave_opt = function
+ | [] -> ["-async-proofs"; "worker"; string_of_int id_slave]
+ | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl
+ | ("-async-proofs"
+ |"-compile"
+ |"-compile-verbose")::_::tl -> set_slave_opt tl
+ | x::tl -> x :: set_slave_opt tl in
+ let args =
+ Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
+ let env =
+ Array.append !async_proofs_workers_extra_env (Unix.environment ()) in
+ respawn ~args ~env () in
let last_task = ref None in
let task_expired = ref false in
let task_cancelled = ref false in
@@ -1098,11 +999,11 @@ end = struct (* {{{ *)
| KillRespawn ->
Pp.feedback (Interface.InProgress ~-1);
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave cancel_user_req respawn
+ manage_slave cancel_user_req id_slave respawn
| Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
Pp.feedback (Interface.InProgress ~-1);
ignore(Worker.wait proc);
- manage_slave cancel_user_req respawn
+ manage_slave cancel_user_req id_slave respawn
| Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
msg_warning(strbrk "The worker was cancelled.");
Option.iter (fun task ->
@@ -1114,7 +1015,7 @@ end = struct (* {{{ *)
Pp.feedback (Interface.InProgress ~-1);
) !last_task;
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave cancel_user_req respawn
+ manage_slave cancel_user_req id_slave respawn
| Sys_error _ | Invalid_argument _ | End_of_file
when !fallback_to_lazy_if_slave_dies ->
msg_warning(strbrk "The worker process died badly.");
@@ -1125,7 +1026,7 @@ end = struct (* {{{ *)
Pp.feedback (Interface.InProgress ~-1);
) !last_task;
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave cancel_user_req respawn
+ manage_slave cancel_user_req id_slave respawn
| Sys_error _ | Invalid_argument _ | End_of_file ->
Worker.kill proc;
let exit_status proc = match Worker.wait proc with
@@ -1136,7 +1037,7 @@ end = struct (* {{{ *)
pr_err ("Fatal worker error: " ^ (exit_status proc));
flush_all (); exit 1
- let init () = SlavesPool.init !Flags.async_proofs_n_workers manage_slave
+ let init () = WorkersPool.init !Flags.async_proofs_n_workers manage_slave
let slave_ic = ref stdin
let slave_oc = ref stdout
@@ -1152,15 +1053,7 @@ end = struct (* {{{ *)
| [] -> let data = f () in l := List.tl data; List.hd data
| x::tl -> l := tl; x
- let slave_handshake () =
- try
- let v = (Marshal.from_channel !slave_ic : int) in
- assert(v = 17);
- Marshal.to_channel !slave_oc v []; flush !slave_oc;
- prerr_endline "Handshake OK"
- with e ->
- prerr_endline ("Handshake failed: " ^ Printexc.to_string e);
- exit 1
+ let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc
let slave_main_loop reset =
let slave_feeder oc fb =
@@ -1214,7 +1107,7 @@ end = struct (* {{{ *)
(* For external users this name is nicer than request *)
type tasks = int request list
let dump f2t_map =
- assert(SlavesPool.is_empty ()); (* ATM, we allow that only if no slaves *)
+ assert(WorkersPool.is_empty ()); (* ATM, we allow that only if no slaves *)
let tasks = TQueue.dump queue in
prerr_endline (Printf.sprintf "dumping %d\n" (List.length tasks));
let tasks = List.map request_of_task tasks in