From 012e35b2d05b52dbf43dc1e26c8db550c859af03 Mon Sep 17 00:00:00 2001 From: Emilio Jesus Gallego Arias Date: Mon, 25 Sep 2017 12:40:33 +0200 Subject: [stm] [doc] Add some documentation to AsyncTaskQueue API As a bonus we remove some trailing whitespace, and implement a couple of hints suggested in the discussion. --- stm/asyncTaskQueue.ml | 51 +++++++------- stm/asyncTaskQueue.mli | 182 ++++++++++++++++++++++++++++++++++++++++++------- stm/stm.ml | 150 ++++++++++++++++++++-------------------- 3 files changed, 259 insertions(+), 124 deletions(-) (limited to 'stm') diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index e2bce1a96..4662c5543 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -14,13 +14,15 @@ let stm_pr_err pp = Format.eprintf "%s] @[%a@]\n%!" (System.process_id ()) Pp.pp let stm_prerr_endline s = if !Flags.debug then begin stm_pr_err (str s) end else () -type 'a worker_status = [ `Fresh | `Old of 'a ] +type cancel_switch = bool ref module type Task = sig type task type competence + type worker_status = Fresh | Old of competence + (* Marshallable *) type request type response @@ -29,15 +31,14 @@ module type Task = sig val extra_env : unit -> string array (* run by the master, on a thread *) - val request_of_task : competence worker_status -> task -> request option - val task_match : competence worker_status -> task -> bool - val use_response : - competence worker_status -> task -> response -> - [ `Stay of competence * task list | `End ] + val request_of_task : worker_status -> task -> request option + val task_match : worker_status -> task -> bool + val use_response : worker_status -> task -> response -> + [ `Stay of competence * task list | `End ] val on_marshal_error : string -> task -> unit val on_task_cancellation_or_expiration_or_slave_death : task option -> unit val forward_feedback : Feedback.feedback -> unit - + (* run by the worker *) val perform : request -> response @@ -47,8 +48,6 @@ module type Task = sig end -type expiration = bool ref - module Make(T : Task) () = struct exception Die @@ -66,38 +65,38 @@ module Make(T : Task) () = struct Response res exception MarshalError of string - + let marshal_to_channel oc data = Marshal.to_channel oc data []; flush oc - + let marshal_err s = raise (MarshalError s) - + let marshal_request oc (req : request) = try marshal_to_channel oc req with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_request: "^s) - + let unmarshal_request ic = try (CThread.thread_friendly_input_value ic : request) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_request: "^s) - + let marshal_response oc (res : response) = try marshal_to_channel oc res with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_response: "^s) - + let unmarshal_response ic = try (CThread.thread_friendly_input_value ic : response) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_response: "^s) - + let marshal_more_data oc (res : more_data) = try marshal_to_channel oc res with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_more_data: "^s) - + let unmarshal_more_data ic = try (CThread.thread_friendly_input_value ic : more_data) with Failure s | Invalid_argument s | Sys_error s -> @@ -112,7 +111,7 @@ module Make(T : Task) () = struct module Model = struct type process = Worker.process - type extra = (T.task * expiration) TQueue.t + type extra = (T.task * cancel_switch) TQueue.t let spawn id = let name = Printf.sprintf "%s:%d" !T.name id in @@ -140,7 +139,7 @@ module Make(T : Task) () = struct let { WorkerPool.extra = queue; exit; cancelled } = cpanel in let exit () = report_status ~id "Dead"; exit () in let last_task = ref None in - let worker_age = ref `Fresh in + let worker_age = ref T.Fresh in let got_token = ref false in let giveback_exec_token () = if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in @@ -213,7 +212,7 @@ module Make(T : Task) () = struct | `Stay(competence, new_tasks) -> last_task := None; giveback_exec_token (); - worker_age := `Old competence; + worker_age := T.Old competence; add_tasks new_tasks in continue () @@ -236,7 +235,7 @@ module Make(T : Task) () = struct type queue = { active : Pool.pool; - queue : (T.task * expiration) TQueue.t; + queue : (T.task * cancel_switch) TQueue.t; cleaner : Thread.t option; } @@ -252,16 +251,16 @@ module Make(T : Task) () = struct queue; cleaner = if size > 0 then Some (Thread.create cleaner queue) else None; } - + let destroy { active; queue } = Pool.destroy active; TQueue.destroy queue let broadcast { queue } = TQueue.broadcast queue - let enqueue_task { queue; active } (t, _ as item) = + let enqueue_task { queue; active } t ~cancel_switch = stm_prerr_endline ("Enqueue task "^T.name_of_task t); - TQueue.push queue item + TQueue.push queue (t, cancel_switch) let cancel_worker { active } n = Pool.cancel n active @@ -339,14 +338,14 @@ module Make(T : Task) () = struct let clear { queue; active } = assert(Pool.is_empty active); (* We allow that only if no slaves *) TQueue.clear queue - + let snapshot { queue; active } = List.map fst (TQueue.wait_until_n_are_waiting_then_snapshot (Pool.n_workers active) queue) let with_n_workers n f = - let q = create n in + let q = create n in try let rc = f q in destroy q; rc with e -> let e = CErrors.push e in destroy q; iraise e diff --git a/stm/asyncTaskQueue.mli b/stm/asyncTaskQueue.mli index 1044e668b..ccd643deb 100644 --- a/stm/asyncTaskQueue.mli +++ b/stm/asyncTaskQueue.mli @@ -6,79 +6,211 @@ (* * GNU Lesser General Public License Version 2.1 *) (************************************************************************) -type 'a worker_status = [ `Fresh | `Old of 'a ] +(** This file provides an API for defining and managing a queue of + tasks to be done by external workers. + A queue of items of type [task] is maintained, then for each task, + a request is generated, then sent to a worker using marshalling. + + The workers will then eventually return a result, using marshalling + again: + ____ ____ ____ ________ + | T1 | T2 | T3 | => [request ] => | Worker | + |____|____|____| <= [response] <= |________| + | Master Proc. | + \--------------/ + + Thus [request] and [response] must be safely marshallable. + + Operations for managing the task queue are provide, see below + for more details. + + *) + +(** The [Task] module type defines an abstract message-processing + queue. *) module type Task = sig + (** Main description of a task. Elements are stored in the "master" + process, and then converted into a request. + *) type task + + (** [competence] stores the information about what kind of work a + worker has completed / has available. *) type competence - (* Marshallable *) + (** A worker_status is: + + - [`Fresh] when a worker is born. + + - [`Old of competence]: When a worker ends a job it can either die + (and be replaced by a fresh new worker) or hang there as an [`Old] + worker. In such case some data can be carried by the [`Old] + constructor, typically used to implement [request_of_task]. + + This allows to implement both one-shot workers and "persistent" + ones. E.g. par: is implement using workers that don't + "reboot". Proof workers do reboot mainly because the vm has some + C state that cannot be cleared, so you have a real memory leak if + you don't reboot the worker. *) + type worker_status = Fresh | Old of competence + + (** Type of input and output data for workers. + + The data must be marshallable as it send through the network + using [Marshal] . *) type request type response - val name : string ref (* UID of the task kind, for -toploop *) + (** UID of the task kind, for -toploop *) + val name : string ref + (** Extra arguments of the task kind, for -toploop *) val extra_env : unit -> string array - (* run by the master, on a thread *) - val request_of_task : competence worker_status -> task -> request option - val task_match : competence worker_status -> task -> bool - val use_response : - competence worker_status -> task -> response -> - [ `Stay of competence * task list | `End ] + (** {5} Master API, it is run by the master, on a thread *) + + (** [request_of_task status t] takes the [status] of the worker + and a task [t] and creates the corresponding [Some request] to be + sent to the worker or it is not valid anymore [None]. *) + val request_of_task : worker_status -> task -> request option + + (** [task_match status tid] Allows to discard tasks based on the + worker status. *) + val task_match : worker_status -> task -> bool + + (** [use_response status t out] + + For a response [out] to a task [t] with [status] we can choose + to end the worker of to keep it alive with some data and + immediately inject extra tasks in the queue. + + For example, the proof worker runs a proof and finds an error, + the response signals that, e.g. + + [ReponseError {state = 34; msg = "oops"}] + + When the manager uses such a response he can tell the worker to + stay there and inject into the queue an extra task requesting + state 33 (to implement efficient proof repair). *) + val use_response : worker_status -> task -> response -> + [ `Stay of competence * task list | `End ] + + (** [on_marshal_error err_msg tid] notifies of marshaling failure. *) val on_marshal_error : string -> task -> unit + + (** [on_task_cancellation_or_expiration_or_slave_death tid] + + These functions are meant to parametrize the worker manager on + the actions to be taken when things go wrong or are cancelled + (you can kill a worker in CoqIDE, or using kill -9...) + + E.g. master can decide to inhabit the (delegate) Future.t with a + closure (to be run in master), i.e. make the document still + checkable. This is what I do for marshaling errors. *) val on_task_cancellation_or_expiration_or_slave_death : task option -> unit + + (** [forward_feedback fb] sends fb to all the workers. *) val forward_feedback : Feedback.feedback -> unit - - (* run by the worker *) + + (** {5} Worker API, it is run by worker, on a different fresh + process *) + + (** [perform in] synchronously processes a request [in] *) val perform : request -> response - (* debugging *) + (** debugging *) val name_of_task : task -> string val name_of_request : request -> string end -type expiration = bool ref +(** [cancel_switch] to be flipped to true by anyone to signal the task + is not relevant anymore. When the STM performs an undo/edit-at, it + crawls the document and flips these flags (the Qed node carries a + pointer to the flag IIRC). +*) +type cancel_switch = bool ref +(** Client-side functor. [MakeQueue T] creates a task queue for task [T] *) module MakeQueue(T : Task) () : sig + (** [queue] is the abstract queue type. *) type queue - (* Number of workers, 0 = lazy local *) + (** [create n] will initialize the queue with [n] workers. If [n] is + 0, the queue won't spawn any process, working in a lazy local + manner. [not imposed by the this API] *) val create : int -> queue + + (** [destroy q] Deallocates [q], cancelling all pending tasks. *) val destroy : queue -> unit + (** [n_workers q] returns the number of workers of [q] *) val n_workers : queue -> int - val enqueue_task : queue -> T.task * expiration -> unit + (** [enqueue_task q t ~cancel_switch] schedules [t] for execution in + [q]. [cancel_switch] can be flipped to true to cancel the task. *) + val enqueue_task : queue -> T.task -> cancel_switch:cancel_switch -> unit - (* blocking function that waits for the task queue to be empty *) + (** [join q] blocks until the task queue is empty *) val join : queue -> unit + + (** [cancel_all q] Cancels all tasks *) val cancel_all : queue -> unit + (** [cancel_worker q wid] cancels a particular worker [wid] *) val cancel_worker : queue -> WorkerPool.worker_id -> unit + (** [set_order q cmp] reorders [q] using ordering [cmp] *) val set_order : queue -> (T.task -> T.task -> int) -> unit + (** [broadcast q] + + This is nasty. Workers can be picky, e.g. pick tasks only when + they are "on screen". Of course the screen is scrolled, and that + changes the potential choice of workers to pick up a task or + not. + + This function wakes up the workers (the managers) that give a + look (again) to the tasks in the queue. + + The STM calls it when the perspective (as in PIDE) changes. + + A problem here is that why task_match has access to the + competence data in order to decide if the task is palatable to + the worker or not... such data is local to the worker (manager). + The perspective is global, so it does not quite fit this + picture. This API to make all managers reconsider the tasks in + the queue is the best I could came up with. + + This API is crucial to Coqoon (or any other UI that invokes + Stm.finish eagerly but wants the workers to "focus" on the visible + part of the document). + *) val broadcast : queue -> unit - (* Take a snapshot (non destructive but waits until all workers are - * enqueued) *) + (** [snapshot q] Takes a snapshot (non destructive but waits until + all workers are enqueued) *) val snapshot : queue -> T.task list - (* Clears the queue, only if the worker prool is empty *) - val clear : queue -> unit - - (* create a queue, run the function, destroy the queue. - * the user should call join *) + (** [clear q] Clears [q], only if the worker prool is empty *) + val clear : queue -> unit + + (** [with_n_workers n f] create a queue, run the function, destroy + the queue. The user should call join *) val with_n_workers : int -> (queue -> 'a) -> 'a end +(** Server-side functor. [MakeWorker T] creates the server task + dispatcher. *) module MakeWorker(T : Task) () : sig - val main_loop : unit -> unit + (** [init_stdout ()] is called at [Coqtop.toploop_init] time. *) val init_stdout : unit -> unit - + + (** [main_loop ()] is called at [Coqtop.toploop_run] time. *) + val main_loop : unit -> unit + end diff --git a/stm/stm.ml b/stm/stm.ml index 864fff9e0..a9183ce78 100644 --- a/stm/stm.ml +++ b/stm/stm.ml @@ -48,7 +48,7 @@ let state_computed, state_computed_hook = Hook.make let state_ready, state_ready_hook = Hook.make ~default:(fun state_id -> ()) () -let forward_feedback, forward_feedback_hook = +let forward_feedback, forward_feedback_hook = let m = Mutex.create () in Hook.make ~default:(function | { doc_id = did; span_id = id; route; contents } -> @@ -108,7 +108,6 @@ module Vcs_ = Vcs.Make(Stateid.Self) type future_proof = Proof_global.closed_proof_output Future.computation type proof_mode = string type depth = int -type cancel_switch = bool ref type branch_type = [ `Master | `Proof of proof_mode * depth @@ -122,14 +121,14 @@ type cmd_t = { cids : Names.Id.t list; cblock : proof_block_name option; cqueue : [ `MainQueue - | `TacQueue of solving_tac * anon_abstracting_tac * cancel_switch - | `QueryQueue of cancel_switch + | `TacQueue of solving_tac * anon_abstracting_tac * AsyncTaskQueue.cancel_switch + | `QueryQueue of AsyncTaskQueue.cancel_switch | `SkipQueue ] } type fork_t = aast * Vcs_.Branch.t * Vernacexpr.opacity_guarantee * Names.Id.t list type qed_t = { qast : aast; keep : vernac_qed_type; - mutable fproof : (future_proof * cancel_switch) option; + mutable fproof : (future_proof * AsyncTaskQueue.cancel_switch) option; brname : Vcs_.Branch.t; brinfo : branch_type Vcs_.branch_info } @@ -318,7 +317,7 @@ module VCS : sig (* cuts from start -> stop, raising Expired if some nodes are not there *) val slice : block_start:id -> block_stop:id -> vcs val nodes_in_slice : block_start:id -> block_stop:id -> Stateid.t list - + val create_proof_task_box : id list -> qed:id -> block_start:id -> unit val create_proof_block : static_block_declaration -> string -> unit val box_of : id -> box list @@ -367,7 +366,7 @@ end = struct (* {{{ *) | Noop -> " " | Alias (id,_) -> sprintf "Alias(%s)" (Stateid.to_string id) | Qed { qast } -> Pp.string_of_ppcmds (pr_ast qast) in - let is_green id = + let is_green id = match get_info vcs id with | Some { state = Valid _ } -> true | _ -> false in @@ -435,7 +434,7 @@ end = struct (* {{{ *) let outerboxes boxes = List.filter (fun b -> not (List.exists (fun b1 -> - not (same_box b1 b) && contains b1 b) boxes) + not (same_box b1 b) && contains b1 b) boxes) ) boxes in let rec rec_print b = boxes := CList.remove same_box b !boxes; @@ -565,7 +564,7 @@ end = struct (* {{{ *) let id = new_node () in merge id ~ours:(Sideff action) ~into:b Branch.master) (List.filter (fun b -> not (Branch.equal b Branch.master)) (branches ())) - + let visit id = Vcs_aux.visit !vcs id let nodes_in_slice ~block_start ~block_stop = @@ -664,7 +663,7 @@ end = struct (* {{{ *) val command : now:bool -> (unit -> unit) -> unit end = struct - + let m = Mutex.create () let c = Condition.create () let job = ref None @@ -972,7 +971,7 @@ let get_script prf = find ((x.expr, (VCS.get_info id).n_goals)::acc) view.next | `Sideff (CherryPickEnv, id) -> find acc id | `Cmd {cast = x; ctac} when ctac -> (* skip non-tactics *) - find ((x.expr, (VCS.get_info id).n_goals)::acc) view.next + find ((x.expr, (VCS.get_info id).n_goals)::acc) view.next | `Cmd _ -> find acc view.next | `Alias (id,_) -> find acc id | `Fork _ -> find acc view.next @@ -1138,7 +1137,7 @@ end = struct (* {{{ *) let m = match e with VernacUndoTo m -> m | _ -> 0 in let id = VCS.get_branch_pos (VCS.current_branch ()) in let vcs = - match (VCS.get_info id).vcs_backup with + match (VCS.get_info id).vcs_backup with | None, _ -> anomaly Pp.(str"Backtrack: tip with no vcs_backup.") | Some vcs, _ -> vcs in let cb, _ = @@ -1191,7 +1190,7 @@ let record_pb_time ?loc proof_name time = Aux_file.record_in_aux_at proof_name proof_build_time; hints := Aux_file.set !hints proof_name proof_build_time end - + exception RemoteException of Pp.t let _ = CErrors.register_handler (function | RemoteException ppcmd -> ppcmd @@ -1248,7 +1247,7 @@ let is_block_name_enabled name = | `Only l -> List.mem name l let detect_proof_block id name = - let name = match name with None -> "indent" | Some x -> x in + let name = match name with None -> "indent" | Some x -> x in if is_block_name_enabled name && (Flags.async_proofs_is_master () || Flags.async_proofs_is_worker ()) then ( @@ -1271,7 +1270,7 @@ let detect_proof_block id name = (* Unused module warning doesn't understand [module rec] *) [@@@ocaml.warning "-60"] module rec ProofTask : sig - + type competence = Stateid.t list type task_build_proof = { t_exn_info : Stateid.t * Stateid.t; @@ -1294,8 +1293,8 @@ module rec ProofTask : sig include AsyncTaskQueue.Task with type task := task - and type competence := competence - and type request := request + and type competence := competence + and type request := request val build_proof_here : ?loc:Loc.t -> @@ -1304,7 +1303,7 @@ module rec ProofTask : sig Proof_global.closed_proof_output Future.computation (* If set, only tasks overlapping with this list are processed *) - val set_perspective : Stateid.t list -> unit + val set_perspective : Stateid.t list -> unit end = struct (* {{{ *) @@ -1326,10 +1325,12 @@ end = struct (* {{{ *) | BuildProof of task_build_proof | States of Stateid.t list + type worker_status = Fresh | Old of competence + type request = | ReqBuildProof of (Future.UUID.t,VCS.vcs) Stateid.request * bool * competence | ReqStates of Stateid.t list - + type error = { e_error_at : Stateid.t; e_safe_id : Stateid.t; @@ -1349,10 +1350,10 @@ end = struct (* {{{ *) let task_match age t = match age, t with - | `Fresh, BuildProof { t_states } -> + | Fresh, BuildProof { t_states } -> not !Flags.async_proofs_full || List.exists (fun x -> CList.mem_f Stateid.equal x !perspective) t_states - | `Old my_states, States l -> + | Old my_states, States l -> List.for_all (fun x -> CList.mem_f Stateid.equal x my_states) l | _ -> false @@ -1368,7 +1369,7 @@ end = struct (* {{{ *) | BuildProof { t_exn_info;t_start;t_stop;t_loc;t_uuid;t_name;t_states;t_drop } -> - assert(age = `Fresh); + assert(age = Fresh); try Some (ReqBuildProof ({ Stateid.exn_info = t_exn_info; stop = t_stop; @@ -1378,11 +1379,11 @@ end = struct (* {{{ *) name = t_name }, t_drop, t_states)) with VCS.Expired -> None - let use_response (s : competence AsyncTaskQueue.worker_status) t r = + let use_response (s : worker_status) t r = match s, t, r with - | `Old c, States _, RespStates l -> + | Old c, States _, RespStates l -> List.iter (fun (id,s) -> State.assign id s) l; `End - | `Fresh, BuildProof { t_assign; t_loc; t_name; t_states; t_drop }, + | Fresh, BuildProof { t_assign; t_loc; t_name; t_states; t_drop }, RespBuiltProof (pl, time) -> feedback (InProgress ~-1); t_assign (`Val pl); @@ -1390,7 +1391,7 @@ end = struct (* {{{ *) if !Flags.async_proofs_full || t_drop then `Stay(t_states,[States t_states]) else `End - | `Fresh, BuildProof { t_assign; t_loc; t_name; t_states }, + | Fresh, BuildProof { t_assign; t_loc; t_name; t_states }, RespError { e_error_at; e_safe_id = valid; e_msg; e_safe_states } -> feedback (InProgress ~-1); let info = Stateid.add ~valid Exninfo.null e_error_at in @@ -1477,7 +1478,7 @@ end = struct (* {{{ *) | VtProofStep _, _ -> true | _ -> false in - let initial = + let initial = let rec aux id = try match VCS.visit id with { next } -> aux next with VCS.Expired -> id in @@ -1490,7 +1491,7 @@ end = struct (* {{{ *) then Some (prev, State.get_cached prev, step) else None with VCS.Expired -> None in - let this = + let this = if State.is_cached_and_valid id then Some (State.get_cached id) else None in match prev, this with | _, None -> None @@ -1532,11 +1533,11 @@ and Slaves : sig val build_proof : ?loc:Loc.t -> drop_pt:bool -> exn_info:(Stateid.t * Stateid.t) -> block_start:Stateid.t -> block_stop:Stateid.t -> - name:string -> future_proof * cancel_switch + name:string -> future_proof * AsyncTaskQueue.cancel_switch (* blocking function that waits for the task queue to be empty *) val wait_all_done : unit -> unit - + (* initialize the whole machinery (optional) *) val init : unit -> unit @@ -1558,7 +1559,7 @@ and Slaves : sig end = struct (* {{{ *) module TaskQueue = AsyncTaskQueue.MakeQueue(ProofTask) () - + let queue = ref None let init () = if Flags.async_proofs_is_master () then @@ -1613,8 +1614,8 @@ end = struct (* {{{ *) | Some (_, cur) -> match VCS.visit cur with | { step = `Cmd { cast = { loc } } } - | { step = `Fork (( { loc }, _, _, _), _) } - | { step = `Qed ( { qast = { loc } }, _) } + | { step = `Fork (( { loc }, _, _, _), _) } + | { step = `Qed ( { qast = { loc } }, _) } | { step = `Sideff (ReplayCommand { loc }, _) } -> let start, stop = Option.cata Loc.unloc (0,0) loc in msg_error Pp.( @@ -1664,7 +1665,7 @@ end = struct (* {{{ *) u.(bucket) <- uc; p.(bucket) <- pr; u, Univ.ContextSet.union cst extra, false - + let check_task name l i = match check_task_aux "" name l i with | `OK _ | `OK_ADMITTED -> true @@ -1709,11 +1710,11 @@ end = struct (* {{{ *) t_exn_info; t_start = block_start; t_stop = block_stop; t_drop = drop_pt; t_assign = assign; t_loc = loc; t_uuid; t_name = pname; t_states = VCS.nodes_in_slice ~block_start ~block_stop }) in - TaskQueue.enqueue_task (Option.get !queue) (task,cancel_switch); + TaskQueue.enqueue_task (Option.get !queue) task ~cancel_switch; f, cancel_switch end else ProofTask.build_proof_here ?loc ~drop_pt t_exn_info block_stop, cancel_switch - else + else let f, t_assign = Future.create_delegate ~name:pname (State.exn_on id ~valid) in let t_uuid = Future.uuid f in feedback (InProgress 1); @@ -1721,7 +1722,7 @@ end = struct (* {{{ *) t_exn_info; t_start = block_start; t_stop = block_stop; t_assign; t_drop = drop_pt; t_loc = loc; t_uuid; t_name = pname; t_states = VCS.nodes_in_slice ~block_start ~block_stop }) in - TaskQueue.enqueue_task (Option.get !queue) (task,cancel_switch); + TaskQueue.enqueue_task (Option.get !queue) task ~cancel_switch; f, cancel_switch let wait_all_done () = TaskQueue.join (Option.get !queue) @@ -1735,7 +1736,7 @@ end = struct (* {{{ *) let reqs = CList.map_filter ProofTask.(fun x -> - match request_of_task `Fresh x with + match request_of_task Fresh x with | Some (ReqBuildProof (r, b, _)) -> Some(r, b) | _ -> None) tasks in @@ -1756,14 +1757,14 @@ and TacTask : sig t_ast : int * aast; t_goal : Goal.goal; t_kill : unit -> unit; - t_name : string } + t_name : string } include AsyncTaskQueue.Task with type task := task end = struct (* {{{ *) type output = (Constr.constr * UState.t) option - + let forward_feedback msg = Hooks.(call forward_feedback msg) type task = { @@ -1773,7 +1774,7 @@ end = struct (* {{{ *) t_ast : int * aast; t_goal : Goal.goal; t_kill : unit -> unit; - t_name : string } + t_name : string } type request = { r_state : Stateid.t; @@ -1791,6 +1792,8 @@ end = struct (* {{{ *) let name = ref "tacworker" let extra_env () = [||] type competence = unit + type worker_status = Fresh | Old of competence + let task_match _ _ = true (* run by the master, on a thread *) @@ -1799,13 +1802,13 @@ end = struct (* {{{ *) r_state = t_state; r_state_fb = t_state_fb; r_document = - if age <> `Fresh then None + if age <> Fresh then None else Some (VCS.slice ~block_start:t_state ~block_stop:t_state); r_ast = t_ast; r_goal = t_goal; r_name = t_name } with VCS.Expired -> None - + let use_response _ { t_assign; t_state; t_state_fb; t_kill } resp = match resp with | RespBuiltSubProof o -> t_assign (`Val (Some o)); `Stay ((),[]) @@ -1818,7 +1821,7 @@ end = struct (* {{{ *) t_assign (`Exn e); t_kill (); `Stay ((),[]) - + let on_marshal_error err { t_name } = stm_pr_err ("Fatal marshal error: " ^ t_name ); flush_all (); exit 1 @@ -1826,7 +1829,7 @@ end = struct (* {{{ *) let on_task_cancellation_or_expiration_or_slave_death = function | Some { t_kill } -> t_kill () | _ -> () - + let command_focus = Proof.new_focus_kind () let focus_cond = Proof.no_cond command_focus @@ -1871,21 +1874,20 @@ end = struct (* {{{ *) let name_of_task { t_name } = t_name let name_of_request { r_name } = r_name - + end (* }}} *) and Partac : sig val vernac_interp : - solve:bool -> abstract:bool -> cancel_switch -> - int -> Stateid.t -> Stateid.t -> aast -> - unit + solve:bool -> abstract:bool -> cancel_switch:AsyncTaskQueue.cancel_switch -> + int -> Stateid.t -> Stateid.t -> aast -> unit end = struct (* {{{ *) - + module TaskQueue = AsyncTaskQueue.MakeQueue(TacTask) () - let vernac_interp ~solve ~abstract cancel nworkers safe_id id + let vernac_interp ~solve ~abstract ~cancel_switch nworkers safe_id id { indentation; verbose; loc; expr = e; strlen } = let e, time, fail = @@ -1909,10 +1911,10 @@ end = struct (* {{{ *) let t_ast = (i, { indentation; verbose; loc; expr = e; strlen }) in let t_name = Goal.uid g in TaskQueue.enqueue_task queue - ({ t_state = safe_id; t_state_fb = id; + { t_state = safe_id; t_state_fb = id; t_assign = assign; t_ast; t_goal = g; t_name; - t_kill = (fun () -> if solve then TaskQueue.cancel_all queue) }, - cancel); + t_kill = (fun () -> if solve then TaskQueue.cancel_all queue) } + ~cancel_switch; g,f) 1 goals in TaskQueue.join queue; @@ -1943,7 +1945,7 @@ end = struct (* {{{ *) end) in Proof.run_tactic (Global.env()) assign_tac p)))) ()) - + end (* }}} *) and QueryTask : sig @@ -1952,10 +1954,10 @@ and QueryTask : sig include AsyncTaskQueue.Task with type task := task end = struct (* {{{ *) - + type task = { t_where : Stateid.t; t_for : Stateid.t ; t_what : aast } - + type request = { r_where : Stateid.t ; r_for : Stateid.t ; r_what : aast; r_doc : VCS.vcs } type response = unit @@ -1963,6 +1965,8 @@ end = struct (* {{{ *) let name = ref "queryworker" let extra_env _ = [||] type competence = unit + type worker_status = Fresh | Old of competence + let task_match _ _ = true let request_of_task _ { t_where; t_what; t_for } = @@ -1972,7 +1976,7 @@ end = struct (* {{{ *) r_doc = VCS.slice ~block_start:t_where ~block_stop:t_where; r_what = t_what } with VCS.Expired -> None - + let use_response _ _ _ = `End let on_marshal_error _ _ = @@ -1980,7 +1984,7 @@ end = struct (* {{{ *) flush_all (); exit 1 let on_task_cancellation_or_expiration_or_slave_death _ = () - + let forward_feedback msg = Hooks.(call forward_feedback msg) let perform { r_where; r_doc; r_what; r_for } = @@ -2000,16 +2004,16 @@ end = struct (* {{{ *) let e = CErrors.push e in let msg = iprint e in feedback ~id:r_for (Message (Error, None, msg)) - + let name_of_task { t_what } = string_of_ppcmds (pr_ast t_what) let name_of_request { r_what } = string_of_ppcmds (pr_ast r_what) end (* }}} *) -and Query : sig +and Query : sig val init : unit -> unit - val vernac_interp : cancel_switch -> Stateid.t -> Stateid.t -> aast -> unit + val vernac_interp : cancel_switch:AsyncTaskQueue.cancel_switch -> Stateid.t -> Stateid.t -> aast -> unit end = struct (* {{{ *) @@ -2017,10 +2021,10 @@ end = struct (* {{{ *) let queue = ref None - let vernac_interp switch prev id q = + let vernac_interp ~cancel_switch prev id q = assert(TaskQueue.n_workers (Option.get !queue) > 0); TaskQueue.enqueue_task (Option.get !queue) - QueryTask.({ t_where = prev; t_for = id; t_what = q }, switch) + QueryTask.({ t_where = prev; t_for = id; t_what = q }) ~cancel_switch let init () = queue := Some (TaskQueue.create (if !Flags.async_proofs_full then 1 else 0)) @@ -2049,7 +2053,7 @@ let delegate name = get_hint_bp_time name >= !Flags.async_proofs_delegation_threshold || VCS.is_vio_doc () || !Flags.async_proofs_full - + let warn_deprecated_nested_proofs = CWarnings.create ~name:"deprecated-nested-proofs" ~category:"deprecated" (fun () -> @@ -2175,7 +2179,7 @@ let log_processing_sync id name reason = log_string Printf.(sprintf let wall_clock_last_fork = ref 0.0 let known_state ?(redefine_qed=false) ~cache id = - + let error_absorbing_tactic id blockname exn = (* We keep the static/dynamic part of block detection separate, since the static part could be performed earlier. As of today there is @@ -2277,17 +2281,17 @@ let known_state ?(redefine_qed=false) ~cache id = ), cache, true | `Cmd { cast = x; cqueue = `SkipQueue } -> (fun () -> reach view.next), cache, true - | `Cmd { cast = x; cqueue = `TacQueue (solve,abstract,cancel); cblock } -> + | `Cmd { cast = x; cqueue = `TacQueue (solve,abstract,cancel_switch); cblock } -> (fun () -> resilient_tactic id cblock (fun () -> reach ~cache:`Shallow view.next; - Partac.vernac_interp ~solve ~abstract - cancel !Flags.async_proofs_n_tacworkers view.next id x) + Partac.vernac_interp ~solve ~abstract ~cancel_switch + !Flags.async_proofs_n_tacworkers view.next id x) ), cache, true - | `Cmd { cast = x; cqueue = `QueryQueue cancel } + | `Cmd { cast = x; cqueue = `QueryQueue cancel_switch } when Flags.async_proofs_is_master () -> (fun () -> reach view.next; - Query.vernac_interp cancel view.next id x + Query.vernac_interp ~cancel_switch view.next id x ), cache, false | `Cmd { cast = x; ceff = eff; ctac = true; cblock } -> (fun () -> resilient_tactic id cblock (fun () -> @@ -2376,7 +2380,7 @@ let known_state ?(redefine_qed=false) ~cache id = end; Proof_global.discard_all () ), (if redefine_qed then `No else `Yes), true - | `Sync (name, `Immediate) -> (fun () -> + | `Sync (name, `Immediate) -> (fun () -> reach eop; let st = Vernacstate.freeze_interp_state `No in ignore(stm_vernac_interp id st x); @@ -2831,7 +2835,7 @@ let process_transaction ?(newtip=Stateid.fresh ()) ?(part_of_script=true) let get_ast ~doc id = match VCS.visit id with | { step = `Cmd { cast = { loc; expr } } } - | { step = `Fork (({ loc; expr }, _, _, _), _) } + | { step = `Fork (({ loc; expr }, _, _, _), _) } | { step = `Qed ({ qast = { loc; expr } }, _) } -> Some (Loc.tag ?loc expr) | _ -> None -- cgit v1.2.3