aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm/asyncTaskQueue.ml
diff options
context:
space:
mode:
authorGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-12-16 18:23:44 +0100
committerGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-12-17 15:05:05 +0100
commit4618ab9961fc196a1f1912ed1b6b140eb8b4d407 (patch)
tree2c04f9ea402d8b65a072e0e6d1b6fc86f5fa0dbe /stm/asyncTaskQueue.ml
parent77472779450b218711aa7024feafc19054371eaa (diff)
AsyncTaskQueue: simpler model (no parking area, continuation tasks)
Diffstat (limited to 'stm/asyncTaskQueue.ml')
-rw-r--r--stm/asyncTaskQueue.ml257
1 files changed, 115 insertions, 142 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
index 662152e45..3cb6e7022 100644
--- a/stm/asyncTaskQueue.ml
+++ b/stm/asyncTaskQueue.ml
@@ -14,7 +14,7 @@ let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr
let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
-type 'a worker_status = [ `Fresh | `Old | `Parked of 'a ]
+type 'a worker_status = [ `Fresh | `Old of 'a ]
module type Task = sig
@@ -33,28 +33,25 @@ module type Task = sig
val task_match : competence worker_status -> task -> bool
val use_response :
competence worker_status -> task -> response ->
- [ `Stay | `Park of competence | `Reset ]
+ [ `Stay of competence * task list | `End ]
val on_marshal_error : string -> task -> unit
- val on_slave_death : task option -> [ `Exit of int | `Stay ]
- val on_task_cancellation_or_expiration : task option -> unit
+ val on_task_cancellation_or_expiration_or_slave_death : task option -> unit
val forward_feedback : Feedback.feedback -> unit
(* run by the worker *)
val perform : request -> response
-
+
(* debugging *)
val name_of_task : task -> string
val name_of_request : request -> string
end
-type cancel_switch = bool ref
+type expiration = bool ref
module Make(T : Task) = struct
- exception KillRespawn
exception Die
- exception Expired
type response =
| Response of T.response
| RespFeedback of Feedback.feedback
@@ -64,9 +61,11 @@ module Make(T : Task) = struct
type more_data =
| MoreDataUnivLevel of Univ.universe_level list
- let request_cswitch_of_task (t, c) = T.request_of_task t, c
+ let request_expiry_of_task (t, c) = T.request_of_task t, c
- let slave_respond msg = match msg with Request r -> Response (T.perform r)
+ let slave_respond (Request r) =
+ let res = T.perform r in
+ Response res
exception MarshalError of string
@@ -109,7 +108,6 @@ module Make(T : Task) = struct
let report_status ?(id = !Flags.async_proofs_worker_id) s =
Pp.feedback ~state_id:Stateid.initial (Feedback.WorkerStatus(id, s))
-
module Worker = Spawn.Sync(struct
let add_timeout ~sec f =
ignore(Thread.create (fun () ->
@@ -120,19 +118,18 @@ module Make(T : Task) = struct
())
end)
- module Manager = struct
- type process = Worker.process
- type extra = (T.task * cancel_switch) TQueue.t * (string -> unit)
+ module Model = struct
- let naming n = Printf.sprintf "%s:%d" !T.name n
+ type process = Worker.process
+ type extra = (T.task * expiration) TQueue.t
- let rec manager extra ~cancel:cancel_user_req ~die id respawn =
- let queue, park = extra in
- let ic, oc, proc =
+ let spawn id =
+ let name = Printf.sprintf "%s:%d" !T.name id in
+ let proc, ic, oc =
let rec set_slave_opt = function
| [] -> !Flags.async_proofs_flags_for_workers @
["-toploop"; !T.name^"top";
- "-worker-id"; id;
+ "-worker-id"; name;
"-async-proofs-worker-priority";
Flags.string_of_priority !Flags.async_proofs_worker_priority]
| ("-ideslave"|"-emacs"|"-emacs-U"|"-batch")::tl -> set_slave_opt tl
@@ -144,161 +141,138 @@ module Make(T : Task) = struct
let args =
Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
let env = Array.append (T.extra_env ()) (Unix.environment ()) in
- respawn ~args ~env () in
+ Worker.spawn ~env Sys.argv.(0) args in
+ name, proc, CThread.prepare_in_channel_for_thread_friendly_io ic, oc
+
+ let manager cpanel (id, proc, ic, oc) =
+ let { WorkerPool.extra = queue; exit; cancelled } = cpanel in
let last_task = ref None in
- let task_expired = ref false in
- let task_cancelled = ref false in
let worker_age = ref `Fresh in
let got_token = ref false in
- let giveback_token () =
+ let giveback_exec_token () =
if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in
- let pick age (t, c) = not !c && T.task_match age t in
- CThread.prepare_in_channel_for_thread_friendly_io ic;
- try
- while not !die do
- prerr_endline "waiting for a task";
- report_status ~id "Idle";
- let task, cancel_switch = TQueue.pop ~picky:(pick !worker_age) queue in
- prerr_endline ("got task: "^T.name_of_task task);
- prerr_endline ("I'm : "^ match !worker_age with `Parked _ -> "parked" | _ -> "fresh/old");
- last_task := Some task;
- begin try
- let req = T.request_of_task !worker_age task in
- if req = None then raise Expired;
- ignore(CoqworkmgrApi.get 1); got_token := true;
- prerr_endline ("got execution token");
- marshal_request oc (Request (Option.get req));
- Worker.kill_if proc ~sec:1 (fun () ->
- task_expired := !cancel_switch;
- task_cancelled := !cancel_user_req;
- if !cancel_user_req then cancel_user_req := false;
- !task_expired || !task_cancelled || !die);
- let rec loop () =
- let response = unmarshal_response ic in
- match response with
- | Response resp ->
- (match T.use_response !worker_age task resp with
- | `Stay ->
- last_task := None;
- if !worker_age = `Fresh then worker_age := `Old;
- giveback_token ()
- | `Park competence ->
- prerr_endline "parking";
- last_task := None; worker_age := `Parked competence;
- park id; giveback_token ()
- | `Reset -> last_task := None; raise KillRespawn)
- | RespGetCounterNewUnivLevel ->
- marshal_more_data oc (MoreDataUnivLevel
- (CList.init 10 (fun _ ->
- Universes.new_univ_level (Global.current_dirpath ()))));
- loop ()
- | RespFeedback fbk -> T.forward_feedback fbk; loop ()
- in
- loop ()
- with
- | Expired -> prerr_endline ("Task expired: " ^ T.name_of_task task)
- | (Sys_error _|Invalid_argument _|End_of_file|KillRespawn) as e ->
- raise e (* we pass the exception to the external handler *)
- | MarshalError s -> T.on_marshal_error s task
- | e ->
- pr_err ("Uncaught exception in worker manager: "^
- string_of_ppcmds (print e));
- flush_all ()
- end;
- done;
- raise Die
- with
- | KillRespawn ->
- giveback_token ();
- Worker.kill proc; ignore(Worker.wait proc);
- manager extra ~cancel:cancel_user_req ~die id respawn
+ let stop_waiting = ref false in
+ let expiration_date = ref (ref false) in
+ let pick_task () =
+ prerr_endline "waiting for a task";
+ let pick age (t, c) = not !c && T.task_match age t in
+ let task, task_expiration =
+ TQueue.pop ~picky:(pick !worker_age) ~destroy:stop_waiting queue in
+ expiration_date := task_expiration;
+ last_task := Some task;
+ prerr_endline ("got task: "^T.name_of_task task);
+ task in
+ let add_tasks l =
+ List.iter (fun t -> TQueue.push queue (t,!expiration_date)) l in
+ let get_exec_token () =
+ ignore(CoqworkmgrApi.get 1);
+ got_token := true;
+ prerr_endline ("got execution token") in
+ let kill proc =
+ Worker.kill proc;
+ prerr_endline ("Worker exited: " ^
+ match Worker.wait proc with
+ | Unix.WEXITED 0x400 -> "exit code unavailable"
+ | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
+ | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
+ | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno) in
+ let more_univs n =
+ CList.init 10 (fun _ ->
+ Universes.new_univ_level (Global.current_dirpath ())) in
+
+ Worker.kill_if proc ~sec:1 (fun () ->
+ let stop = cancelled () || !(!expiration_date) in
+ if stop then (stop_waiting := true; TQueue.signal_destruction queue);
+ stop);
+
+ try while true do
+ report_status ~id "Idle";
+ let task = pick_task () in
+ match T.request_of_task !worker_age task with
+ | None -> prerr_endline ("Task expired: " ^ T.name_of_task task)
+ | Some req ->
+ try
+ get_exec_token ();
+ marshal_request oc (Request req);
+ let rec continue () =
+ match unmarshal_response ic with
+ | RespGetCounterNewUnivLevel ->
+ marshal_more_data oc (MoreDataUnivLevel (more_univs 10));
+ continue ()
+ | RespFeedback fbk -> T.forward_feedback fbk; continue ()
+ | Response resp ->
+ match T.use_response !worker_age task resp with
+ | `End -> raise Die
+ | `Stay(competence, new_tasks) ->
+ last_task := None;
+ giveback_exec_token ();
+ worker_age := `Old competence;
+ add_tasks new_tasks
+ in
+ continue ()
+ with
+ | (Sys_error _|Invalid_argument _|End_of_file|Die) as e ->
+ raise e (* we pass the exception to the external handler *)
+ | MarshalError s -> T.on_marshal_error s task; raise Die
+ | e ->
+ pr_err ("Uncaught exception in worker manager: "^
+ string_of_ppcmds (print e));
+ flush_all (); raise Die
+ done with
| (Die | TQueue.BeingDestroyed) ->
- giveback_token ();
- Worker.kill proc;ignore(Worker.wait proc); Thread.exit ()
- | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
- giveback_token ();
- T.on_task_cancellation_or_expiration !last_task;
- ignore(Worker.wait proc);
- manager extra ~cancel:cancel_user_req ~die id respawn
- | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
- giveback_token ();
- msg_warning(strbrk "The worker was cancelled.");
- T.on_task_cancellation_or_expiration !last_task;
- Worker.kill proc; ignore(Worker.wait proc);
- manager extra ~cancel:cancel_user_req ~die id respawn
+ giveback_exec_token (); kill proc; exit ()
| Sys_error _ | Invalid_argument _ | End_of_file ->
- giveback_token ();
- match T.on_slave_death !last_task with
- | `Stay ->
- msg_warning(strbrk "The worker process died badly.");
- Worker.kill proc; ignore(Worker.wait proc);
- manager extra ~cancel:cancel_user_req ~die id respawn
- | `Exit exit_code ->
- Worker.kill proc;
- let exit_status proc = match Worker.wait proc with
- | Unix.WEXITED 0x400 -> "exit code unavailable"
- | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
- | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
- | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in
- pr_err ("Fatal worker error: " ^ (exit_status proc));
- flush_all (); exit exit_code
+ giveback_exec_token ();
+ T.on_task_cancellation_or_expiration_or_slave_death !last_task;
+ kill proc;
+ exit ()
end
- module Pool = WorkerPool.Make(Worker)(Manager)
+ module Pool = WorkerPool.Make(Model)
type queue = {
- active : WorkerPool.active Pool.pool;
- parking : WorkerPool.parking Pool.pool;
- queue : (T.task * cancel_switch) TQueue.t;
+ active : Pool.pool;
+ queue : (T.task * expiration) TQueue.t;
cleaner : Thread.t;
}
- let create n =
- let queue = TQueue.create () in
+ let create size =
let cleaner queue =
while true do
try ignore(TQueue.pop ~picky:(fun (_,cancelled) -> !cancelled) queue)
with TQueue.BeingDestroyed -> Thread.exit ()
done in
- let atq = ref None in
- let park id =
- let atq = Option.get !atq in Pool.move atq.active atq.parking id in
- atq := Some {
- active = Pool.create_active (queue,park) n;
- parking = Pool.create_parking ();
+ let queue = TQueue.create () in
+ {
+ active = Pool.create queue ~size;
queue;
cleaner = Thread.create cleaner queue;
- };
- Option.get !atq
+ }
- let destroy { active; parking; queue } =
+ let destroy { active; queue } =
Pool.destroy active;
- Pool.destroy parking;
TQueue.destroy queue
- let enqueue_task { queue } t c =
+ let enqueue_task { queue; active } (t, _ as item) =
prerr_endline ("Enqueue task "^T.name_of_task t);
- TQueue.push queue (t,c)
+ TQueue.push queue item
- let cancel_worker { active; parking } n =
- Pool.cancel n active;
- Pool.cancel n parking
+ let cancel_worker { active } n = Pool.cancel n active
let name_of_request (Request r) = T.name_of_request r
let set_order { queue } cmp =
TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2)
- let join { queue; active; parking } =
+ let join { queue; active } =
if not (Pool.is_empty active) then
TQueue.wait_until_n_are_waiting_and_queue_empty
- (Pool.n_workers active + Pool.n_workers parking + 1(*cleaner*))
+ (Pool.n_workers active + 1(*cleaner*))
queue
- let cancel_all { queue; active; parking } =
+ let cancel_all { queue; active } =
TQueue.clear queue;
- Pool.cancel_all active;
- Pool.cancel_all parking
+ Pool.cancel_all active
let slave_ic = ref None
let slave_oc = ref None
@@ -348,22 +322,21 @@ module Make(T : Task) = struct
flush_all (); exit 1
done
- let clear { queue; active; parking } =
+ let clear { queue; active } =
assert(Pool.is_empty active); (* We allow that only if no slaves *)
TQueue.clear queue
- let snapshot { queue; active; parking } =
+ let snapshot { queue; active } =
List.map fst
(TQueue.wait_until_n_are_waiting_then_snapshot
- (Pool.n_workers active + Pool.n_workers parking) queue)
+ (Pool.n_workers active) queue)
let with_n_workers n f =
let q = create n in
try let rc = f q in destroy q; rc
with e -> let e = Errors.push e in destroy q; iraise e
- let n_workers { active; parking } =
- Pool.n_workers active, Pool.n_workers parking
+ let n_workers { active } = Pool.n_workers active
end