aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm/asyncTaskQueue.ml
diff options
context:
space:
mode:
Diffstat (limited to 'stm/asyncTaskQueue.ml')
-rw-r--r--stm/asyncTaskQueue.ml207
1 files changed, 129 insertions, 78 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
index 8b4e0844c..d870ff92b 100644
--- a/stm/asyncTaskQueue.ml
+++ b/stm/asyncTaskQueue.ml
@@ -14,28 +14,35 @@ let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr
let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
+type 'a worker_status = [ `Fresh | `Old | `Parked of 'a ]
+
module type Task = sig
type task
+ type competence
(* Marshallable *)
type request
type response
- val name : string (* UID of the task kind, for -toploop *)
+ val name : string ref (* UID of the task kind, for -toploop *)
val extra_env : unit -> string array
(* run by the master, on a thread *)
- val request_of_task : [ `Fresh | `Old ] -> task -> request option
- val use_response : task -> response -> [ `Stay | `StayReset ]
+ val request_of_task : competence worker_status -> task -> request option
+ val task_match : competence worker_status -> task -> bool
+ val use_response :
+ competence worker_status -> task -> response ->
+ [ `Stay | `Park of competence | `Reset ]
val on_marshal_error : string -> task -> unit
val on_slave_death : task option -> [ `Exit of int | `Stay ]
val on_task_cancellation_or_expiration : task option -> unit
- val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit
+ val forward_feedback :
+ Stateid.t -> Feedback.route_id -> Feedback.feedback_content -> unit
(* run by the worker *)
val perform : request -> response
-
+
(* debugging *)
val name_of_task : task -> string
val name_of_request : request -> string
@@ -46,34 +53,14 @@ type cancel_switch = bool ref
module Make(T : Task) = struct
- module Worker = Spawn.Sync(struct
- let add_timeout ~sec f =
- ignore(Thread.create (fun () ->
- while true do
- Unix.sleep sec;
- if not (f ()) then Thread.exit ()
- done)
- ())
- end)
-
- module WorkersPool = WorkerPool.Make(Worker)
-
- let queue : (T.task * cancel_switch) TQueue.t = TQueue.create ()
-
- let enqueue_task t c =
- prerr_endline ("Enqueue task "^T.name_of_task t);
- TQueue.push queue (t,c)
-
- let cancel_worker = WorkersPool.cancel
-
- type request = Request of T.request
-
- let name_of_request (Request r) = T.name_of_request r
-
+ exception KillRespawn
+ exception Die
+ exception Expired
type response =
| Response of T.response
| RespFeedback of Feedback.feedback
| RespGetCounterNewUnivLevel
+ type request = Request of T.request
type more_data =
| MoreDataUnivLevel of Univ.universe_level list
@@ -83,23 +70,23 @@ module Make(T : Task) = struct
let slave_respond msg = match msg with Request r -> Response (T.perform r)
exception MarshalError of string
-
+
let marshal_to_channel oc data =
Marshal.to_channel oc data [];
flush oc
-
+
let marshal_err s = raise (MarshalError s)
-
+
let marshal_request oc (req : request) =
try marshal_to_channel oc req
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("marshal_request: "^s)
-
+
let unmarshal_request ic =
try (Marshal.from_channel ic : request)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_request: "^s)
-
+
let marshal_response oc (res : response) =
try marshal_to_channel oc res
with Failure s | Invalid_argument s | Sys_error s ->
@@ -109,7 +96,7 @@ module Make(T : Task) = struct
try (CThread.thread_friendly_input_value ic : response)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_response: "^s)
-
+
let marshal_more_data oc (res : more_data) =
try marshal_to_channel oc res
with Failure s | Invalid_argument s | Sys_error s ->
@@ -120,28 +107,32 @@ module Make(T : Task) = struct
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_more_data: "^s)
- let set_order cmp = TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2)
-
- let join () =
- if not (WorkersPool.is_empty ()) then
- TQueue.wait_until_n_are_waiting_and_queue_empty
- (WorkersPool.n_workers ()) queue
- let cancel_all () =
- TQueue.clear queue;
- WorkersPool.cancel_all ()
-
- exception KillRespawn
- exception Die
- exception Expired
-
let report_status ?(id = !Flags.async_proofs_worker_id) s =
Pp.feedback ~state_id:Stateid.initial (Feedback.WorkerStatus(id, s))
- let rec manage_slave ~cancel:cancel_user_req ~die id respawn =
+
+ module Worker = Spawn.Sync(struct
+ let add_timeout ~sec f =
+ ignore(Thread.create (fun () ->
+ while true do
+ Unix.sleep sec;
+ if not (f ()) then Thread.exit ()
+ done)
+ ())
+ end)
+
+ module Manager = struct
+ type process = Worker.process
+ type extra = (T.task * cancel_switch) TQueue.t * (string -> unit)
+
+ let naming n = Printf.sprintf "%s:%d" !T.name n
+
+ let rec manager extra ~cancel:cancel_user_req ~die id respawn =
+ let queue, park = extra in
let ic, oc, proc =
let rec set_slave_opt = function
| [] -> !Flags.async_proofs_flags_for_workers @
- ["-toploop"; T.name^"top";
+ ["-toploop"; !T.name^"top";
"-worker-id"; id;
"-async-proofs-worker-priority";
Flags.string_of_priority !Flags.async_proofs_worker_priority]
@@ -162,13 +153,15 @@ module Make(T : Task) = struct
let got_token = ref false in
let giveback_token () =
if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in
+ let pick age (t, c) = not !c && T.task_match age t in
CThread.prepare_in_channel_for_thread_friendly_io ic;
try
while not !die do
prerr_endline "waiting for a task";
report_status ~id "Idle";
- let task, cancel_switch = TQueue.pop queue in
+ let task, cancel_switch = TQueue.pop ~picky:(pick !worker_age) queue in
prerr_endline ("got task: "^T.name_of_task task);
+ prerr_endline ("I'm : "^ match !worker_age with `Parked _ -> "parked" | _ -> "fresh/old");
last_task := Some task;
begin try
let req = T.request_of_task !worker_age task in
@@ -185,17 +178,24 @@ module Make(T : Task) = struct
let response = unmarshal_response ic in
match response with
| Response resp ->
- (match T.use_response task resp with
+ (match T.use_response !worker_age task resp with
| `Stay ->
- last_task := None; worker_age := `Old; giveback_token ()
- | `StayReset -> last_task := None; raise KillRespawn)
+ last_task := None;
+ if !worker_age = `Fresh then worker_age := `Old;
+ giveback_token ()
+ | `Park competence ->
+ prerr_endline "parking";
+ last_task := None; worker_age := `Parked competence;
+ park id; giveback_token ()
+ | `Reset -> last_task := None; raise KillRespawn)
| RespGetCounterNewUnivLevel ->
marshal_more_data oc (MoreDataUnivLevel
(CList.init 10 (fun _ ->
Universes.new_univ_level (Global.current_dirpath ()))));
loop ()
- | RespFeedback ({ Feedback.id = Feedback.State state_id } as fbk) ->
- T.forward_feedback state_id fbk.Feedback.content;
+ | RespFeedback ({ Feedback.id = Feedback.State state_id;
+ Feedback.route = rid } as fbk) ->
+ T.forward_feedback state_id rid fbk.Feedback.content;
loop ()
| RespFeedback _ -> Errors.anomaly (str"Parsing in master process")
in
@@ -216,28 +216,28 @@ module Make(T : Task) = struct
| KillRespawn ->
giveback_token ();
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req ~die id respawn
+ manager extra ~cancel:cancel_user_req ~die id respawn
| (Die | TQueue.BeingDestroyed) ->
giveback_token ();
- Worker.kill proc;ignore(Worker.wait proc)
+ Worker.kill proc;ignore(Worker.wait proc); Thread.exit ()
| Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
giveback_token ();
T.on_task_cancellation_or_expiration !last_task;
ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req ~die id respawn
+ manager extra ~cancel:cancel_user_req ~die id respawn
| Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
giveback_token ();
msg_warning(strbrk "The worker was cancelled.");
T.on_task_cancellation_or_expiration !last_task;
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req ~die id respawn
+ manager extra ~cancel:cancel_user_req ~die id respawn
| Sys_error _ | Invalid_argument _ | End_of_file ->
giveback_token ();
match T.on_slave_death !last_task with
| `Stay ->
msg_warning(strbrk "The worker process died badly.");
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req ~die id respawn
+ manager extra ~cancel:cancel_user_req ~die id respawn
| `Exit exit_code ->
Worker.kill proc;
let exit_status proc = match Worker.wait proc with
@@ -247,6 +247,63 @@ module Make(T : Task) = struct
| Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in
pr_err ("Fatal worker error: " ^ (exit_status proc));
flush_all (); exit exit_code
+ end
+
+ module Pool = WorkerPool.Make(Worker)(Manager)
+
+ type queue = {
+ active : WorkerPool.active Pool.pool;
+ parking : WorkerPool.parking Pool.pool;
+ queue : (T.task * cancel_switch) TQueue.t;
+ cleaner : Thread.t;
+ }
+
+ let create n =
+ let queue = TQueue.create () in
+ let cleaner queue =
+ while true do
+ try ignore(TQueue.pop ~picky:(fun (_,cancelled) -> !cancelled) queue)
+ with TQueue.BeingDestroyed -> Thread.exit ()
+ done in
+ let atq = ref None in
+ let park id =
+ let atq = Option.get !atq in Pool.move atq.active atq.parking id in
+ atq := Some {
+ active = Pool.create_active (queue,park) n;
+ parking = Pool.create_parking ();
+ queue;
+ cleaner = Thread.create cleaner queue;
+ };
+ Option.get !atq
+
+ let destroy { active; parking; queue } =
+ Pool.destroy active;
+ Pool.destroy parking;
+ TQueue.destroy queue
+
+ let enqueue_task { queue } t c =
+ prerr_endline ("Enqueue task "^T.name_of_task t);
+ TQueue.push queue (t,c)
+
+ let cancel_worker { active; parking } n =
+ Pool.cancel active n;
+ Pool.cancel parking n
+
+ let name_of_request (Request r) = T.name_of_request r
+
+ let set_order { queue } cmp =
+ TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2)
+
+ let join { queue; active; parking } =
+ if not (Pool.is_empty active) then
+ TQueue.wait_until_n_are_waiting_and_queue_empty
+ (Pool.n_workers active + Pool.n_workers parking + 1(*cleaner*))
+ queue
+
+ let cancel_all { queue; active; parking } =
+ TQueue.clear queue;
+ Pool.cancel_all active;
+ Pool.cancel_all parking
let slave_ic = ref stdin
let slave_oc = ref stdout
@@ -262,7 +319,7 @@ module Make(T : Task) = struct
| [] -> let data = f () in l := List.tl data; List.hd data
| x::tl -> l := tl; x
- let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc
+ let slave_handshake () = Pool.worker_handshake !slave_ic !slave_oc
let main_loop () =
let slave_feeder oc fb =
@@ -295,28 +352,22 @@ module Make(T : Task) = struct
flush_all (); exit 1
done
- let clear () =
- assert(WorkersPool.is_empty ()); (* We allow that only if no slaves *)
+ let clear { queue; active; parking } =
+ assert(Pool.is_empty active); (* We allow that only if no slaves *)
TQueue.clear queue
- let snapshot () =
+ let snapshot { queue; active; parking } =
List.map fst
(TQueue.wait_until_n_are_waiting_then_snapshot
- (WorkersPool.n_workers ()) queue)
-
- let init n =
- WorkersPool.init n manage_slave
- (fun n -> Printf.sprintf "%s:%d" T.name n)
-
- let destroy () =
- WorkersPool.destroy ();
- TQueue.destroy queue
+ (Pool.n_workers active + Pool.n_workers parking) queue)
let with_n_workers n f =
- try init n; let rc = f ~join ~cancel_all in destroy (); rc
- with e -> let e = Errors.push e in destroy (); raise e
+ let q = create n in
+ try let rc = f q in destroy q; rc
+ with e -> let e = Errors.push e in destroy q; raise e
- let n_workers = WorkersPool.n_workers
+ let n_workers { active; parking } =
+ Pool.n_workers active, Pool.n_workers parking
end