summaryrefslogtreecommitdiffstats
path: root/ocamlbuild/ocamlbuild_executor.ml
blob: 587f4210e5b7463dd7b9e9de3c4ef12ba2c6576f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
(***********************************************************************)
(*                             ocamlbuild                              *)
(*                                                                     *)
(*  Nicolas Pouillard, Berke Durak, projet Gallium, INRIA Rocquencourt *)
(*                                                                     *)
(*  Copyright 2007 Institut National de Recherche en Informatique et   *)
(*  en Automatique.  All rights reserved.  This file is distributed    *)
(*  under the terms of the Q Public License version 1.0.               *)
(*                                                                     *)
(***********************************************************************)


(* Original author: Berke Durak *)
(* Ocamlbuild_executor *)

open Unix;;

type error =
  | Subcommand_failed
  | Subcommand_got_signal
  | Io_error
  | Exceptionl_condition

type task = unit -> string;;

type job = {
  job_id      : int * int;
  job_command : string;
  job_next    : task list;
  job_result  : bool ref; (* Result of this sequence group *)
  job_stdout  : in_channel;
  job_stdin   : out_channel;
  job_stderr  : in_channel;
  job_buffer  : Buffer.t;
  mutable job_dying : bool;
};;

module JS = Set.Make(struct type t = job let compare = compare end);;
module FDM = Map.Make(struct type t = file_descr let compare = compare end);;

let sf = Printf.sprintf;;
let fp = Printf.fprintf;;

(*** print_unix_status *)
(* FIXME never called *)
let print_unix_status oc = function
  | WEXITED x -> fp oc "exit %d" x
  | WSIGNALED i -> fp oc "signal %d" i
  | WSTOPPED i -> fp oc "stop %d" i
;;
(* ***)
(*** print_job_id *)
let print_job_id oc (x,y) = fp oc "%d.%d" x y;;
(* ***)
(*** output_lines *)
let output_lines prefix oc buffer =
  let u = Buffer.contents buffer in
  let m = String.length u in
  let output_line i j =
    output_string oc prefix;
    output oc u i (j - i);
    output_char oc '\n'
  in
  let rec loop i =
    if i = m then
      ()
    else
      begin
        try
          let j = String.index_from u i '\n' in
          output_line i j;
          loop (j + 1)
        with
        | Not_found ->
            output_line i m
      end
  in
  loop 0
