diff options
Diffstat (limited to 'stm/asyncTaskQueue.ml')
-rw-r--r-- | stm/asyncTaskQueue.ml | 207 |
1 files changed, 129 insertions, 78 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index 8b4e0844c..d870ff92b 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -14,28 +14,35 @@ let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr let prerr_endline s = if !Flags.debug then begin pr_err s end else () +type 'a worker_status = [ `Fresh | `Old | `Parked of 'a ] + module type Task = sig type task + type competence (* Marshallable *) type request type response - val name : string (* UID of the task kind, for -toploop *) + val name : string ref (* UID of the task kind, for -toploop *) val extra_env : unit -> string array (* run by the master, on a thread *) - val request_of_task : [ `Fresh | `Old ] -> task -> request option - val use_response : task -> response -> [ `Stay | `StayReset ] + val request_of_task : competence worker_status -> task -> request option + val task_match : competence worker_status -> task -> bool + val use_response : + competence worker_status -> task -> response -> + [ `Stay | `Park of competence | `Reset ] val on_marshal_error : string -> task -> unit val on_slave_death : task option -> [ `Exit of int | `Stay ] val on_task_cancellation_or_expiration : task option -> unit - val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit + val forward_feedback : + Stateid.t -> Feedback.route_id -> Feedback.feedback_content -> unit (* run by the worker *) val perform : request -> response - + (* debugging *) val name_of_task : task -> string val name_of_request : request -> string @@ -46,34 +53,14 @@ type cancel_switch = bool ref module Make(T : Task) = struct - module Worker = Spawn.Sync(struct - let add_timeout ~sec f = - ignore(Thread.create (fun () -> - while true do - Unix.sleep sec; - if not (f ()) then Thread.exit () - done) - ()) - end) - - module WorkersPool = WorkerPool.Make(Worker) - - let queue : (T.task * cancel_switch) TQueue.t = TQueue.create () - - let enqueue_task t c = - prerr_endline ("Enqueue task "^T.name_of_task t); - TQueue.push queue (t,c) - - let cancel_worker = WorkersPool.cancel - - type request = Request of T.request - - let name_of_request (Request r) = T.name_of_request r - + exception KillRespawn + exception Die + exception Expired type response = | Response of T.response | RespFeedback of Feedback.feedback | RespGetCounterNewUnivLevel + type request = Request of T.request type more_data = | MoreDataUnivLevel of Univ.universe_level list @@ -83,23 +70,23 @@ module Make(T : Task) = struct let slave_respond msg = match msg with Request r -> Response (T.perform r) exception MarshalError of string - + let marshal_to_channel oc data = Marshal.to_channel oc data []; flush oc - + let marshal_err s = raise (MarshalError s) - + let marshal_request oc (req : request) = try marshal_to_channel oc req with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_request: "^s) - + let unmarshal_request ic = try (Marshal.from_channel ic : request) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_request: "^s) - + let marshal_response oc (res : response) = try marshal_to_channel oc res with Failure s | Invalid_argument s | Sys_error s -> @@ -109,7 +96,7 @@ module Make(T : Task) = struct try (CThread.thread_friendly_input_value ic : response) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_response: "^s) - + let marshal_more_data oc (res : more_data) = try marshal_to_channel oc res with Failure s | Invalid_argument s | Sys_error s -> @@ -120,28 +107,32 @@ module Make(T : Task) = struct with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_more_data: "^s) - let set_order cmp = TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2) - - let join () = - if not (WorkersPool.is_empty ()) then - TQueue.wait_until_n_are_waiting_and_queue_empty - (WorkersPool.n_workers ()) queue - let cancel_all () = - TQueue.clear queue; - WorkersPool.cancel_all () - - exception KillRespawn - exception Die - exception Expired - let report_status ?(id = !Flags.async_proofs_worker_id) s = Pp.feedback ~state_id:Stateid.initial (Feedback.WorkerStatus(id, s)) - let rec manage_slave ~cancel:cancel_user_req ~die id respawn = + + module Worker = Spawn.Sync(struct + let add_timeout ~sec f = + ignore(Thread.create (fun () -> + while true do + Unix.sleep sec; + if not (f ()) then Thread.exit () + done) + ()) + end) + + module Manager = struct + type process = Worker.process + type extra = (T.task * cancel_switch) TQueue.t * (string -> unit) + + let naming n = Printf.sprintf "%s:%d" !T.name n + + let rec manager extra ~cancel:cancel_user_req ~die id respawn = + let queue, park = extra in let ic, oc, proc = let rec set_slave_opt = function | [] -> !Flags.async_proofs_flags_for_workers @ - ["-toploop"; T.name^"top"; + ["-toploop"; !T.name^"top"; "-worker-id"; id; "-async-proofs-worker-priority"; Flags.string_of_priority !Flags.async_proofs_worker_priority] @@ -162,13 +153,15 @@ module Make(T : Task) = struct let got_token = ref false in let giveback_token () = if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in + let pick age (t, c) = not !c && T.task_match age t in CThread.prepare_in_channel_for_thread_friendly_io ic; try while not !die do prerr_endline "waiting for a task"; report_status ~id "Idle"; - let task, cancel_switch = TQueue.pop queue in + let task, cancel_switch = TQueue.pop ~picky:(pick !worker_age) queue in prerr_endline ("got task: "^T.name_of_task task); + prerr_endline ("I'm : "^ match !worker_age with `Parked _ -> "parked" | _ -> "fresh/old"); last_task := Some task; begin try let req = T.request_of_task !worker_age task in @@ -185,17 +178,24 @@ module Make(T : Task) = struct let response = unmarshal_response ic in match response with | Response resp -> - (match T.use_response task resp with + (match T.use_response !worker_age task resp with | `Stay -> - last_task := None; worker_age := `Old; giveback_token () - | `StayReset -> last_task := None; raise KillRespawn) + last_task := None; + if !worker_age = `Fresh then worker_age := `Old; + giveback_token () + | `Park competence -> + prerr_endline "parking"; + last_task := None; worker_age := `Parked competence; + park id; giveback_token () + | `Reset -> last_task := None; raise KillRespawn) | RespGetCounterNewUnivLevel -> marshal_more_data oc (MoreDataUnivLevel (CList.init 10 (fun _ -> Universes.new_univ_level (Global.current_dirpath ())))); loop () - | RespFeedback ({ Feedback.id = Feedback.State state_id } as fbk) -> - T.forward_feedback state_id fbk.Feedback.content; + | RespFeedback ({ Feedback.id = Feedback.State state_id; + Feedback.route = rid } as fbk) -> + T.forward_feedback state_id rid fbk.Feedback.content; loop () | RespFeedback _ -> Errors.anomaly (str"Parsing in master process") in @@ -216,28 +216,28 @@ module Make(T : Task) = struct | KillRespawn -> giveback_token (); Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req ~die id respawn + manager extra ~cancel:cancel_user_req ~die id respawn | (Die | TQueue.BeingDestroyed) -> giveback_token (); - Worker.kill proc;ignore(Worker.wait proc) + Worker.kill proc;ignore(Worker.wait proc); Thread.exit () | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired -> giveback_token (); T.on_task_cancellation_or_expiration !last_task; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req ~die id respawn + manager extra ~cancel:cancel_user_req ~die id respawn | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled -> giveback_token (); msg_warning(strbrk "The worker was cancelled."); T.on_task_cancellation_or_expiration !last_task; Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req ~die id respawn + manager extra ~cancel:cancel_user_req ~die id respawn | Sys_error _ | Invalid_argument _ | End_of_file -> giveback_token (); match T.on_slave_death !last_task with | `Stay -> msg_warning(strbrk "The worker process died badly."); Worker.kill proc; ignore(Worker.wait proc); - manage_slave ~cancel:cancel_user_req ~die id respawn + manager extra ~cancel:cancel_user_req ~die id respawn | `Exit exit_code -> Worker.kill proc; let exit_status proc = match Worker.wait proc with @@ -247,6 +247,63 @@ module Make(T : Task) = struct | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in pr_err ("Fatal worker error: " ^ (exit_status proc)); flush_all (); exit exit_code + end + + module Pool = WorkerPool.Make(Worker)(Manager) + + type queue = { + active : WorkerPool.active Pool.pool; + parking : WorkerPool.parking Pool.pool; + queue : (T.task * cancel_switch) TQueue.t; + cleaner : Thread.t; + } + + let create n = + let queue = TQueue.create () in + let cleaner queue = + while true do + try ignore(TQueue.pop ~picky:(fun (_,cancelled) -> !cancelled) queue) + with TQueue.BeingDestroyed -> Thread.exit () + done in + let atq = ref None in + let park id = + let atq = Option.get !atq in Pool.move atq.active atq.parking id in + atq := Some { + active = Pool.create_active (queue,park) n; + parking = Pool.create_parking (); + queue; + cleaner = Thread.create cleaner queue; + }; + Option.get !atq + + let destroy { active; parking; queue } = + Pool.destroy active; + Pool.destroy parking; + TQueue.destroy queue + + let enqueue_task { queue } t c = + prerr_endline ("Enqueue task "^T.name_of_task t); + TQueue.push queue (t,c) + + let cancel_worker { active; parking } n = + Pool.cancel active n; + Pool.cancel parking n + + let name_of_request (Request r) = T.name_of_request r + + let set_order { queue } cmp = + TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2) + + let join { queue; active; parking } = + if not (Pool.is_empty active) then + TQueue.wait_until_n_are_waiting_and_queue_empty + (Pool.n_workers active + Pool.n_workers parking + 1(*cleaner*)) + queue + + let cancel_all { queue; active; parking } = + TQueue.clear queue; + Pool.cancel_all active; + Pool.cancel_all parking let slave_ic = ref stdin let slave_oc = ref stdout @@ -262,7 +319,7 @@ module Make(T : Task) = struct | [] -> let data = f () in l := List.tl data; List.hd data | x::tl -> l := tl; x - let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc + let slave_handshake () = Pool.worker_handshake !slave_ic !slave_oc let main_loop () = let slave_feeder oc fb = @@ -295,28 +352,22 @@ module Make(T : Task) = struct flush_all (); exit 1 done - let clear () = - assert(WorkersPool.is_empty ()); (* We allow that only if no slaves *) + let clear { queue; active; parking } = + assert(Pool.is_empty active); (* We allow that only if no slaves *) TQueue.clear queue - let snapshot () = + let snapshot { queue; active; parking } = List.map fst (TQueue.wait_until_n_are_waiting_then_snapshot - (WorkersPool.n_workers ()) queue) - - let init n = - WorkersPool.init n manage_slave - (fun n -> Printf.sprintf "%s:%d" T.name n) - - let destroy () = - WorkersPool.destroy (); - TQueue.destroy queue + (Pool.n_workers active + Pool.n_workers parking) queue) let with_n_workers n f = - try init n; let rc = f ~join ~cancel_all in destroy (); rc - with e -> let e = Errors.push e in destroy (); raise e + let q = create n in + try let rc = f q in destroy q; rc + with e -> let e = Errors.push e in destroy q; raise e - let n_workers = WorkersPool.n_workers + let n_workers { active; parking } = + Pool.n_workers active, Pool.n_workers parking end |