From 4618ab9961fc196a1f1912ed1b6b140eb8b4d407 Mon Sep 17 00:00:00 2001 From: Enrico Tassi Date: Tue, 16 Dec 2014 18:23:44 +0100 Subject: AsyncTaskQueue: simpler model (no parking area, continuation tasks) --- stm/asyncTaskQueue.ml | 257 ++++++++++++++++++++++---------------------------- 1 file changed, 115 insertions(+), 142 deletions(-) (limited to 'stm/asyncTaskQueue.ml') diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index 662152e45..3cb6e7022 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -14,7 +14,7 @@ let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr let prerr_endline s = if !Flags.debug then begin pr_err s end else () -type 'a worker_status = [ `Fresh | `Old | `Parked of 'a ] +type 'a worker_status = [ `Fresh | `Old of 'a ] module type Task = sig @@ -33,28 +33,25 @@ module type Task = sig val task_match : competence worker_status -> task -> bool val use_response : competence worker_status -> task -> response -> - [ `Stay | `Park of competence | `Reset ] + [ `Stay of competence * task list | `End ] val on_marshal_error : string -> task -> unit - val on_slave_death : task option -> [ `Exit of int | `Stay ] - val on_task_cancellation_or_expiration : task option -> unit + val on_task_cancellation_or_expiration_or_slave_death : task option -> unit val forward_feedback : Feedback.feedback -> unit (* run by the worker *) val perform : request -> response - + (* debugging *) val name_of_task : task -> string val name_of_request : request -> string end -type cancel_switch = bool ref +type expiration = bool ref module Make(T : Task) = struct - exception KillRespawn exception Die - exception Expired type response = | Response of T.response | RespFeedback of Feedback.feedback @@ -64,9 +61,11 @@ module Make(T : Task) = struct type more_data = | MoreDataUnivLevel of Univ.universe_level list - let request_cswitch_of_task (t, c) = T.request_of_task t, c + let request_expiry_of_task (t, c) = T.request_of_task t, c - let slave_respond msg = match msg with Request r -> Response (T.perform r) + let slave_respond (Request r) = + let res = T.perform r in + Response res exception MarshalError of string @@ -109,7 +108,6 @@ module Make(T : Task) = struct let report_status ?(id = !Flags.async_proofs_worker_id) s = Pp.feedback ~state_id:Stateid.initial (Feedback.WorkerStatus(id, s)) - module Worker = Spawn.Sync(struct let add_timeout ~sec f = ignore(Thread.create (fun () -> @@ -120,19 +118,18 @@ module Make(T : Task) = struct ()) end) - module Manager = struct - type process = Worker.process - type extra = (T.task * cancel_switch) TQueue.t * (string -> unit) + module Model = struct - let naming n = Printf.sprintf "%s:%d" !T.name n + type process = Worker.process + type extra = (T.task * expiration) TQueue.t - let rec manager extra ~cancel:cancel_user_req ~die id respawn = - let queue, park = extra in - let ic, oc, proc = + let spawn id = + let name = Printf.sprintf "%s:%d" !T.name id in + let proc, ic, oc = let rec set_slave_opt = function | [] -> !Flags.async_proofs_flags_for_workers @ ["-toploop"; !T.name^"top"; - "-worker-id"; id; + "-worker-id"; name; "-async-proofs-worker-priority"; Flags.string_of_priority !Flags.async_proofs_worker_priority] | ("-ideslave"|"-emacs"|"-emacs-U"|"-batch")::tl -> set_slave_opt tl @@ -144,161 +141,138 @@ module Make(T : Task) = struct let args = Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in let env = Array.append (T.extra_env ()) (Unix.environment ()) in - respawn ~args ~env () in + Worker.spawn ~env Sys.argv.(0) args in + name, proc, CThread.prepare_in_channel_for_thread_friendly_io ic, oc + + let manager cpanel (id, proc, ic, oc) = + let { WorkerPool.extra = queue; exit; cancelled } = cpanel in let last_task = ref None in - let task_expired = ref false in - let task_cancelled = ref false in let worker_age = ref `Fresh in let got_token = ref false in - let giveback_token () = + let giveback_exec_token () = if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in - let pick age (t, c) = not !c && T.task_match age t in - CThread.prepare_in_channel_for_thread_friendly_io ic; - try - while not !die do - prerr_endline "waiting for a task"; - report_status ~id "Idle"; - let task, cancel_switch = TQueue.pop ~picky:(pick !worker_age) queue in - prerr_endline ("got task: "^T.name_of_task task); - prerr_endline ("I'm : "^ match !worker_age with `Parked _ -> "parked" | _ -> "fresh/old"); - last_task := Some task; - begin try - let req = T.request_of_task !worker_age task in - if req = None then raise Expired; - ignore(CoqworkmgrApi.get 1); got_token := true; - prerr_endline ("got execution token"); - marshal_request oc (Request (Option.get req)); - Worker.kill_if proc ~sec:1 (fun () -> - task_expired := !cancel_switch; - task_cancelled := !cancel_user_req; - if !cancel_user_req then cancel_user_req := false; - !task_expired || !task_cancelled || !die); - let rec loop () = - let response = unmarshal_response ic in - match response with - | Response resp -> - (match T.use_response !worker_age task resp with - | `Stay -> - last_task := None; - if !worker_age = `Fresh then worker_age := `Old; - giveback_token () - | `Park competence -> - prerr_endline "parking"; - last_task := None; worker_age := `Parked competence; - park id; giveback_token () - | `Reset -> last_task := None; raise KillRespawn) - | RespGetCounterNewUnivLevel -> - marshal_more_data oc (MoreDataUnivLevel - (CList.init 10 (fun _ -> - Universes.new_univ_level (Global.current_dirpath ())))); - loop () - | RespFeedback fbk -> T.forward_feedback fbk; loop () - in - loop () - with - | Expired -> prerr_endline ("Task expired: " ^ T.name_of_task task) - | (Sys_error _|Invalid_argument _|End_of_file|KillRespawn) as e -> - raise e (* we pass the exception to the external handler *) - | MarshalError s -> T.on_marshal_error s task - | e -> - pr_err ("Uncaught exception in worker manager: "^ - string_of_ppcmds (print e)); - flush_all () - end; - done; - raise Die - with - | KillRespawn -> - giveback_token (); - Worker.kill proc; ignore(Worker.wait proc); - manager extra ~cancel:cancel_user_req ~die id respawn + let stop_waiting = ref false in + let expiration_date = ref (ref false) in + let pick_task () = + prerr_endline "waiting for a task"; + let pick age (t, c) = not !c && T.task_match age t in + let task, task_expiration = + TQueue.pop ~picky:(pick !worker_age) ~destroy:stop_waiting queue in + expiration_date := task_expiration; + last_task := Some task; + prerr_endline ("got task: "^T.name_of_task task); + task in + let add_tasks l = + List.iter (fun t -> TQueue.push queue (t,!expiration_date)) l in + let get_exec_token () = + ignore(CoqworkmgrApi.get 1); + got_token := true; + prerr_endline ("got execution token") in + let kill proc = + Worker.kill proc; + prerr_endline ("Worker exited: " ^ + match Worker.wait proc with + | Unix.WEXITED 0x400 -> "exit code unavailable" + | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i + | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno + | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno) in + let more_univs n = + CList.init 10 (fun _ -> + Universes.new_univ_level (Global.current_dirpath ())) in + + Worker.kill_if proc ~sec:1 (fun () -> + let stop = cancelled () || !(!expiration_date) in + if stop then (stop_waiting := true; TQueue.signal_destruction queue); + stop); + + try while true do + report_status ~id "Idle"; + let task = pick_task () in + match T.request_of_task !worker_age task with + | None -> prerr_endline ("Task expired: " ^ T.name_of_task task) + | Some req -> + try + get_exec_token (); + marshal_request oc (Request req); + let rec continue () = + match unmarshal_response ic with + | RespGetCounterNewUnivLevel -> + marshal_more_data oc (MoreDataUnivLevel (more_univs 10)); + continue () + | RespFeedback fbk -> T.forward_feedback fbk; continue () + | Response resp -> + match T.use_response !worker_age task resp with + | `End -> raise Die + | `Stay(competence, new_tasks) -> + last_task := None; + giveback_exec_token (); + worker_age := `Old competence; + add_tasks new_tasks + in + continue () + with + | (Sys_error _|Invalid_argument _|End_of_file|Die) as e -> + raise e (* we pass the exception to the external handler *) + | MarshalError s -> T.on_marshal_error s task; raise Die + | e -> + pr_err ("Uncaught exception in worker manager: "^ + string_of_ppcmds (print e)); + flush_all (); raise Die + done with | (Die | TQueue.BeingDestroyed) -> - giveback_token (); - Worker.kill proc;ignore(Worker.wait proc); Thread.exit () - | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired -> - giveback_token (); - T.on_task_cancellation_or_expiration !last_task; - ignore(Worker.wait proc); - manager extra ~cancel:cancel_user_req ~die id respawn - | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled -> - giveback_token (); - msg_warning(strbrk "The worker was cancelled."); - T.on_task_cancellation_or_expiration !last_task; - Worker.kill proc; ignore(Worker.wait proc); - manager extra ~cancel:cancel_user_req ~die id respawn + giveback_exec_token (); kill proc; exit () | Sys_error _ | Invalid_argument _ | End_of_file -> - giveback_token (); - match T.on_slave_death !last_task with - | `Stay -> - msg_warning(strbrk "The worker process died badly."); - Worker.kill proc; ignore(Worker.wait proc); - manager extra ~cancel:cancel_user_req ~die id respawn - | `Exit exit_code -> - Worker.kill proc; - let exit_status proc = match Worker.wait proc with - | Unix.WEXITED 0x400 -> "exit code unavailable" - | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i - | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno - | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in - pr_err ("Fatal worker error: " ^ (exit_status proc)); - flush_all (); exit exit_code + giveback_exec_token (); + T.on_task_cancellation_or_expiration_or_slave_death !last_task; + kill proc; + exit () end - module Pool = WorkerPool.Make(Worker)(Manager) + module Pool = WorkerPool.Make(Model) type queue = { - active : WorkerPool.active Pool.pool; - parking : WorkerPool.parking Pool.pool; - queue : (T.task * cancel_switch) TQueue.t; + active : Pool.pool; + queue : (T.task * expiration) TQueue.t; cleaner : Thread.t; } - let create n = - let queue = TQueue.create () in + let create size = let cleaner queue = while true do try ignore(TQueue.pop ~picky:(fun (_,cancelled) -> !cancelled) queue) with TQueue.BeingDestroyed -> Thread.exit () done in - let atq = ref None in - let park id = - let atq = Option.get !atq in Pool.move atq.active atq.parking id in - atq := Some { - active = Pool.create_active (queue,park) n; - parking = Pool.create_parking (); + let queue = TQueue.create () in + { + active = Pool.create queue ~size; queue; cleaner = Thread.create cleaner queue; - }; - Option.get !atq + } - let destroy { active; parking; queue } = + let destroy { active; queue } = Pool.destroy active; - Pool.destroy parking; TQueue.destroy queue - let enqueue_task { queue } t c = + let enqueue_task { queue; active } (t, _ as item) = prerr_endline ("Enqueue task "^T.name_of_task t); - TQueue.push queue (t,c) + TQueue.push queue item - let cancel_worker { active; parking } n = - Pool.cancel n active; - Pool.cancel n parking + let cancel_worker { active } n = Pool.cancel n active let name_of_request (Request r) = T.name_of_request r let set_order { queue } cmp = TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2) - let join { queue; active; parking } = + let join { queue; active } = if not (Pool.is_empty active) then TQueue.wait_until_n_are_waiting_and_queue_empty - (Pool.n_workers active + Pool.n_workers parking + 1(*cleaner*)) + (Pool.n_workers active + 1(*cleaner*)) queue - let cancel_all { queue; active; parking } = + let cancel_all { queue; active } = TQueue.clear queue; - Pool.cancel_all active; - Pool.cancel_all parking + Pool.cancel_all active let slave_ic = ref None let slave_oc = ref None @@ -348,22 +322,21 @@ module Make(T : Task) = struct flush_all (); exit 1 done - let clear { queue; active; parking } = + let clear { queue; active } = assert(Pool.is_empty active); (* We allow that only if no slaves *) TQueue.clear queue - let snapshot { queue; active; parking } = + let snapshot { queue; active } = List.map fst (TQueue.wait_until_n_are_waiting_then_snapshot - (Pool.n_workers active + Pool.n_workers parking) queue) + (Pool.n_workers active) queue) let with_n_workers n f = let q = create n in try let rc = f q in destroy q; rc with e -> let e = Errors.push e in destroy q; iraise e - let n_workers { active; parking } = - Pool.n_workers active, Pool.n_workers parking + let n_workers { active } = Pool.n_workers active end -- cgit v1.2.3