summaryrefslogtreecommitdiff
path: root/stm/asyncTaskQueue.ml
diff options
context:
space:
mode:
Diffstat (limited to 'stm/asyncTaskQueue.ml')
-rw-r--r--stm/asyncTaskQueue.ml127
1 files changed, 59 insertions, 68 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
index fa6422cd..b3e1500a 100644
--- a/stm/asyncTaskQueue.ml
+++ b/stm/asyncTaskQueue.ml
@@ -1,26 +1,30 @@
(************************************************************************)
-(* v * The Coq Proof Assistant / The Coq Development Team *)
-(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2016 *)
+(* * The Coq Proof Assistant / The Coq Development Team *)
+(* v * INRIA, CNRS and contributors - Copyright 1999-2018 *)
+(* <O___,, * (see CREDITS file for the list of authors) *)
(* \VV/ **************************************************************)
-(* // * This file is distributed under the terms of the *)
-(* * GNU Lesser General Public License Version 2.1 *)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(* * (see LICENSE file for the text of the license) *)
(************************************************************************)
open CErrors
open Pp
open Util
-let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr
+let stm_pr_err pp = Format.eprintf "%s] @[%a@]\n%!" (Spawned.process_id ()) Pp.pp_with pp
+let stm_prerr_endline s = if !Flags.debug then begin stm_pr_err (str s) end else ()
-let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
-
-type 'a worker_status = [ `Fresh | `Old of 'a ]
+type cancel_switch = bool ref
+let async_proofs_flags_for_workers = ref []
module type Task = sig
type task
type competence
+ type worker_status = Fresh | Old of competence
+
(* Marshallable *)
type request
type response
@@ -29,15 +33,14 @@ module type Task = sig
val extra_env : unit -> string array
(* run by the master, on a thread *)
- val request_of_task : competence worker_status -> task -> request option
- val task_match : competence worker_status -> task -> bool
- val use_response :
- competence worker_status -> task -> response ->
- [ `Stay of competence * task list | `End ]
+ val request_of_task : worker_status -> task -> request option
+ val task_match : worker_status -> task -> bool
+ val use_response : worker_status -> task -> response ->
+ [ `Stay of competence * task list | `End ]
val on_marshal_error : string -> task -> unit
val on_task_cancellation_or_expiration_or_slave_death : task option -> unit
val forward_feedback : Feedback.feedback -> unit
-
+
(* run by the worker *)
val perform : request -> response
@@ -47,9 +50,7 @@ module type Task = sig
end
-type expiration = bool ref
-
-module Make(T : Task) = struct
+module Make(T : Task) () = struct
exception Die
type response =
@@ -59,45 +60,45 @@ module Make(T : Task) = struct
type request = Request of T.request
type more_data =
- | MoreDataUnivLevel of Univ.universe_level list
+ | MoreDataUnivLevel of Universes.universe_id list
let slave_respond (Request r) =
let res = T.perform r in
Response res
exception MarshalError of string
-
+
let marshal_to_channel oc data =
Marshal.to_channel oc data [];
flush oc
-
+
let marshal_err s = raise (MarshalError s)
-
+
let marshal_request oc (req : request) =
try marshal_to_channel oc req
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("marshal_request: "^s)
-
+
let unmarshal_request ic =
try (CThread.thread_friendly_input_value ic : request)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_request: "^s)
-
+
let marshal_response oc (res : response) =
try marshal_to_channel oc res
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("marshal_response: "^s)
-
+
let unmarshal_response ic =
try (CThread.thread_friendly_input_value ic : response)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_response: "^s)
-
+
let marshal_more_data oc (res : more_data) =
try marshal_to_channel oc res
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("marshal_more_data: "^s)
-
+
let unmarshal_more_data ic =
try (CThread.thread_friendly_input_value ic : more_data)
with Failure s | Invalid_argument s | Sys_error s ->
@@ -105,24 +106,24 @@ module Make(T : Task) = struct
let report_status ?(id = !Flags.async_proofs_worker_id) s =
let open Feedback in
- feedback ~id:(State Stateid.initial) (WorkerStatus(id, s))
+ feedback ~id:Stateid.initial (WorkerStatus(id, s))
- module Worker = Spawn.Sync(struct end)
+ module Worker = Spawn.Sync ()
module Model = struct
type process = Worker.process
- type extra = (T.task * expiration) TQueue.t
+ type extra = (T.task * cancel_switch) TQueue.t
let spawn id =
let name = Printf.sprintf "%s:%d" !T.name id in
let proc, ic, oc =
let rec set_slave_opt = function
- | [] -> !Flags.async_proofs_flags_for_workers @
+ | [] -> !async_proofs_flags_for_workers @
["-toploop"; !T.name^"top";
"-worker-id"; name;
"-async-proofs-worker-priority";
- Flags.string_of_priority !Flags.async_proofs_worker_priority]
+ CoqworkmgrApi.(string_of_priority !WorkerLoop.async_proofs_worker_priority)]
| ("-ideslave"|"-emacs"|"-emacs-U"|"-batch")::tl -> set_slave_opt tl
| ("-async-proofs" |"-toploop" |"-vio2vo"
|"-load-vernac-source" |"-l" |"-load-vernac-source-verbose" |"-lv"
@@ -140,38 +141,37 @@ module Make(T : Task) = struct
let { WorkerPool.extra = queue; exit; cancelled } = cpanel in
let exit () = report_status ~id "Dead"; exit () in
let last_task = ref None in
- let worker_age = ref `Fresh in
+ let worker_age = ref T.Fresh in
let got_token = ref false in
let giveback_exec_token () =
if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in
let stop_waiting = ref false in
let expiration_date = ref (ref false) in
let pick_task () =
- prerr_endline "waiting for a task";
+ stm_prerr_endline "waiting for a task";
let pick age (t, c) = not !c && T.task_match age t in
let task, task_expiration =
TQueue.pop ~picky:(pick !worker_age) ~destroy:stop_waiting queue in
expiration_date := task_expiration;
last_task := Some task;
- prerr_endline ("got task: "^T.name_of_task task);
+ stm_prerr_endline ("got task: " ^ T.name_of_task task);
task in
let add_tasks l =
List.iter (fun t -> TQueue.push queue (t,!expiration_date)) l in
let get_exec_token () =
ignore(CoqworkmgrApi.get 1);
got_token := true;
- prerr_endline ("got execution token") in
+ stm_prerr_endline ("got execution token") in
let kill proc =
Worker.kill proc;
- prerr_endline ("Worker exited: " ^
+ stm_prerr_endline ("Worker exited: " ^
match Worker.wait proc with
| Unix.WEXITED 0x400 -> "exit code unavailable"
| Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
| Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
| Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno) in
let more_univs n =
- CList.init 10 (fun _ ->
- Universes.new_univ_level (Global.current_dirpath ())) in
+ CList.init n (fun _ -> Universes.new_univ_id ()) in
let rec kill_if () =
if not (Worker.is_alive proc) then ()
@@ -196,7 +196,7 @@ module Make(T : Task) = struct
report_status ~id "Idle";
let task = pick_task () in
match T.request_of_task !worker_age task with
- | None -> prerr_endline ("Task expired: " ^ T.name_of_task task)
+ | None -> stm_prerr_endline ("Task expired: " ^ T.name_of_task task)
| Some req ->
try
get_exec_token ();
@@ -213,7 +213,7 @@ module Make(T : Task) = struct
| `Stay(competence, new_tasks) ->
last_task := None;
giveback_exec_token ();
- worker_age := `Old competence;
+ worker_age := T.Old competence;
add_tasks new_tasks
in
continue ()
@@ -222,8 +222,7 @@ module Make(T : Task) = struct
raise e (* we pass the exception to the external handler *)
| MarshalError s -> T.on_marshal_error s task; raise Die
| e ->
- pr_err ("Uncaught exception in worker manager: "^
- string_of_ppcmds (print e));
+ stm_pr_err Pp.(seq [str "Uncaught exception in worker manager: "; print e]);
flush_all (); raise Die
done with
| (Die | TQueue.BeingDestroyed) ->
@@ -237,8 +236,8 @@ module Make(T : Task) = struct
type queue = {
active : Pool.pool;
- queue : (T.task * expiration) TQueue.t;
- cleaner : Thread.t;
+ queue : (T.task * cancel_switch) TQueue.t;
+ cleaner : Thread.t option;
}
let create size =
@@ -251,18 +250,18 @@ module Make(T : Task) = struct
{
active = Pool.create queue ~size;
queue;
- cleaner = Thread.create cleaner queue;
+ cleaner = if size > 0 then Some (Thread.create cleaner queue) else None;
}
-
+
let destroy { active; queue } =
Pool.destroy active;
TQueue.destroy queue
let broadcast { queue } = TQueue.broadcast queue
- let enqueue_task { queue; active } (t, _ as item) =
- prerr_endline ("Enqueue task "^T.name_of_task t);
- TQueue.push queue item
+ let enqueue_task { queue; active } t ~cancel_switch =
+ stm_prerr_endline ("Enqueue task "^T.name_of_task t);
+ TQueue.push queue (t, cancel_switch)
let cancel_worker { active } n = Pool.cancel n active
@@ -298,28 +297,20 @@ module Make(T : Task) = struct
let slave_handshake () =
Pool.worker_handshake (Option.get !slave_ic) (Option.get !slave_oc)
- let pp_pid pp =
- (* Breaking all abstraction barriers... very nice *)
- let get_xml pp = match Richpp.repr pp with
- | Xml_datatype.Element("_", [], xml) -> xml
- | _ -> assert false in
- Richpp.richpp_of_xml (Xml_datatype.Element("_", [],
- get_xml (Richpp.richpp_of_pp Pp.(str (System.process_id ()^ " "))) @
- get_xml pp))
+ let pp_pid pp = Pp.(str (Spawned.process_id () ^ " ") ++ pp)
let debug_with_pid = Feedback.(function
| { contents = Message(Debug, loc, pp) } as fb ->
- { fb with contents = Message(Debug,loc,pp_pid pp) }
+ { fb with contents = Message(Debug,loc, pp_pid pp) }
| x -> x)
let main_loop () =
(* We pass feedback to master *)
let slave_feeder oc fb =
Marshal.to_channel oc (RespFeedback (debug_with_pid fb)) []; flush oc in
- Feedback.add_feeder (fun x -> slave_feeder (Option.get !slave_oc) x);
- Feedback.set_logger Feedback.feedback_logger;
+ ignore (Feedback.add_feeder (fun x -> slave_feeder (Option.get !slave_oc) x));
(* We ask master to allocate universe identifiers *)
- Universes.set_remote_new_univ_level (bufferize (fun () ->
+ Universes.set_remote_new_univ_id (bufferize (fun () ->
marshal_response (Option.get !slave_oc) RespGetCounterNewUnivLevel;
match unmarshal_more_data (Option.get !slave_ic) with
| MoreDataUnivLevel l -> l));
@@ -337,25 +328,25 @@ module Make(T : Task) = struct
CEphemeron.clear ()
with
| MarshalError s ->
- pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 2
+ stm_pr_err Pp.(prlist str ["Fatal marshal error: "; s]); flush_all (); exit 2
| End_of_file ->
- prerr_endline "connection lost"; flush_all (); exit 2
+ stm_prerr_endline "connection lost"; flush_all (); exit 2
| e ->
- pr_err ("Slave: critical exception: " ^ Pp.string_of_ppcmds (print e));
+ stm_pr_err Pp.(seq [str "Slave: critical exception: "; print e]);
flush_all (); exit 1
done
let clear { queue; active } =
assert(Pool.is_empty active); (* We allow that only if no slaves *)
TQueue.clear queue
-
+
let snapshot { queue; active } =
List.map fst
(TQueue.wait_until_n_are_waiting_then_snapshot
(Pool.n_workers active) queue)
let with_n_workers n f =
- let q = create n in
+ let q = create n in
try let rc = f q in destroy q; rc
with e -> let e = CErrors.push e in destroy q; iraise e
@@ -363,5 +354,5 @@ module Make(T : Task) = struct
end
-module MakeQueue(T : Task) = struct include Make(T) end
-module MakeWorker(T : Task) = struct include Make(T) end
+module MakeQueue(T : Task) () = struct include Make(T) () end
+module MakeWorker(T : Task) () = struct include Make(T) () end