aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm/asyncTaskQueue.ml
diff options
context:
space:
mode:
authorGravatar Emilio Jesus Gallego Arias <e+git@x80.org>2017-09-25 12:40:33 +0200
committerGravatar Emilio Jesus Gallego Arias <e+git@x80.org>2017-11-21 18:55:55 +0100
commit012e35b2d05b52dbf43dc1e26c8db550c859af03 (patch)
tree3ae1fcb1db042f0f7581c247aaa627e0bb0c3fc1 /stm/asyncTaskQueue.ml
parentb75beb248873db7d9ab8e4a078022b2ed0edcd36 (diff)
[stm] [doc] Add some documentation to AsyncTaskQueue API
As a bonus we remove some trailing whitespace, and implement a couple of hints suggested in the discussion.
Diffstat (limited to 'stm/asyncTaskQueue.ml')
-rw-r--r--stm/asyncTaskQueue.ml51
1 files changed, 25 insertions, 26 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
index e2bce1a96..4662c5543 100644
--- a/stm/asyncTaskQueue.ml
+++ b/stm/asyncTaskQueue.ml
@@ -14,13 +14,15 @@ let stm_pr_err pp = Format.eprintf "%s] @[%a@]\n%!" (System.process_id ()) Pp.pp
let stm_prerr_endline s = if !Flags.debug then begin stm_pr_err (str s) end else ()
-type 'a worker_status = [ `Fresh | `Old of 'a ]
+type cancel_switch = bool ref
module type Task = sig
type task
type competence
+ type worker_status = Fresh | Old of competence
+
(* Marshallable *)
type request
type response
@@ -29,15 +31,14 @@ module type Task = sig
val extra_env : unit -> string array
(* run by the master, on a thread *)
- val request_of_task : competence worker_status -> task -> request option
- val task_match : competence worker_status -> task -> bool
- val use_response :
- competence worker_status -> task -> response ->
- [ `Stay of competence * task list | `End ]
+ val request_of_task : worker_status -> task -> request option
+ val task_match : worker_status -> task -> bool
+ val use_response : worker_status -> task -> response ->
+ [ `Stay of competence * task list | `End ]
val on_marshal_error : string -> task -> unit
val on_task_cancellation_or_expiration_or_slave_death : task option -> unit
val forward_feedback : Feedback.feedback -> unit
-
+
(* run by the worker *)
val perform : request -> response
@@ -47,8 +48,6 @@ module type Task = sig
end
-type expiration = bool ref
-
module Make(T : Task) () = struct
exception Die
@@ -66,38 +65,38 @@ module Make(T : Task) () = struct
Response res
exception MarshalError of string
-
+
let marshal_to_channel oc data =
Marshal.to_channel oc data [];
flush oc
-
+
let marshal_err s = raise (MarshalError s)
-
+
let marshal_request oc (req : request) =
try marshal_to_channel oc req
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("marshal_request: "^s)
-
+
let unmarshal_request ic =
try (CThread.thread_friendly_input_value ic : request)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_request: "^s)
-
+
let marshal_response oc (res : response) =
try marshal_to_channel oc res
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("marshal_response: "^s)
-
+
let unmarshal_response ic =
try (CThread.thread_friendly_input_value ic : response)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_response: "^s)
-
+
let marshal_more_data oc (res : more_data) =
try marshal_to_channel oc res
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("marshal_more_data: "^s)
-
+
let unmarshal_more_data ic =
try (CThread.thread_friendly_input_value ic : more_data)
with Failure s | Invalid_argument s | Sys_error s ->
@@ -112,7 +111,7 @@ module Make(T : Task) () = struct
module Model = struct
type process = Worker.process
- type extra = (T.task * expiration) TQueue.t
+ type extra = (T.task * cancel_switch) TQueue.t
let spawn id =
let name = Printf.sprintf "%s:%d" !T.name id in
@@ -140,7 +139,7 @@ module Make(T : Task) () = struct
let { WorkerPool.extra = queue; exit; cancelled } = cpanel in
let exit () = report_status ~id "Dead"; exit () in
let last_task = ref None in
- let worker_age = ref `Fresh in
+ let worker_age = ref T.Fresh in
let got_token = ref false in
let giveback_exec_token () =
if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in
@@ -213,7 +212,7 @@ module Make(T : Task) () = struct
| `Stay(competence, new_tasks) ->
last_task := None;
giveback_exec_token ();
- worker_age := `Old competence;
+ worker_age := T.Old competence;
add_tasks new_tasks
in
continue ()
@@ -236,7 +235,7 @@ module Make(T : Task) () = struct
type queue = {
active : Pool.pool;
- queue : (T.task * expiration) TQueue.t;
+ queue : (T.task * cancel_switch) TQueue.t;
cleaner : Thread.t option;
}
@@ -252,16 +251,16 @@ module Make(T : Task) () = struct
queue;
cleaner = if size > 0 then Some (Thread.create cleaner queue) else None;
}
-
+
let destroy { active; queue } =
Pool.destroy active;
TQueue.destroy queue
let broadcast { queue } = TQueue.broadcast queue
- let enqueue_task { queue; active } (t, _ as item) =
+ let enqueue_task { queue; active } t ~cancel_switch =
stm_prerr_endline ("Enqueue task "^T.name_of_task t);
- TQueue.push queue item
+ TQueue.push queue (t, cancel_switch)
let cancel_worker { active } n = Pool.cancel n active
@@ -339,14 +338,14 @@ module Make(T : Task) () = struct
let clear { queue; active } =
assert(Pool.is_empty active); (* We allow that only if no slaves *)
TQueue.clear queue
-
+
let snapshot { queue; active } =
List.map fst
(TQueue.wait_until_n_are_waiting_then_snapshot
(Pool.n_workers active) queue)
let with_n_workers n f =
- let q = create n in
+ let q = create n in
try let rc = f q in destroy q; rc
with e -> let e = CErrors.push e in destroy q; iraise e