diff options
author | Enrico Tassi <Enrico.Tassi@inria.fr> | 2014-11-26 18:45:29 +0100 |
---|---|---|
committer | Enrico Tassi <Enrico.Tassi@inria.fr> | 2014-11-27 16:06:54 +0100 |
commit | d86dbc9b9f4fbabd922b07e7c695f03cf6c03c43 (patch) | |
tree | 525b0552cf09f6ddfa347d5f4394f7c201306567 /stm/tQueue.ml | |
parent | 61209f27eb26dcb04a7d5428b9f2ab7f65f73ac8 (diff) |
TQueue: let reader be picky when popping an item
E.g. let a worker pick up only jobs he is able to deal with.
Diffstat (limited to 'stm/tQueue.ml')
-rw-r--r-- | stm/tQueue.ml | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/stm/tQueue.ml b/stm/tQueue.ml index 04e4e81f9..00213b8a2 100644 --- a/stm/tQueue.ml +++ b/stm/tQueue.ml @@ -11,7 +11,8 @@ module PriorityQueue : sig val create : unit -> 'a t val set_rel : ('a -> 'a -> int) -> 'a t -> unit val is_empty : 'a t -> bool - val pop : 'a t -> 'a + val exists : ('a -> bool) -> 'a t -> bool + val pop : ?picky:('a -> bool) -> 'a t -> 'a val push : 'a t -> 'a -> unit val clear : 'a t -> unit end = struct @@ -22,10 +23,13 @@ end = struct let age = ref 0 let create () = ref ([],sort_timestamp) let is_empty t = fst !t = [] - let pop ({ contents = (l, rel) } as t) = - match l with - | [] -> raise Queue.Empty - | (_,x) :: xs -> t := (xs, rel); x + let exists p t = List.exists (fun (_,x) -> p x) (fst !t) + let pop ?(picky=(fun _ -> true)) ({ contents = (l, rel) } as t) = + let rec aux acc = function + | [] -> raise Queue.Empty + | (_,x) :: xs when picky x -> t := (List.rev acc @ xs, rel); x + | (_,x) as hd :: xs -> aux (hd :: acc) xs in + aux [] l let push ({ contents = (xs, rel) } as t) x = incr age; (* re-roting the whole list is not the most efficient way... *) @@ -56,17 +60,19 @@ let create () = { release = false; } -let pop ({ queue = q; lock = m; cond = c; cond_waiting = cn } as tq) = +let pop ?(picky=(fun _ -> true)) + ({ queue = q; lock = m; cond = c; cond_waiting = cn } as tq) += if tq.release then raise BeingDestroyed; Mutex.lock m; - while PriorityQueue.is_empty q do + while not (PriorityQueue.exists picky q) do tq.nwaiting <- tq.nwaiting + 1; - Condition.signal cn; + Condition.broadcast cn; Condition.wait c m; tq.nwaiting <- tq.nwaiting - 1; if tq.release then (Mutex.unlock m; raise BeingDestroyed) done; - let x = PriorityQueue.pop q in + let x = PriorityQueue.pop ~picky q in Condition.signal c; Condition.signal cn; Mutex.unlock m; @@ -77,7 +83,7 @@ let push { queue = q; lock = m; cond = c; release } x = "TQueue.push while being destroyed! Only 1 producer/destroyer allowed"); Mutex.lock m; PriorityQueue.push q x; - Condition.signal c; + Condition.broadcast c; Mutex.unlock m let clear { queue = q; lock = m; cond = c } = @@ -91,7 +97,7 @@ let destroy tq = tq.release <- true; while tq.nwaiting > 0 do Mutex.lock tq.lock; - Condition.signal tq.cond; + Condition.broadcast tq.cond; Mutex.unlock tq.lock; done; tq.release <- false @@ -111,7 +117,7 @@ let wait_until_n_are_waiting_then_snapshot j tq = done; while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done; List.iter (PriorityQueue.push tq.queue) (List.rev !l); - if !l <> [] then Condition.signal tq.cond; + if !l <> [] then Condition.broadcast tq.cond; Mutex.unlock tq.lock; List.rev !l |