aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm/asyncTaskQueue.ml
diff options
context:
space:
mode:
Diffstat (limited to 'stm/asyncTaskQueue.ml')
-rw-r--r--stm/asyncTaskQueue.ml296
1 files changed, 296 insertions, 0 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
new file mode 100644
index 000000000..65219724e
--- /dev/null
+++ b/stm/asyncTaskQueue.ml
@@ -0,0 +1,296 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2014 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+open Errors
+open Pp
+open Util
+
+let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr
+
+let prerr_endline s = if !Flags.debug then begin pr_err s end else ()
+
+module type Task = sig
+
+ type task
+
+ (* Marshallable *)
+ type request
+ type response
+
+ val name : string (* UID of the task kind, for -toploop *)
+ val extra_env : unit -> string array
+
+ (* run by the master, on a thread *)
+ val request_of_task : task -> request option
+ val use_response : task -> response -> [ `Die | `Stay | `StayReset ]
+ val on_marshal_error : string -> task -> unit
+ val on_slave_death : task option -> [ `Exit of int | `Stay ]
+ val on_task_cancellation_or_expiration : task option -> unit
+ val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit
+
+ (* run by the worker *)
+ val perform : request -> response
+
+ (* debugging *)
+ val name_of_task : task -> string
+ val name_of_request : request -> string
+
+end
+
+type cancel_switch = bool ref
+
+module Make(T : Task) = struct
+
+ module Worker = Spawn.Sync(struct
+ let add_timeout ~sec f =
+ ignore(Thread.create (fun () ->
+ while true do
+ Unix.sleep sec;
+ if not (f ()) then Thread.exit ()
+ done)
+ ())
+ end)
+
+ module WorkersPool = WorkerPool.Make(Worker)
+
+ let queue : (T.task * cancel_switch) TQueue.t = TQueue.create ()
+
+ let enqueue_task t c =
+ prerr_endline ("Enqueue task "^T.name_of_task t);
+ TQueue.push queue (t,c)
+
+ let cancel_worker = WorkersPool.cancel
+
+ type request = Request of T.request
+
+ let name_of_request (Request r) = T.name_of_request r
+
+ type response =
+ | Response of T.response
+ | RespFeedback of Feedback.feedback
+ | RespGetCounterNewUnivLevel
+
+ type more_data =
+ | MoreDataUnivLevel of Univ.universe_level list
+
+ let request_cswitch_of_task (t, c) = T.request_of_task t, c
+
+ let slave_respond msg = match msg with Request r -> Response (T.perform r)
+
+ exception MarshalError of string
+
+ let marshal_to_channel oc data =
+ Marshal.to_channel oc data [];
+ flush oc
+
+ let marshal_err s = raise (MarshalError s)
+
+ let marshal_request oc (req : request) =
+ try marshal_to_channel oc req
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("marshal_request: "^s)
+
+ let unmarshal_request ic =
+ try (Marshal.from_channel ic : request)
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("unmarshal_request: "^s)
+
+ let marshal_response oc (res : response) =
+ try marshal_to_channel oc res
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("marshal_response: "^s)
+
+ let unmarshal_response ic =
+ try (CThread.thread_friendly_input_value ic : response)
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("unmarshal_response: "^s)
+
+ let marshal_more_data oc (res : more_data) =
+ try marshal_to_channel oc res
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("marshal_more_data: "^s)
+
+ let unmarshal_more_data ic =
+ try (Marshal.from_channel ic : more_data)
+ with Failure s | Invalid_argument s | Sys_error s ->
+ marshal_err ("unmarshal_more_data: "^s)
+
+ let reorder_tasks cmp = TQueue.reorder queue (fun (t1,_) (t2,_) -> cmp t1 t2)
+
+ let join () =
+ if not (WorkersPool.is_empty ()) then
+ TQueue.wait_until_n_are_waiting_and_queue_empty
+ (WorkersPool.n_workers ()) queue
+
+ exception KillRespawn
+ exception Die
+ exception Expired
+
+ let report_status ?(id = !Flags.async_proofs_worker_id) s =
+ Pp.feedback ~state_id:Stateid.initial (Feedback.SlaveStatus(id, s))
+
+ let rec manage_slave ~cancel:cancel_user_req id respawn =
+ let ic, oc, proc =
+ let rec set_slave_opt = function
+ | [] -> !Flags.async_proofs_flags_for_workers @
+ ["-toploop"; T.name^"top";
+ "-worker-id"; id]
+ | ("-ideslave"|"-emacs"|"-emacs-U")::tl -> set_slave_opt tl
+ | ("-async-proofs" |"-toploop" |"-vi2vo" |"-compile"
+ |"-compile-verbose")::_::tl -> set_slave_opt tl
+ | x::tl -> x :: set_slave_opt tl in
+ let args =
+ Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
+ let env = Array.append (T.extra_env ()) (Unix.environment ()) in
+ respawn ~args ~env () in
+ let last_task = ref None in
+ let task_expired = ref false in
+ let task_cancelled = ref false in
+ CThread.prepare_in_channel_for_thread_friendly_io ic;
+ try
+ while true do
+ prerr_endline "waiting for a task";
+ report_status ~id "Idle";
+ let task, cancel_switch = TQueue.pop queue in
+ prerr_endline ("got task: "^T.name_of_task task);
+ last_task := Some task;
+ try
+ let req = T.request_of_task task in
+ if req = None then raise Expired;
+ marshal_request oc (Request (Option.get req));
+ Worker.kill_if proc ~sec:1 (fun () ->
+ task_expired := !cancel_switch;
+ task_cancelled := !cancel_user_req;
+ if !cancel_user_req then cancel_user_req := false;
+ !task_expired || !task_cancelled);
+ let rec loop () =
+ let response = unmarshal_response ic in
+ match response with
+ | Response resp ->
+ (match T.use_response task resp with
+ | `Die -> raise Die
+ | `Stay -> last_task := None; ()
+ | `StayReset -> last_task := None; raise KillRespawn)
+ | RespGetCounterNewUnivLevel ->
+ marshal_more_data oc (MoreDataUnivLevel
+ (CList.init 10 (fun _ ->
+ Universes.new_univ_level (Global.current_dirpath ()))));
+ loop ()
+ | RespFeedback ({ Feedback.id = Feedback.State state_id } as fbk) ->
+ T.forward_feedback state_id fbk.Feedback.content;
+ loop ()
+ | RespFeedback _ -> Errors.anomaly (str"Parsing in master process")
+ in
+ loop ()
+ with
+ | Expired -> prerr_endline ("Task expired: " ^ T.name_of_task task)
+ | (Sys_error _ | Invalid_argument _ | End_of_file | KillRespawn) as e ->
+ raise e (* we pass the exception to the external handler *)
+ | MarshalError s -> T.on_marshal_error s task
+ | e ->
+ pr_err ("Uncaught exception in worker manager: "^
+ string_of_ppcmds (print e));
+ flush_all ()
+ done
+ with
+ | KillRespawn ->
+ Worker.kill proc; ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | Die -> Worker.kill proc; ignore(Worker.wait proc)
+ | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
+ T.on_task_cancellation_or_expiration !last_task;
+ ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
+ msg_warning(strbrk "The worker was cancelled.");
+ T.on_task_cancellation_or_expiration !last_task;
+ Worker.kill proc; ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | Sys_error _ | Invalid_argument _ | End_of_file ->
+ match T.on_slave_death !last_task with
+ | `Stay ->
+ msg_warning(strbrk "The worker process died badly.");
+ Worker.kill proc; ignore(Worker.wait proc);
+ manage_slave ~cancel:cancel_user_req id respawn
+ | `Exit exit_code ->
+ Worker.kill proc;
+ let exit_status proc = match Worker.wait proc with
+ | Unix.WEXITED 0x400 -> "exit code unavailable"
+ | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
+ | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
+ | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in
+ pr_err ("Fatal worker error: " ^ (exit_status proc));
+ flush_all (); exit exit_code
+
+ let slave_ic = ref stdin
+ let slave_oc = ref stdout
+
+ let slave_init_stdout () =
+ let ic, oc = Spawned.get_channels () in
+ slave_oc := oc; slave_ic := ic
+
+ let bufferize f =
+ let l = ref [] in
+ fun () ->
+ match !l with
+ | [] -> let data = f () in l := List.tl data; List.hd data
+ | x::tl -> l := tl; x
+
+ let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc
+
+ let slave_main_loop reset =
+ let feedback_queue = ref [] in
+ let slave_feeder oc fb =
+ match fb.Feedback.content with
+ | Feedback.Goals _ -> feedback_queue := fb :: !feedback_queue
+ | _ -> Marshal.to_channel oc (RespFeedback fb) []; flush oc in
+ let flush_feeder oc =
+ List.iter (fun fb -> Marshal.to_channel oc (RespFeedback fb) [])
+ !feedback_queue;
+ feedback_queue := [];
+ flush oc in
+ Pp.set_feeder (slave_feeder !slave_oc);
+ Pp.log_via_feedback ();
+ Universes.set_remote_new_univ_level (bufferize (fun () ->
+ marshal_response !slave_oc RespGetCounterNewUnivLevel;
+ match unmarshal_more_data !slave_ic with
+ | MoreDataUnivLevel l -> l));
+ let working = ref false in
+ slave_handshake ();
+ while true do
+ try
+ working := false;
+ let request = unmarshal_request !slave_ic in
+ working := true;
+ report_status (name_of_request request);
+ let response = slave_respond request in
+ flush_feeder !slave_oc;
+ report_status "Idle";
+ marshal_response !slave_oc response;
+ reset ()
+ with
+ | MarshalError s ->
+ pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 2
+ | End_of_file ->
+ prerr_endline "connection lost"; flush_all (); exit 2
+ | e ->
+ pr_err ("Slave: critical exception: " ^ Pp.string_of_ppcmds (print e));
+ flush_all (); exit 1
+ done
+
+ let dump () =
+ assert(WorkersPool.is_empty ()); (* ATM, we allow that only if no slaves *)
+ List.map fst (TQueue.dump queue)
+
+ let init n =
+ WorkersPool.init n manage_slave
+ (fun n -> Printf.sprintf "%s:%d" T.name n)
+
+ let n_workers = WorkersPool.n_workers
+
+end