aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm/asyncTaskQueue.ml
diff options
context:
space:
mode:
authorGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-11-26 19:05:38 +0100
committerGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-11-27 16:06:54 +0100
commitf549ddbb9fae979d4b7b260316ea4754f2c28ffa (patch)
tree12e254fbf260fc8c8844a7de74848816b00fdc82 /stm/asyncTaskQueue.ml
parent5f7234edcf0a6bef995c8d1dc31f679799a98557 (diff)
AsyncTaskQueue: API to park a worker
Generalize the old model by letting one park a worker and by letting the (parked) worker be picky about the tasks it picks up. The use of that is the following: a proof worker, while performing its "main" task (building a proof term) computes all the intermediate states but returns only its main result. One can ask the worker to hang around, and react to special tasks, like printing the goals of an intermediate state.
Diffstat (limited to 'stm/asyncTaskQueue.ml')
-rw-r--r--stm/asyncTaskQueue.ml207
1 files changed, 129 insertions, 78 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
index 8b4e0844c..d870ff92b 100644
--- a/stm/asyncTaskQueue.ml
+++ b/stm/asyncTaskQueue.ml
@@ -14,28 +14,35 @@ let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr
let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
+type 'a worker_status = [ `Fresh | `Old | `Parked of 'a ]
+
module type Task = sig
type task
+ type competence
(* Marshallable *)
type request
type response
- val name : string (* UID of the task kind, for -toploop *)
+ val name : string ref (* UID of the task kind, for -toploop *)
val extra_env : unit -> string array
(* run by the master, on a thread *)
- val request_of_task : [ `Fresh | `Old ] -> task -> request option
- val use_response : task -> response -> [ `Stay | `StayReset ]
+ val request_of_task : competence worker_status -> task -> request option
+ val task_match : competence worker_status -> task -> bool
+ val use_response :
+ competence worker_status -> task -> response ->
+ [ `Stay | `Park of competence | `Reset ]
val on_marshal_error : string -> task -> unit
val on_slave_death : task option -> [ `Exit of int | `Stay ]
val on_task_cancellation_or_expiration : task option -> unit
- val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit
+ val forward_feedback :
+ Stateid.t -> Feedback.route_id -> Feedback.feedback_content -> unit
(* run by the worker *)
val perform : request -> response
-
+
(* debugging *)
val name_of_task : task -> string
val name_of_request : request -> string
@@ -46,34 +53,14 @@ type cancel_switch = bool ref
module Make(T : Task) = struct
- module Worker = Spawn.Sync(struct
- let add_timeout ~sec f =
- ignore(Thread.create (fun () ->
- while true do
- Unix.sleep sec;
- if not (f ()) then Thread.exit ()
- done)
- ())
- end)
-
- module WorkersPool = WorkerPool.Make(Worker)
-
- let queue : (T.task * cancel_switch) TQueue.t = TQueue.create ()
-
- let enqueue_task t c =
- prerr_endline ("Enqueue task "^T.name_of_task t);
- TQueue.push queue (t,c)
-
- let cancel_worker = WorkersPool.cancel
-
- type request = Request of T.request
-
- let name_of_request (Request r) = T.name_of_request r
-
+ exception KillRespawn
+ exception Die
+ exception Expired
type response =
| Response of T.response
| RespFeedback of Feedback.feedback
| RespGetCounterNewUnivLevel
+ type request = Request of T.request
type more_data =
| MoreDataUnivLevel of Univ.universe_level list
@@ -83,23 +70,23 @@ module Make(T : Task) = struct
let slave_respond msg = match msg with Request r -> Response (T.perform r)
exception MarshalError of string
-
+
let marshal_to_channel oc data =
Marshal.to_channel oc data [];
flush oc
-
+
let marshal_err s = raise (MarshalError s)
-
+
let marshal_request oc (req : request) =
try marshal_to_channel oc req
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("marshal_request: "^s)
-
+
let unmarshal_request ic =
try (Marshal.from_channel ic : request)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_request: "^s)
-
+
let marshal_response oc (res : response) =
try marshal_to_channel oc res
with Failure s | Invalid_argument s | Sys_error s ->
@@ -109,7 +96,7 @@ module Make(T : Task) = struct
try (CThread.thread_friendly_input_value ic : response)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_response: "^s)
-
+
let marshal_more_data oc (res : more_data) =
try marshal_to_channel oc res
with Failure s | Invalid_argument s | Sys_error s ->
@@ -120,28 +107,32 @@ module Make(T : Task) = struct
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_more_data: "^s)
- let set_order cmp = TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2)
-
- let join () =
- if not (WorkersPool.is_empty ()) then
- TQueue.wait_until_n_are_waiting_and_queue_empty
- (WorkersPool.n_workers ()) queue
- let cancel_all () =
- TQueue.clear queue;
- WorkersPool.cancel_all ()
-
- exception KillRespawn
- exception Die
- exception Expired
-
let report_status ?(id = !Flags.async_proofs_worker_id) s =
Pp.feedback ~state_id:Stateid.initial (Feedback.WorkerStatus(id, s))
- let rec manage_slave ~cancel:cancel_user_req ~die id respawn =
+
+ module Worker = Spawn.Sync(struct
+ let add_timeout ~sec f =
+ ignore(Thread.create (fun () ->
+ while true do
+ Unix.sleep sec;
+ if not (f ()) then Thread.exit ()
+ done)
+ ())
+ end)
+
+ module Manager = struct
+ type process = Worker.process
+ type extra = (T.task * cancel_switch) TQueue.t * (string -> unit)
+
+ let naming n = Printf.sprintf "%s:%d" !T.name n
+
+ let rec manager extra ~cancel:cancel_user_req ~die id respawn =
+ let queue, park = extra in
let ic, oc, proc =
let rec set_slave_opt = function
| [] -> !Flags.async_proofs_flags_for_workers @
- ["-toploop"; T.name^"top";
+ ["-toploop"; !T.name^"top";
"-worker-id"; id;
"-async-proofs-worker-priority";
Flags.string_of_priority !Flags.async_proofs_worker_priority]
@@ -162,13 +153,15 @@ module Make(T : Task) = struct
let got_token = ref false in
let giveback_token () =
if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in
+ let pick age (t, c) = not !c && T.task_match age t in
CThread.prepare_in_channel_for_thread_friendly_io ic;
try
while not !die do
prerr_endline "waiting for a task";
report_status ~id "Idle";
- let task, cancel_switch = TQueue.pop queue in
+ let task, cancel_switch = TQueue.pop ~picky:(pick !worker_age) queue in
prerr_endline ("got task: "^T.name_of_task task);
+ prerr_endline ("I'm : "^ match !worker_age with `Parked _ -> "parked" | _ -> "fresh/old");
last_task := Some task;
begin try
let req = T.request_of_task !worker_age task in
@@ -185,17 +178,24 @@ module Make(T : Task) = struct
let response = unmarshal_response ic in
match response with
| Response resp ->
- (match T.use_response task resp with
+ (match T.use_response !worker_age task resp with
| `Stay ->
- last_task := None; worker_age := `Old; giveback_token ()
- | `StayReset -> last_task := None; raise KillRespawn)
+ last_task := None;
+ if !worker_age = `Fresh then worker_age := `Old;
+ giveback_token ()
+ | `Park competence ->
+ prerr_endline "parking";
+ last_task := None; worker_age := `Parked competence;
+ park id; giveback_token ()
+ | `Reset -> last_task := None; raise KillRespawn)
| RespGetCounterNewUnivLevel ->
marshal_more_data oc (MoreDataUnivLevel
(CList.init 10 (fun _ ->
Universes.new_univ_level (Global.current_dirpath ()))));
loop ()
- | RespFeedback ({ Feedback.id = Feedback.State state_id } as fbk) ->
- T.forward_feedback state_id fbk.Feedback.content;
+ | RespFeedback ({ Feedback.id = Feedback.State state_id;
+ Feedback.route = rid } as fbk) ->
+ T.forward_feedback state_id rid fbk.Feedback.content;
loop ()
| RespFeedback _ -> Errors.anomaly (str"Parsing in master process")
in
@@ -216,28 +216,28 @@ module Make(T : Task) = struct
| KillRespawn ->
giveback_token ();
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req ~die id respawn
+ manager extra ~cancel:cancel_user_req ~die id respawn
| (Die | TQueue.BeingDestroyed) ->
giveback_token ();
- Worker.kill proc;ignore(Worker.wait proc)
+ Worker.kill proc;ignore(Worker.wait proc); Thread.exit ()
| Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
giveback_token ();
T.on_task_cancellation_or_expiration !last_task;
ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req ~die id respawn
+ manager extra ~cancel:cancel_user_req ~die id respawn
| Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
giveback_token ();
msg_warning(strbrk "The worker was cancelled.");
T.on_task_cancellation_or_expiration !last_task;
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req ~die id respawn
+ manager extra ~cancel:cancel_user_req ~die id respawn
| Sys_error _ | Invalid_argument _ | End_of_file ->
giveback_token ();
match T.on_slave_death !last_task with
| `Stay ->
msg_warning(strbrk "The worker process died badly.");
Worker.kill proc; ignore(Worker.wait proc);
- manage_slave ~cancel:cancel_user_req ~die id respawn
+ manager extra ~cancel:cancel_user_req ~die id respawn
| `Exit exit_code ->
Worker.kill proc;
let exit_status proc = match Worker.wait proc with
@@ -247,6 +247,63 @@ module Make(T : Task) = struct
| Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in
pr_err ("Fatal worker error: " ^ (exit_status proc));
flush_all (); exit exit_code
+ end
+
+ module Pool = WorkerPool.Make(Worker)(Manager)
+
+ type queue = {
+ active : WorkerPool.active Pool.pool;
+ parking : WorkerPool.parking Pool.pool;
+ queue : (T.task * cancel_switch) TQueue.t;
+ cleaner : Thread.t;
+ }
+
+ let create n =
+ let queue = TQueue.create () in
+ let cleaner queue =
+ while true do
+ try ignore(TQueue.pop ~picky:(fun (_,cancelled) -> !cancelled) queue)
+ with TQueue.BeingDestroyed -> Thread.exit ()
+ done in
+ let atq = ref None in
+ let park id =
+ let atq = Option.get !atq in Pool.move atq.active atq.parking id in
+ atq := Some {
+ active = Pool.create_active (queue,park) n;
+ parking = Pool.create_parking ();
+ queue;
+ cleaner = Thread.create cleaner queue;
+ };
+ Option.get !atq
+
+ let destroy { active; parking; queue } =
+ Pool.destroy active;
+ Pool.destroy parking;
+ TQueue.destroy queue
+
+ let enqueue_task { queue } t c =
+ prerr_endline ("Enqueue task "^T.name_of_task t);
+ TQueue.push queue (t,c)
+
+ let cancel_worker { active; parking } n =
+ Pool.cancel active n;
+ Pool.cancel parking n
+
+ let name_of_request (Request r) = T.name_of_request r
+
+ let set_order { queue } cmp =
+ TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2)
+
+ let join { queue; active; parking } =
+ if not (Pool.is_empty active) then
+ TQueue.wait_until_n_are_waiting_and_queue_empty
+ (Pool.n_workers active + Pool.n_workers parking + 1(*cleaner*))
+ queue
+
+ let cancel_all { queue; active; parking } =
+ TQueue.clear queue;
+ Pool.cancel_all active;
+ Pool.cancel_all parking
let slave_ic = ref stdin
let slave_oc = ref stdout
@@ -262,7 +319,7 @@ module Make(T : Task) = struct
| [] -> let data = f () in l := List.tl data; List.hd data
| x::tl -> l := tl; x
- let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc
+ let slave_handshake () = Pool.worker_handshake !slave_ic !slave_oc
let main_loop () =
let slave_feeder oc fb =
@@ -295,28 +352,22 @@ module Make(T : Task) = struct
flush_all (); exit 1
done
- let clear () =
- assert(WorkersPool.is_empty ()); (* We allow that only if no slaves *)
+ let clear { queue; active; parking } =
+ assert(Pool.is_empty active); (* We allow that only if no slaves *)
TQueue.clear queue
- let snapshot () =
+ let snapshot { queue; active; parking } =
List.map fst
(TQueue.wait_until_n_are_waiting_then_snapshot
- (WorkersPool.n_workers ()) queue)
-
- let init n =
- WorkersPool.init n manage_slave
- (fun n -> Printf.sprintf "%s:%d" T.name n)
-
- let destroy () =
- WorkersPool.destroy ();
- TQueue.destroy queue
+ (Pool.n_workers active + Pool.n_workers parking) queue)
let with_n_workers n f =
- try init n; let rc = f ~join ~cancel_all in destroy (); rc
- with e -> let e = Errors.push e in destroy (); raise e
+ let q = create n in
+ try let rc = f q in destroy q; rc
+ with e -> let e = Errors.push e in destroy q; raise e
- let n_workers = WorkersPool.n_workers
+ let n_workers { active; parking } =
+ Pool.n_workers active, Pool.n_workers parking
end