From 5f7234edcf0a6bef995c8d1dc31f679799a98557 Mon Sep 17 00:00:00 2001 From: Enrico Tassi Date: Mon, 24 Nov 2014 14:56:51 +0100 Subject: WorkerPool: API to move a worker from an active pool to a parking one This lets me have a pool of active workers of a fixed size, plus a parking area where workers that completed their job can stay holding their state (and eventually one can ask them to query such state afterwards). --- stm/workerPool.ml | 133 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 90 insertions(+), 43 deletions(-) (limited to 'stm/workerPool.ml') diff --git a/stm/workerPool.ml b/stm/workerPool.ml index 2e192cdec..65fb755be 100644 --- a/stm/workerPool.ml +++ b/stm/workerPool.ml @@ -6,29 +6,47 @@ (* * GNU Lesser General Public License Version 2.1 *) (************************************************************************) -module Make(Worker : sig +type worker_id = string +type 'a spawn = + args:string array -> env:string array -> unit -> in_channel * out_channel * 'a +type active +type parking + +module type WorkerModel = sig type process val spawn : - ?prefer_sock:bool -> ?env:string array -> string -> string array -> + ?prefer_sock:bool -> ?env:string array -> string -> string array -> process * in_channel * out_channel -end) = struct +end -type worker_id = string -type spawn = - args:string array -> env:string array -> unit -> - in_channel * out_channel * Worker.process +module type ManagerModel = sig + type process + type extra (* extra stuff to pass to the manager *) + val manager : + extra -> cancel:bool ref -> die:bool ref -> worker_id -> process spawn -> + unit + val naming : int -> worker_id +end + + +module Make(Worker : WorkerModel) + (Manager : ManagerModel with type process = Worker.process) = struct type worker = { name : worker_id; cancel : bool ref; die : bool ref; manager : Thread.t } -let slave_managers : worker array option ref = ref None -let n_workers () = match !slave_managers with - | None -> 0 - | Some managers -> Array.length managers -let is_empty () = !slave_managers = None +type 'a pool = { + workers : worker list ref; + count : int ref; + extra : Manager.extra option; + lock : Mutex.t +} + +let n_workers { workers } = List.length !workers +let is_empty { workers } = !workers = [] let magic_no = 17 @@ -45,37 +63,6 @@ let master_handshake worker_id ic oc = worker_id (Printexc.to_string e); exit 1 -let respawn n ~args ~env () = - let proc, ic, oc = Worker.spawn ~env Sys.argv.(0) args in - master_handshake n ic oc; - ic, oc, proc - -let init ~size:n ~manager:manage_slave mk_name = - slave_managers := Some - (Array.init n (fun x -> - let name = mk_name x in - let cancel = ref false in - let die = ref false in - let manager = - Thread.create (manage_slave ~cancel ~die name) (respawn name) in - { name; cancel; die; manager })) - -let foreach f = - match !slave_managers with - | None -> () - | Some a -> - for i = 0 to Array.length a - 1 do f a.(i) done - -let cancel n = foreach (fun { name; cancel } -> if n = name then cancel := true) - -let cancel_all () = foreach (fun { cancel } -> cancel := true) - -let kill_all () = foreach (fun { die } -> die := true) - -let destroy () = - kill_all (); - slave_managers := None - let worker_handshake slave_ic slave_oc = try let v = (Marshal.from_channel slave_ic : int) in @@ -88,4 +75,64 @@ let worker_handshake slave_ic slave_oc = prerr_endline ("Handshake failed: " ^ Printexc.to_string e); exit 1 +let respawn n ~args ~env () = + let proc, ic, oc = Worker.spawn ~env Sys.argv.(0) args in + master_handshake n ic oc; + ic, oc, proc + +let create1 e x = + let name = Manager.naming x in + let cancel = ref false in + let die = ref false in + let manager = + Thread.create (Manager.manager e ~cancel ~die name) (respawn name) in + { name; cancel; die; manager } + +let create_active e n = { + lock = Mutex.create (); + extra = Some e; + workers = ref (CList.init n (create1 e)); + count = ref n +} + +let create_parking () = { + lock = Mutex.create (); + extra = None; + workers = ref []; + count = ref 0 +} + +let cancel { lock; workers } n = + Mutex.lock lock; + List.iter (fun { name; cancel } -> if n = name then cancel := true) !workers; + Mutex.unlock lock + +let cancel_all { lock; workers } = + Mutex.lock lock; + List.iter (fun { cancel } -> cancel := true) !workers; + Mutex.unlock lock + +let kill_all { lock; workers } = + Mutex.lock lock; + List.iter (fun { die } -> die := true) !workers; + Mutex.unlock lock + +let destroy { lock; workers } = + Mutex.lock lock; + List.iter (fun { die } -> die := true) !workers; + workers := []; + Mutex.unlock lock + +let move oldq newq n = + Mutex.lock oldq.lock; Mutex.lock newq.lock; + let rec find acc = function + | [] -> Mutex.unlock oldq.lock; Mutex.unlock newq.lock; assert false + | { name } as w :: rest when name = n -> w, List.rev acc @ rest + | x :: xs -> find (x :: acc) xs in + let w, rest = find [] !(oldq.workers) in + oldq.workers := create1 (Option.get oldq.extra) !(oldq.count) :: rest; + oldq.count := !(oldq.count) + 1; + newq.workers := w :: !(newq.workers); + Mutex.unlock oldq.lock; Mutex.unlock newq.lock + end -- cgit v1.2.3