diff options
Diffstat (limited to 'stm/asyncTaskQueue.mli')
-rw-r--r-- | stm/asyncTaskQueue.mli | 44 |
1 files changed, 27 insertions, 17 deletions
diff --git a/stm/asyncTaskQueue.mli b/stm/asyncTaskQueue.mli index 96239d145..172e1035e 100644 --- a/stm/asyncTaskQueue.mli +++ b/stm/asyncTaskQueue.mli @@ -6,24 +6,31 @@ (* * GNU Lesser General Public License Version 2.1 *) (************************************************************************) +type 'a worker_status = [ `Fresh | `Old | `Parked of 'a ] + module type Task = sig type task + type competence (* Marshallable *) type request type response - val name : string (* UID of the task kind, for -toploop *) + val name : string ref (* UID of the task kind, for -toploop *) val extra_env : unit -> string array (* run by the master, on a thread *) - val request_of_task : [ `Fresh | `Old ] -> task -> request option - val use_response : task -> response -> [ `Stay | `StayReset ] + val request_of_task : competence worker_status -> task -> request option + val task_match : competence worker_status -> task -> bool + val use_response : + competence worker_status -> task -> response -> + [ `Stay | `Park of competence | `Reset ] val on_marshal_error : string -> task -> unit val on_slave_death : task option -> [ `Exit of int | `Stay ] val on_task_cancellation_or_expiration : task option -> unit - val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit + val forward_feedback : + Stateid.t -> Feedback.route_id -> Feedback.feedback_content -> unit (* run by the worker *) val perform : request -> response @@ -38,31 +45,34 @@ type cancel_switch = bool ref module MakeQueue(T : Task) : sig - (* Number of workers, 0 = lazy local *) - val init : int -> unit - val destroy : unit -> unit + type queue - val with_n_workers : - int -> (join:(unit -> unit) -> cancel_all:(unit -> unit) -> 'a) -> 'a + (* Number of workers, 0 = lazy local *) + val create : int -> queue + val destroy : queue -> unit - val n_workers : unit -> int + val n_workers : queue -> int * int (* active, parked *) - val enqueue_task : T.task -> cancel_switch -> unit + val enqueue_task : queue -> T.task -> cancel_switch -> unit (* blocking function that waits for the task queue to be empty *) - val join : unit -> unit - val cancel_all : unit -> unit + val join : queue -> unit + val cancel_all : queue -> unit - val cancel_worker : string -> unit + val cancel_worker : queue -> string -> unit - val set_order : (T.task -> T.task -> int) -> unit + val set_order : queue -> (T.task -> T.task -> int) -> unit (* Take a snapshot (non destructive but waits until all workers are * enqueued) *) - val snapshot : unit -> T.task list + val snapshot : queue -> T.task list (* Clears the queue, only if the worker prool is empty *) - val clear : unit -> unit + val clear : queue -> unit + + (* create a queue, run the function, destroy the queue. + * the user should call join *) + val with_n_workers : int -> (queue -> 'a) -> 'a end |