1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
|
(************************************************************************)
(* v * The Coq Proof Assistant / The Coq Development Team *)
(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2017 *)
(* \VV/ **************************************************************)
(* // * This file is distributed under the terms of the *)
(* * GNU Lesser General Public License Version 2.1 *)
(************************************************************************)
open CErrors
open Pp
open Util
let stm_pr_err pp = Format.eprintf "%s] @[%a@]%!\n" (System.process_id ()) Pp.pp_with pp
let stm_prerr_endline s = if !Flags.debug then begin stm_pr_err (str s) end else ()
type 'a worker_status = [ `Fresh | `Old of 'a ]
module type Task = sig
type task
type competence
(* Marshallable *)
type request
type response
val name : string ref (* UID of the task kind, for -toploop *)
val extra_env : unit -> string array
(* run by the master, on a thread *)
val request_of_task : competence worker_status -> task -> request option
val task_match : competence worker_status -> task -> bool
val use_response :
competence worker_status -> task -> response ->
[ `Stay of competence * task list | `End ]
val on_marshal_error : string -> task -> unit
val on_task_cancellation_or_expiration_or_slave_death : task option -> unit
val forward_feedback : Feedback.feedback -> unit
(* run by the worker *)
val perform : request -> response
(* debugging *)
val name_of_task : task -> string
val name_of_request : request -> string
end
type expiration = bool ref
module Make(T : Task) = struct
exception Die
type response =
| Response of T.response
| RespFeedback of Feedback.feedback
| RespGetCounterNewUnivLevel
type request = Request of T.request
type more_data =
| MoreDataUnivLevel of Univ.universe_level list
let slave_respond (Request r) =
let res = T.perform r in
Response res
exception MarshalError of string
let marshal_to_channel oc data =
Marshal.to_channel oc data [];
flush oc
let marshal_err s = raise (MarshalError s)
let marshal_request oc (req : request) =
try marshal_to_channel oc req
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("marshal_request: "^s)
let unmarshal_request ic =
try (CThread.thread_friendly_input_value ic : request)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_request: "^s)
let marshal_response oc (res : response) =
try marshal_to_channel oc res
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("marshal_response: "^s)
let unmarshal_response ic =
try (CThread.thread_friendly_input_value ic : response)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_response: "^s)
let marshal_more_data oc (res : more_data) =
try marshal_to_channel oc res
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("marshal_more_data: "^s)
let unmarshal_more_data ic =
try (CThread.thread_friendly_input_value ic : more_data)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_more_data: "^s)
let report_status ?(id = !Flags.async_proofs_worker_id) s =
let open Feedback in
feedback ~id:Stateid.initial (WorkerStatus(id, s))
module Worker = Spawn.Sync(struct end)
module Model = struct
type process = Worker.process
type extra = (T.task * expiration) TQueue.t
let spawn id =
let name = Printf.sprintf "%s:%d" !T.name id in
let proc, ic, oc =
let rec set_slave_opt = function
| [] -> !Flags.async_proofs_flags_for_workers @
["-toploop"; !T.name^"top";
"-worker-id"; name;
"-async-proofs-worker-priority";
Flags.string_of_priority !Flags.async_proofs_worker_priority]
| ("-ideslave"|"-emacs"|"-batch")::tl -> set_slave_opt tl
| ("-async-proofs" |"-toploop" |"-vio2vo"
|"-load-vernac-source" |"-l" |"-load-vernac-source-verbose" |"-lv"
|"-compile" |"-compile-verbose"
|"-async-proofs-worker-priority" |"-worker-id") :: _ :: tl ->
set_slave_opt tl
| x::tl -> x :: set_slave_opt tl in
let args =
Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
let env = Array.append (T.extra_env ()) (Unix.environment ()) in
Worker.spawn ~env Sys.argv.(0) args in
name, proc, CThread.prepare_in_channel_for_thread_friendly_io ic, oc
let manager cpanel (id, proc, ic, oc) =
let { WorkerPool.extra = queue; exit; cancelled } = cpanel in
let exit () = report_status ~id "Dead"; exit () in
let last_task = ref None in
let worker_age = ref `Fresh in
let got_token = ref false in
let giveback_exec_token () =
if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in
let stop_waiting = ref false in
let expiration_date = ref (ref false) in
let pick_task () =
stm_prerr_endline "waiting for a task";
let pick age (t, c) = not !c && T.task_match age t in
let task, task_expiration =
TQueue.pop ~picky:(pick !worker_age) ~destroy:stop_waiting queue in
expiration_date := task_expiration;
last_task := Some task;
stm_prerr_endline ("got task: " ^ T.name_of_task task);
task in
let add_tasks l =
List.iter (fun t -> TQueue.push queue (t,!expiration_date)) l in
let get_exec_token () =
ignore(CoqworkmgrApi.get 1);
got_token := true;
stm_prerr_endline ("got execution token") in
let kill proc =
Worker.kill proc;
stm_prerr_endline ("Worker exited: " ^
match Worker.wait proc with
| Unix.WEXITED 0x400 -> "exit code unavailable"
| Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
| Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
| Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno) in
let more_univs n =
CList.init n (fun _ ->
Universes.new_univ_level (Global.current_dirpath ())) in
let rec kill_if () =
if not (Worker.is_alive proc) then ()
else if cancelled () || !(!expiration_date) then
let () = stop_waiting := true in
let () = TQueue.broadcast queue in
Worker.kill proc
else
let () = Unix.sleep 1 in
kill_if ()
in
let kill_if () =
try kill_if ()
with Sys.Break ->
let () = stop_waiting := true in
let () = TQueue.broadcast queue in
Worker.kill proc
in
let _ = Thread.create kill_if () in
try while true do
report_status ~id "Idle";
let task = pick_task () in
match T.request_of_task !worker_age task with
| None -> stm_prerr_endline ("Task expired: " ^ T.name_of_task task)
| Some req ->
try
get_exec_token ();
marshal_request oc (Request req);
let rec continue () =
match unmarshal_response ic with
| RespGetCounterNewUnivLevel ->
marshal_more_data oc (MoreDataUnivLevel (more_univs 10));
continue ()
| RespFeedback fbk -> T.forward_feedback fbk; continue ()
| Response resp ->
match T.use_response !worker_age task resp with
| `End -> raise Die
| `Stay(competence, new_tasks) ->
last_task := None;
giveback_exec_token ();
worker_age := `Old competence;
add_tasks new_tasks
in
continue ()
with
| (Sys_error _|Invalid_argument _|End_of_file|Die) as e ->
raise e (* we pass the exception to the external handler *)
| MarshalError s -> T.on_marshal_error s task; raise Die
| e ->
stm_pr_err Pp.(seq [str "Uncaught exception in worker manager: "; print e]);
flush_all (); raise Die
done with
| (Die | TQueue.BeingDestroyed) ->
giveback_exec_token (); kill proc; exit ()
| Sys_error _ | Invalid_argument _ | End_of_file ->
T.on_task_cancellation_or_expiration_or_slave_death !last_task;
giveback_exec_token (); kill proc; exit ()
end
module Pool = WorkerPool.Make(Model)
type queue = {
active : Pool.pool;
queue : (T.task * expiration) TQueue.t;
cleaner : Thread.t;
}
let create size =
let cleaner queue =
while true do
try ignore(TQueue.pop ~picky:(fun (_,cancelled) -> !cancelled) queue)
with TQueue.BeingDestroyed -> Thread.exit ()
done in
let queue = TQueue.create () in
{
active = Pool.create queue ~size;
queue;
cleaner = Thread.create cleaner queue;
}
let destroy { active; queue } =
Pool.destroy active;
TQueue.destroy queue
let broadcast { queue } = TQueue.broadcast queue
let enqueue_task { queue; active } (t, _ as item) =
stm_prerr_endline ("Enqueue task "^T.name_of_task t);
TQueue.push queue item
let cancel_worker { active } n = Pool.cancel n active
let name_of_request (Request r) = T.name_of_request r
let set_order { queue } cmp =
TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2)
let join { queue; active } =
if not (Pool.is_empty active) then
TQueue.wait_until_n_are_waiting_and_queue_empty
(Pool.n_workers active + 1(*cleaner*))
queue
let cancel_all { queue; active } =
TQueue.clear queue;
Pool.cancel_all active
let slave_ic = ref None
let slave_oc = ref None
let init_stdout () =
let ic, oc = Spawned.get_channels () in
slave_oc := Some oc; slave_ic := Some ic
let bufferize f =
let l = ref [] in
fun () ->
match !l with
| [] -> let data = f () in l := List.tl data; List.hd data
| x::tl -> l := tl; x
let slave_handshake () =
Pool.worker_handshake (Option.get !slave_ic) (Option.get !slave_oc)
let pp_pid pp = Pp.(str (System.process_id () ^ " ") ++ pp)
let debug_with_pid = Feedback.(function
| { contents = Message(Debug, loc, pp) } as fb ->
{ fb with contents = Message(Debug,loc, pp_pid pp) }
| x -> x)
let main_loop () =
(* We pass feedback to master *)
let slave_feeder oc fb =
Marshal.to_channel oc (RespFeedback (debug_with_pid fb)) []; flush oc in
ignore (Feedback.add_feeder (fun x -> slave_feeder (Option.get !slave_oc) x));
(* We ask master to allocate universe identifiers *)
Universes.set_remote_new_univ_level (bufferize (fun () ->
marshal_response (Option.get !slave_oc) RespGetCounterNewUnivLevel;
match unmarshal_more_data (Option.get !slave_ic) with
| MoreDataUnivLevel l -> l));
let working = ref false in
slave_handshake ();
while true do
try
working := false;
let request = unmarshal_request (Option.get !slave_ic) in
working := true;
report_status (name_of_request request);
let response = slave_respond request in
report_status "Idle";
marshal_response (Option.get !slave_oc) response;
CEphemeron.clear ()
with
| MarshalError s ->
stm_pr_err Pp.(prlist str ["Fatal marshal error: "; s]); flush_all (); exit 2
| End_of_file ->
stm_prerr_endline "connection lost"; flush_all (); exit 2
| e ->
stm_pr_err Pp.(seq [str "Slave: critical exception: "; print e]);
flush_all (); exit 1
done
let clear { queue; active } =
assert(Pool.is_empty active); (* We allow that only if no slaves *)
TQueue.clear queue
let snapshot { queue; active } =
List.map fst
(TQueue.wait_until_n_are_waiting_then_snapshot
(Pool.n_workers active) queue)
let with_n_workers n f =
let q = create n in
try let rc = f q in destroy q; rc
with e -> let e = CErrors.push e in destroy q; iraise e
let n_workers { active } = Pool.n_workers active
end
module MakeQueue(T : Task) = struct include Make(T) end
module MakeWorker(T : Task) = struct include Make(T) end
|