aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm
diff options
context:
space:
mode:
authorGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-12-11 14:27:46 +0100
committerGravatar Enrico Tassi <Enrico.Tassi@inria.fr>2014-12-17 15:05:04 +0100
commit2d66c7d508f6bd198969012241082e34a5b6047c (patch)
tree33b632f3118c8166a05211aa5a05375268fb78a3 /stm
parent433f0bc4e5110db09c770022b277c419f8b35f64 (diff)
CThread: use a different type for thread friendly in_channels
Diffstat (limited to 'stm')
-rw-r--r--stm/asyncTaskQueue.ml27
-rw-r--r--stm/spawned.ml4
-rw-r--r--stm/spawned.mli2
-rw-r--r--stm/workerPool.ml2
4 files changed, 19 insertions, 16 deletions
diff --git a/stm/asyncTaskQueue.ml b/stm/asyncTaskQueue.ml
index a527675f6..662152e45 100644
--- a/stm/asyncTaskQueue.ml
+++ b/stm/asyncTaskQueue.ml
@@ -82,7 +82,7 @@ module Make(T : Task) = struct
marshal_err ("marshal_request: "^s)
let unmarshal_request ic =
- try (Marshal.from_channel ic : request)
+ try (CThread.thread_friendly_input_value ic : request)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_request: "^s)
@@ -102,7 +102,7 @@ module Make(T : Task) = struct
marshal_err ("marshal_more_data: "^s)
let unmarshal_more_data ic =
- try (Marshal.from_channel ic : more_data)
+ try (CThread.thread_friendly_input_value ic : more_data)
with Failure s | Invalid_argument s | Sys_error s ->
marshal_err ("unmarshal_more_data: "^s)
@@ -281,8 +281,8 @@ module Make(T : Task) = struct
TQueue.push queue (t,c)
let cancel_worker { active; parking } n =
- Pool.cancel active n;
- Pool.cancel parking n
+ Pool.cancel n active;
+ Pool.cancel n parking
let name_of_request (Request r) = T.name_of_request r
@@ -300,12 +300,12 @@ module Make(T : Task) = struct
Pool.cancel_all active;
Pool.cancel_all parking
- let slave_ic = ref stdin
- let slave_oc = ref stdout
+ let slave_ic = ref None
+ let slave_oc = ref None
let init_stdout () =
let ic, oc = Spawned.get_channels () in
- slave_oc := oc; slave_ic := ic
+ slave_oc := Some oc; slave_ic := Some ic
let bufferize f =
let l = ref [] in
@@ -314,28 +314,29 @@ module Make(T : Task) = struct
| [] -> let data = f () in l := List.tl data; List.hd data
| x::tl -> l := tl; x
- let slave_handshake () = Pool.worker_handshake !slave_ic !slave_oc
+ let slave_handshake () =
+ Pool.worker_handshake (Option.get !slave_ic) (Option.get !slave_oc)
let main_loop () =
let slave_feeder oc fb =
Marshal.to_channel oc (RespFeedback fb) []; flush oc in
- Pp.set_feeder (slave_feeder !slave_oc);
+ Pp.set_feeder (fun x -> slave_feeder (Option.get !slave_oc) x);
Pp.log_via_feedback ();
Universes.set_remote_new_univ_level (bufferize (fun () ->
- marshal_response !slave_oc RespGetCounterNewUnivLevel;
- match unmarshal_more_data !slave_ic with
+ marshal_response (Option.get !slave_oc) RespGetCounterNewUnivLevel;
+ match unmarshal_more_data (Option.get !slave_ic) with
| MoreDataUnivLevel l -> l));
let working = ref false in
slave_handshake ();
while true do
try
working := false;
- let request = unmarshal_request !slave_ic in
+ let request = unmarshal_request (Option.get !slave_ic) in
working := true;
report_status (name_of_request request);
let response = slave_respond request in
report_status "Idle";
- marshal_response !slave_oc response;
+ marshal_response (Option.get !slave_oc) response;
Ephemeron.clear ()
with
| MarshalError s ->
diff --git a/stm/spawned.ml b/stm/spawned.ml
index d02594569..21902f71d 100644
--- a/stm/spawned.ml
+++ b/stm/spawned.ml
@@ -31,6 +31,7 @@ let open_bin_connection h p =
let cin, cout = open_connection (ADDR_INET (inet_addr_of_string h,p)) in
set_binary_mode_in cin true;
set_binary_mode_out cout true;
+ let cin = CThread.prepare_in_channel_for_thread_friendly_io cin in
cin, cout
let controller h p =
@@ -39,7 +40,7 @@ let controller h p =
let ic, oc = open_bin_connection h p in
let rec loop () =
try
- match input_value ic with
+ match CThread.thread_friendly_input_value ic with
| Hello _ -> prerr_endline "internal protocol error"; exit 1
| ReqDie -> prerr_endline "death sentence received"; exit 0
| ReqStats ->
@@ -68,6 +69,7 @@ let init_channels () =
Unix.dup2 Unix.stderr Unix.stdout;
set_binary_mode_in stdin true;
set_binary_mode_out stdout true;
+ let stdin = CThread.prepare_in_channel_for_thread_friendly_io stdin in
channels := Some (stdin, stdout);
in
match !control_channel with
diff --git a/stm/spawned.mli b/stm/spawned.mli
index 18b88dc64..d7b3243c2 100644
--- a/stm/spawned.mli
+++ b/stm/spawned.mli
@@ -18,5 +18,5 @@ val control_channel : chandescr option ref
val init_channels : unit -> unit
(* Once initialized, these are the channels to talk with our master *)
-val get_channels : unit -> in_channel * out_channel
+val get_channels : unit -> CThread.thread_ic * out_channel
diff --git a/stm/workerPool.ml b/stm/workerPool.ml
index 65fb755be..510611f05 100644
--- a/stm/workerPool.ml
+++ b/stm/workerPool.ml
@@ -65,7 +65,7 @@ let master_handshake worker_id ic oc =
let worker_handshake slave_ic slave_oc =
try
- let v = (Marshal.from_channel slave_ic : int) in
+ let v = (CThread.thread_friendly_input_value slave_ic : int) in
if v <> magic_no then begin
prerr_endline "Handshake failed: protocol mismatch\n";
exit 1;