aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm/asyncTaskQueue.ml
blob: 74e83a39fdd724c1e5fc05a91d0da19fb384fe8c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
(************************************************************************)
(*  v      *   The Coq Proof Assistant  /  The Coq Development Team     *)
(* <O___,, *   INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2014     *)
(*   \VV/  **************************************************************)
(*    //   *      This file is distributed under the terms of the       *)
(*         *       GNU Lesser General Public License Version 2.1        *)
(************************************************************************)

open Errors
open Pp
open Util

let pr_err s = Printf.eprintf "%s] %s\n" (System.process_id ()) s; flush stderr

let prerr_endline s = if !Flags.debug then begin pr_err s end else ()

module type Task = sig

  type task

  (* Marshallable *)
  type request
  type response

  val name : string (* UID of the task kind, for -toploop *)
  val extra_env : unit -> string array

  (* run by the master, on a thread *)
  val request_of_task : [ `Fresh | `Old ] -> task -> request option
  val use_response : task -> response -> [ `Stay | `StayReset ]
  val on_marshal_error : string -> task -> unit
  val on_slave_death : task option -> [ `Exit of int | `Stay ]
  val on_task_cancellation_or_expiration : task option -> unit
  val forward_feedback : Stateid.t -> Feedback.feedback_content -> unit
 
  (* run by the worker *)
  val perform : request -> response

  (* debugging *)
  val name_of_task : task -> string
  val name_of_request : request -> string

end

type cancel_switch = bool ref

module Make(T : Task) = struct

  module Worker = Spawn.Sync(struct 
    let add_timeout ~sec f =
      ignore(Thread.create (fun () ->
        while true do
          Unix.sleep sec;
          if not (f ()) then Thread.exit ()
        done)
      ())
  end)
  
  module WorkersPool = WorkerPool.Make(Worker)

  let queue : (T.task * cancel_switch) TQueue.t = TQueue.create ()

  let enqueue_task t c =
    prerr_endline ("Enqueue task "^T.name_of_task t);
    TQueue.push queue (t,c)

  let cancel_worker = WorkersPool.cancel

  type request = Request of T.request

  let name_of_request (Request r) = T.name_of_request r

  type response =
    | Response of T.response
    | RespFeedback of Feedback.feedback
    | RespGetCounterNewUnivLevel

  type more_data =
    | MoreDataUnivLevel of Univ.universe_level list
 
  let request_cswitch_of_task (t, c) = T.request_of_task t, c
  
  let slave_respond msg = match msg with Request r -> Response (T.perform r)

  exception MarshalError of string

  let marshal_to_channel oc data =
    Marshal.to_channel oc data [];
    flush oc

  let marshal_err s = raise (MarshalError s)

  let marshal_request oc (req : request) =
    try marshal_to_channel oc req
    with Failure s | Invalid_argument s | Sys_error s ->
      marshal_err ("marshal_request: "^s)

  let unmarshal_request ic =
    try (Marshal.from_channel ic : request)
    with Failure s | Invalid_argument s | Sys_error s ->
      marshal_err ("unmarshal_request: "^s)

  let marshal_response oc (res : response) =
    try marshal_to_channel oc res
    with Failure s | Invalid_argument s | Sys_error s ->
      marshal_err ("marshal_response: "^s)
  
  let unmarshal_response ic =
    try (CThread.thread_friendly_input_value ic : response)
    with Failure s | Invalid_argument s | Sys_error s ->
      marshal_err ("unmarshal_response: "^s)

  let marshal_more_data oc (res : more_data) =
    try marshal_to_channel oc res
    with Failure s | Invalid_argument s | Sys_error s ->
      marshal_err ("marshal_more_data: "^s)
  
  let unmarshal_more_data ic =
    try (Marshal.from_channel ic : more_data)
    with Failure s | Invalid_argument s | Sys_error s ->
      marshal_err ("unmarshal_more_data: "^s)

  let set_order cmp = TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2)

  let join () =
    if not (WorkersPool.is_empty ()) then
      TQueue.wait_until_n_are_waiting_and_queue_empty
        (WorkersPool.n_workers ()) queue
  let cancel_all () =
    TQueue.clear queue;
    WorkersPool.cancel_all ()

  exception KillRespawn
  exception Die
  exception Expired
    
  let report_status ?(id = !Flags.async_proofs_worker_id) s =
    Pp.feedback ~state_id:Stateid.initial (Feedback.SlaveStatus(id, s))

  let rec manage_slave ~cancel:cancel_user_req ~die id respawn =
    let ic, oc, proc =
      let rec set_slave_opt = function
        | [] -> !Flags.async_proofs_flags_for_workers @
                ["-toploop"; T.name^"top";
                 "-worker-id"; id;
                 "-async-proofs-worker-priority";
                   Flags.string_of_priority !Flags.async_proofs_worker_priority]
        | ("-ideslave"|"-emacs"|"-emacs-U"|"-batch")::tl -> set_slave_opt tl
        | ("-async-proofs" |"-toploop" |"-vi2vo" |"-compile"
          |"-load-vernac-source" |"-compile-verbose"
          |"-async-proofs-worker-priority" |"-worker-id") :: _ :: tl ->
          set_slave_opt tl
        | x::tl -> x :: set_slave_opt tl in
      let args =
        Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in
      let env = Array.append (T.extra_env ()) (Unix.environment ()) in
      respawn ~args ~env () in
    let last_task = ref None in
    let task_expired = ref false in
    let task_cancelled = ref false in
    let worker_age = ref `Fresh in
    let got_token = ref false in
    let giveback_token () =
      if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in
    CThread.prepare_in_channel_for_thread_friendly_io ic;
    try
      while not !die do
        prerr_endline "waiting for a task";
        report_status ~id "Idle";
        let task, cancel_switch = TQueue.pop queue in
        prerr_endline ("got task: "^T.name_of_task task);
        last_task := Some task;
        begin try
          let req = T.request_of_task !worker_age task in
          if req = None then raise Expired;
          ignore(CoqworkmgrApi.get 1); got_token := true;
          prerr_endline ("got execution token");
          marshal_request oc (Request (Option.get req));
          Worker.kill_if proc ~sec:1 (fun () ->
            task_expired := !cancel_switch;
            task_cancelled := !cancel_user_req;
            if !cancel_user_req then cancel_user_req := false;
            !task_expired || !task_cancelled || !die);
          let rec loop () =
            let response = unmarshal_response ic in
            match response with
            | Response resp ->
                (match T.use_response task resp with
                | `Stay ->
                     last_task := None; worker_age := `Old; giveback_token ()
                | `StayReset -> last_task := None; raise KillRespawn)
            | RespGetCounterNewUnivLevel ->
                marshal_more_data oc (MoreDataUnivLevel
                  (CList.init 10 (fun _ ->
                     Universes.new_univ_level (Global.current_dirpath ()))));
                loop ()
            | RespFeedback ({ Feedback.id = Feedback.State state_id } as fbk) ->
                T.forward_feedback state_id fbk.Feedback.content;
                loop ()
            | RespFeedback _ -> Errors.anomaly (str"Parsing in master process")
          in
            loop ()
        with
        | Expired -> prerr_endline ("Task expired: " ^ T.name_of_task task)
        | (Sys_error _|Invalid_argument _|End_of_file|KillRespawn) as e ->
            raise e (* we pass the exception to the external handler *)
        | MarshalError s -> T.on_marshal_error s task
        | e ->
            pr_err ("Uncaught exception in worker manager: "^
              string_of_ppcmds (print e));
            flush_all ()
        end;
      done;
      raise Die
    with
    | KillRespawn ->
        giveback_token ();
        Worker.kill proc; ignore(Worker.wait proc);
        manage_slave ~cancel:cancel_user_req ~die id respawn
    | (Die | TQueue.BeingDestroyed) ->
        giveback_token ();
        Worker.kill proc;ignore(Worker.wait proc)
    | Sys_error _ | Invalid_argument _ | End_of_file when !task_expired ->
        giveback_token ();
        T.on_task_cancellation_or_expiration !last_task;
        ignore(Worker.wait proc);
        manage_slave ~cancel:cancel_user_req ~die id respawn
    | Sys_error _ | Invalid_argument _ | End_of_file when !task_cancelled ->
        giveback_token ();
        msg_warning(strbrk "The worker was cancelled.");
        T.on_task_cancellation_or_expiration !last_task;
        Worker.kill proc; ignore(Worker.wait proc);
        manage_slave ~cancel:cancel_user_req ~die id respawn
    | Sys_error _ | Invalid_argument _ | End_of_file ->
        giveback_token ();
        match T.on_slave_death !last_task with
        | `Stay -> 
            msg_warning(strbrk "The worker process died badly.");
            Worker.kill proc; ignore(Worker.wait proc);
            manage_slave ~cancel:cancel_user_req ~die id respawn
        | `Exit exit_code ->
            Worker.kill proc;
            let exit_status proc = match Worker.wait proc with
              | Unix.WEXITED 0x400 -> "exit code unavailable"
              | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i
              | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno
              | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno in
            pr_err ("Fatal worker error: " ^ (exit_status proc));
            flush_all (); exit exit_code

  let slave_ic = ref stdin
  let slave_oc = ref stdout

  let slave_init_stdout () =
    let ic, oc = Spawned.get_channels () in
    slave_oc := oc; slave_ic := ic

  let bufferize f =
    let l = ref [] in
    fun () ->
      match !l with
      | [] -> let data = f () in l := List.tl data; List.hd data
      | x::tl -> l := tl; x

  let slave_handshake () = WorkersPool.worker_handshake !slave_ic !slave_oc

  let slave_main_loop reset =
    let feedback_queue = ref [] in
    let slave_feeder oc fb =
      match fb.Feedback.content with
      | Feedback.Goals _ -> feedback_queue := fb :: !feedback_queue
      | _ -> Marshal.to_channel oc (RespFeedback fb) []; flush oc in
    let flush_feeder oc =
      List.iter (fun fb -> Marshal.to_channel oc (RespFeedback fb) [])
        !feedback_queue;
      feedback_queue := [];
      flush oc in
    Pp.set_feeder (slave_feeder !slave_oc);
    Pp.log_via_feedback ();
    Universes.set_remote_new_univ_level (bufferize (fun () ->
      marshal_response !slave_oc RespGetCounterNewUnivLevel;
      match unmarshal_more_data !slave_ic with
      | MoreDataUnivLevel l -> l));
    let working = ref false in
    slave_handshake ();
    while true do
      try
        working := false;
        let request = unmarshal_request !slave_ic in
        working := true;
        report_status (name_of_request request);
        let response = slave_respond request in
        flush_feeder !slave_oc;
        report_status "Idle";
        marshal_response !slave_oc response;
        reset ()
      with
      | MarshalError s ->
        pr_err ("Fatal marshal error: " ^ s); flush_all (); exit 2
      | End_of_file ->
        prerr_endline "connection lost"; flush_all (); exit 2
      | e ->
        pr_err ("Slave: critical exception: " ^ Pp.string_of_ppcmds (print e));
        flush_all (); exit 1
    done

  let dump () =
    assert(WorkersPool.is_empty ()); (* ATM, we allow that only if no slaves *)
    List.map fst (TQueue.dump queue)

  let init n =
    WorkersPool.init n manage_slave
      (fun n -> Printf.sprintf "%s:%d" T.name n)
  let destroy () =
    WorkersPool.destroy ();
    TQueue.destroy queue

  let with_n_workers n f =
    try init n; let rc = f ~join ~cancel_all in destroy (); rc
    with e -> let e = Errors.push e in destroy (); raise e

  let n_workers = WorkersPool.n_workers

end