aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm/workerPool.ml
diff options
context:
space:
mode:
authorGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-12-12 18:19:29 +0100
committerGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-12-17 15:05:05 +0100
commit77472779450b218711aa7024feafc19054371eaa (patch)
tree2c6fe9bb2d36e244024be3d23c2eb117b53f65f2 /stm/workerPool.ml
parentf9a6efbb2647e7856c34966fd7bcc00a1d8fbc4d (diff)
WorkerPool: simpler fuctor and no more parking area
Diffstat (limited to 'stm/workerPool.ml')
-rw-r--r--stm/workerPool.ml148
1 files changed, 69 insertions, 79 deletions
diff --git a/stm/workerPool.ml b/stm/workerPool.ml
index 510611f05..4757760c7 100644
--- a/stm/workerPool.ml
+++ b/stm/workerPool.ml
@@ -7,53 +7,47 @@
(************************************************************************)
type worker_id = string
-type 'a spawn =
- args:string array -> env:string array -> unit -> in_channel * out_channel * 'a
-type active
-type parking
-module type WorkerModel = sig
- type process
- val spawn :
- ?prefer_sock:bool -> ?env:string array -> string -> string array ->
- process * in_channel * out_channel
-end
+type 'a cpanel = {
+ exit : unit -> unit; (* called by manager to exit instead of Thread.exit *)
+ cancelled : unit -> bool; (* manager checks for a request of termination *)
+ extra : 'a; (* extra stuff to pass to the manager *)
+}
-module type ManagerModel = sig
+module type PoolModel = sig
+ (* this shall come from a Spawn.* model *)
type process
- type extra (* extra stuff to pass to the manager *)
+ val spawn : int -> worker_id * process * CThread.thread_ic * out_channel
+
+ (* this defines the main loop of the manager *)
+ type extra
val manager :
- extra -> cancel:bool ref -> die:bool ref -> worker_id -> process spawn ->
- unit
- val naming : int -> worker_id
+ extra cpanel -> worker_id * process * CThread.thread_ic * out_channel -> unit
end
-
-module Make(Worker : WorkerModel)
- (Manager : ManagerModel with type process = Worker.process) = struct
+module Make(Model : PoolModel) = struct
type worker = {
name : worker_id;
cancel : bool ref;
- die : bool ref;
- manager : Thread.t }
+ manager : Thread.t;
+ process : Model.process;
+}
-type 'a pool = {
+type pre_pool = {
workers : worker list ref;
count : int ref;
- extra : Manager.extra option;
- lock : Mutex.t
+ extra : Model.extra;
}
-let n_workers { workers } = List.length !workers
-let is_empty { workers } = !workers = []
+type pool = { lock : Mutex.t; pool : pre_pool }
let magic_no = 17
let master_handshake worker_id ic oc =
try
Marshal.to_channel oc magic_no []; flush oc;
- let n = (Marshal.from_channel ic : int) in
+ let n = (CThread.thread_friendly_input_value ic : int) in
if n <> magic_no then begin
Printf.eprintf "Handshake with %s failed: protocol mismatch\n" worker_id;
exit 1;
@@ -75,64 +69,60 @@ let worker_handshake slave_ic slave_oc =
prerr_endline ("Handshake failed: " ^ Printexc.to_string e);
exit 1
-let respawn n ~args ~env () =
- let proc, ic, oc = Worker.spawn ~env Sys.argv.(0) args in
- master_handshake n ic oc;
- ic, oc, proc
+let locking { lock; pool = p } f =
+ try
+ Mutex.lock lock;
+ let x = f p in
+ Mutex.unlock lock;
+ x
+ with e -> Mutex.unlock lock; raise e
-let create1 e x =
- let name = Manager.naming x in
+let rec create_worker extra pool id =
let cancel = ref false in
- let die = ref false in
- let manager =
- Thread.create (Manager.manager e ~cancel ~die name) (respawn name) in
- { name; cancel; die; manager }
-
-let create_active e n = {
- lock = Mutex.create ();
- extra = Some e;
- workers = ref (CList.init n (create1 e));
- count = ref n
-}
+ let name, process, ic, oc as worker = Model.spawn id in
+ master_handshake name ic oc;
+ let exit () = cancel := true; cleanup pool; Thread.exit () in
+ let cancelled () = !cancel in
+ let cpanel = { exit; cancelled; extra } in
+ let manager = Thread.create (Model.manager cpanel) worker in
+ { name; cancel; manager; process }
+
+and cleanup x = locking x begin fun { workers; count; extra } ->
+ workers := List.map (function
+ | { cancel } as w when !cancel = false -> w
+ | _ -> let n = !count in incr count; create_worker extra x n)
+ !workers
+end
-let create_parking () = {
- lock = Mutex.create ();
- extra = None;
- workers = ref [];
- count = ref 0
-}
+let n_workers x = locking x begin fun { workers } ->
+ List.length !workers
+end
-let cancel { lock; workers } n =
- Mutex.lock lock;
- List.iter (fun { name; cancel } -> if n = name then cancel := true) !workers;
- Mutex.unlock lock
+let is_empty x = locking x begin fun { workers } -> !workers = [] end
+
+let create extra ~size = let x = {
+ lock = Mutex.create ();
+ pool = {
+ extra;
+ workers = ref [];
+ count = ref size;
+ }} in
+ locking x begin fun { workers } ->
+ workers := CList.init size (create_worker extra x)
+ end;
+ x
+
+let cancel n x = locking x begin fun { workers } ->
+ List.iter (fun { name; cancel } -> if n = name then cancel := true) !workers
+end
-let cancel_all { lock; workers } =
- Mutex.lock lock;
+let cancel_all x = locking x begin fun { workers } ->
+ List.iter (fun { cancel } -> cancel := true) !workers
+end
+
+let destroy x = locking x begin fun { workers } ->
List.iter (fun { cancel } -> cancel := true) !workers;
- Mutex.unlock lock
-
-let kill_all { lock; workers } =
- Mutex.lock lock;
- List.iter (fun { die } -> die := true) !workers;
- Mutex.unlock lock
-
-let destroy { lock; workers } =
- Mutex.lock lock;
- List.iter (fun { die } -> die := true) !workers;
- workers := [];
- Mutex.unlock lock
-
-let move oldq newq n =
- Mutex.lock oldq.lock; Mutex.lock newq.lock;
- let rec find acc = function
- | [] -> Mutex.unlock oldq.lock; Mutex.unlock newq.lock; assert false
- | { name } as w :: rest when name = n -> w, List.rev acc @ rest
- | x :: xs -> find (x :: acc) xs in
- let w, rest = find [] !(oldq.workers) in
- oldq.workers := create1 (Option.get oldq.extra) !(oldq.count) :: rest;
- oldq.count := !(oldq.count) + 1;
- newq.workers := w :: !(newq.workers);
- Mutex.unlock oldq.lock; Mutex.unlock newq.lock
+ workers := []
+end
end