blob: ff4dc5c35e3a867464553a6378a62ed861a1f97d (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
(************************************************************************)
(* v * The Coq Proof Assistant / The Coq Development Team *)
(* <O___,, * INRIA - CNRS - LIX - LRI - PPS - Copyright 1999-2017 *)
(* \VV/ **************************************************************)
(* // * This file is distributed under the terms of the *)
(* * GNU Lesser General Public License Version 2.1 *)
(************************************************************************)
type worker_id = string
type 'a cpanel = {
exit : unit -> unit; (* called by manager to exit instead of Thread.exit *)
cancelled : unit -> bool; (* manager checks for a request of termination *)
extra : 'a; (* extra stuff to pass to the manager *)
}
module type PoolModel = sig
(* this shall come from a Spawn.* model *)
type process
val spawn : int -> worker_id * process * CThread.thread_ic * out_channel
(* this defines the main loop of the manager *)
type extra
val manager :
extra cpanel -> worker_id * process * CThread.thread_ic * out_channel -> unit
end
module Make(Model : PoolModel) = struct
type worker = {
name : worker_id;
cancel : bool ref;
manager : Thread.t;
process : Model.process;
}
type pre_pool = {
workers : worker list ref;
count : int ref;
extra_arg : Model.extra;
}
type pool = { lock : Mutex.t; pool : pre_pool }
let magic_no = 17
let master_handshake worker_id ic oc =
try
Marshal.to_channel oc magic_no []; flush oc;
let n = (CThread.thread_friendly_input_value ic : int) in
if n <> magic_no then begin
Printf.eprintf "Handshake with %s failed: protocol mismatch\n" worker_id;
exit 1;
end
with e when CErrors.noncritical e ->
Printf.eprintf "Handshake with %s failed: %s\n"
worker_id (Printexc.to_string e);
exit 1
let worker_handshake slave_ic slave_oc =
try
let v = (CThread.thread_friendly_input_value slave_ic : int) in
if v <> magic_no then begin
prerr_endline "Handshake failed: protocol mismatch\n";
exit 1;
end;
Marshal.to_channel slave_oc v []; flush slave_oc;
with e when CErrors.noncritical e ->
prerr_endline ("Handshake failed: " ^ Printexc.to_string e);
exit 1
let locking { lock; pool = p } f =
try
Mutex.lock lock;
let x = f p in
Mutex.unlock lock;
x
with e -> Mutex.unlock lock; raise e
let rec create_worker extra pool id =
let cancel = ref false in
let name, process, ic, oc as worker = Model.spawn id in
master_handshake name ic oc;
let exit () = cancel := true; cleanup pool; Thread.exit () in
let cancelled () = !cancel in
let cpanel = { exit; cancelled; extra } in
let manager = Thread.create (Model.manager cpanel) worker in
{ name; cancel; manager; process }
and cleanup x = locking x begin fun { workers; count; extra_arg } ->
workers := List.map (function
| { cancel } as w when !cancel = false -> w
| _ -> let n = !count in incr count; create_worker extra_arg x n)
!workers
end
let n_workers x = locking x begin fun { workers } ->
List.length !workers
end
let is_empty x = locking x begin fun { workers } -> !workers = [] end
let create extra_arg ~size = let x = {
lock = Mutex.create ();
pool = {
extra_arg;
workers = ref [];
count = ref size;
}} in
locking x begin fun { workers } ->
workers := CList.init size (create_worker extra_arg x)
end;
x
let cancel n x = locking x begin fun { workers } ->
List.iter (fun { name; cancel } -> if n = name then cancel := true) !workers
end
let cancel_all x = locking x begin fun { workers } ->
List.iter (fun { cancel } -> cancel := true) !workers
end
let destroy x = locking x begin fun { workers } ->
List.iter (fun { cancel } -> cancel := true) !workers;
workers := []
end
end
|