diff options
author | Enrico Tassi <Enrico.Tassi@inria.fr> | 2014-12-11 14:27:46 +0100 |
---|---|---|
committer | Enrico Tassi <Enrico.Tassi@inria.fr> | 2014-12-17 15:05:04 +0100 |
commit | 2d66c7d508f6bd198969012241082e34a5b6047c (patch) | |
tree | 33b632f3118c8166a05211aa5a05375268fb78a3 | |
parent | 433f0bc4e5110db09c770022b277c419f8b35f64 (diff) |
CThread: use a different type for thread friendly in_channels
-rw-r--r-- | ide/ide_slave.ml | 1 | ||||
-rw-r--r-- | lib/cThread.ml | 4 | ||||
-rw-r--r-- | lib/cThread.mli | 12 | ||||
-rw-r--r-- | stm/asyncTaskQueue.ml | 27 | ||||
-rw-r--r-- | stm/spawned.ml | 4 | ||||
-rw-r--r-- | stm/spawned.mli | 2 | ||||
-rw-r--r-- | stm/workerPool.ml | 2 |
7 files changed, 29 insertions, 23 deletions
diff --git a/ide/ide_slave.ml b/ide/ide_slave.ml index 3d2676f14..9d636a8b2 100644 --- a/ide/ide_slave.ml +++ b/ide/ide_slave.ml @@ -450,7 +450,6 @@ let loop () = catch_break := false; let in_ch, out_ch = Spawned.get_channels () in let xml_oc = Xml_printer.make (Xml_printer.TChannel out_ch) in - CThread.prepare_in_channel_for_thread_friendly_io in_ch; let in_lb = Lexing.from_function (fun s len -> CThread.thread_friendly_read in_ch s ~off:0 ~len) in let xml_ic = Xml_parser.make (Xml_parser.SLexbuf in_lb) in diff --git a/lib/cThread.ml b/lib/cThread.ml index 19f14637d..7a3f15200 100644 --- a/lib/cThread.ml +++ b/lib/cThread.ml @@ -6,8 +6,10 @@ (* * GNU Lesser General Public License Version 2.1 *) (************************************************************************) +type thread_ic = in_channel + let prepare_in_channel_for_thread_friendly_io ic = - Unix.set_nonblock (Unix.descr_of_in_channel ic) + Unix.set_nonblock (Unix.descr_of_in_channel ic); ic let safe_wait_timed_read fd time = try Thread.wait_timed_read fd time diff --git a/lib/cThread.mli b/lib/cThread.mli index ada492dd4..d68a392d1 100644 --- a/lib/cThread.mli +++ b/lib/cThread.mli @@ -14,11 +14,13 @@ * an unbounded wait has the same problem. *) (* Use only the following functions on the channel *) -val prepare_in_channel_for_thread_friendly_io : in_channel -> unit -val thread_friendly_input_value : in_channel -> 'a +type thread_ic +val prepare_in_channel_for_thread_friendly_io : in_channel -> thread_ic + +val thread_friendly_input_value : thread_ic -> 'a val thread_friendly_read : - in_channel -> string -> off:int -> len:int -> int + thread_ic -> string -> off:int -> len:int -> int val thread_friendly_really_read : - in_channel -> string -> off:int -> len:int -> unit -val thread_friendly_really_read_line : in_channel -> string + thread_ic -> string -> off:int -> len:int -> unit +val thread_friendly_really_read_line : thread_ic -> string diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml index a527675f6..662152e45 100644 --- a/stm/asyncTaskQueue.ml +++ b/stm/asyncTaskQueue.ml @@ -82,7 +82,7 @@ module Make(T : Task) = struct marshal_err ("marshal_request: "^s) let unmarshal_request ic = - try (Marshal.from_channel ic : request) + try (CThread.thread_friendly_input_value ic : request) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_request: "^s) @@ -102,7 +102,7 @@ module Make(T : Task) = struct marshal_err ("marshal_more_data: "^s) let unmarshal_more_data ic = - try (Marshal.from_channel ic : more_data) + try (CThread.thread_friendly_input_value ic : more_data) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_more_data: "^s) @@ -281,8 +281,8 @@ module Make(T : Task) = struct TQueue.push queue (t,c) let cancel_worker { active; parking } n = - Pool.cancel active n; - Pool.cancel parking n + Pool.cancel n active; + Pool.cancel n parking let name_of_request (Request r) = T.name_of_request r @@ -300,12 +300,12 @@ module Make(T : Task) = struct Pool.cancel_all active; Pool.cancel_all parking - let slave_ic = ref stdin - let slave_oc = ref stdout + let slave_ic = ref None + let slave_oc = ref None let init_stdout () = let ic, oc = Spawned.get_channels () in - slave_oc := oc; slave_ic := ic + slave_oc := Some oc; slave_ic := Some ic let bufferize f = let l = ref [] in @@ -314,28 +314,29 @@ module Make(T : Task) = struct | [] -> let data = f () in l := List.tl data; List.hd data | x::tl -> l := tl; x - let slave_handshake () = Pool.worker_handshake !slave_ic !slave_oc + let slave_handshake () = + Pool.worker_handshake (Option.get !slave_ic) (Option.get !slave_oc) let main_loop () = let slave_feeder oc fb = Marshal.to_channel oc (RespFeedback fb) []; flush oc in - Pp.set_feeder (slave_feeder !slave_oc); + Pp.set_feeder (fun x -> slave_feeder (Option.get !slave_oc) x); Pp.log_via_feedback (); Universes.set_remote_new_univ_level (bufferize (fun () -> - marshal_response !slave_oc RespGetCounterNewUnivLevel; - match unmarshal_more_data !slave_ic with + marshal_response (Option.get !slave_oc) RespGetCounterNewUnivLevel; + match unmarshal_more_data (Option.get !slave_ic) with | MoreDataUnivLevel l -> l)); let working = ref false in slave_handshake (); while true do try working := false; - let request = unmarshal_request !slave_ic in + let request = unmarshal_request (Option.get !slave_ic) in working := true; report_status (name_of_request request); let response = slave_respond request in report_status "Idle"; - marshal_response !slave_oc response; + marshal_response (Option.get !slave_oc) response; Ephemeron.clear () with | MarshalError s -> diff --git a/stm/spawned.ml b/stm/spawned.ml index d02594569..21902f71d 100644 --- a/stm/spawned.ml +++ b/stm/spawned.ml @@ -31,6 +31,7 @@ let open_bin_connection h p = let cin, cout = open_connection (ADDR_INET (inet_addr_of_string h,p)) in set_binary_mode_in cin true; set_binary_mode_out cout true; + let cin = CThread.prepare_in_channel_for_thread_friendly_io cin in cin, cout let controller h p = @@ -39,7 +40,7 @@ let controller h p = let ic, oc = open_bin_connection h p in let rec loop () = try - match input_value ic with + match CThread.thread_friendly_input_value ic with | Hello _ -> prerr_endline "internal protocol error"; exit 1 | ReqDie -> prerr_endline "death sentence received"; exit 0 | ReqStats -> @@ -68,6 +69,7 @@ let init_channels () = Unix.dup2 Unix.stderr Unix.stdout; set_binary_mode_in stdin true; set_binary_mode_out stdout true; + let stdin = CThread.prepare_in_channel_for_thread_friendly_io stdin in channels := Some (stdin, stdout); in match !control_channel with diff --git a/stm/spawned.mli b/stm/spawned.mli index 18b88dc64..d7b3243c2 100644 --- a/stm/spawned.mli +++ b/stm/spawned.mli @@ -18,5 +18,5 @@ val control_channel : chandescr option ref val init_channels : unit -> unit (* Once initialized, these are the channels to talk with our master *) -val get_channels : unit -> in_channel * out_channel +val get_channels : unit -> CThread.thread_ic * out_channel diff --git a/stm/workerPool.ml b/stm/workerPool.ml index 65fb755be..510611f05 100644 --- a/stm/workerPool.ml +++ b/stm/workerPool.ml @@ -65,7 +65,7 @@ let master_handshake worker_id ic oc = let worker_handshake slave_ic slave_oc = try - let v = (Marshal.from_channel slave_ic : int) in + let v = (CThread.thread_friendly_input_value slave_ic : int) in if v <> magic_no then begin prerr_endline "Handshake failed: protocol mismatch\n"; exit 1; |