path: root/stm
diff options
authorGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-11-24 14:56:51 +0100
committerGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-11-27 16:06:54 +0100
commit5f7234edcf0a6bef995c8d1dc31f679799a98557 (patch)
tree4d2b3f8c20f2f552bef4b60696e385837bf67918 /stm
parentd86dbc9b9f4fbabd922b07e7c695f03cf6c03c43 (diff)
WorkerPool: API to move a worker from an active pool to a parking one
This lets me have a pool of active workers of a fixed size, plus a parking area where workers that completed their job can stay holding their state (and eventually one can ask them to query such state afterwards).
Diffstat (limited to 'stm')
2 files changed, 131 insertions, 64 deletions
diff --git a/stm/workerPool.ml b/stm/workerPool.ml
index 2e192cdec..65fb755be 100644
--- a/stm/workerPool.ml
+++ b/stm/workerPool.ml
@@ -6,29 +6,47 @@
(* * GNU Lesser General Public License Version 2.1 *)
-module Make(Worker : sig
+type worker_id = string
+type 'a spawn =
+ args:string array -> env:string array -> unit -> in_channel * out_channel * 'a
+type active
+type parking
+module type WorkerModel = sig
type process
val spawn :
- ?prefer_sock:bool -> ?env:string array -> string -> string array ->
+ ?prefer_sock:bool -> ?env:string array -> string -> string array ->
process * in_channel * out_channel
-end) = struct
-type worker_id = string
-type spawn =
- args:string array -> env:string array -> unit ->
- in_channel * out_channel * Worker.process
+module type ManagerModel = sig
+ type process
+ type extra (* extra stuff to pass to the manager *)
+ val manager :
+ extra -> cancel:bool ref -> die:bool ref -> worker_id -> process spawn ->
+ unit
+ val naming : int -> worker_id
+module Make(Worker : WorkerModel)
+ (Manager : ManagerModel with type process = Worker.process) = struct
type worker = {
name : worker_id;
cancel : bool ref;
die : bool ref;
manager : Thread.t }
-let slave_managers : worker array option ref = ref None
-let n_workers () = match !slave_managers with
- | None -> 0
- | Some managers -> Array.length managers
-let is_empty () = !slave_managers = None
+type 'a pool = {
+ workers : worker list ref;
+ count : int ref;
+ extra : Manager.extra option;
+ lock : Mutex.t
+let n_workers { workers } = List.length !workers
+let is_empty { workers } = !workers = []
let magic_no = 17
@@ -45,37 +63,6 @@ let master_handshake worker_id ic oc =
worker_id (Printexc.to_string e);
exit 1
-let respawn n ~args ~env () =
- let proc, ic, oc = Worker.spawn ~env Sys.argv.(0) args in
- master_handshake n ic oc;
- ic, oc, proc
-let init ~size:n ~manager:manage_slave mk_name =
- slave_managers := Some
- (Array.init n (fun x ->
- let name = mk_name x in
- let cancel = ref false in
- let die = ref false in
- let manager =
- Thread.create (manage_slave ~cancel ~die name) (respawn name) in
- { name; cancel; die; manager }))
-let foreach f =
- match !slave_managers with
- | None -> ()
- | Some a ->
- for i = 0 to Array.length a - 1 do f a.(i) done
-let cancel n = foreach (fun { name; cancel } -> if n = name then cancel := true)
-let cancel_all () = foreach (fun { cancel } -> cancel := true)
-let kill_all () = foreach (fun { die } -> die := true)
-let destroy () =
- kill_all ();
- slave_managers := None
let worker_handshake slave_ic slave_oc =
let v = (Marshal.from_channel slave_ic : int) in
@@ -88,4 +75,64 @@ let worker_handshake slave_ic slave_oc =
prerr_endline ("Handshake failed: " ^ Printexc.to_string e);
exit 1
+let respawn n ~args ~env () =
+ let proc, ic, oc = Worker.spawn ~env Sys.argv.(0) args in
+ master_handshake n ic oc;
+ ic, oc, proc
+let create1 e x =
+ let name = Manager.naming x in
+ let cancel = ref false in
+ let die = ref false in
+ let manager =
+ Thread.create (Manager.manager e ~cancel ~die name) (respawn name) in
+ { name; cancel; die; manager }
+let create_active e n = {
+ lock = Mutex.create ();
+ extra = Some e;
+ workers = ref (CList.init n (create1 e));
+ count = ref n
+let create_parking () = {
+ lock = Mutex.create ();
+ extra = None;
+ workers = ref [];
+ count = ref 0
+let cancel { lock; workers } n =
+ Mutex.lock lock;
+ List.iter (fun { name; cancel } -> if n = name then cancel := true) !workers;
+ Mutex.unlock lock
+let cancel_all { lock; workers } =
+ Mutex.lock lock;
+ List.iter (fun { cancel } -> cancel := true) !workers;
+ Mutex.unlock lock
+let kill_all { lock; workers } =
+ Mutex.lock lock;
+ List.iter (fun { die } -> die := true) !workers;
+ Mutex.unlock lock
+let destroy { lock; workers } =
+ Mutex.lock lock;
+ List.iter (fun { die } -> die := true) !workers;
+ workers := [];
+ Mutex.unlock lock
+let move oldq newq n =
+ Mutex.lock oldq.lock; Mutex.lock newq.lock;
+ let rec find acc = function
+ | [] -> Mutex.unlock oldq.lock; Mutex.unlock newq.lock; assert false
+ | { name } as w :: rest when name = n -> w, List.rev acc @ rest
+ | x :: xs -> find (x :: acc) xs in
+ let w, rest = find [] !(oldq.workers) in
+ oldq.workers := create1 (Option.get oldq.extra) !(oldq.count) :: rest;
+ oldq.count := !(oldq.count) + 1;
+ newq.workers := w :: !(newq.workers);
+ Mutex.unlock oldq.lock; Mutex.unlock newq.lock
diff --git a/stm/workerPool.mli b/stm/workerPool.mli
index 4e5512a4b..29398cc06 100644
--- a/stm/workerPool.mli
+++ b/stm/workerPool.mli
@@ -6,30 +6,50 @@
(* * GNU Lesser General Public License Version 2.1 *)
-module Make(Worker : sig
+type worker_id = string
+type 'a spawn =
+ args:string array -> env:string array -> unit -> in_channel * out_channel * 'a
+type active
+type parking
+(* this shall come from a Spawn.* model *)
+module type WorkerModel = sig
type process
val spawn :
- ?prefer_sock:bool -> ?env:string array -> string -> string array ->
+ ?prefer_sock:bool -> ?env:string array -> string -> string array ->
process * in_channel * out_channel
-end) : sig
-type worker_id = string
-type spawn =
- args:string array -> env:string array -> unit ->
- in_channel * out_channel * Worker.process
-val init :
- size:int ->
- manager:(cancel:bool ref -> die:bool ref -> worker_id -> spawn -> unit) ->
- (int -> worker_id) -> unit
-val destroy : unit -> unit
-val is_empty : unit -> bool
-val n_workers : unit -> int
-val cancel : worker_id -> unit
-val cancel_all : unit -> unit
-(* The worker should call this function *)
-val worker_handshake : in_channel -> out_channel -> unit
+(* this defines the main loop of the manager *)
+module type ManagerModel = sig
+ type process
+ type extra (* extra stuff to pass to the manager *)
+ val manager :
+ extra -> cancel:bool ref -> die:bool ref -> worker_id -> process spawn ->
+ unit
+ val naming : int -> worker_id
+module Make(Worker : WorkerModel)
+ (Manager : ManagerModel with type process = Worker.process) : sig
+ type 'a pool
+ val create_active : Manager.extra -> int -> active pool
+ val create_parking : unit -> parking pool
+ val is_empty : 'a pool -> bool
+ val n_workers : 'a pool -> int
+ (* cancel signal *)
+ val cancel : 'a pool -> worker_id -> unit
+ val cancel_all : 'a pool -> unit
+ (* die signal + true removal, the pool is empty afterward *)
+ val destroy : 'a pool -> unit
+ val move : active pool -> parking pool -> worker_id -> unit
+ (* The worker should call this function *)
+ val worker_handshake : in_channel -> out_channel -> unit