diff options
author | Emilio Jesus Gallego Arias <e+git@x80.org> | 2017-09-25 12:40:33 +0200 |
---|---|---|
committer | Emilio Jesus Gallego Arias <e+git@x80.org> | 2017-11-21 18:55:55 +0100 |
commit | 012e35b2d05b52dbf43dc1e26c8db550c859af03 (patch) | |
tree | 3ae1fcb1db042f0f7581c247aaa627e0bb0c3fc1 /stm/asyncTaskQueue.ml | |
parent | b75beb248873db7d9ab8e4a078022b2ed0edcd36 (diff) |
[stm] [doc] Add some documentation to AsyncTaskQueue API
As a bonus we remove some trailing whitespace, and implement a couple
of hints suggested in the discussion.
Diffstat (limited to 'stm/asyncTaskQueue.ml')
-rw-r--r-- | stm/asyncTaskQueue.ml | 51 |
1 files changed, 25 insertions, 26 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index e2bce1a96..4662c5543 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -14,13 +14,15 @@ let stm_pr_err pp = Format.eprintf "%s] @[%a@]\n%!" (System.process_id ()) Pp.pp let stm_prerr_endline s = if !Flags.debug then begin stm_pr_err (str s) end else () -type 'a worker_status = [ `Fresh | `Old of 'a ] +type cancel_switch = bool ref module type Task = sig type task type competence + type worker_status = Fresh | Old of competence + (* Marshallable *) type request type response @@ -29,15 +31,14 @@ module type Task = sig val extra_env : unit -> string array (* run by the master, on a thread *) - val request_of_task : competence worker_status -> task -> request option - val task_match : competence worker_status -> task -> bool - val use_response : - competence worker_status -> task -> response -> - [ `Stay of competence * task list | `End ] + val request_of_task : worker_status -> task -> request option + val task_match : worker_status -> task -> bool + val use_response : worker_status -> task -> response -> + [ `Stay of competence * task list | `End ] val on_marshal_error : string -> task -> unit val on_task_cancellation_or_expiration_or_slave_death : task option -> unit val forward_feedback : Feedback.feedback -> unit - + (* run by the worker *) val perform : request -> response @@ -47,8 +48,6 @@ module type Task = sig end -type expiration = bool ref - module Make(T : Task) () = struct exception Die @@ -66,38 +65,38 @@ module Make(T : Task) () = struct Response res exception MarshalError of string - + let marshal_to_channel oc data = Marshal.to_channel oc data []; flush oc - + let marshal_err s = raise (MarshalError s) - + let marshal_request oc (req : request) = try marshal_to_channel oc req with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_request: "^s) - + let unmarshal_request ic = try (CThread.thread_friendly_input_value ic : request) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_request: "^s) - + let marshal_response oc (res : response) = try marshal_to_channel oc res with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_response: "^s) - + let unmarshal_response ic = try (CThread.thread_friendly_input_value ic : response) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_response: "^s) - + let marshal_more_data oc (res : more_data) = try marshal_to_channel oc res with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_more_data: "^s) - + let unmarshal_more_data ic = try (CThread.thread_friendly_input_value ic : more_data) with Failure s | Invalid_argument s | Sys_error s -> @@ -112,7 +111,7 @@ module Make(T : Task) () = struct module Model = struct type process = Worker.process - type extra = (T.task * expiration) TQueue.t + type extra = (T.task * cancel_switch) TQueue.t let spawn id = let name = Printf.sprintf "%s:%d" !T.name id in @@ -140,7 +139,7 @@ module Make(T : Task) () = struct let { WorkerPool.extra = queue; exit; cancelled } = cpanel in let exit () = report_status ~id "Dead"; exit () in let last_task = ref None in - let worker_age = ref `Fresh in + let worker_age = ref T.Fresh in let got_token = ref false in let giveback_exec_token () = if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in @@ -213,7 +212,7 @@ module Make(T : Task) () = struct | `Stay(competence, new_tasks) -> last_task := None; giveback_exec_token (); - worker_age := `Old competence; + worker_age := T.Old competence; add_tasks new_tasks in continue () @@ -236,7 +235,7 @@ module Make(T : Task) () = struct type queue = { active : Pool.pool; - queue : (T.task * expiration) TQueue.t; + queue : (T.task * cancel_switch) TQueue.t; cleaner : Thread.t option; } @@ -252,16 +251,16 @@ module Make(T : Task) () = struct queue; cleaner = if size > 0 then Some (Thread.create cleaner queue) else None; } - + let destroy { active; queue } = Pool.destroy active; TQueue.destroy queue let broadcast { queue } = TQueue.broadcast queue - let enqueue_task { queue; active } (t, _ as item) = + let enqueue_task { queue; active } t ~cancel_switch = stm_prerr_endline ("Enqueue task "^T.name_of_task t); - TQueue.push queue item + TQueue.push queue (t, cancel_switch) let cancel_worker { active } n = Pool.cancel n active @@ -339,14 +338,14 @@ module Make(T : Task) () = struct let clear { queue; active } = assert(Pool.is_empty active); (* We allow that only if no slaves *) TQueue.clear queue - + let snapshot { queue; active } = List.map fst (TQueue.wait_until_n_are_waiting_then_snapshot (Pool.n_workers active) queue) let with_n_workers n f = - let q = create n in + let q = create n in try let rc = f q in destroy q; rc with e -> let e = CErrors.push e in destroy q; iraise e |