From 012e35b2d05b52dbf43dc1e26c8db550c859af03 Mon Sep 17 00:00:00 2001 From: Emilio Jesus Gallego Arias Date: Mon, 25 Sep 2017 12:40:33 +0200 Subject: [stm] [doc] Add some documentation to AsyncTaskQueue API As a bonus we remove some trailing whitespace, and implement a couple of hints suggested in the discussion. --- stm/stm.ml | 150 +++++++++++++++++++++++++++++++------------------------------ 1 file changed, 77 insertions(+), 73 deletions(-) (limited to 'stm/stm.ml') diff --git a/stm/stm.ml b/stm/stm.ml index 864fff9e0..a9183ce78 100644 --- a/stm/stm.ml +++ b/stm/stm.ml @@ -48,7 +48,7 @@ let state_computed, state_computed_hook = Hook.make let state_ready, state_ready_hook = Hook.make ~default:(fun state_id -> ()) () -let forward_feedback, forward_feedback_hook = +let forward_feedback, forward_feedback_hook = let m = Mutex.create () in Hook.make ~default:(function | { doc_id = did; span_id = id; route; contents } -> @@ -108,7 +108,6 @@ module Vcs_ = Vcs.Make(Stateid.Self) type future_proof = Proof_global.closed_proof_output Future.computation type proof_mode = string type depth = int -type cancel_switch = bool ref type branch_type = [ `Master | `Proof of proof_mode * depth @@ -122,14 +121,14 @@ type cmd_t = { cids : Names.Id.t list; cblock : proof_block_name option; cqueue : [ `MainQueue - | `TacQueue of solving_tac * anon_abstracting_tac * cancel_switch - | `QueryQueue of cancel_switch + | `TacQueue of solving_tac * anon_abstracting_tac * AsyncTaskQueue.cancel_switch + | `QueryQueue of AsyncTaskQueue.cancel_switch | `SkipQueue ] } type fork_t = aast * Vcs_.Branch.t * Vernacexpr.opacity_guarantee * Names.Id.t list type qed_t = { qast : aast; keep : vernac_qed_type; - mutable fproof : (future_proof * cancel_switch) option; + mutable fproof : (future_proof * AsyncTaskQueue.cancel_switch) option; brname : Vcs_.Branch.t; brinfo : branch_type Vcs_.branch_info } @@ -318,7 +317,7 @@ module VCS : sig (* cuts from start -> stop, raising Expired if some nodes are not there *) val slice : block_start:id -> block_stop:id -> vcs val nodes_in_slice : block_start:id -> block_stop:id -> Stateid.t list - + val create_proof_task_box : id list -> qed:id -> block_start:id -> unit val create_proof_block : static_block_declaration -> string -> unit val box_of : id -> box list @@ -367,7 +366,7 @@ end = struct (* {{{ *) | Noop -> " " | Alias (id,_) -> sprintf "Alias(%s)" (Stateid.to_string id) | Qed { qast } -> Pp.string_of_ppcmds (pr_ast qast) in - let is_green id = + let is_green id = match get_info vcs id with | Some { state = Valid _ } -> true | _ -> false in @@ -435,7 +434,7 @@ end = struct (* {{{ *) let outerboxes boxes = List.filter (fun b -> not (List.exists (fun b1 -> - not (same_box b1 b) && contains b1 b) boxes) + not (same_box b1 b) && contains b1 b) boxes) ) boxes in let rec rec_print b = boxes := CList.remove same_box b !boxes; @@ -565,7 +564,7 @@ end = struct (* {{{ *) let id = new_node () in merge id ~ours:(Sideff action) ~into:b Branch.master) (List.filter (fun b -> not (Branch.equal b Branch.master)) (branches ())) - + let visit id = Vcs_aux.visit !vcs id let nodes_in_slice ~block_start ~block_stop = @@ -664,7 +663,7 @@ end = struct (* {{{ *) val command : now:bool -> (unit -> unit) -> unit end = struct - + let m = Mutex.create () let c = Condition.create () let job = ref None @@ -972,7 +971,7 @@ let get_script prf = find ((x.expr, (VCS.get_info id).n_goals)::acc) view.next | `Sideff (CherryPickEnv, id) -> find acc id | `Cmd {cast = x; ctac} when ctac -> (* skip non-tactics *) - find ((x.expr, (VCS.get_info id).n_goals)::acc) view.next + find ((x.expr, (VCS.get_info id).n_goals)::acc) view.next | `Cmd _ -> find acc view.next | `Alias (id,_) -> find acc id | `Fork _ -> find acc view.next @@ -1138,7 +1137,7 @@ end = struct (* {{{ *) let m = match e with VernacUndoTo m -> m | _ -> 0 in let id = VCS.get_branch_pos (VCS.current_branch ()) in let vcs = - match (VCS.get_info id).vcs_backup with + match (VCS.get_info id).vcs_backup with | None, _ -> anomaly Pp.(str"Backtrack: tip with no vcs_backup.") | Some vcs, _ -> vcs in let cb, _ = @@ -1191,7 +1190,7 @@ let record_pb_time ?loc proof_name time = Aux_file.record_in_aux_at proof_name proof_build_time; hints := Aux_file.set !hints proof_name proof_build_time end - + exception RemoteException of Pp.t let _ = CErrors.register_handler (function | RemoteException ppcmd -> ppcmd @@ -1248,7 +1247,7 @@ let is_block_name_enabled name = | `Only l -> List.mem name l let detect_proof_block id name = - let name = match name with None -> "indent" | Some x -> x in + let name = match name with None -> "indent" | Some x -> x in if is_block_name_enabled name && (Flags.async_proofs_is_master () || Flags.async_proofs_is_worker ()) then ( @@ -1271,7 +1270,7 @@ let detect_proof_block id name = (* Unused module warning doesn't understand [module rec] *) [@@@ocaml.warning "-60"] module rec ProofTask : sig - + type competence = Stateid.t list type task_build_proof = { t_exn_info : Stateid.t * Stateid.t; @@ -1294,8 +1293,8 @@ module rec ProofTask : sig include AsyncTaskQueue.Task with type task := task - and type competence := competence - and type request := request + and type competence := competence + and type request := request val build_proof_here : ?loc:Loc.t -> @@ -1304,7 +1303,7 @@ module rec ProofTask : sig Proof_global.closed_proof_output Future.computation (* If set, only tasks overlapping with this list are processed *) - val set_perspective : Stateid.t list -> unit + val set_perspective : Stateid.t list -> unit end = struct (* {{{ *) @@ -1326,10 +1325,12 @@ end = struct (* {{{ *) | BuildProof of task_build_proof | States of Stateid.t list + type worker_status = Fresh | Old of competence + type request = | ReqBuildProof of (Future.UUID.t,VCS.vcs) Stateid.request * bool * competence | ReqStates of Stateid.t list - + type error = { e_error_at : Stateid.t; e_safe_id : Stateid.t; @@ -1349,10 +1350,10 @@ end = struct (* {{{ *) let task_match age t = match age, t with - | `Fresh, BuildProof { t_states } -> + | Fresh, BuildProof { t_states } -> not !Flags.async_proofs_full || List.exists (fun x -> CList.mem_f Stateid.equal x !perspective) t_states - | `Old my_states, States l -> + | Old my_states, States l -> List.for_all (fun x -> CList.mem_f Stateid.equal x my_states) l | _ -> false @@ -1368,7 +1369,7 @@ end = struct (* {{{ *) | BuildProof { t_exn_info;t_start;t_stop;t_loc;t_uuid;t_name;t_states;t_drop } -> - assert(age = `Fresh); + assert(age = Fresh); try Some (ReqBuildProof ({ Stateid.exn_info = t_exn_info; stop = t_stop; @@ -1378,11 +1379,11 @@ end = struct (* {{{ *) name = t_name }, t_drop, t_states)) with VCS.Expired -> None - let use_response (s : competence AsyncTaskQueue.worker_status) t r = + let use_response (s : worker_status) t r = match s, t, r with - | `Old c, States _, RespStates l -> + | Old c, States _, RespStates l -> List.iter (fun (id,s) -> State.assign id s) l; `End - | `Fresh, BuildProof { t_assign; t_loc; t_name; t_states; t_drop }, + | Fresh, BuildProof { t_assign; t_loc; t_name; t_states; t_drop }, RespBuiltProof (pl, time) -> feedback (InProgress ~-1); t_assign (`Val pl); @@ -1390,7 +1391,7 @@ end = struct (* {{{ *) if !Flags.async_proofs_full || t_drop then `Stay(t_states,[States t_states]) else `End - | `Fresh, BuildProof { t_assign; t_loc; t_name; t_states }, + | Fresh, BuildProof { t_assign; t_loc; t_name; t_states }, RespError { e_error_at; e_safe_id = valid; e_msg; e_safe_states } -> feedback (InProgress ~-1); let info = Stateid.add ~valid Exninfo.null e_error_at in @@ -1477,7 +1478,7 @@ end = struct (* {{{ *) | VtProofStep _, _ -> true | _ -> false in - let initial = + let initial = let rec aux id = try match VCS.visit id with { next } -> aux next with VCS.Expired -> id in @@ -1490,7 +1491,7 @@ end = struct (* {{{ *) then Some (prev, State.get_cached prev, step) else None with VCS.Expired -> None in - let this = + let this = if State.is_cached_and_valid id then Some (State.get_cached id) else None in match prev, this with | _, None -> None @@ -1532,11 +1533,11 @@ and Slaves : sig val build_proof : ?loc:Loc.t -> drop_pt:bool -> exn_info:(Stateid.t * Stateid.t) -> block_start:Stateid.t -> block_stop:Stateid.t -> - name:string -> future_proof * cancel_switch + name:string -> future_proof * AsyncTaskQueue.cancel_switch (* blocking function that waits for the task queue to be empty *) val wait_all_done : unit -> unit - + (* initialize the whole machinery (optional) *) val init : unit -> unit @@ -1558,7 +1559,7 @@ and Slaves : sig end = struct (* {{{ *) module TaskQueue = AsyncTaskQueue.MakeQueue(ProofTask) () - + let queue = ref None let init () = if Flags.async_proofs_is_master () then @@ -1613,8 +1614,8 @@ end = struct (* {{{ *) | Some (_, cur) -> match VCS.visit cur with | { step = `Cmd { cast = { loc } } } - | { step = `Fork (( { loc }, _, _, _), _) } - | { step = `Qed ( { qast = { loc } }, _) } + | { step = `Fork (( { loc }, _, _, _), _) } + | { step = `Qed ( { qast = { loc } }, _) } | { step = `Sideff (ReplayCommand { loc }, _) } -> let start, stop = Option.cata Loc.unloc (0,0) loc in msg_error Pp.( @@ -1664,7 +1665,7 @@ end = struct (* {{{ *) u.(bucket) <- uc; p.(bucket) <- pr; u, Univ.ContextSet.union cst extra, false - + let check_task name l i = match check_task_aux "" name l i with | `OK _ | `OK_ADMITTED -> true @@ -1709,11 +1710,11 @@ end = struct (* {{{ *) t_exn_info; t_start = block_start; t_stop = block_stop; t_drop = drop_pt; t_assign = assign; t_loc = loc; t_uuid; t_name = pname; t_states = VCS.nodes_in_slice ~block_start ~block_stop }) in - TaskQueue.enqueue_task (Option.get !queue) (task,cancel_switch); + TaskQueue.enqueue_task (Option.get !queue) task ~cancel_switch; f, cancel_switch end else ProofTask.build_proof_here ?loc ~drop_pt t_exn_info block_stop, cancel_switch - else + else let f, t_assign = Future.create_delegate ~name:pname (State.exn_on id ~valid) in let t_uuid = Future.uuid f in feedback (InProgress 1); @@ -1721,7 +1722,7 @@ end = struct (* {{{ *) t_exn_info; t_start = block_start; t_stop = block_stop; t_assign; t_drop = drop_pt; t_loc = loc; t_uuid; t_name = pname; t_states = VCS.nodes_in_slice ~block_start ~block_stop }) in - TaskQueue.enqueue_task (Option.get !queue) (task,cancel_switch); + TaskQueue.enqueue_task (Option.get !queue) task ~cancel_switch; f, cancel_switch let wait_all_done () = TaskQueue.join (Option.get !queue) @@ -1735,7 +1736,7 @@ end = struct (* {{{ *) let reqs = CList.map_filter ProofTask.(fun x -> - match request_of_task `Fresh x with + match request_of_task Fresh x with | Some (ReqBuildProof (r, b, _)) -> Some(r, b) | _ -> None) tasks in @@ -1756,14 +1757,14 @@ and TacTask : sig t_ast : int * aast; t_goal : Goal.goal; t_kill : unit -> unit; - t_name : string } + t_name : string } include AsyncTaskQueue.Task with type task := task end = struct (* {{{ *) type output = (Constr.constr * UState.t) option - + let forward_feedback msg = Hooks.(call forward_feedback msg) type task = { @@ -1773,7 +1774,7 @@ end = struct (* {{{ *) t_ast : int * aast; t_goal : Goal.goal; t_kill : unit -> unit; - t_name : string } + t_name : string } type request = { r_state : Stateid.t; @@ -1791,6 +1792,8 @@ end = struct (* {{{ *) let name = ref "tacworker" let extra_env () = [||] type competence = unit + type worker_status = Fresh | Old of competence + let task_match _ _ = true (* run by the master, on a thread *) @@ -1799,13 +1802,13 @@ end = struct (* {{{ *) r_state = t_state; r_state_fb = t_state_fb; r_document = - if age <> `Fresh then None + if age <> Fresh then None else Some (VCS.slice ~block_start:t_state ~block_stop:t_state); r_ast = t_ast; r_goal = t_goal; r_name = t_name } with VCS.Expired -> None - + let use_response _ { t_assign; t_state; t_state_fb; t_kill } resp = match resp with | RespBuiltSubProof o -> t_assign (`Val (Some o)); `Stay ((),[]) @@ -1818,7 +1821,7 @@ end = struct (* {{{ *) t_assign (`Exn e); t_kill (); `Stay ((),[]) - + let on_marshal_error err { t_name } = stm_pr_err ("Fatal marshal error: " ^ t_name ); flush_all (); exit 1 @@ -1826,7 +1829,7 @@ end = struct (* {{{ *) let on_task_cancellation_or_expiration_or_slave_death = function | Some { t_kill } -> t_kill () | _ -> () - + let command_focus = Proof.new_focus_kind () let focus_cond = Proof.no_cond command_focus @@ -1871,21 +1874,20 @@ end = struct (* {{{ *) let name_of_task { t_name } = t_name let name_of_request { r_name } = r_name - + end (* }}} *) and Partac : sig val vernac_interp : - solve:bool -> abstract:bool -> cancel_switch -> - int -> Stateid.t -> Stateid.t -> aast -> - unit + solve:bool -> abstract:bool -> cancel_switch:AsyncTaskQueue.cancel_switch -> + int -> Stateid.t -> Stateid.t -> aast -> unit end = struct (* {{{ *) - + module TaskQueue = AsyncTaskQueue.MakeQueue(TacTask) () - let vernac_interp ~solve ~abstract cancel nworkers safe_id id + let vernac_interp ~solve ~abstract ~cancel_switch nworkers safe_id id { indentation; verbose; loc; expr = e; strlen } = let e, time, fail = @@ -1909,10 +1911,10 @@ end = struct (* {{{ *) let t_ast = (i, { indentation; verbose; loc; expr = e; strlen }) in let t_name = Goal.uid g in TaskQueue.enqueue_task queue - ({ t_state = safe_id; t_state_fb = id; + { t_state = safe_id; t_state_fb = id; t_assign = assign; t_ast; t_goal = g; t_name; - t_kill = (fun () -> if solve then TaskQueue.cancel_all queue) }, - cancel); + t_kill = (fun () -> if solve then TaskQueue.cancel_all queue) } + ~cancel_switch; g,f) 1 goals in TaskQueue.join queue; @@ -1943,7 +1945,7 @@ end = struct (* {{{ *) end) in Proof.run_tactic (Global.env()) assign_tac p)))) ()) - + end (* }}} *) and QueryTask : sig @@ -1952,10 +1954,10 @@ and QueryTask : sig include AsyncTaskQueue.Task with type task := task end = struct (* {{{ *) - + type task = { t_where : Stateid.t; t_for : Stateid.t ; t_what : aast } - + type request = { r_where : Stateid.t ; r_for : Stateid.t ; r_what : aast; r_doc : VCS.vcs } type response = unit @@ -1963,6 +1965,8 @@ end = struct (* {{{ *) let name = ref "queryworker" let extra_env _ = [||] type competence = unit + type worker_status = Fresh | Old of competence + let task_match _ _ = true let request_of_task _ { t_where; t_what; t_for } = @@ -1972,7 +1976,7 @@ end = struct (* {{{ *) r_doc = VCS.slice ~block_start:t_where ~block_stop:t_where; r_what = t_what } with VCS.Expired -> None - + let use_response _ _ _ = `End let on_marshal_error _ _ = @@ -1980,7 +1984,7 @@ end = struct (* {{{ *) flush_all (); exit 1 let on_task_cancellation_or_expiration_or_slave_death _ = () - + let forward_feedback msg = Hooks.(call forward_feedback msg) let perform { r_where; r_doc; r_what; r_for } = @@ -2000,16 +2004,16 @@ end = struct (* {{{ *) let e = CErrors.push e in let msg = iprint e in feedback ~id:r_for (Message (Error, None, msg)) - + let name_of_task { t_what } = string_of_ppcmds (pr_ast t_what) let name_of_request { r_what } = string_of_ppcmds (pr_ast r_what) end (* }}} *) -and Query : sig +and Query : sig val init : unit -> unit - val vernac_interp : cancel_switch -> Stateid.t -> Stateid.t -> aast -> unit + val vernac_interp : cancel_switch:AsyncTaskQueue.cancel_switch -> Stateid.t -> Stateid.t -> aast -> unit end = struct (* {{{ *) @@ -2017,10 +2021,10 @@ end = struct (* {{{ *) let queue = ref None - let vernac_interp switch prev id q = + let vernac_interp ~cancel_switch prev id q = assert(TaskQueue.n_workers (Option.get !queue) > 0); TaskQueue.enqueue_task (Option.get !queue) - QueryTask.({ t_where = prev; t_for = id; t_what = q }, switch) + QueryTask.({ t_where = prev; t_for = id; t_what = q }) ~cancel_switch let init () = queue := Some (TaskQueue.create (if !Flags.async_proofs_full then 1 else 0)) @@ -2049,7 +2053,7 @@ let delegate name = get_hint_bp_time name >= !Flags.async_proofs_delegation_threshold || VCS.is_vio_doc () || !Flags.async_proofs_full - + let warn_deprecated_nested_proofs = CWarnings.create ~name:"deprecated-nested-proofs" ~category:"deprecated" (fun () -> @@ -2175,7 +2179,7 @@ let log_processing_sync id name reason = log_string Printf.(sprintf let wall_clock_last_fork = ref 0.0 let known_state ?(redefine_qed=false) ~cache id = - + let error_absorbing_tactic id blockname exn = (* We keep the static/dynamic part of block detection separate, since the static part could be performed earlier. As of today there is @@ -2277,17 +2281,17 @@ let known_state ?(redefine_qed=false) ~cache id = ), cache, true | `Cmd { cast = x; cqueue = `SkipQueue } -> (fun () -> reach view.next), cache, true - | `Cmd { cast = x; cqueue = `TacQueue (solve,abstract,cancel); cblock } -> + | `Cmd { cast = x; cqueue = `TacQueue (solve,abstract,cancel_switch); cblock } -> (fun () -> resilient_tactic id cblock (fun () -> reach ~cache:`Shallow view.next; - Partac.vernac_interp ~solve ~abstract - cancel !Flags.async_proofs_n_tacworkers view.next id x) + Partac.vernac_interp ~solve ~abstract ~cancel_switch + !Flags.async_proofs_n_tacworkers view.next id x) ), cache, true - | `Cmd { cast = x; cqueue = `QueryQueue cancel } + | `Cmd { cast = x; cqueue = `QueryQueue cancel_switch } when Flags.async_proofs_is_master () -> (fun () -> reach view.next; - Query.vernac_interp cancel view.next id x + Query.vernac_interp ~cancel_switch view.next id x ), cache, false | `Cmd { cast = x; ceff = eff; ctac = true; cblock } -> (fun () -> resilient_tactic id cblock (fun () -> @@ -2376,7 +2380,7 @@ let known_state ?(redefine_qed=false) ~cache id = end; Proof_global.discard_all () ), (if redefine_qed then `No else `Yes), true - | `Sync (name, `Immediate) -> (fun () -> + | `Sync (name, `Immediate) -> (fun () -> reach eop; let st = Vernacstate.freeze_interp_state `No in ignore(stm_vernac_interp id st x); @@ -2831,7 +2835,7 @@ let process_transaction ?(newtip=Stateid.fresh ()) ?(part_of_script=true) let get_ast ~doc id = match VCS.visit id with | { step = `Cmd { cast = { loc; expr } } } - | { step = `Fork (({ loc; expr }, _, _, _), _) } + | { step = `Fork (({ loc; expr }, _, _, _), _) } | { step = `Qed ({ qast = { loc; expr } }, _) } -> Some (Loc.tag ?loc expr) | _ -> None -- cgit v1.2.3