aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm/tQueue.ml
diff options
context:
space:
mode:
authorGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-11-26 18:45:29 +0100
committerGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-11-27 16:06:54 +0100
commitd86dbc9b9f4fbabd922b07e7c695f03cf6c03c43 (patch)
tree525b0552cf09f6ddfa347d5f4394f7c201306567 /stm/tQueue.ml
parent61209f27eb26dcb04a7d5428b9f2ab7f65f73ac8 (diff)
TQueue: let reader be picky when popping an item
E.g. let a worker pick up only jobs he is able to deal with.
Diffstat (limited to 'stm/tQueue.ml')
-rw-r--r--stm/tQueue.ml30
1 files changed, 18 insertions, 12 deletions
diff --git a/stm/tQueue.ml b/stm/tQueue.ml
index 04e4e81f9..00213b8a2 100644
--- a/stm/tQueue.ml
+++ b/stm/tQueue.ml
@@ -11,7 +11,8 @@ module PriorityQueue : sig
val create : unit -> 'a t
val set_rel : ('a -> 'a -> int) -> 'a t -> unit
val is_empty : 'a t -> bool
- val pop : 'a t -> 'a
+ val exists : ('a -> bool) -> 'a t -> bool
+ val pop : ?picky:('a -> bool) -> 'a t -> 'a
val push : 'a t -> 'a -> unit
val clear : 'a t -> unit
end = struct
@@ -22,10 +23,13 @@ end = struct
let age = ref 0
let create () = ref ([],sort_timestamp)
let is_empty t = fst !t = []
- let pop ({ contents = (l, rel) } as t) =
- match l with
- | [] -> raise Queue.Empty
- | (_,x) :: xs -> t := (xs, rel); x
+ let exists p t = List.exists (fun (_,x) -> p x) (fst !t)
+ let pop ?(picky=(fun _ -> true)) ({ contents = (l, rel) } as t) =
+ let rec aux acc = function
+ | [] -> raise Queue.Empty
+ | (_,x) :: xs when picky x -> t := (List.rev acc @ xs, rel); x
+ | (_,x) as hd :: xs -> aux (hd :: acc) xs in
+ aux [] l
let push ({ contents = (xs, rel) } as t) x =
incr age;
(* re-roting the whole list is not the most efficient way... *)
@@ -56,17 +60,19 @@ let create () = {
release = false;
}
-let pop ({ queue = q; lock = m; cond = c; cond_waiting = cn } as tq) =
+let pop ?(picky=(fun _ -> true))
+ ({ queue = q; lock = m; cond = c; cond_waiting = cn } as tq)
+=
if tq.release then raise BeingDestroyed;
Mutex.lock m;
- while PriorityQueue.is_empty q do
+ while not (PriorityQueue.exists picky q) do
tq.nwaiting <- tq.nwaiting + 1;
- Condition.signal cn;
+ Condition.broadcast cn;
Condition.wait c m;
tq.nwaiting <- tq.nwaiting - 1;
if tq.release then (Mutex.unlock m; raise BeingDestroyed)
done;
- let x = PriorityQueue.pop q in
+ let x = PriorityQueue.pop ~picky q in
Condition.signal c;
Condition.signal cn;
Mutex.unlock m;
@@ -77,7 +83,7 @@ let push { queue = q; lock = m; cond = c; release } x =
"TQueue.push while being destroyed! Only 1 producer/destroyer allowed");
Mutex.lock m;
PriorityQueue.push q x;
- Condition.signal c;
+ Condition.broadcast c;
Mutex.unlock m
let clear { queue = q; lock = m; cond = c } =
@@ -91,7 +97,7 @@ let destroy tq =
tq.release <- true;
while tq.nwaiting > 0 do
Mutex.lock tq.lock;
- Condition.signal tq.cond;
+ Condition.broadcast tq.cond;
Mutex.unlock tq.lock;
done;
tq.release <- false
@@ -111,7 +117,7 @@ let wait_until_n_are_waiting_then_snapshot j tq =
done;
while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done;
List.iter (PriorityQueue.push tq.queue) (List.rev !l);
- if !l <> [] then Condition.signal tq.cond;
+ if !l <> [] then Condition.broadcast tq.cond;
Mutex.unlock tq.lock;
List.rev !l