aboutsummaryrefslogtreecommitdiffhomepage
path: root/lib
diff options
context:
space:
mode:
authorGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-03-13 15:41:44 +0100
committerGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-03-13 16:04:13 +0100
commitc9b1caaa5516d616e400faa7a7c0278c8677c51c (patch)
tree2dc6f7870a9824f8b9c8357774287c5c44b332a2 /lib
parent8ee720fef8e21595827d18e1e28777c1d061a9e5 (diff)
STM: move out a couple of submodules
These modules are not as reusable as one may want them to be, but moving them out simplifies a little STM.
Diffstat (limited to 'lib')
-rw-r--r--lib/lib.mllib2
-rw-r--r--lib/tQueue.ml64
-rw-r--r--lib/tQueue.mli17
-rw-r--r--lib/workerPool.ml73
-rw-r--r--lib/workerPool.mli30
5 files changed, 186 insertions, 0 deletions
diff --git a/lib/lib.mllib b/lib/lib.mllib
index 9ba1e15b9..50621df20 100644
--- a/lib/lib.mllib
+++ b/lib/lib.mllib
@@ -25,3 +25,5 @@ Future
RemoteCounter
Dag
Vcs
+TQueue
+WorkerPool
diff --git a/lib/tQueue.ml b/lib/tQueue.ml
new file mode 100644
index 000000000..783c545fd
--- /dev/null
+++ b/lib/tQueue.ml
@@ -0,0 +1,64 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+type 'a t = {
+ queue: 'a Queue.t;
+ lock : Mutex.t;
+ cond : Condition.t;
+ mutable nwaiting : int;
+ cond_waiting : Condition.t;
+}
+
+let create () = {
+ queue = Queue.create ();
+ lock = Mutex.create ();
+ cond = Condition.create ();
+ nwaiting = 0;
+ cond_waiting = Condition.create ();
+}
+
+let pop ({ queue = q; lock = m; cond = c; cond_waiting = cn } as tq) =
+ Mutex.lock m;
+ while Queue.is_empty q do
+ tq.nwaiting <- tq.nwaiting + 1;
+ Condition.signal cn;
+ Condition.wait c m;
+ tq.nwaiting <- tq.nwaiting - 1;
+ done;
+ let x = Queue.pop q in
+ Condition.signal c;
+ Condition.signal cn;
+ Mutex.unlock m;
+ x
+
+let push { queue = q; lock = m; cond = c } x =
+ Mutex.lock m;
+ Queue.push x q;
+ Condition.signal c;
+ Mutex.unlock m
+
+let wait_until_n_are_waiting_and_queue_empty j tq =
+ Mutex.lock tq.lock;
+ while not (Queue.is_empty tq.queue) || tq.nwaiting < j do
+ Condition.wait tq.cond_waiting tq.lock
+ done;
+ Mutex.unlock tq.lock
+
+let dump { queue; lock } =
+ let l = ref [] in
+ Mutex.lock lock;
+ while not (Queue.is_empty queue) do l := Queue.pop queue :: !l done;
+ Mutex.unlock lock;
+ List.rev !l
+
+let reorder tq rel =
+ Mutex.lock tq.lock;
+ let l = ref [] in
+ while not (Queue.is_empty tq.queue) do l := Queue.pop tq.queue :: !l done;
+ List.iter (fun x -> Queue.push x tq.queue) (List.sort rel !l);
+ Mutex.unlock tq.lock
diff --git a/lib/tQueue.mli b/lib/tQueue.mli
new file mode 100644
index 000000000..a3ea5532f
--- /dev/null
+++ b/lib/tQueue.mli
@@ -0,0 +1,17 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+(* Thread safe queue with some extras *)
+
+type 'a t
+val create : unit -> 'a t
+val pop : 'a t -> 'a
+val push : 'a t -> 'a -> unit
+val reorder : 'a t -> ('a -> 'a -> int) -> unit
+val wait_until_n_are_waiting_and_queue_empty : int -> 'a t -> unit
+val dump : 'a t -> 'a list
diff --git a/lib/workerPool.ml b/lib/workerPool.ml
new file mode 100644
index 000000000..fcae4f20d
--- /dev/null
+++ b/lib/workerPool.ml
@@ -0,0 +1,73 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+module Make(Worker : sig
+ type process
+ val spawn :
+ ?prefer_sock:bool -> ?env:string array -> string -> string array ->
+ process * in_channel * out_channel
+end) = struct
+
+type worker_id = int
+type spawn =
+ args:string array -> env:string array -> unit ->
+ in_channel * out_channel * Worker.process
+
+let slave_managers = ref None
+
+let n_workers () = match !slave_managers with
+ | None -> 0
+ | Some managers -> Array.length managers
+let is_empty () = !slave_managers = None
+
+let magic_no = 17
+
+let master_handshake worker_id ic oc =
+ try
+ Marshal.to_channel oc magic_no []; flush oc;
+ let n = (Marshal.from_channel ic : int) in
+ if n <> magic_no then begin
+ Printf.eprintf "Handshake with %d failed: protocol mismatch\n" worker_id;
+ exit 1;
+ end
+ with e when Errors.noncritical e ->
+ Printf.eprintf "Handshake with %d failed: %s\n"
+ worker_id (Printexc.to_string e);
+ exit 1
+
+let respawn n ~args ~env () =
+ let proc, ic, oc = Worker.spawn ~env Sys.argv.(0) args in
+ master_handshake n ic oc;
+ ic, oc, proc
+
+let init ~size:n ~manager:manage_slave =
+ slave_managers := Some
+ (Array.init n (fun x ->
+ let cancel = ref false in
+ cancel, Thread.create (manage_slave ~cancel (x+1)) (respawn (x+1))))
+
+let cancel n =
+ match !slave_managers with
+ | None -> ()
+ | Some a ->
+ let switch, _ = a.(n) in
+ switch := true
+
+let worker_handshake slave_ic slave_oc =
+ try
+ let v = (Marshal.from_channel slave_ic : int) in
+ if v <> magic_no then begin
+ prerr_endline "Handshake failed: protocol mismatch\n";
+ exit 1;
+ end;
+ Marshal.to_channel slave_oc v []; flush slave_oc;
+ with e when Errors.noncritical e ->
+ prerr_endline ("Handshake failed: " ^ Printexc.to_string e);
+ exit 1
+
+end
diff --git a/lib/workerPool.mli b/lib/workerPool.mli
new file mode 100644
index 000000000..d7a546929
--- /dev/null
+++ b/lib/workerPool.mli
@@ -0,0 +1,30 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+module Make(Worker : sig
+ type process
+ val spawn :
+ ?prefer_sock:bool -> ?env:string array -> string -> string array ->
+ process * in_channel * out_channel
+end) : sig
+
+type worker_id = int
+type spawn =
+ args:string array -> env:string array -> unit ->
+ in_channel * out_channel * Worker.process
+
+val init :
+ size:int -> manager:(cancel:bool ref -> worker_id -> spawn -> unit) -> unit
+val is_empty : unit -> bool
+val n_workers : unit -> int
+val cancel : worker_id -> unit
+
+(* The worker should call this function *)
+val worker_handshake : in_channel -> out_channel -> unit
+
+end