+(* ocamlbuild *)
+(* *)
+(* Nicolas Pouillard, Berke Durak, projet Gallium, INRIA Rocquencourt *)
+(* *)
+(* Copyright 2007 Institut National de Recherche en Informatique et *)
+(* en Automatique. All rights reserved. This file is distributed *)
+(* under the terms of the Q Public License version 1.0. *)
+(* *)
+(* $Id$ *)
+(* Original author: Berke Durak *)
+(* Executor *)
+open Unix;;
+module Exit_codes = struct
+ let rc_subcommand_failed = 10
+ let rc_subcommand_got_signal = 11
+ let rc_io_error = 12
+ let rc_exceptional_condition = 13
+type task = (string * (unit -> unit));;
+type job = {
+ job_id : int * int;
+ job_command : string;
+ job_next : (string * (unit -> unit)) list;
+ job_result : bool ref; (* Result of this sequence group *)
+ job_stdout : in_channel;
+ job_stdin : out_channel;
+ job_stderr : in_channel;
+ job_buffer : Buffer.t;
+ mutable job_dying : bool;
+module JS = Set.Make(struct type t = job let compare = compare end);;
+module FDM = Map.Make(struct type t = file_descr let compare = compare end);;
+let sf = Printf.sprintf;;
+let fp = Printf.fprintf;;
+(*** print_unix_status *)
+(* FIXME never called *)
+let print_unix_status oc = function
+ | WEXITED x -> fp oc "exit %d" x
+ | WSIGNALED i -> fp oc "signal %d" i
+ | WSTOPPED i -> fp oc "stop %d" i
+(* ***)
+(*** print_job_id *)
+let print_job_id oc (x,y) = fp oc "%d.%d" x y;;
+(* ***)
+(*** output_lines *)
+let output_lines prefix oc buffer =
+ let u = Buffer.contents buffer in
+ let m = String.length u in
+ let output_line i j =
+ output_string oc prefix;
+ output oc u i (j - i);
+ output_char oc '\n'
+ in
+ let rec loop i =
+ if i = m then
+ ()
+ else
+ begin
+ try
+ let j = String.index_from u i '\n' in
+ output_line i j;
+ loop (j + 1)
+ with
+ | Not_found ->
+ output_line i m
+ end
+ in
+ loop 0
+(* ***)
+(*** execute *)
+(* XXX: Add test for non reentrancy *)
+let execute
+ ?(max_jobs=max_int)
+ ?(ticker=ignore)
+ ?(period=0.1)
+ ?(display=(fun f -> f Pervasives.stdout))
+ (commands : task list list)
+ =
+ let batch_id = ref 0 in
+ let env = environment () in
+ let jobs = ref JS.empty in
+ let jobs_active = ref 0 in
+ let jobs_to_terminate = Queue.create () in
+ let commands_to_execute = Queue.create () in
+ let all_ok = ref true in
+ let results =
+ (fun tasks ->
+ let result = ref false in
+ Queue.add (tasks, result) commands_to_execute;
+ result)
+ commands
+ in
+ let outputs = ref FDM.empty in
+ let doi = descr_of_in_channel in
+ let doo = descr_of_out_channel in
+ (*** compute_fds *)
+ let compute_fds =
+ let fds = ref ([], [], []) in
+ let prev_jobs = ref JS.empty in
+ fun () ->
+ if not (!prev_jobs == !jobs) then
+ begin
+ prev_jobs := !jobs;
+ fds :=
+ JS.fold
+ begin fun job (rfds, wfds, xfds) ->
+ let ofd = doi job.job_stdout
+ and ifd = doo job.job_stdin
+ and efd = doi job.job_stderr
+ in
+ (ofd :: efd :: rfds, wfds, ofd :: ifd :: efd :: xfds)
+ end
+ !jobs
+ ([], [], [])
+ end;
+ !fds
+ in
+ (* ***)
+ (*** add_job *)
+ let add_job (cmd, action) rest result id =
+ (*display begin fun oc -> fp oc "Job %a is %s\n%!" print_job_id id cmd; end;*)
+ action ();
+ let (stdout', stdin', stderr') = open_process_full cmd env in
+ incr jobs_active;
+ set_nonblock (doi stdout');
+ set_nonblock (doi stderr');
+ let job =
+ { job_id = id;
+ job_command = cmd;
+ job_next = rest;
+ job_result = result;
+ job_stdout = stdout';
+ job_stdin = stdin';
+ job_stderr = stderr';
+ job_buffer = Buffer.create 1024;
+ job_dying = false }
+ in
+ outputs := FDM.add (doi stdout') job (FDM.add (doi stderr') job !outputs);
+ jobs := JS.add job !jobs;
+ in
+ (* ***)
+ (*** add_some_jobs *)
+ let add_some_jobs () =
+ let (tasks, result) = Queue.take commands_to_execute in
+ match tasks with
+ | [] -> result := false
+ | task :: rest ->
+ let b_id = !batch_id in
+ incr batch_id;
+ add_job task rest result (b_id, 0)
+ in
+ (* ***)
+ (*** terminate *)
+ let terminate ?(continue=true) job =
+ if not job.job_dying then
+ begin
+ job.job_dying <- true;
+ Queue.add (job, continue) jobs_to_terminate
+ end
+ else
+ ()
+ in
+ (* ***)
+ (*** add_more_jobs_if_possible *)
+ let add_more_jobs_if_possible () =
+ while !jobs_active < max_jobs && not (Queue.is_empty commands_to_execute) do
+ add_some_jobs ()
+ done
+ in
+ (* ***)
+ (*** process_jobs_to_terminate *)
+ let process_jobs_to_terminate () =
+ while not (Queue.is_empty jobs_to_terminate) do
+ ticker ();
+ let (job, continue) = Queue.take jobs_to_terminate in
+ (*display begin fun oc -> fp oc "Terminating job %a\n%!" print_job_id job.job_id; end;*)
+ decr jobs_active;
+ outputs := FDM.remove (doi job.job_stdout) (FDM.remove (doi job.job_stderr) !outputs);
+ jobs := JS.remove job !jobs;
+ let status = close_process_full (job.job_stdout, job.job_stdin, job.job_stderr) in
+ let shown = ref false in
+ let show_command () =
+ if !shown then
+ ()
+ else
+ display
+ begin fun oc ->
+ shown := true;
+ fp oc "+ %s\n" job.job_command;
+ output_lines "" oc job.job_buffer
+ end
+ in
+ if Buffer.length job.job_buffer > 0 then show_command ();
+ begin
+ match status with
+ | Unix.WEXITED 0 ->
+ begin
+ if continue then
+ begin
+ match job.job_next with
+ | [] -> job.job_result := true
+ | task :: rest ->
+ let (b_id, s_id) = job.job_id in
+ add_job task rest job.job_result (b_id, s_id + 1)
+ end
+ else
+ all_ok := false;
+ end
+ | Unix.WEXITED rc ->
+ show_command ();
+ display (fun oc -> fp oc "Command exited with code %d.\n" rc);
+ all_ok := false;
+ exit Exit_codes.rc_subcommand_failed
+ | Unix.WSTOPPED s | Unix.WSIGNALED s ->
+ show_command ();
+ all_ok := false;
+ display (fun oc -> fp oc "Command got signal %d.\n" s);
+ exit Exit_codes.rc_subcommand_got_signal
+ end
+ done
+ in
+ (* ***)
+ (*** do_read *)
+ let do_read =
+ let u = String.create 4096 in
+ fun fd job ->
+ if job.job_dying then
+ ()
+ else
+ try
+ let m = read fd u 0 (String.length u) in
+ if m = 0 then
+ terminate job
+ else
+ begin
+ Buffer.add_substring job.job_buffer u 0 m
+ end
+ with
+ | x ->
+ display
+ begin fun oc ->
+ fp oc "Exception %s while reading output of command %S\n%!" job.job_command
+ (Printexc.to_string x);
+ end;
+ exit Exit_codes.rc_io_error
+ in
+ (* ***)
+ (*** terminate_all_jobs *)
+ let terminate_all_jobs () =
+ JS.iter (terminate ~continue:false) !jobs
+ in
+ (* ***)
+ (*** loop *)
+ let rec loop () =
+ (*display (fun oc -> fp oc "Total %d jobs\n" !jobs_active);*)
+ process_jobs_to_terminate ();
+ add_more_jobs_if_possible ();
+ if JS.is_empty !jobs then
+ ()
+ else
+ begin
+ let (rfds, wfds, xfds) = compute_fds () in
+ ticker ();
+ let (chrfds, chwfds, chxfds) = select rfds wfds xfds period in
+ List.iter
+ begin fun (fdlist, hook) ->
+ List.iter
+ begin fun fd ->
+ let job = FDM.find fd !outputs in
+ ticker ();
+ hook fd job
+ end
+ fdlist
+ end
+ [chrfds, do_read;
+ chwfds, (fun _ _ -> ());
+ chxfds,
+ begin fun _ job ->
+ display (fun oc -> fp oc "Exceptional condition on command %S\n%!" job.job_command);
+ exit Exit_codes.rc_exceptional_condition
+ end];
+ loop ()
+ end
+ in
+ try
+ loop ();
+ None
+ with
+ | x ->
+ begin
+ try
+ terminate_all_jobs ()
+ with
+ | x' ->
+ display (fun oc -> fp oc "Extra exception %s\n%!" (Printexc.to_string x'))
+ end;
+ Some( (!) results, x)
+(* ***)