diff options
Diffstat (limited to 'stm/tQueue.ml')
-rw-r--r-- | stm/tQueue.ml | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/stm/tQueue.ml b/stm/tQueue.ml new file mode 100644 index 00000000..8a62fe79 --- /dev/null +++ b/stm/tQueue.ml @@ -0,0 +1,133 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2015 *) +(* \VV/ **************************************************************) +(* // * This file is distributed under the terms of the *) +(* * GNU Lesser General Public License Version 2.1 *) +(************************************************************************) + +module PriorityQueue : sig + type 'a t + val create : unit -> 'a t + val set_rel : ('a -> 'a -> int) -> 'a t -> unit + val is_empty : 'a t -> bool + val exists : ('a -> bool) -> 'a t -> bool + val pop : ?picky:('a -> bool) -> 'a t -> 'a + val push : 'a t -> 'a -> unit + val clear : 'a t -> unit +end = struct + type 'a item = int * 'a + type 'a rel = 'a item -> 'a item -> int + type 'a t = ('a item list * 'a rel) ref + let sort_timestamp (i,_) (j,_) = i - j + let age = ref 0 + let create () = ref ([],sort_timestamp) + let is_empty t = fst !t = [] + let exists p t = List.exists (fun (_,x) -> p x) (fst !t) + let pop ?(picky=(fun _ -> true)) ({ contents = (l, rel) } as t) = + let rec aux acc = function + | [] -> raise Queue.Empty + | (_,x) :: xs when picky x -> t := (List.rev acc @ xs, rel); x + | (_,x) as hd :: xs -> aux (hd :: acc) xs in + aux [] l + let push ({ contents = (xs, rel) } as t) x = + incr age; + (* re-roting the whole list is not the most efficient way... *) + t := (List.sort rel (xs @ [!age,x]), rel) + let clear ({ contents = (l, rel) } as t) = t := ([], rel) + let set_rel rel ({ contents = (xs, _) } as t) = + let rel (_,x) (_,y) = rel x y in + t := (List.sort rel xs, rel) +end + +type 'a t = { + queue: 'a PriorityQueue.t; + lock : Mutex.t; + cond : Condition.t; + mutable nwaiting : int; + cond_waiting : Condition.t; + mutable release : bool; +} + +exception BeingDestroyed + +let create () = { + queue = PriorityQueue.create (); + lock = Mutex.create (); + cond = Condition.create (); + nwaiting = 0; + cond_waiting = Condition.create (); + release = false; +} + +let pop ?(picky=(fun _ -> true)) ?(destroy=ref false) + ({ queue = q; lock = m; cond = c; cond_waiting = cn } as tq) += + Mutex.lock m; + if tq.release then (Mutex.unlock m; raise BeingDestroyed); + while not (PriorityQueue.exists picky q || !destroy) do + tq.nwaiting <- tq.nwaiting + 1; + Condition.broadcast cn; + Condition.wait c m; + tq.nwaiting <- tq.nwaiting - 1; + if tq.release || !destroy then (Mutex.unlock m; raise BeingDestroyed) + done; + if !destroy then (Mutex.unlock m; raise BeingDestroyed); + let x = PriorityQueue.pop ~picky q in + Condition.signal c; + Condition.signal cn; + Mutex.unlock m; + x + +let signal_destruction { lock = m; cond = c } = + Mutex.lock m; + Condition.broadcast c; + Mutex.unlock m + +let push { queue = q; lock = m; cond = c; release } x = + if release then Errors.anomaly(Pp.str + "TQueue.push while being destroyed! Only 1 producer/destroyer allowed"); + Mutex.lock m; + PriorityQueue.push q x; + Condition.broadcast c; + Mutex.unlock m + +let clear { queue = q; lock = m; cond = c } = + Mutex.lock m; + PriorityQueue.clear q; + Mutex.unlock m + +let is_empty { queue = q } = PriorityQueue.is_empty q + +let destroy tq = + tq.release <- true; + while tq.nwaiting > 0 do + Mutex.lock tq.lock; + Condition.broadcast tq.cond; + Mutex.unlock tq.lock; + done; + tq.release <- false + +let wait_until_n_are_waiting_and_queue_empty j tq = + Mutex.lock tq.lock; + while not (PriorityQueue.is_empty tq.queue) || tq.nwaiting < j do + Condition.wait tq.cond_waiting tq.lock + done; + Mutex.unlock tq.lock + +let wait_until_n_are_waiting_then_snapshot j tq = + let l = ref [] in + Mutex.lock tq.lock; + while not (PriorityQueue.is_empty tq.queue) do + l := PriorityQueue.pop tq.queue :: !l + done; + while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done; + List.iter (PriorityQueue.push tq.queue) (List.rev !l); + if !l <> [] then Condition.broadcast tq.cond; + Mutex.unlock tq.lock; + List.rev !l + +let set_order tq rel = + Mutex.lock tq.lock; + PriorityQueue.set_rel rel tq.queue; + Mutex.unlock tq.lock |