1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
|
(***********************************************************************)
(* ocamlbuild *)
(* *)
(* Nicolas Pouillard, Berke Durak, projet Gallium, INRIA Rocquencourt *)
(* *)
(* Copyright 2007 Institut National de Recherche en Informatique et *)
(* en Automatique. All rights reserved. This file is distributed *)
(* under the terms of the Q Public License version 1.0. *)
(* *)
(***********************************************************************)
(* Original author: Berke Durak *)
(* Ocamlbuild_executor *)
open Unix;;
type error =
| Subcommand_failed
| Subcommand_got_signal
| Io_error
| Exceptionl_condition
type task = unit -> string;;
type job = {
job_id : int * int;
job_command : string;
job_next : task list;
job_result : bool ref; (* Result of this sequence group *)
job_stdout : in_channel;
job_stdin : out_channel;
job_stderr : in_channel;
job_buffer : Buffer.t;
mutable job_dying : bool;
};;
module JS = Set.Make(struct type t = job let compare = compare end);;
module FDM = Map.Make(struct type t = file_descr let compare = compare end);;
let sf = Printf.sprintf;;
let fp = Printf.fprintf;;
(*** print_unix_status *)
(* FIXME never called *)
let print_unix_status oc = function
| WEXITED x -> fp oc "exit %d" x
| WSIGNALED i -> fp oc "signal %d" i
| WSTOPPED i -> fp oc "stop %d" i
;;
(* ***)
(*** print_job_id *)
let print_job_id oc (x,y) = fp oc "%d.%d" x y;;
(* ***)
(*** output_lines *)
let output_lines prefix oc buffer =
let u = Buffer.contents buffer in
let m = String.length u in
let output_line i j =
output_string oc prefix;
output oc u i (j - i);
output_char oc '\n'
in
let rec loop i =
if i = m then
()
else
begin
try
let j = String.index_from u i '\n' in
output_line i j;
loop (j + 1)
with
| Not_found ->
output_line i m
end
in
loop 0
;;
(* ***)
(*** execute *)
(* XXX: Add test for non reentrancy *)
let execute
?(max_jobs=max_int)
?(ticker=ignore)
?(period=0.1)
?(display=(fun f -> f Pervasives.stdout))
~exit
(commands : task list list)
=
let batch_id = ref 0 in
let env = environment () in
let jobs = ref JS.empty in
let jobs_active = ref 0 in
let jobs_to_terminate = Queue.create () in
let commands_to_execute = Queue.create () in
let all_ok = ref true in
let results =
List.map (fun tasks ->
let result = ref false in
Queue.add (tasks, result) commands_to_execute;
result)
commands
in
let outputs = ref FDM.empty in
let doi = descr_of_in_channel in
let doo = descr_of_out_channel in
(*** compute_fds *)
let compute_fds =
let fds = ref ([], [], []) in
let prev_jobs = ref JS.empty in
fun () ->
if not (!prev_jobs == !jobs) then
begin
prev_jobs := !jobs;
fds :=
JS.fold
begin fun job (rfds, wfds, xfds) ->
let ofd = doi job.job_stdout
and ifd = doo job.job_stdin
and efd = doi job.job_stderr
in
(ofd :: efd :: rfds, wfds, ofd :: ifd :: efd :: xfds)
end
!jobs
([], [], [])
end;
!fds
in
(* ***)
(*** add_job *)
let add_job cmd rest result id =
(*display begin fun oc -> fp oc "Job %a is %s\n%!" print_job_id id cmd; end;*)
let (stdout', stdin', stderr') = open_process_full cmd env in
incr jobs_active;
set_nonblock (doi stdout');
set_nonblock (doi stderr');
let job =
{ job_id = id;
job_command = cmd;
job_next = rest;
job_result = result;
job_stdout = stdout';
job_stdin = stdin';
job_stderr = stderr';
job_buffer = Buffer.create 1024;
job_dying = false }
in
outputs := FDM.add (doi stdout') job (FDM.add (doi stderr') job !outputs);
jobs := JS.add job !jobs;
in
(* ***)
(*** skip_empty_tasks *)
let rec skip_empty_tasks = function
| [] -> None
| task :: tasks ->
let cmd = task () in
if cmd = "" then skip_empty_tasks tasks else Some(cmd, tasks)
in
(* ***)
(*** add_some_jobs *)
let add_some_jobs () =
let (tasks, result) = Queue.take commands_to_execute in
match skip_empty_tasks tasks with
| None -> result := false
| Some(cmd, rest) ->
let b_id = !batch_id in
incr batch_id;
add_job cmd rest result (b_id, 0)
in
(* ***)
(*** terminate *)
let terminate ?(continue=true) job =
if not job.job_dying then
begin
job.job_dying <- true;
Queue.add (job, continue) jobs_to_terminate
end
else
()
in
(* ***)
(*** add_more_jobs_if_possible *)
let add_more_jobs_if_possible () =
while !jobs_active < max_jobs && not (Queue.is_empty commands_to_execute) do
add_some_jobs ()
done
in
(* ***)
(*** do_read *)
let do_read =
let u = String.create 4096 in
fun ?(loop=false) fd job ->
(*if job.job_dying then
()
else*)
try
let rec iteration () =
let m =
try
read fd u 0 (String.length u)
with
| Unix.Unix_error(_,_,_) -> 0
in
if m = 0 then
if job.job_dying then
()
else
terminate job
else
begin
Buffer.add_substring job.job_buffer u 0 m;
if loop then
iteration ()
else
()
end
in
iteration ()
with
| x ->
display
begin fun oc ->
fp oc "Exception %s while reading output of command %S\n%!" job.job_command
(Printexc.to_string x);
end;
exit Io_error
in
(* ***)
(*** process_jobs_to_terminate *)
let process_jobs_to_terminate () =
while not (Queue.is_empty jobs_to_terminate) do
ticker ();
let (job, continue) = Queue.take jobs_to_terminate in
(*display begin fun oc -> fp oc "Terminating job %a\n%!" print_job_id job.job_id; end;*)
decr jobs_active;
do_read ~loop:true (doi job.job_stdout) job;
do_read ~loop:true (doi job.job_stderr) job;
outputs := FDM.remove (doi job.job_stdout) (FDM.remove (doi job.job_stderr) !outputs);
jobs := JS.remove job !jobs;
let status = close_process_full (job.job_stdout, job.job_stdin, job.job_stderr) in
let shown = ref false in
let show_command () =
if !shown then
()
else
display
begin fun oc ->
shown := true;
fp oc "+ %s\n" job.job_command;
output_lines "" oc job.job_buffer
end
in
if Buffer.length job.job_buffer > 0 then show_command ();
begin
match status with
| Unix.WEXITED 0 ->
begin
if continue then
begin
match skip_empty_tasks job.job_next with
| None -> job.job_result := true
| Some(cmd, rest) ->
let (b_id, s_id) = job.job_id in
add_job cmd rest job.job_result (b_id, s_id + 1)
end
else
all_ok := false;
end
| Unix.WEXITED rc ->
show_command ();
display (fun oc -> fp oc "Command exited with code %d.\n" rc);
all_ok := false;
exit Subcommand_failed
| Unix.WSTOPPED s | Unix.WSIGNALED s ->
show_command ();
all_ok := false;
display (fun oc -> fp oc "Command got signal %d.\n" s);
exit Subcommand_got_signal
end
done
in
(* ***)
(*** terminate_all_jobs *)
let terminate_all_jobs () =
JS.iter (terminate ~continue:false) !jobs
in
(* ***)
(*** loop *)
let rec loop () =
(*display (fun oc -> fp oc "Total %d jobs\n" !jobs_active);*)
process_jobs_to_terminate ();
add_more_jobs_if_possible ();
if JS.is_empty !jobs then
()
else
begin
let (rfds, wfds, xfds) = compute_fds () in
ticker ();
let (chrfds, chwfds, chxfds) = select rfds wfds xfds period in
List.iter
begin fun (fdlist, hook) ->
List.iter
begin fun fd ->
try
let job = FDM.find fd !outputs in
ticker ();
hook fd job
with
| Not_found -> () (* XXX *)
end
fdlist
end
[chrfds, do_read ~loop:false;
chwfds, (fun _ _ -> ());
chxfds,
begin fun _ _job ->
(*display (fun oc -> fp oc "Exceptional condition on command %S\n%!" job.job_command);
exit Exceptional_condition*)
() (* FIXME *)
end];
loop ()
end
in
try
loop ();
None
with
| x ->
begin
try
terminate_all_jobs ()
with
| x' ->
display (fun oc -> fp oc "Extra exception %s\n%!" (Printexc.to_string x'))
end;
Some(List.map (!) results, x)
;;
(* ***)
|