aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm/asyncTaskQueue.mli
diff options
context:
space:
mode:
Diffstat (limited to 'stm/asyncTaskQueue.mli')
-rw-r--r--stm/asyncTaskQueue.mli44
1 files changed, 27 insertions, 17 deletions
diff --git a/stm/asyncTaskQueue.mli b/stm/asyncTaskQueue.mli
index 96239d145..172e1035e 100644
--- a/stm/asyncTaskQueue.mli
+++ b/stm/asyncTaskQueue.mli
@@ -6,24 +6,31 @@
(* * GNU Lesser General Public License Version 2.1 *)
(************************************************************************)
+type 'a worker_status = [ `Fresh | `Old | `Parked of 'a ]
+
module type Task = sig
type task
+ type competence
(* Marshallable *)
type request
type response
- val name : string (* UID of the task kind, for -toploop *)
+ val name : string ref (* UID of the task kind, for -toploop *)
val extra_env : unit -> string array
(* run by the master, on a thread *)
- val request_of_task : [ `Fresh | `Old ] -> task -> request option
- val use_response : task -> response -> [ `Stay | `StayReset ]
+ val request_of_task : competence worker_status -> task -> request option
+ val task_match : competence worker_status -> task -> bool
+ val use_response :
+ competence worker_status -> task -> response ->
+ [ `Stay | `Park of competence | `Reset ]
val on_marshal_error : string -> task -> unit
val on_slave_death : task option -> [ `Exit of int | `Stay ]
val on_task_cancellation_or_expiration : task option -> unit
- val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit
+ val forward_feedback :
+ Stateid.t -> Feedback.route_id -> Feedback.feedback_content -> unit
(* run by the worker *)
val perform : request -> response
@@ -38,31 +45,34 @@ type cancel_switch = bool ref
module MakeQueue(T : Task) : sig
- (* Number of workers, 0 = lazy local *)
- val init : int -> unit
- val destroy : unit -> unit
+ type queue
- val with_n_workers :
- int -> (join:(unit -> unit) -> cancel_all:(unit -> unit) -> 'a) -> 'a
+ (* Number of workers, 0 = lazy local *)
+ val create : int -> queue
+ val destroy : queue -> unit
- val n_workers : unit -> int
+ val n_workers : queue -> int * int (* active, parked *)
- val enqueue_task : T.task -> cancel_switch -> unit
+ val enqueue_task : queue -> T.task -> cancel_switch -> unit
(* blocking function that waits for the task queue to be empty *)
- val join : unit -> unit
- val cancel_all : unit -> unit
+ val join : queue -> unit
+ val cancel_all : queue -> unit
- val cancel_worker : string -> unit
+ val cancel_worker : queue -> string -> unit
- val set_order : (T.task -> T.task -> int) -> unit
+ val set_order : queue -> (T.task -> T.task -> int) -> unit
(* Take a snapshot (non destructive but waits until all workers are
* enqueued) *)
- val snapshot : unit -> T.task list
+ val snapshot : queue -> T.task list
(* Clears the queue, only if the worker prool is empty *)
- val clear : unit -> unit
+ val clear : queue -> unit
+
+ (* create a queue, run the function, destroy the queue.
+ * the user should call join *)
+ val with_n_workers : int -> (queue -> 'a) -> 'a
end