diff options
author | Enrico Tassi <Enrico.Tassi@inria.fr> | 2014-11-28 18:41:43 +0100 |
---|---|---|
committer | Enrico Tassi <Enrico.Tassi@inria.fr> | 2014-11-28 18:41:43 +0100 |
commit | 5d3f5c210aad8d73b007936e65694c401e66c7d0 (patch) | |
tree | 2ec57168c42d7557ba27136722415e5288c7abd9 /stm | |
parent | 9f4546a103607e0f2283897c094ce05ffa2d5c21 (diff) |
STM: if a-p-always-delegate fetch states from parked worker on edit-at
If the async-proofs-always-delegate is passed, workers are killed
only when the proof they computed is obsolete. If one jumps back
to a state that was computed by the worker (and not the master) instead
of (re)computing such state in the master ask the worker to send it
back.
Diffstat (limited to 'stm')
-rw-r--r-- | stm/stm.ml | 141 | ||||
-rw-r--r-- | stm/vcs.ml | 10 | ||||
-rw-r--r-- | stm/vcs.mli | 8 |
3 files changed, 107 insertions, 52 deletions
diff --git a/stm/stm.ml b/stm/stm.ml index 1c51c56a0..ce6ddd3c5 100644 --- a/stm/stm.ml +++ b/stm/stm.ml @@ -268,8 +268,8 @@ module VCS : sig val slice : start:id -> stop:id -> vcs val nodes_in_slice : start:id -> stop:id -> Stateid.Set.t - val create_cluster : id list -> tip:id -> unit - val cluster_of : id -> id option + val create_cluster : id list -> qed:id -> start:id -> unit + val cluster_of : id -> (id * id) option val delete_cluster_of : id -> unit val proof_nesting : unit -> int @@ -490,7 +490,7 @@ end = struct (* {{{ *) List.fold_right (fun (id,_) acc -> Stateid.Set.add id acc) (nodes_in_slice ~start ~stop) Stateid.Set.empty - let create_cluster l ~tip = vcs := create_cluster !vcs l tip + let create_cluster l ~qed ~start = vcs := create_cluster !vcs l (qed,start) let cluster_of id = Option.map Dag.Cluster.data (cluster_of !vcs id) let delete_cluster_of id = Option.iter (fun x -> vcs := delete_cluster !vcs x) (Vcs_.cluster_of !vcs id) @@ -863,11 +863,19 @@ module rec ProofTask : sig t_report_at: Stateid.t; t_route : Feedback.route_id; t_text : string } + type task_safesate = { + t_states : Stateid.t list; + t_assignstates : + (Stateid.t * State.frozen_state) list Future.assignement -> unit } - type task = BuildProof of task_build_proof | Querys of task_query list + type task = + | BuildProof of task_build_proof + | Querys of task_query list + | States of task_safesate type request = | ReqBuildProof of (Future.UUID.t,VCS.vcs) Stateid.request | ReqQuerys of task_query list + | ReqStates of Stateid.t list include AsyncTaskQueue.Task with type task := task @@ -899,12 +907,20 @@ end = struct (* {{{ *) t_report_at: Stateid.t; t_route : Feedback.route_id; t_text : string } + type task_safesate = { + t_states : Stateid.t list; + t_assignstates : + (Stateid.t * State.frozen_state) list Future.assignement -> unit } - type task = BuildProof of task_build_proof | Querys of task_query list + type task = + | BuildProof of task_build_proof + | Querys of task_query list + | States of task_safesate type request = | ReqBuildProof of (Future.UUID.t,VCS.vcs) Stateid.request | ReqQuerys of task_query list + | ReqStates of Stateid.t list type error = { e_error_at : Stateid.t; @@ -915,6 +931,7 @@ end = struct (* {{{ *) type response = | RespBuiltProof of Proof_global.closed_proof_output * float | RespError of error + | RespStates of (Stateid.t * State.frozen_state) list | RespDone let name = ref "proofworker" @@ -925,17 +942,25 @@ end = struct (* {{{ *) | `Fresh, BuildProof _ -> true | `Parked my_states, Querys qs -> List.for_all (fun { t_at } -> Stateid.Set.mem t_at my_states) qs + | `Parked my_states, States { t_states } -> + List.for_all (fun x -> Stateid.Set.mem x my_states) t_states | _ -> false let name_of_task = function - | BuildProof t -> t.t_name - | Querys l -> Printf.sprintf "querys(%d)" (List.length l) + | BuildProof t -> "proof: " ^ t.t_name + | Querys l -> + "querys: " ^ String.concat " " (List.map (fun { t_text } -> t_text ) l) + | States { t_states } -> + "states: " ^ String.concat "," (List.map Stateid.to_string t_states) let name_of_request = function - | ReqBuildProof r -> r.Stateid.name - | ReqQuerys l -> Printf.sprintf "querys(%d)" (List.length l) + | ReqBuildProof r -> "proof: " ^ r.Stateid.name + | ReqQuerys l -> + "querys: " ^ String.concat " " (List.map (fun { t_text } -> t_text ) l) + | ReqStates l -> "states: "^String.concat "," (List.map Stateid.to_string l) let request_of_task age = function - | Querys q -> Some (ReqQuerys q) + | Querys l -> Some (ReqQuerys l) + | States { t_states } -> Some (ReqStates t_states) | BuildProof { t_exn_info;t_start;t_stop;t_loc;t_uuid;t_name } -> assert(age = `Fresh); try Some (ReqBuildProof { @@ -948,29 +973,27 @@ end = struct (* {{{ *) with VCS.Expired -> None let use_response (s : competence AsyncTaskQueue.worker_status) t r = - match s, t with - | `Parked _, Querys _ -> `Stay - | `Fresh, Querys _ -> assert false - | `Old, Querys _ -> assert false - | `Old, BuildProof _ -> assert false - | `Parked _, BuildProof _ -> assert false - | `Fresh, BuildProof { t_assign; t_loc; t_name; t_states } -> - match r with - | RespBuiltProof (pl, time) -> + match s, t, r with + | `Parked _, Querys _, _ -> `Stay + | `Parked _, States { t_assignstates }, RespStates l -> + t_assignstates (`Val l); `Stay + | `Fresh, BuildProof { t_assign; t_loc; t_name; t_states }, + RespBuiltProof (pl, time) -> Pp.feedback (Feedback.InProgress ~-1); t_assign (`Val pl); record_pb_time t_name t_loc time; if !Flags.async_proofs_always_delegate then `Park t_states else `Reset - | RespError { e_error_at; e_safe_id = valid; e_msg; e_safe_states } -> + | `Fresh, BuildProof { t_assign; t_loc; t_name; t_states }, + RespError { e_error_at; e_safe_id = valid; e_msg; e_safe_states } -> Pp.feedback (Feedback.InProgress ~-1); let e = Stateid.add ~valid (RemoteException e_msg) e_error_at in t_assign (`Exn e); List.iter (fun (id,s) -> State.assign id s) e_safe_states; if !Flags.async_proofs_always_delegate then `Park t_states else `Reset - | RespDone -> assert false + | _ -> assert false let on_task_cancellation_or_expiration = function - | None | Some (Querys _) -> () + | None | Some (Querys _) | Some (States _) -> () | Some (BuildProof { t_start = start; t_assign }) -> let s = "Worker cancelled by the user" in let e = Stateid.add ~valid:start (RemoteException (strbrk s)) start in @@ -1027,26 +1050,35 @@ end = struct (* {{{ *) (if is_cached e_safe_id then [e_safe_id,get_cached e_safe_id] else []) @ aux 1 (prog 1 1) e_safe_id in RespError { e_error_at; e_safe_id; e_msg = print e; e_safe_states } + let perform_query q = - try Future.purify (fun { t_at; t_report_at; t_text; t_route } -> + try Future.purify (fun { t_at; t_report_at; t_text; t_route = route } -> Reach.known_state ~cache:`No t_at; - let loc, ast = vernac_parse ~newtip:t_report_at ~route:t_route 0 t_text in - try vernac_interp t_report_at ~route:t_route { expr = ast; loc; verbose = true } + let loc, ast = vernac_parse ~newtip:t_report_at ~route 0 t_text in + try vernac_interp t_report_at ~route { expr = ast; loc; verbose = true } with e when Errors.noncritical e -> let msg = string_of_ppcmds (print e) in - Pp.feedback ~state_id:t_report_at ~route:t_route + Pp.feedback ~state_id:t_report_at ~route (Feedback.ErrorMsg (Loc.ghost, msg))) q with e when Errors.noncritical e -> () + + let perform_states q = + CList.map_filter (fun id -> + if State.is_cached id then Some (id, State.get_cached id) else None) + q + let perform = function | ReqBuildProof bp -> perform_buildp bp | ReqQuerys qs -> List.iter perform_query qs; RespDone + | ReqStates sl -> RespStates (perform_states sl) let on_slave_death task = if not !fallback_to_lazy_if_slave_dies then `Exit 1 else match task with | None -> `Stay | Some (Querys _) -> `Stay + | Some (States _) -> `Stay | Some (BuildProof { t_exn_info; t_loc; t_stop; t_assign }) -> msg_warning(strbrk "Falling back to local, lazy, evaluation."); t_assign (`Comp(build_proof_here t_exn_info t_loc t_stop)); @@ -1055,6 +1087,8 @@ end = struct (* {{{ *) let on_marshal_error s = function | Querys _ -> () + | States _ -> msg_error(strbrk("Marshalling error: "^s^". "^ + "The system state could not be sent to the master process.")) | BuildProof { t_exn_info; t_stop; t_assign; t_loc } -> if !fallback_to_lazy_if_marshal_error then begin msg_error(strbrk("Marshalling error: "^s^". "^ @@ -1102,6 +1136,9 @@ and Slaves : sig Stateid.t -> cancel_switch -> (Stateid.t * Feedback.route_id) -> string -> unit + (* blocking *) + val fetch_states : Stateid.t list -> unit + end = struct (* {{{ *) module TaskQueue = AsyncTaskQueue.MakeQueue(ProofTask) @@ -1212,24 +1249,21 @@ end = struct (* {{{ *) | false, true -> 1 in TaskQueue.set_order (Option.get !queue) (fun task1 task2 -> match task1, task2 with - | BuildProof _, Querys _ -> 0 - | Querys _, BuildProof _ -> 0 | Querys q1, Querys q2 -> let s1 = List.fold_right (fun { t_at } -> Set.add t_at) q1 Set.empty in let s2 = List.fold_right (fun { t_at } -> Set.add t_at) q2 Set.empty in overlap_rel s1 s2 | BuildProof { t_states = s1 }, - BuildProof { t_states = s2 } -> overlap_rel s1 s2) + BuildProof { t_states = s2 } -> overlap_rel s1 s2 + | _ -> 0) let build_proof ~loc ~exn_info ~start ~stop ~name:pname = let id, valid as t_exn_info = exn_info in let cancel_switch = ref false in if fst (TaskQueue.n_workers (Option.get !queue)) = 0 then if !Flags.compilation_mode = Flags.BuildVi then begin - let force () : Proof_global.closed_proof_output Future.assignement = - try `Val (ProofTask.build_proof_here_core loc stop ()) - with e -> let e = Errors.push e in `Exn e in - let f,assign = Future.create_delegate ~force (State.exn_on id ~valid) in + let f,assign = + Future.create_delegate ~blocking:true (State.exn_on id ~valid) in let t_uuid = Future.uuid f in let task = ProofTask.(BuildProof { t_exn_info; t_start = start; t_stop = stop; @@ -1276,6 +1310,12 @@ end = struct (* {{{ *) let task = ProofTask.(Querys [ { t_at; t_report_at; t_route; t_text } ]) in TaskQueue.enqueue_task (Option.get !queue) task cancel_switch + let fetch_states t_states = + let fl, assign = Future.create_delegate ~blocking:true (fun x -> x) in + TaskQueue.enqueue_task (Option.get !queue) + ProofTask.(States { t_states; t_assignstates = assign }) (ref false); + List.iter (fun (id,s) -> State.assign id s) (Future.join fl) + end (* }}} *) and TacTask : sig @@ -1678,12 +1718,11 @@ let known_state ?(redefine_qed=false) ~cache id = | `ASync (start, pua, nodes, name) -> (fun () -> assert(keep == VtKeep); prerr_endline ("Asynchronous " ^ Stateid.to_string id); - VCS.create_cluster nodes ~tip:id; + VCS.create_cluster nodes ~qed:id ~start; begin match brinfo, qed.fproof with | { VCS.kind = `Edit _ }, None -> assert false | { VCS.kind = `Edit _ }, Some (ofp, cancel) -> assert(redefine_qed = true); - VCS.create_cluster nodes ~tip:id; let fp, cancel = Slaves.build_proof ~loc:x.loc ~exn_info:(id,eop) ~start ~stop:eop ~name in Future.replace ofp fp; @@ -2125,7 +2164,7 @@ let find_state id = try match VCS.cluster_of id with | None -> `Master false - | Some qed_id -> + | Some (qed_id,_) -> match VCS.visit qed_id with | { step = `Qed ({ fproof = Some (_,cs) }, _) } -> `Worker cs | _ -> anomaly (str "Cluster not ending with Qed") @@ -2156,6 +2195,11 @@ let async_query ~at ~report_with s = let edit_at id = if Stateid.equal id Stateid.dummy then anomaly(str"edit_at dummy") else let vcs = VCS.backup () in + let nodes_of_cluster id = + match VCS.cluster_of id with + | None -> [] + | Some (_, start) -> + Stateid.Set.elements (VCS.nodes_in_slice ~start ~stop:id) in let on_cur_branch id = let rec aux cur = if id = cur then true @@ -2177,14 +2221,24 @@ let edit_at id = | { step = `Fork _ } -> tip | { step = `Sideff (`Ast(_,id)|`Id id) } -> id | { next } -> master_for_br root next in - let reopen_branch at_id mode qed_id tip = - VCS.delete_cluster_of id; - let master_id = + let fetch_states_of_nodes_of_cluster id = + if !Flags.async_proofs_always_delegate then begin + let parked = nodes_of_cluster id in + let to_fetch = List.filter (fun id -> not (State.is_cached id)) parked in + prerr_endline ("fetch " ^ String.concat " " (List.map Stateid.to_string to_fetch)); + if to_fetch <> [] then Slaves.fetch_states to_fetch + end in + let reopen_branch start at_id mode qed_id tip = + let master_id, cancel_switch = + (* Hum, this should be the real start_id in the clusted and not next *) match VCS.visit qed_id with - | { step = `Qed _; next = master_id } -> master_id + | { step = `Qed ({ fproof = Some (_,cs)},_) } -> start, cs | _ -> anomaly (str "Cluster not ending with Qed") in VCS.branch ~root:master_id ~pos:id VCS.edit_branch (`Edit (mode, qed_id, master_id)); + fetch_states_of_nodes_of_cluster id; + VCS.delete_cluster_of id; + cancel_switch := true; Reach.known_state ~cache:(interactive ()) id; VCS.checkout_shallowest_proof_branch (); `Focus { stop = qed_id; start = master_id; tip } in @@ -2199,6 +2253,7 @@ let edit_at id = others; VCS.reset_branch VCS.Branch.master (master_for_br brinfo.VCS.root id); VCS.branch ~root:brinfo.VCS.root ~pos:brinfo.VCS.pos brname brinfo.VCS.kind; + fetch_states_of_nodes_of_cluster id; VCS.delete_cluster_of id; VCS.gc (); Reach.known_state ~cache:(interactive ()) id; @@ -2213,12 +2268,12 @@ let edit_at id = | _ -> None in match focused, VCS.cluster_of id, branch_info with | _, Some _, None -> assert false - | false, Some qed_id, Some mode -> + | false, Some (qed_id,start), Some mode -> let tip = VCS.cur_tip () in if has_failed qed_id && not !Flags.async_proofs_never_reopen_branch - then reopen_branch id mode qed_id tip + then reopen_branch start id mode qed_id tip else backto id - | true, Some qed_id, Some mode -> + | true, Some (qed_id,_), Some mode -> if on_cur_branch id then begin assert false end else if is_ancestor_of_cur_branch id then begin diff --git a/stm/vcs.ml b/stm/vcs.ml index e2513b1c1..3efe89358 100644 --- a/stm/vcs.ml +++ b/stm/vcs.ml @@ -62,11 +62,11 @@ module type S = sig val reachable : ('k,'e,'info) t -> id -> NodeSet.t module Dag : Dag.S with type node = id - val dag : ('kind,'diff,'info) t -> ('diff,'info,id) Dag.t + val dag : ('kind,'diff,'info) t -> ('diff,'info,id*id) Dag.t - val create_cluster : ('k,'e,'i) t -> id list -> id -> ('k,'e,'i) t - val cluster_of : ('k,'e,'i) t -> id -> id Dag.Cluster.t option - val delete_cluster : ('k,'e,'i) t -> id Dag.Cluster.t -> ('k,'e,'i) t + val create_cluster : ('k,'e,'i) t -> id list -> (id * id) -> ('k,'e,'i) t + val cluster_of : ('k,'e,'i) t -> id -> (id * id) Dag.Cluster.t option + val delete_cluster : ('k,'e,'i) t -> (id * id) Dag.Cluster.t -> ('k,'e,'i) t end @@ -102,7 +102,7 @@ type 'kind branch_info = { type ('kind,'edge,'info) t = { cur_branch : Branch.t; heads : 'kind branch_info BranchMap.t; - dag : ('edge,'info,id) Dag.t; + dag : ('edge,'info,id*id) Dag.t; } let empty root = { diff --git a/stm/vcs.mli b/stm/vcs.mli index 6c3571a08..7241ae461 100644 --- a/stm/vcs.mli +++ b/stm/vcs.mli @@ -79,11 +79,11 @@ module type S = sig (* read only dag *) module Dag : Dag.S with type node = id - val dag : ('kind,'diff,'info) t -> ('diff,'info,id) Dag.t + val dag : ('kind,'diff,'info) t -> ('diff,'info,id * id) Dag.t - val create_cluster : ('k,'e,'i) t -> id list -> id -> ('k,'e,'i) t - val cluster_of : ('k,'e,'i) t -> id -> id Dag.Cluster.t option - val delete_cluster : ('k,'e,'i) t -> id Dag.Cluster.t -> ('k,'e,'i) t + val create_cluster : ('k,'e,'i) t -> id list -> (id * id) -> ('k,'e,'i) t + val cluster_of : ('k,'e,'i) t -> id -> (id * id) Dag.Cluster.t option + val delete_cluster : ('k,'e,'i) t -> (id * id) Dag.Cluster.t -> ('k,'e,'i) t end |