;;
(* ***)
(*** execute *)
(* XXX: Add test for non reentrancy *)
let execute
  ?(max_jobs=max_int)
  ?(ticker=ignore)
  ?(period=0.1)
  ?(display=(fun f -> f Pervasives.stdout))
  ~exit
  (commands : task list list)
    =
  let batch_id = ref 0 in
  let env = environment () in
  let jobs = ref JS.empty in
  let jobs_active = ref 0 in
  let jobs_to_terminate = Queue.create () in
  let commands_to_execute = Queue.create () in
  let all_ok = ref true in
  let results = 
    List.map (fun tasks ->
      let result = ref false in
      Queue.add (tasks, result) commands_to_execute;
      result)
      commands
  in
  let outputs = ref FDM.empty in
  let doi = descr_of_in_channel in
  let doo = descr_of_out_channel in
  (*** compute_fds *)
  let compute_fds =
    let fds = ref ([], [], []) in
    let prev_jobs = ref JS.empty in
    fun () ->
      if not (!prev_jobs == !jobs) then
        begin
          prev_jobs := !jobs;
          fds :=
            JS.fold
              begin fun job (rfds, wfds, xfds) ->
                let ofd = doi job.job_stdout
                and ifd = doo job.job_stdin
                and efd = doi job.job_stderr
                in
                (ofd :: efd :: rfds, wfds, ofd :: ifd :: efd :: xfds)
              end
              !jobs
              ([], [], [])
        end;
      !fds
  in
  (* ***)
  (*** add_job *)
  let add_job cmd rest result id =
    (*display begin fun oc -> fp oc "Job %a is %s\n%!" print_job_id id cmd; end;*)
    let (stdout', stdin', stderr') = open_process_full cmd env in
    incr jobs_active;
    set_nonblock (doi stdout');
    set_nonblock (doi stderr');
    let job =
      { job_id          = id;
        job_command     = cmd;
        job_next        = rest;
        job_result      = result;
        job_stdout      = stdout';
        job_stdin       = stdin';
        job_stderr      = stderr';
        job_buffer      = Buffer.create 1024;
        job_dying       = false }
    in
    outputs := FDM.add (doi stdout') job (FDM.add (doi stderr') job !outputs);
    jobs := JS.add job !jobs;
  in
  (* ***)
  (*** skip_empty_tasks *)
  let rec skip_empty_tasks = function
    | [] -> None
    | task :: tasks ->
        let cmd = task () in
        if cmd = "" then skip_empty_tasks tasks else Some(cmd, tasks)
  in
  (* ***)
  (*** add_some_jobs *)
  let add_some_jobs () =
    let (tasks, result) = Queue.take commands_to_execute in
    match skip_empty_tasks tasks with
    | None -> result := false
    | Some(cmd, rest) ->
      let b_id = !batch_id in
      incr batch_id;
      add_job cmd rest result (b_id, 0)
  in
  (* ***)
  (*** terminate *)
  let terminate ?(continue=true) job =
    if not job.job_dying then
      begin
        job.job_dying <- true;
        Queue.add (job, continue) jobs_to_terminate 
      end
    else
      ()
  in
  (* ***)
  (*** add_more_jobs_if_possible *)
  let add_more_jobs_if_possible () =
    while !jobs_active < max_jobs && not (Queue.is_empty commands_to_execute) do
      add_some_jobs ()
    done
  in
  (* ***)
  (*** do_read *)
  let do_read =
    let u = String.create 4096 in
    fun ?(loop=false) fd job ->
      (*if job.job_dying then
        ()
      else*)
        try
          let rec iteration () =
            let m =
              try
                read fd u 0 (String.length u)
              with
              | Unix.Unix_error(_,_,_) -> 0
            in
            if m = 0 then
              if job.job_dying then 
                ()
              else
                terminate job
            else
              begin
                Buffer.add_substring job.job_buffer u 0 m;
                if loop then
                  iteration ()
                else
                  ()
              end
          in
          iteration ()
        with
        | x ->
            display
              begin fun oc ->
                fp oc "Exception %s while reading output of command %S\n%!" job.job_command
                  (Printexc.to_string x);
              end;
            exit Io_error
  in
  (* ***)
  (*** process_jobs_to_terminate *)
  let process_jobs_to_terminate () =
    while not (Queue.is_empty jobs_to_terminate) do
      ticker ();
      let (job, continue) = Queue.take jobs_to_terminate in

      (*display begin fun oc -> fp oc "Terminating job %a\n%!" print_job_id job.job_id; end;*)

      decr jobs_active;
      do_read ~loop:true (doi job.job_stdout) job;
      do_read ~loop:true (doi job.job_stderr) job;
      outputs := FDM.remove (doi job.job_stdout) (FDM.remove (doi job.job_stderr) !outputs);
      jobs := JS.remove job !jobs;
      let status = close_process_full (job.job_stdout, job.job_stdin, job.job_stderr) in

      let shown = ref false in

      let show_command () =
        if !shown then
          ()
        else
        display
          begin fun oc ->
            shown := true;
            fp oc "+ %s\n" job.job_command;
            output_lines "" oc job.job_buffer
          end
      in
      if Buffer.length job.job_buffer > 0 then show_command ();
      begin
        match status with
        | Unix.WEXITED 0 ->
            begin
              if continue then
                begin
                  match skip_empty_tasks job.job_next with
                  | None -> job.job_result := true
                  | Some(cmd, rest) ->
                      let (b_id, s_id) = job.job_id in
                      add_job cmd rest job.job_result (b_id, s_id + 1)
                end
              else
                all_ok := false;
            end
        | Unix.WEXITED rc ->
            show_command ();
            display (fun oc -> fp oc "Command exited with code %d.\n" rc);
            all_ok := false;
            exit Subcommand_failed
        | Unix.WSTOPPED s | Unix.WSIGNALED s ->
            show_command ();
            all_ok := false;
            display (fun oc -> fp oc "Command got signal %d.\n" s);
            exit Subcommand_got_signal
      end
    done
  in
  (* ***)
  (*** terminate_all_jobs *)
  let terminate_all_jobs () =
    JS.iter (terminate ~continue:false) !jobs
  in
  (* ***)
  (*** loop *)
  let rec loop () =
    (*display (fun oc -> fp oc "Total %d jobs\n" !jobs_active);*)
    process_jobs_to_terminate ();
    add_more_jobs_if_possible ();
    if JS.is_empty !jobs then
      ()
    else
      begin
        let (rfds, wfds, xfds) = compute_fds () in
        ticker ();
        let (chrfds, chwfds, chxfds) = select rfds wfds xfds period in
        List.iter
          begin fun (fdlist, hook) ->
            List.iter
              begin fun fd ->
                try
                  let job = FDM.find fd !outputs in
                  ticker ();
                  hook fd job
                with
                | Not_found -> () (* XXX *)
              end
              fdlist
          end
          [chrfds, do_read ~loop:false;
           chwfds, (fun _ _ -> ());
           chxfds,
             begin fun _ _job ->
               (*display (fun oc -> fp oc "Exceptional condition on command %S\n%!" job.job_command);
               exit Exceptional_condition*)
	       () (* FIXME *)
             end];
        loop ()
      end
  in
  try
    loop ();
    None
  with
  | x ->
      begin
        try
          terminate_all_jobs ()
        with
        | x' ->
            display (fun oc -> fp oc "Extra exception %s\n%!" (Printexc.to_string x'))
      end;
      Some(List.map (!) results, x)
;;
(* ***)