aboutsummaryrefslogtreecommitdiffhomepage
path: root/stm/workerPool.ml
blob: 65fb755be7d47f291a6ae364d9914a8fec1f74c8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
(************************************************************************)
(*  v      *   The Coq Proof Assistant  /  The Coq Development Team     *)
(* <O___,, *   INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2012     *)
(*   \VV/  **************************************************************)
(*    //   *      This file is distributed under the terms of the       *)
(*         *       GNU Lesser General Public License Version 2.1        *)
(************************************************************************)

type worker_id = string
type 'a spawn =
  args:string array -> env:string array -> unit -> in_channel * out_channel * 'a
type active
type parking

module type WorkerModel = sig
  type process
  val spawn :
    ?prefer_sock:bool -> ?env:string array -> string -> string array ->
      process * in_channel * out_channel
end

module type ManagerModel = sig
  type process
  type extra (* extra stuff to pass to the manager *)
  val manager :
    extra -> cancel:bool ref -> die:bool ref -> worker_id -> process spawn ->
      unit
  val naming : int -> worker_id
end


module Make(Worker : WorkerModel)
           (Manager : ManagerModel with type process = Worker.process) = struct

type worker = {
  name : worker_id;
  cancel : bool ref;
  die : bool ref;
  manager : Thread.t }

type 'a pool = {
  workers : worker list ref;
  count : int ref;
  extra : Manager.extra option;
  lock : Mutex.t
}

let n_workers { workers } = List.length !workers
let is_empty { workers } = !workers = []

let magic_no = 17

let master_handshake worker_id ic oc =
  try
    Marshal.to_channel oc magic_no [];  flush oc;
    let n = (Marshal.from_channel ic : int) in
    if n <> magic_no then begin
      Printf.eprintf "Handshake with %s failed: protocol mismatch\n" worker_id;
      exit 1;
    end
  with e when Errors.noncritical e ->
    Printf.eprintf "Handshake with %s failed: %s\n"
      worker_id (Printexc.to_string e);
    exit 1

let worker_handshake slave_ic slave_oc =
  try
    let v = (Marshal.from_channel slave_ic : int) in
    if v <> magic_no then begin
      prerr_endline "Handshake failed: protocol mismatch\n";
      exit 1;
    end;
    Marshal.to_channel slave_oc v []; flush slave_oc;
  with e when Errors.noncritical e ->
    prerr_endline ("Handshake failed: " ^ Printexc.to_string e);
    exit 1

let respawn n ~args ~env () =
  let proc, ic, oc = Worker.spawn ~env Sys.argv.(0) args in
  master_handshake n ic oc;
  ic, oc, proc

let create1 e x =
  let name = Manager.naming x in
  let cancel = ref false in
  let die = ref false in
  let manager =
    Thread.create (Manager.manager e ~cancel ~die name) (respawn name) in
  { name; cancel; die; manager }

let create_active e n = {
  lock = Mutex.create ();
  extra = Some e;
  workers = ref (CList.init n (create1 e));
  count = ref n
}

let create_parking () = {
  lock = Mutex.create ();
  extra = None;
  workers = ref [];
  count = ref 0
}

let cancel { lock; workers } n =
  Mutex.lock lock;
  List.iter (fun { name; cancel } -> if n = name then cancel := true) !workers;
  Mutex.unlock lock

let cancel_all { lock; workers } =
  Mutex.lock lock;
  List.iter (fun { cancel } -> cancel := true) !workers;
  Mutex.unlock lock

let kill_all { lock; workers } =
  Mutex.lock lock;
  List.iter (fun { die } -> die := true) !workers;
  Mutex.unlock lock

let destroy { lock; workers } =
  Mutex.lock lock;
  List.iter (fun { die } -> die := true) !workers;
  workers := [];
  Mutex.unlock lock

let move oldq newq n =
  Mutex.lock oldq.lock; Mutex.lock newq.lock;
  let rec find acc = function
    | [] -> Mutex.unlock oldq.lock; Mutex.unlock newq.lock; assert false
    | { name } as w :: rest when name = n -> w, List.rev acc @ rest
    | x :: xs -> find (x :: acc) xs in
  let w, rest = find [] !(oldq.workers) in
  oldq.workers := create1 (Option.get oldq.extra) !(oldq.count) :: rest;
  oldq.count := !(oldq.count) + 1;
  newq.workers := w :: !(newq.workers);
  Mutex.unlock oldq.lock; Mutex.unlock newq.lock

end