diff options
author | Enrico Tassi <Enrico.Tassi@inria.fr> | 2014-12-12 18:19:29 +0100 |
---|---|---|
committer | Enrico Tassi <Enrico.Tassi@inria.fr> | 2014-12-17 15:05:05 +0100 |
commit | 77472779450b218711aa7024feafc19054371eaa (patch) | |
tree | 2c6fe9bb2d36e244024be3d23c2eb117b53f65f2 /stm/workerPool.ml | |
parent | f9a6efbb2647e7856c34966fd7bcc00a1d8fbc4d (diff) |
WorkerPool: simpler fuctor and no more parking area
Diffstat (limited to 'stm/workerPool.ml')
-rw-r--r-- | stm/workerPool.ml | 148 |
1 files changed, 69 insertions, 79 deletions
diff --git a/stm/workerPool.ml b/stm/workerPool.ml index 510611f05..4757760c7 100644 --- a/stm/workerPool.ml +++ b/stm/workerPool.ml @@ -7,53 +7,47 @@ (************************************************************************) type worker_id = string -type 'a spawn = - args:string array -> env:string array -> unit -> in_channel * out_channel * 'a -type active -type parking -module type WorkerModel = sig - type process - val spawn : - ?prefer_sock:bool -> ?env:string array -> string -> string array -> - process * in_channel * out_channel -end +type 'a cpanel = { + exit : unit -> unit; (* called by manager to exit instead of Thread.exit *) + cancelled : unit -> bool; (* manager checks for a request of termination *) + extra : 'a; (* extra stuff to pass to the manager *) +} -module type ManagerModel = sig +module type PoolModel = sig + (* this shall come from a Spawn.* model *) type process - type extra (* extra stuff to pass to the manager *) + val spawn : int -> worker_id * process * CThread.thread_ic * out_channel + + (* this defines the main loop of the manager *) + type extra val manager : - extra -> cancel:bool ref -> die:bool ref -> worker_id -> process spawn -> - unit - val naming : int -> worker_id + extra cpanel -> worker_id * process * CThread.thread_ic * out_channel -> unit end - -module Make(Worker : WorkerModel) - (Manager : ManagerModel with type process = Worker.process) = struct +module Make(Model : PoolModel) = struct type worker = { name : worker_id; cancel : bool ref; - die : bool ref; - manager : Thread.t } + manager : Thread.t; + process : Model.process; +} -type 'a pool = { +type pre_pool = { workers : worker list ref; count : int ref; - extra : Manager.extra option; - lock : Mutex.t + extra : Model.extra; } -let n_workers { workers } = List.length !workers -let is_empty { workers } = !workers = [] +type pool = { lock : Mutex.t; pool : pre_pool } let magic_no = 17 let master_handshake worker_id ic oc = try Marshal.to_channel oc magic_no []; flush oc; - let n = (Marshal.from_channel ic : int) in + let n = (CThread.thread_friendly_input_value ic : int) in if n <> magic_no then begin Printf.eprintf "Handshake with %s failed: protocol mismatch\n" worker_id; exit 1; @@ -75,64 +69,60 @@ let worker_handshake slave_ic slave_oc = prerr_endline ("Handshake failed: " ^ Printexc.to_string e); exit 1 -let respawn n ~args ~env () = - let proc, ic, oc = Worker.spawn ~env Sys.argv.(0) args in - master_handshake n ic oc; - ic, oc, proc +let locking { lock; pool = p } f = + try + Mutex.lock lock; + let x = f p in + Mutex.unlock lock; + x + with e -> Mutex.unlock lock; raise e -let create1 e x = - let name = Manager.naming x in +let rec create_worker extra pool id = let cancel = ref false in - let die = ref false in - let manager = - Thread.create (Manager.manager e ~cancel ~die name) (respawn name) in - { name; cancel; die; manager } - -let create_active e n = { - lock = Mutex.create (); - extra = Some e; - workers = ref (CList.init n (create1 e)); - count = ref n -} + let name, process, ic, oc as worker = Model.spawn id in + master_handshake name ic oc; + let exit () = cancel := true; cleanup pool; Thread.exit () in + let cancelled () = !cancel in + let cpanel = { exit; cancelled; extra } in + let manager = Thread.create (Model.manager cpanel) worker in + { name; cancel; manager; process } + +and cleanup x = locking x begin fun { workers; count; extra } -> + workers := List.map (function + | { cancel } as w when !cancel = false -> w + | _ -> let n = !count in incr count; create_worker extra x n) + !workers +end -let create_parking () = { - lock = Mutex.create (); - extra = None; - workers = ref []; - count = ref 0 -} +let n_workers x = locking x begin fun { workers } -> + List.length !workers +end -let cancel { lock; workers } n = - Mutex.lock lock; - List.iter (fun { name; cancel } -> if n = name then cancel := true) !workers; - Mutex.unlock lock +let is_empty x = locking x begin fun { workers } -> !workers = [] end + +let create extra ~size = let x = { + lock = Mutex.create (); + pool = { + extra; + workers = ref []; + count = ref size; + }} in + locking x begin fun { workers } -> + workers := CList.init size (create_worker extra x) + end; + x + +let cancel n x = locking x begin fun { workers } -> + List.iter (fun { name; cancel } -> if n = name then cancel := true) !workers +end -let cancel_all { lock; workers } = - Mutex.lock lock; +let cancel_all x = locking x begin fun { workers } -> + List.iter (fun { cancel } -> cancel := true) !workers +end + +let destroy x = locking x begin fun { workers } -> List.iter (fun { cancel } -> cancel := true) !workers; - Mutex.unlock lock - -let kill_all { lock; workers } = - Mutex.lock lock; - List.iter (fun { die } -> die := true) !workers; - Mutex.unlock lock - -let destroy { lock; workers } = - Mutex.lock lock; - List.iter (fun { die } -> die := true) !workers; - workers := []; - Mutex.unlock lock - -let move oldq newq n = - Mutex.lock oldq.lock; Mutex.lock newq.lock; - let rec find acc = function - | [] -> Mutex.unlock oldq.lock; Mutex.unlock newq.lock; assert false - | { name } as w :: rest when name = n -> w, List.rev acc @ rest - | x :: xs -> find (x :: acc) xs in - let w, rest = find [] !(oldq.workers) in - oldq.workers := create1 (Option.get oldq.extra) !(oldq.count) :: rest; - oldq.count := !(oldq.count) + 1; - newq.workers := w :: !(newq.workers); - Mutex.unlock oldq.lock; Mutex.unlock newq.lock + workers := [] +end end |