diff options
-rw-r--r-- | lib/lib.mllib | 2 | ||||
-rw-r--r-- | lib/tQueue.ml | 64 | ||||
-rw-r--r-- | lib/tQueue.mli | 17 | ||||
-rw-r--r-- | lib/workerPool.ml | 73 | ||||
-rw-r--r-- | lib/workerPool.mli | 30 | ||||
-rw-r--r-- | toplevel/stm.ml | 161 |
6 files changed, 213 insertions, 134 deletions
diff --git a/lib/lib.mllib b/lib/lib.mllib index 9ba1e15b9..50621df20 100644 --- a/lib/lib.mllib +++ b/lib/lib.mllib @@ -25,3 +25,5 @@ Future RemoteCounter Dag Vcs +TQueue +WorkerPool diff --git a/lib/tQueue.ml b/lib/tQueue.ml new file mode 100644 index 000000000..783c545fd --- /dev/null +++ b/lib/tQueue.ml @@ -0,0 +1,64 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +type 'a t = { + queue: 'a Queue.t; + lock : Mutex.t; + cond : Condition.t; + mutable nwaiting : int; + cond_waiting : Condition.t; +} + +let create () = { + queue = Queue.create (); + lock = Mutex.create (); + cond = Condition.create (); + nwaiting = 0; + cond_waiting = Condition.create (); +} + +let pop ({ queue = q; lock = m; cond = c; cond_waiting = cn } as tq) = + Mutex.lock m; + while Queue.is_empty q do + tq.nwaiting <- tq.nwaiting + 1; + Condition.signal cn; + Condition.wait c m; + tq.nwaiting <- tq.nwaiting - 1; + done; + let x = Queue.pop q in + Condition.signal c; + Condition.signal cn; + Mutex.unlock m; + x + +let push { queue = q; lock = m; cond = c } x = + Mutex.lock m; + Queue.push x q; + Condition.signal c; + Mutex.unlock m + +let wait_until_n_are_waiting_and_queue_empty j tq = + Mutex.lock tq.lock; + while not (Queue.is_empty tq.queue) || tq.nwaiting < j do + Condition.wait tq.cond_waiting tq.lock + done; + Mutex.unlock tq.lock + +let dump { queue; lock } = + let l = ref [] in + Mutex.lock lock; + while not (Queue.is_empty queue) do l := Queue.pop queue :: !l done; + Mutex.unlock lock; + List.rev !l + +let reorder tq rel = + Mutex.lock tq.lock; + let l = ref [] in + while not (Queue.is_empty tq.queue) do l := Queue.pop tq.queue :: !l done; + List.iter (fun x -> Queue.push x tq.queue) (List.sort rel !l); + Mutex.unlock tq.lock diff --git a/lib/tQueue.mli b/lib/tQueue.mli new file mode 100644 index 000000000..a3ea5532f --- /dev/null +++ b/lib/tQueue.mli @@ -0,0 +1,17 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +(* Thread safe queue with some extras *) + +type 'a t +val create : unit -> 'a t +val pop : 'a t -> 'a +val push : 'a t -> 'a -> unit +val reorder : 'a t -> ('a -> 'a -> int) -> unit +val wait_until_n_are_waiting_and_queue_empty : int -> 'a t -> unit +val dump : 'a t -> 'a list diff --git a/lib/workerPool.ml b/lib/workerPool.ml new file mode 100644 index 000000000..fcae4f20d --- /dev/null +++ b/lib/workerPool.ml @@ -0,0 +1,73 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +module Make(Worker : sig + type process + val spawn : + ?prefer_sock:bool -> ?env:string array -> string -> string array -> + process * in_channel * out_channel +end) = struct + +type worker_id = int +type spawn = + args:string array -> env:string array -> unit -> + in_channel * out_channel * Worker.process + +let slave_managers = ref None + +let n_workers () = match !slave_managers with + | None -> 0 + | Some managers -> Array.length managers +let is_empty () = !slave_managers = None + +let magic_no = 17 + +let master_handshake worker_id ic oc = + try + Marshal.to_channel oc magic_no []; flush oc; + let n = (Marshal.from_channel ic : int) in + if n <> magic_no then begin + Printf.eprintf "Handshake with %d failed: protocol mismatch\n" worker_id; + exit 1; + end + with e when Errors.noncritical e -> + Printf.eprintf "Handshake with %d failed: %s\n" + worker_id (Printexc.to_string e); + exit 1 + +let respawn n ~args ~env () = + let proc, ic, oc = Worker.spawn ~env Sys.argv.(0) args in + master_handshake n ic oc; + ic, oc, proc + +let init ~size:n ~manager:manage_slave = + slave_managers := Some + (Array.init n (fun x -> + let cancel = ref false in + cancel, Thread.create (manage_slave ~cancel (x+1)) (respawn (x+1)))) + +let cancel n = + match !slave_managers with + | None -> () + | Some a -> + let switch, _ = a.(n) in + switch := true + +let worker_handshake slave_ic slave_oc = + try + let v = (Marshal.from_channel slave_ic : int) in + if v <> magic_no then begin + prerr_endline "Handshake failed: protocol mismatch\n"; + exit 1; + end; + Marshal.to_channel slave_oc v []; flush slave_oc; + with e when Errors.noncritical e -> + prerr_endline ("Handshake failed: " ^ Printexc.to_string e); + exit 1 + +end diff --git a/lib/workerPool.mli b/lib/workerPool.mli new file mode 100644 index 000000000..d7a546929 --- /dev/null +++ b/lib/workerPool.mli @@ -0,0 +1,30 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +module Make(Worker : sig + type process + val spawn : + ?prefer_sock:bool -> ?env:string array -> string -> string array -> + process * in_channel * out_channel +end) : sig + +type worker_id = int +type spawn = + args:string array -> env:string array -> unit -> + in_channel * out_channel * Worker.process + +val init : + size:int -> manager:(cancel:bool ref -> worker_id -> spawn -> unit) -> unit +val is_empty : unit -> bool +val n_workers : unit -> int +val cancel : worker_id -> unit + +(* The worker should call this function *) +val worker_handshake : in_channel -> out_channel -> unit + +end diff --git a/toplevel/stm.ml b/toplevel/stm.ml index c61258655..f6564e892 100644 --- a/toplevel/stm.ml +++ b/toplevel/stm.ml @@ -617,6 +617,8 @@ module Worker = Spawn.Sync(struct ()) end) +module WorkersPool = WorkerPool.Make(Worker) + let record_pb_time proof_name loc time = let proof_build_time = Printf.sprintf "%.3f" time in Aux_file.record_in_aux_at loc "proof_build_time" proof_build_time; @@ -660,121 +662,8 @@ module Slaves : sig end = struct (* {{{ *) - module TQueue : sig - - type 'a t - val create : unit -> 'a t - val pop : 'a t -> 'a - val push : 'a t -> 'a -> unit - val wait_until_n_are_waiting_and_queue_empty : int -> 'a t -> unit - val dump : 'a t -> 'a list - - end = struct (* {{{ *) - - (* queue, lock, cond_q, n_waiting, cond_n_waiting *) - type 'a t = 'a Queue.t * Mutex.t * Condition.t * int ref * Condition.t - - let create () = - Queue.create (), Mutex.create (), Condition.create (), - ref 0, Condition.create () - - let pop (q,m,c,n,cn) = - Mutex.lock m; - while Queue.is_empty q do - n := !n+1; - Condition.signal cn; - Condition.wait c m; - n := !n-1 - done; - let x = Queue.pop q in - Condition.signal c; - Condition.signal cn; - Mutex.unlock m; - x - let push (q,m,c,n,cn) x = - Mutex.lock m; - Queue.push x q; - Condition.signal c; - Mutex.unlock m - - let wait_until_n_are_waiting_and_queue_empty (j : int) (q,m,c,n,cn) = - Mutex.lock m; - while not (Queue.is_empty q) || !n < j do Condition.wait cn m done; - Mutex.unlock m - - let dump (q,m,_,_,_) = - let l = ref [] in - Mutex.lock m; - while not (Queue.is_empty q) do l := Queue.pop q :: !l done; - Mutex.unlock m; - List.rev !l - - end (* }}} *) - - module SlavesPool : sig - - val init : int -> (bool ref -> (unit -> in_channel * out_channel * Worker.process * int) -> unit) -> unit - val is_empty : unit -> bool - val n_slaves : unit -> int - - val cancel : int -> unit - - end = struct (* {{{ *) - - let slave_managers = ref None - - let n_slaves () = match !slave_managers with - | None -> 0 - | Some managers -> Array.length managers - let is_empty () = !slave_managers = None - - let master_handshake worker_id ic oc = - try - Marshal.to_channel oc 17 []; flush oc; - let n = (Marshal.from_channel ic : int) in - assert(n = 17); - prerr_endline (Printf.sprintf "Handshake with %d OK" worker_id) - with e -> - prerr_endline - (Printf.sprintf "Handshake with %d failed: %s" - worker_id (Printexc.to_string e)); - exit 1 - - let respawn n () = - let prog = Sys.argv.(0) in - let rec set_slave_opt = function - | [] -> ["-async-proofs"; "worker"; string_of_int n] - | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl - | ("-async-proofs" - |"-compile" - |"-compile-verbose")::_::tl -> set_slave_opt tl - | x::tl -> x :: set_slave_opt tl in - let args = - Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in - let env = - Array.append !async_proofs_workers_extra_env (Unix.environment ()) in - let proc, ic, oc = Worker.spawn ~env prog args in - master_handshake n ic oc; - ic, oc, proc, n - - let init n manage_slave = - slave_managers := Some - (Array.init n (fun x -> - let calcel_req = ref false in - calcel_req, - Thread.create (manage_slave calcel_req) (respawn (x+1)))) - - let cancel n = - match !slave_managers with - | None -> () - | Some a -> - let switch, _ = a.(n) in - switch := true - - end (* }}} *) - - let cancel_worker n = SlavesPool.cancel (n-1) + let cancel_worker n = WorkersPool.cancel (n-1) let reach_known_state = ref (fun ?redefine_qed ~cache id -> ()) let set_reach_known_state f = reach_known_state := f @@ -976,13 +865,13 @@ end = struct (* {{{ *) let queue : task TQueue.t = TQueue.create () let wait_all_done () = - if not (SlavesPool.is_empty ()) then + if not (WorkersPool.is_empty ()) then TQueue.wait_until_n_are_waiting_and_queue_empty - (SlavesPool.n_slaves ()) queue + (WorkersPool.n_workers ()) queue let build_proof ~loc ~exn_info:(id,valid as exn_info) ~start ~stop ~name = let cancel_switch = ref false in - if SlavesPool.is_empty () then + if WorkersPool.is_empty () then if !Flags.compilation_mode = Flags.BuildVi then begin let force () : Entries.proof_output list Future.assignement = try `Val (build_proof_here_core loc stop ()) with e -> `Exn e in @@ -1018,8 +907,20 @@ end = struct (* {{{ *) | _ -> assert false in Pp.feedback ~state_id:Stateid.initial (Interface.SlaveStatus(id, s)) - let rec manage_slave cancel_user_req respawn = - let ic, oc, proc, id_slave = respawn () in + let rec manage_slave ~cancel:cancel_user_req id_slave respawn = + let ic, oc, proc = + let rec set_slave_opt = function + | [] -> ["-async-proofs"; "worker"; string_of_int id_slave] + | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl + | ("-async-proofs" + |"-compile" + |"-compile-verbose")::_::tl -> set_slave_opt tl + | x::tl -> x :: set_slave_opt tl in + let args = + Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in + let env = + Array.append !async_proofs_workers_extra_env (Unix.environment ()) in + respawn ~args ~env () in let last_task = ref None in let task_expired = ref false in let task_cancelled = ref false in @@ -1098,11 +999,11 @@ end = struct (* {{{ *) | KillRespawn -> Pp.feedback (Interface.InProgress ~-1); Worker.kill proc; ignore(Worker.wait proc); - manage_slave cancel_user_req respawn + manage_slave cancel_user_req id_slave respawn | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired -> Pp.feedback (Interface.InProgress ~-1); ignore(Worker.wait proc); - manage_slave cancel_user_req respawn + manage_slave cancel_user_req id_slave respawn | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled -> msg_warning(strbrk "The worker was cancelled."); Option.iter (fun task -> @@ -1114,7 +1015,7 @@ end = struct (* {{{ *) Pp.feedback (Interface.InProgress ~-1); ) !last_task; Worker.kill proc; ignore(Worker.wait proc); - manage_slave cancel_user_req respawn + manage_slave cancel_user_req id_slave respawn | Sys_error _ | Invalid_argument _ | End_of_file when !fallback_to_lazy_if_slave_dies -> msg_warning(strbrk "The worker process died badly."); @@ -1125,7 +1026,7 @@ end = struct (* {{{ *) Pp.feedback (Interface.InProgress ~-1); ) !last_task; Worker.kill proc; ignore(Worker.wait proc); - manage_slave cancel_user_req respawn + manage_slave cancel_user_req id_slave respawn | Sys_error _ | Invalid_argument _ | End_of_file -> Worker.kill proc; let exit_status proc = match Worker.wait proc with @@ -1136,7 +1037,7 @@ end = struct (* {{{ *) pr_err ("Fatal worker error: " ^ (exit_status proc)); flush_all (); exit 1 - let init () = SlavesPool.init !Flags.async_proofs_n_workers manage_slave + let init () = WorkersPool.init !Flags.async_proofs_n_workers manage_slave let slave_ic = ref stdin let slave_oc = ref stdout @@ -1152,15 +1053,7 @@ end = struct (* {{{ *) | [] -> let data = f () in l := List.tl data; List.hd data | x::tl -> l := tl; x - let slave_handshake () = - try - let v = (Marshal.from_channel !slave_ic : int) in - assert(v = 17); - Marshal.to_channel !slave_oc v []; flush !slave_oc; - prerr_endline "Handshake OK" - with e -> - prerr_endline ("Handshake failed: " ^ Printexc.to_string e); - exit 1 + let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc let slave_main_loop reset = let slave_feeder oc fb = @@ -1214,7 +1107,7 @@ end = struct (* {{{ *) (* For external users this name is nicer than request *) type tasks = int request list let dump f2t_map = - assert(SlavesPool.is_empty ()); (* ATM, we allow that only if no slaves *) + assert(WorkersPool.is_empty ()); (* ATM, we allow that only if no slaves *) let tasks = TQueue.dump queue in prerr_endline (Printf.sprintf "dumping %d\n" (List.length tasks)); let tasks = List.map request_of_task tasks in |