diff options
author | Enrico Tassi <Enrico.Tassi@inria.fr> | 2015-02-10 08:34:00 +0100 |
---|---|---|
committer | Enrico Tassi <Enrico.Tassi@inria.fr> | 2015-02-16 17:53:06 +0100 |
commit | ffe7fc6ff44ec94544123c47b3d01bdec05b3fe0 (patch) | |
tree | d7ec22629888e0632ae0f8139b378d3e86cb8344 | |
parent | cce1b6f06f9802f4d7c977322cec654ad2582d63 (diff) |
*Queue: API to wake up all threads
-rw-r--r-- | stm/asyncTaskQueue.ml | 4 | ||||
-rw-r--r-- | stm/asyncTaskQueue.mli | 2 | ||||
-rw-r--r-- | stm/tQueue.ml | 2 | ||||
-rw-r--r-- | stm/tQueue.mli | 4 |
4 files changed, 9 insertions, 3 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index 672527d9b..e3fb0b607 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -177,7 +177,7 @@ module Make(T : Task) = struct if not (Worker.is_alive proc) then () else if cancelled () || !(!expiration_date) then let () = stop_waiting := true in - let () = TQueue.signal_destruction queue in + let () = TQueue.broadcast queue in Worker.kill proc else let () = Unix.sleep 1 in @@ -253,6 +253,8 @@ module Make(T : Task) = struct Pool.destroy active; TQueue.destroy queue + let broadcast { queue } = TQueue.broadcast queue + let enqueue_task { queue; active } (t, _ as item) = prerr_endline ("Enqueue task "^T.name_of_task t); TQueue.push queue item diff --git a/stm/asyncTaskQueue.mli b/stm/asyncTaskQueue.mli index 78f295d3d..a3fe4b8c0 100644 --- a/stm/asyncTaskQueue.mli +++ b/stm/asyncTaskQueue.mli @@ -61,6 +61,8 @@ module MakeQueue(T : Task) : sig val set_order : queue -> (T.task -> T.task -> int) -> unit + val broadcast : queue -> unit + (* Take a snapshot (non destructive but waits until all workers are * enqueued) *) val snapshot : queue -> T.task list diff --git a/stm/tQueue.ml b/stm/tQueue.ml index 8a62fe79e..6fef895ae 100644 --- a/stm/tQueue.ml +++ b/stm/tQueue.ml @@ -79,7 +79,7 @@ let pop ?(picky=(fun _ -> true)) ?(destroy=ref false) Mutex.unlock m; x -let signal_destruction { lock = m; cond = c } = +let broadcast { lock = m; cond = c } = Mutex.lock m; Condition.broadcast c; Mutex.unlock m diff --git a/stm/tQueue.mli b/stm/tQueue.mli index bc3922b33..7458de510 100644 --- a/stm/tQueue.mli +++ b/stm/tQueue.mli @@ -14,7 +14,9 @@ val pop : ?picky:('a -> bool) -> ?destroy:bool ref -> 'a t -> 'a val push : 'a t -> 'a -> unit val set_order : 'a t -> ('a -> 'a -> int) -> unit val wait_until_n_are_waiting_and_queue_empty : int -> 'a t -> unit -val signal_destruction : 'a t -> unit + +(* Wake up all waiting threads *) +val broadcast : 'a t -> unit (* Non destructive *) val wait_until_n_are_waiting_then_snapshot : int -> 'a t -> 'a list |