aboutsummaryrefslogtreecommitdiffhomepage
path: root/tools
diff options
context:
space:
mode:
authorGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-09-01 14:54:49 +0200
committerGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-09-02 11:29:42 +0200
commitcf6b12cb3a88fb3af6a7b3e91d17db8b06d23c81 (patch)
tree46ec306afd1ebf29b735e7f6679c8e1b8d9c5679 /tools
parent7befcc7ea63ea4bd6e45e6f4b8ec01a69b586cc7 (diff)
coqworkmgr
Diffstat (limited to 'tools')
-rw-r--r--tools/coqworkmgr.ml222
1 files changed, 222 insertions, 0 deletions
diff --git a/tools/coqworkmgr.ml b/tools/coqworkmgr.ml
new file mode 100644
index 000000000..23f452166
--- /dev/null
+++ b/tools/coqworkmgr.ml
@@ -0,0 +1,222 @@
+(************************************************************************)
+(* v * The Coq Proof Assistant / The Coq Development Team *)
+(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2014 *)
+(* \VV/ **************************************************************)
+(* // * This file is distributed under the terms of the *)
+(* * GNU Lesser General Public License Version 2.1 *)
+(************************************************************************)
+
+open CoqworkmgrApi
+
+let debug = ref false
+
+type party = {
+ sock : Unix.file_descr;
+ cout : out_channel;
+ mutable tokens : int;
+ priority : Flags.priority;
+}
+
+let answer party msg =
+ output_string party.cout (print_response msg); flush party.cout
+
+let mk_socket_channel () =
+ let open Unix in
+ let s = socket PF_INET SOCK_STREAM 0 in
+ bind s (ADDR_INET (inet_addr_loopback,0));
+ listen s 1;
+ match getsockname s with
+ | ADDR_INET(host, port) ->
+ s, string_of_inet_addr host ^":"^ string_of_int port
+ | _ -> assert false
+
+module Queue : sig
+ type t
+ val is_empty : t -> bool
+ val push : int * party -> t -> unit
+ val pop : t -> int * party
+ val create : unit -> t
+end = struct
+ type t = (int * party) list ref
+ let create () = ref []
+ let is_empty q = !q = []
+ let rec split acc = function
+ | [] -> List.rev acc, []
+ | (_, { priority = Flags.Low }) :: _ as l -> List.rev acc, l
+ | x :: xs -> split (x :: acc) xs
+ let push (_,{ priority } as item) q =
+ if priority = Flags.Low then q := !q @ [item]
+ else
+ let high, low = split [] !q in
+ q := high @ (item :: low)
+ let pop q = match !q with x :: xs -> q := xs; x | _ -> assert false
+end
+
+let read_fd fd s ~off ~len =
+ let rec loop () =
+ try Unix.read fd s off len
+ with Unix.Unix_error(Unix.EAGAIN,_,_) -> loop ()
+ in
+ loop ()
+
+let really_read_fd fd s off len =
+ let i = ref 0 in
+ while !i < len do
+ let off = off + !i in
+ let len = len - !i in
+ let r = read_fd fd s ~off ~len in
+ if r = 0 then raise End_of_file;
+ i := !i + r
+ done
+
+let raw_input_line fd =
+ try
+ let b = Buffer.create 80 in
+ let s = String.make 1 '\000' in
+ while s <> "\n" do
+ really_read_fd fd s 0 1;
+ if s <> "\n" && s <> "\r" then Buffer.add_string b s;
+ done;
+ Buffer.contents b
+ with Unix.Unix_error _ -> raise End_of_file
+
+let accept s =
+ let cs, _ = Unix.accept s in
+ let cout = Unix.out_channel_of_descr cs in
+ set_binary_mode_out cout true;
+ match parse_request (raw_input_line cs) with
+ | Hello p -> { sock=cs; cout; tokens=0; priority=p }
+ | _ -> (try Unix.close cs with _ -> ()); raise End_of_file
+
+let parse s = ()
+
+let parties = ref []
+
+let max_tokens = ref 2
+let cur_tokens = ref 0
+
+let queue = Queue.create ()
+
+let rec allocate n party =
+ let extra = min n (!max_tokens - !cur_tokens) in
+ cur_tokens := !cur_tokens + extra;
+ party.tokens <- party.tokens + extra;
+ answer party (Tokens extra)
+
+and de_allocate n party =
+ let back = min party.tokens n in
+ party.tokens <- party.tokens - back;
+ cur_tokens := min (!cur_tokens - back) !max_tokens;
+ eventually_dequeue ()
+
+and eventually_dequeue () =
+ if Queue.is_empty queue || !cur_tokens >= !max_tokens then ()
+ else
+ let req, party = Queue.pop queue in
+ if List.exists (fun { sock } -> sock = party.sock) !parties
+ then allocate req party
+ else eventually_dequeue ()
+
+let chat s =
+ let party =
+ try List.find (fun { sock } -> sock = s) !parties
+ with Not_found -> Printf.eprintf "Internal error"; exit 1 in
+ try
+ match parse_request (raw_input_line party.sock) with
+ | Get n ->
+ if !cur_tokens < !max_tokens then allocate n party
+ else Queue.push (n,party) queue
+ | TryGet n ->
+ if !cur_tokens < !max_tokens then allocate n party
+ else answer party Noluck
+ | GiveBack n -> de_allocate n party
+ | Ping ->
+ answer party (Pong (!cur_tokens,!max_tokens,Unix.getpid ()));
+ raise End_of_file
+ | Hello _ -> raise End_of_file
+ with Failure _ | ParseError | Sys_error _ | End_of_file ->
+ (try Unix.close party.sock with _ -> ());
+ parties := List.filter (fun { sock } -> sock <> s) !parties;
+ de_allocate party.tokens party;
+ eventually_dequeue ()
+
+let check_alive s =
+ match CoqworkmgrApi.connect s with
+ | Some s ->
+ let cout = Unix.out_channel_of_descr s in
+ set_binary_mode_out cout true;
+ output_string cout (print_request (Hello Flags.Low)); flush cout;
+ output_string cout (print_request Ping); flush cout;
+ begin match Unix.select [s] [] [] 1.0 with
+ | [s],_,_ ->
+ let cin = Unix.in_channel_of_descr s in
+ set_binary_mode_in cin true;
+ begin match parse_response (input_line cin) with
+ | Pong (n,m,pid) -> n, m, pid
+ | _ -> raise Not_found
+ end
+ | _ -> raise Not_found
+ end
+ | _ -> raise Not_found
+
+let main () =
+ let args = [
+ "-j",Arg.Set_int max_tokens, "max number of concurrent jobs";
+ "-d",Arg.Set debug, "do not detach (debug)"] in
+ let usage =
+ "Prints on stdout an env variable assignement to be picked up by coq\n"^
+ "instances in order to limit the maximum number of concurrent workers.\n"^
+ "The default value is 2.\n"^
+ "Usage:" in
+ Arg.parse args (fun extra ->
+ Arg.usage args ("Unexpected argument "^extra^".\n"^usage))
+ usage;
+ try
+ let sock = Sys.getenv "COQWORKMGR_SOCK" in
+ if !debug then Printf.eprintf "Contacting %s\n%!" sock;
+ let cur, max, pid = check_alive sock in
+ Printf.printf "COQWORKMGR_SOCK=%s\n%!" sock;
+ Printf.eprintf
+ "coqworkmgr already up and running (pid=%d, socket=%s, j=%d/%d)\n%!"
+ pid sock cur max;
+ exit 0
+ with Not_found | Failure _ | Invalid_argument _ | Unix.Unix_error _ ->
+ if !debug then Printf.eprintf "No running instance. Starting a new one\n%!";
+ let master, str = mk_socket_channel () in
+ if not !debug then begin
+ let pid = Unix.fork () in
+ if pid <> 0 then begin
+ Printf.printf "COQWORKMGR_SOCK=%s\n%!" str;
+ exit 0
+ end else begin
+ ignore(Unix.setsid ());
+ Unix.close Unix.stdin;
+ Unix.close Unix.stdout;
+ end;
+ end else begin
+ Printf.printf "COQWORKMGR_SOCK=%s\n%!" str;
+ end;
+ Sys.catch_break true;
+ try
+ while true do
+ if !debug then
+ Printf.eprintf "Status: #parties=%d tokens=%d/%d \n%!"
+ (List.length !parties) !cur_tokens !max_tokens;
+ let socks = master :: List.map (fun { sock } -> sock) !parties in
+ let r, _, _ = Unix.select socks [] [] (-1.0) in
+ List.iter (fun s ->
+ if s = master then begin
+ try parties := accept master :: !parties
+ with _ -> ()
+ end else chat s)
+ r
+ done;
+ exit 0
+ with Sys.Break ->
+ if !parties <> [] then begin
+ Printf.eprintf "Some coq processes still need me\n%!";
+ exit 1;
+ end else
+ exit 0
+
+let () = main ()