From 6c2d8c3026c1baeb0ff731907747a9c216d60400 Mon Sep 17 00:00:00 2001 From: Enrico Tassi Date: Wed, 8 Oct 2014 13:58:14 +0200 Subject: TQueue: new primitive to take a snapshot of the queue --- stm/tQueue.ml | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) (limited to 'stm/tQueue.ml') diff --git a/stm/tQueue.ml b/stm/tQueue.ml index 9d3553c36..fe9cf015a 100644 --- a/stm/tQueue.ml +++ b/stm/tQueue.ml @@ -12,7 +12,7 @@ module PriorityQueue : sig val set_rel : ('a -> 'a -> int) -> 'a t -> unit val is_empty : 'a t -> bool val pop : 'a t -> 'a - val push : 'a -> 'a t -> unit + val push : 'a t -> 'a -> unit val clear : 'a t -> unit end = struct type 'a item = int * 'a @@ -26,7 +26,7 @@ end = struct match l with | [] -> raise Queue.Empty | (_,x) :: xs -> t := (xs, rel); x - let push x ({ contents = (xs, rel) } as t) = + let push ({ contents = (xs, rel) } as t) x = incr age; (* re-roting the whole list is not the most efficient way... *) t := (List.sort rel (xs @ [!age,x]), rel) @@ -76,7 +76,7 @@ let push { queue = q; lock = m; cond = c; release } x = if release then Errors.anomaly(Pp.str "TQueue.push while being destroyed! Only 1 producer/destroyer allowed"); Mutex.lock m; - PriorityQueue.push x q; + PriorityQueue.push q x; Condition.signal c; Mutex.unlock m @@ -112,6 +112,18 @@ let dump { queue; lock } = Mutex.unlock lock; List.rev !l +let wait_until_n_are_waiting_then_snapshot j tq = + let l = ref [] in + Mutex.lock tq.lock; + while not (PriorityQueue.is_empty tq.queue) do + l := PriorityQueue.pop tq.queue :: !l + done; + while tq.nwaiting < j do Condition.wait tq.cond_waiting tq.lock done; + List.iter (PriorityQueue.push tq.queue) (List.rev !l); + if !l <> [] then Condition.signal tq.cond; + Mutex.unlock tq.lock; + List.rev !l + let set_order tq rel = Mutex.lock tq.lock; PriorityQueue.set_rel rel tq.queue; -- cgit v1.2.3