From cf6b12cb3a88fb3af6a7b3e91d17db8b06d23c81 Mon Sep 17 00:00:00 2001 From: Enrico Tassi Date: Mon, 1 Sep 2014 14:54:49 +0200 Subject: coqworkmgr --- tools/coqworkmgr.ml | 222 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 tools/coqworkmgr.ml (limited to 'tools/coqworkmgr.ml') diff --git a/tools/coqworkmgr.ml b/tools/coqworkmgr.ml new file mode 100644 index 000000000..23f452166 --- /dev/null +++ b/tools/coqworkmgr.ml @@ -0,0 +1,222 @@ +(************************************************************************) +(* v * The Coq Proof Assistant / The Coq Development Team *) +(* + s, string_of_inet_addr host ^":"^ string_of_int port + | _ -> assert false + +module Queue : sig + type t + val is_empty : t -> bool + val push : int * party -> t -> unit + val pop : t -> int * party + val create : unit -> t +end = struct + type t = (int * party) list ref + let create () = ref [] + let is_empty q = !q = [] + let rec split acc = function + | [] -> List.rev acc, [] + | (_, { priority = Flags.Low }) :: _ as l -> List.rev acc, l + | x :: xs -> split (x :: acc) xs + let push (_,{ priority } as item) q = + if priority = Flags.Low then q := !q @ [item] + else + let high, low = split [] !q in + q := high @ (item :: low) + let pop q = match !q with x :: xs -> q := xs; x | _ -> assert false +end + +let read_fd fd s ~off ~len = + let rec loop () = + try Unix.read fd s off len + with Unix.Unix_error(Unix.EAGAIN,_,_) -> loop () + in + loop () + +let really_read_fd fd s off len = + let i = ref 0 in + while !i < len do + let off = off + !i in + let len = len - !i in + let r = read_fd fd s ~off ~len in + if r = 0 then raise End_of_file; + i := !i + r + done + +let raw_input_line fd = + try + let b = Buffer.create 80 in + let s = String.make 1 '\000' in + while s <> "\n" do + really_read_fd fd s 0 1; + if s <> "\n" && s <> "\r" then Buffer.add_string b s; + done; + Buffer.contents b + with Unix.Unix_error _ -> raise End_of_file + +let accept s = + let cs, _ = Unix.accept s in + let cout = Unix.out_channel_of_descr cs in + set_binary_mode_out cout true; + match parse_request (raw_input_line cs) with + | Hello p -> { sock=cs; cout; tokens=0; priority=p } + | _ -> (try Unix.close cs with _ -> ()); raise End_of_file + +let parse s = () + +let parties = ref [] + +let max_tokens = ref 2 +let cur_tokens = ref 0 + +let queue = Queue.create () + +let rec allocate n party = + let extra = min n (!max_tokens - !cur_tokens) in + cur_tokens := !cur_tokens + extra; + party.tokens <- party.tokens + extra; + answer party (Tokens extra) + +and de_allocate n party = + let back = min party.tokens n in + party.tokens <- party.tokens - back; + cur_tokens := min (!cur_tokens - back) !max_tokens; + eventually_dequeue () + +and eventually_dequeue () = + if Queue.is_empty queue || !cur_tokens >= !max_tokens then () + else + let req, party = Queue.pop queue in + if List.exists (fun { sock } -> sock = party.sock) !parties + then allocate req party + else eventually_dequeue () + +let chat s = + let party = + try List.find (fun { sock } -> sock = s) !parties + with Not_found -> Printf.eprintf "Internal error"; exit 1 in + try + match parse_request (raw_input_line party.sock) with + | Get n -> + if !cur_tokens < !max_tokens then allocate n party + else Queue.push (n,party) queue + | TryGet n -> + if !cur_tokens < !max_tokens then allocate n party + else answer party Noluck + | GiveBack n -> de_allocate n party + | Ping -> + answer party (Pong (!cur_tokens,!max_tokens,Unix.getpid ())); + raise End_of_file + | Hello _ -> raise End_of_file + with Failure _ | ParseError | Sys_error _ | End_of_file -> + (try Unix.close party.sock with _ -> ()); + parties := List.filter (fun { sock } -> sock <> s) !parties; + de_allocate party.tokens party; + eventually_dequeue () + +let check_alive s = + match CoqworkmgrApi.connect s with + | Some s -> + let cout = Unix.out_channel_of_descr s in + set_binary_mode_out cout true; + output_string cout (print_request (Hello Flags.Low)); flush cout; + output_string cout (print_request Ping); flush cout; + begin match Unix.select [s] [] [] 1.0 with + | [s],_,_ -> + let cin = Unix.in_channel_of_descr s in + set_binary_mode_in cin true; + begin match parse_response (input_line cin) with + | Pong (n,m,pid) -> n, m, pid + | _ -> raise Not_found + end + | _ -> raise Not_found + end + | _ -> raise Not_found + +let main () = + let args = [ + "-j",Arg.Set_int max_tokens, "max number of concurrent jobs"; + "-d",Arg.Set debug, "do not detach (debug)"] in + let usage = + "Prints on stdout an env variable assignement to be picked up by coq\n"^ + "instances in order to limit the maximum number of concurrent workers.\n"^ + "The default value is 2.\n"^ + "Usage:" in + Arg.parse args (fun extra -> + Arg.usage args ("Unexpected argument "^extra^".\n"^usage)) + usage; + try + let sock = Sys.getenv "COQWORKMGR_SOCK" in + if !debug then Printf.eprintf "Contacting %s\n%!" sock; + let cur, max, pid = check_alive sock in + Printf.printf "COQWORKMGR_SOCK=%s\n%!" sock; + Printf.eprintf + "coqworkmgr already up and running (pid=%d, socket=%s, j=%d/%d)\n%!" + pid sock cur max; + exit 0 + with Not_found | Failure _ | Invalid_argument _ | Unix.Unix_error _ -> + if !debug then Printf.eprintf "No running instance. Starting a new one\n%!"; + let master, str = mk_socket_channel () in + if not !debug then begin + let pid = Unix.fork () in + if pid <> 0 then begin + Printf.printf "COQWORKMGR_SOCK=%s\n%!" str; + exit 0 + end else begin + ignore(Unix.setsid ()); + Unix.close Unix.stdin; + Unix.close Unix.stdout; + end; + end else begin + Printf.printf "COQWORKMGR_SOCK=%s\n%!" str; + end; + Sys.catch_break true; + try + while true do + if !debug then + Printf.eprintf "Status: #parties=%d tokens=%d/%d \n%!" + (List.length !parties) !cur_tokens !max_tokens; + let socks = master :: List.map (fun { sock } -> sock) !parties in + let r, _, _ = Unix.select socks [] [] (-1.0) in + List.iter (fun s -> + if s = master then begin + try parties := accept master :: !parties + with _ -> () + end else chat s) + r + done; + exit 0 + with Sys.Break -> + if !parties <> [] then begin + Printf.eprintf "Some coq processes still need me\n%!"; + exit 1; + end else + exit 0 + +let () = main () -- cgit v1.2.3