src/HOL/Tools/ATP/watcher.ML
author paulson
Fri Oct 07 11:29:24 2005 +0200 (2005-10-07)
changeset 17773 a7258e1020b7
parent 17772 818cec5f82a4
child 17774 0ecfb66ea072
permissions -rw-r--r--
more tidying. Fixed process management bugs and race condition
     1 (*  Title:      Watcher.ML
     2     ID:         $Id$
     3     Author:     Claire Quigley
     4     Copyright   2004  University of Cambridge
     5  *)
     6 
     7 (*  The watcher process starts a resolution process when it receives a     *)
     8 (*  message from Isabelle                                                  *)
     9 (*  Signals Isabelle, puts output of child into pipe to Isabelle,          *)
    10 (*  and removes dead processes.  Also possible to kill all the resolution  *)
    11 (*  processes currently running.                                           *)
    12 
    13 signature WATCHER =
    14 sig
    15 
    16 (*  Send request to Watcher for multiple spasses to be called for filenames in arg       *)
    17 (* callResProvers (outstreamtoWatcher, prover name,prover-command, (settings,file) list *)
    18 
    19 val callResProvers : TextIO.outstream * (string*string*string*string) list -> unit
    20 
    21 (* Send message to watcher to kill resolution provers *)
    22 val callSlayer : TextIO.outstream -> unit
    23 
    24 (* Start a watcher and set up signal handlers             *)
    25 val createWatcher : 
    26     thm * (ResClause.clause * thm) Array.array -> 
    27     TextIO.instream * TextIO.outstream * Posix.Process.pid
    28 val killWatcher : Posix.Process.pid -> unit
    29 val setting_sep : char
    30 end
    31 
    32 
    33 
    34 structure Watcher: WATCHER =
    35 struct
    36 
    37 (*Field separators, used to pack items onto a text line*)
    38 val command_sep = #"\t"
    39 and setting_sep = #"%";
    40 
    41 val goals_being_watched = ref 0;
    42 
    43 val trace_path = Path.basic "watcher_trace";
    44 
    45 fun trace s = if !Output.show_debug_msgs then File.append (File.tmp_path trace_path) s 
    46               else ();
    47 
    48 (*Representation of a watcher process*)
    49 type proc = {pid : Posix.Process.pid,
    50              instr : TextIO.instream,
    51              outstr : TextIO.outstream};
    52 
    53 (*Representation of a child (ATP) process*)
    54 type cmdproc = {
    55         prover: string,       (* Name of the resolution prover used, e.g. "spass"*)
    56         file:  string,        (* The file containing the goal for the ATP to prove *)     
    57         proc_handle : (TextIO.instream,TextIO.outstream) Unix.proc,
    58         instr : TextIO.instream,     (*Output of the child process *)
    59         outstr : TextIO.outstream};  (*Input to the child process *)
    60 
    61 
    62 fun fdReader (name : string, fd : Posix.IO.file_desc) =
    63 	  Posix.IO.mkTextReader {initBlkMode = true,name = name,fd = fd };
    64 
    65 fun fdWriter (name, fd) =
    66           Posix.IO.mkTextWriter {
    67 	      appendMode = false,
    68               initBlkMode = true,
    69               name = name,  
    70               chunkSize=4096,
    71               fd = fd};
    72 
    73 fun openOutFD (name, fd) =
    74 	  TextIO.mkOutstream (
    75 	    TextIO.StreamIO.mkOutstream (
    76 	      fdWriter (name, fd), IO.BLOCK_BUF));
    77 
    78 fun openInFD (name, fd) =
    79 	  TextIO.mkInstream (
    80 	    TextIO.StreamIO.mkInstream (
    81 	      fdReader (name, fd), ""));
    82 
    83                             
    84 (*  Send request to Watcher for a vampire to be called for filename in arg*)
    85 fun callResProver (toWatcherStr,  arg) = 
    86       (TextIO.output (toWatcherStr, arg^"\n"); 
    87        TextIO.flushOut toWatcherStr)
    88 
    89 (*  Send request to Watcher for multiple provers to be called*)
    90 fun callResProvers (toWatcherStr,  []) = 
    91       (TextIO.output (toWatcherStr, "End of calls\n");  TextIO.flushOut toWatcherStr)
    92   | callResProvers (toWatcherStr,
    93                     (prover,proverCmd,settings,probfile)  ::  args) =
    94       (trace (space_implode ", " (["\ncallResProvers:", prover, proverCmd, probfile]));
    95        (*Uses a special character to separate items sent to watcher*)
    96        TextIO.output (toWatcherStr,
    97           space_implode (str command_sep) [prover, proverCmd, settings, probfile, "\n"]);
    98        goals_being_watched := (!goals_being_watched) + 1;
    99        TextIO.flushOut toWatcherStr;
   100        callResProvers (toWatcherStr,args))
   101                                                 
   102 
   103 (*Send message to watcher to kill currently running vampires. NOT USED and possibly
   104   buggy. Note that killWatcher kills the entire process group anyway.*)
   105 fun callSlayer toWatcherStr = (TextIO.output (toWatcherStr, "Kill children\n"); 
   106                             TextIO.flushOut toWatcherStr)
   107 
   108                     
   109 (* Get commands from Isabelle*)
   110 fun getCmds (toParentStr, fromParentStr, cmdList) = 
   111   let val thisLine = TextIO.inputLine fromParentStr 
   112   in
   113      trace("\nGot command from parent: " ^ thisLine);
   114      if thisLine = "End of calls\n" orelse thisLine = "" then cmdList
   115      else if thisLine = "Kill children\n"
   116      then (TextIO.output (toParentStr,thisLine); 
   117 	   TextIO.flushOut toParentStr;
   118 	   [("","Kill children",[],"")])
   119      else
   120        let val [prover,proverCmd,settingstr,probfile,_] = 
   121                    String.tokens (fn c => c = command_sep) thisLine
   122            val settings = String.tokens (fn c => c = setting_sep) settingstr
   123        in
   124            trace ("\nprover: " ^ prover ^ "  prover path: " ^ proverCmd ^
   125                   "\n  problem file: " ^ probfile);
   126            getCmds (toParentStr, fromParentStr, 
   127                     (prover, proverCmd, settings, probfile)::cmdList) 
   128        end
   129        handle Bind => 
   130           (trace "getCmds: command parsing failed!";
   131            getCmds (toParentStr, fromParentStr, cmdList))
   132   end
   133 	    
   134                                                                   
   135 (*  Get Io-descriptor for polling of an input stream          *)
   136 fun getInIoDesc someInstr = 
   137     let val (rd, buf) = TextIO.StreamIO.getReader(TextIO.getInstream someInstr)
   138         val _ = TextIO.output (TextIO.stdOut, buf)
   139         val ioDesc = 
   140 	    case rd
   141 	      of TextPrimIO.RD{ioDesc = SOME iod, ...} =>SOME iod
   142 	       | _ => NONE
   143      in (* since getting the reader will have terminated the stream, we need
   144 	 * to build a new stream. *)
   145 	TextIO.setInstream(someInstr, TextIO.StreamIO.mkInstream(rd, buf));
   146 	ioDesc
   147     end
   148 
   149 
   150 (*************************************)
   151 (*  Set up a Watcher Process         *)
   152 (*************************************)
   153 
   154 fun killChild proc = (Unix.kill(proc, Posix.Signal.kill); Unix.reap proc);
   155 
   156 fun killChildren procs = List.app (ignore o killChild) procs;
   157 
   158  (* take an instream and poll its underlying reader for input *)
   159  fun pollParentInput (fromParentIOD, fromParentStr, toParentStr) = 
   160    case OS.IO.pollDesc fromParentIOD of
   161       SOME pd =>
   162 	 (case OS.IO.poll ([OS.IO.pollIn pd], SOME (Time.fromSeconds 2)) of
   163 	      [] => NONE
   164 	    | pd''::_ => if OS.IO.isIn pd''
   165 	 	         then SOME (getCmds (toParentStr, fromParentStr, []))
   166 	 	         else NONE)
   167    | NONE => NONE;
   168 
   169 (*get the number of the subgoal from the filename: the last digit string*)
   170 fun number_from_filename s =
   171   case String.tokens (not o Char.isDigit) s of
   172       [] => (trace ("\nWatcher could not read subgoal nunber! " ^ s); 
   173              raise ERROR)
   174     | numbers => valOf (Int.fromString (List.last numbers));
   175 
   176 fun setupWatcher (thm,clause_arr) = 
   177   let
   178     val p1 = Posix.IO.pipe()   (*pipes for communication between parent and watcher*)
   179     val p2 = Posix.IO.pipe()
   180     fun closep () = 
   181 	 (Posix.IO.close (#outfd p1); Posix.IO.close (#infd p1);
   182 	  Posix.IO.close (#outfd p2); Posix.IO.close (#infd p2))
   183     (****** fork a watcher process and get it set up and going ******)
   184     fun startWatcher procList =
   185      (case  Posix.Process.fork() 
   186       of SOME pid => pid (* parent - i.e. main Isabelle process *)
   187        | NONE => let                (* child - i.e. watcher  *)
   188 	  val oldchildin = #infd p1  
   189 	  val fromParent = Posix.FileSys.wordToFD 0w0
   190 	  val oldchildout = #outfd p2
   191 	  val toParent = Posix.FileSys.wordToFD 0w1
   192 	  val fromParentIOD = Posix.FileSys.fdToIOD fromParent
   193 	  val fromParentStr = openInFD ("_exec_in_parent", fromParent)
   194 	  val toParentStr = openOutFD ("_exec_out_parent", toParent)
   195 	  val pid = Posix.ProcEnv.getpid()
   196 	  val () = Posix.ProcEnv.setpgid {pid = SOME pid, pgid = SOME pid}
   197                    (*set process group id: allows killing all children*)
   198 	  val () = trace "\nsubgoals forked to startWatcher"
   199 	  fun pollChild fromStr = 
   200 	     case getInIoDesc fromStr of
   201 	       SOME iod => 
   202 		 (case OS.IO.pollDesc iod of
   203 		    SOME pd =>
   204 			let val pd' = OS.IO.pollIn pd in
   205 			  case OS.IO.poll ([pd'], SOME (Time.fromSeconds 2)) of
   206 			      [] => false
   207 			    | pd''::_ => OS.IO.isIn pd''
   208 			end
   209 		   | NONE => false)
   210 	     | NONE => false
   211 	  (* Check all ATP processes currently running for output                 *)
   212 	  fun checkChildren ([], toParentStr) = []  (* no children to check *)
   213 	  |   checkChildren (childProc::otherChildren, toParentStr) = 
   214 	       let val _ = trace ("\nIn check child, length of queue:"^
   215 			          Int.toString (length (childProc::otherChildren)))
   216 		   val {prover, file, proc_handle, instr=childIn, ...} : cmdproc =
   217 		       childProc
   218 		   val _ = trace ("\nprobfile = " ^ file)
   219 		   val sgno = number_from_filename file
   220 		   val ppid = Posix.ProcEnv.getppid()
   221 	       in 
   222 		 if pollChild childIn
   223 		 then (* check here for prover label on child*)
   224 		   let val _ = trace ("\nInput available from child: " ^ file)
   225 		       val childDone = (case prover of
   226 			   "vampire" => AtpCommunication.checkVampProofFound
   227 			        (childIn, toParentStr, ppid, file, clause_arr)  
   228 		         | "E" => AtpCommunication.checkEProofFound
   229 		                (childIn, toParentStr, ppid, file, clause_arr)             
   230 			 | "spass" => AtpCommunication.checkSpassProofFound
   231 			        (childIn, toParentStr, ppid, file, thm, sgno,clause_arr)  
   232 			 | _ => (trace ("\nBad prover! " ^ prover); true) )
   233 		    in
   234 		     if childDone (*child has found a proof and transferred it*)
   235 		     then (*Remove this child and go on to check others*)
   236 		          (Unix.reap proc_handle; OS.FileSys.remove file;
   237 			   checkChildren(otherChildren, toParentStr))
   238 		     else (* Keep this child and go on to check others  *)
   239 		       childProc :: checkChildren (otherChildren, toParentStr)
   240 		  end
   241 		else (trace "\nNo child output";
   242 		      childProc::(checkChildren (otherChildren, toParentStr)))
   243 	       end
   244 	
   245 	(* call resolution processes                                        *)
   246 	(* settings should be a list of strings  ["-t300", "-m100000"]    *)
   247 	fun execCmds [] procList = procList
   248 	|   execCmds ((prover,proverCmd,settings,file)::cmds) procList  =
   249 	      let val _ = trace ("\nAbout to execute command: " ^ proverCmd ^ " " ^ file)
   250 	          val childhandle:(TextIO.instream,TextIO.outstream) Unix.proc  = 
   251 		       Unix.execute(proverCmd, settings@[file])
   252 		  val (instr, outstr) = Unix.streamsOf childhandle
   253 		  val newProcList = {prover = prover,
   254 					    file = file,
   255 					    proc_handle = childhandle,
   256 					    instr = instr,
   257 					    outstr = outstr} :: procList
   258      		  val _ = trace ("\nFinished at " ^
   259 			         Date.toString(Date.fromTimeLocal(Time.now())))
   260 	      in execCmds cmds newProcList end
   261 
   262          (******** Watcher Loop ********)
   263          val limit = ref 200;  (*don't let it run forever*)
   264 
   265 	 fun keepWatching procList = 
   266 	   let val _ = trace ("\npollParentInput. Limit = " ^ Int.toString (!limit) ^ 
   267 			      "  length(procList) = " ^ Int.toString (length procList));
   268 	       val cmdsFromIsa = pollParentInput 
   269 				  (fromParentIOD, fromParentStr, toParentStr)
   270 	   in 
   271 	     OS.Process.sleep (Time.fromMilliseconds 100);
   272 	     limit := !limit - 1;
   273 	     if !limit < 0 
   274 	     then 
   275 	      (trace "\nTimeout: Killing proof processes";
   276 	       TextIO.output(toParentStr, "Timeout: Killing proof processes!\n");
   277 	       TextIO.flushOut toParentStr;
   278 	       killChildren (map #proc_handle procList);
   279 	       Posix.Process.exit 0w0)
   280 	     else 
   281 	       case cmdsFromIsa of
   282 		 SOME [(_,"Kill children",_,_)] => 
   283 		   let val child_handles = map #proc_handle procList 
   284 		   in  trace "\nReceived command to kill children";
   285 		       killChildren child_handles; keepWatching [] 
   286 		   end
   287 	       | SOME cmds => (*  deal with commands from Isabelle process *)
   288 		   if length procList < 40
   289 		   then                        (* Execute locally  *)
   290 		     let 
   291 		       val _ = trace("\nCommands from parent: " ^ 
   292 		                     Int.toString(length cmds))
   293 		       val newProcList = execCmds cmds procList
   294 		       val newProcList' = checkChildren (newProcList, toParentStr) 
   295 		     in
   296 		       trace "\nCommands executed"; keepWatching newProcList'
   297 		     end
   298 		   else  (* Execute remotely [FIXME: NOT REALLY]  *)
   299 		     let 
   300 		       val newProcList = execCmds cmds procList
   301 		       val newProcList' = checkChildren (newProcList, toParentStr) 
   302 		     in keepWatching newProcList' end
   303 	       | NONE => (* No new input from Isabelle *)
   304 		   let val newProcList = checkChildren (procList, toParentStr)
   305 		   in trace "\nNothing from parent, still watching"; 
   306 		      keepWatching newProcList
   307 		   end
   308 	   end
   309 	   handle exn => (*FIXME: exn handler is too general!*)
   310 	     (trace ("\nkeepWatching: In exception handler: " ^ Toplevel.exn_message exn);
   311 	      keepWatching procList);
   312 	 in
   313 	   (*** Sort out pipes ********)
   314 	   Posix.IO.close (#outfd p1);  Posix.IO.close (#infd p2);
   315 	   Posix.IO.dup2{old = oldchildin, new = fromParent};
   316 	   Posix.IO.close oldchildin;
   317 	   Posix.IO.dup2{old = oldchildout, new = toParent};
   318 	   Posix.IO.close oldchildout;
   319 	   keepWatching (procList)
   320 	 end);   (* end case *)
   321 
   322     val _ = TextIO.flushOut TextIO.stdOut
   323     val pid = startWatcher []
   324     (* communication streams to watcher*)
   325     val instr = openInFD ("_exec_in", #infd p2)
   326     val outstr = openOutFD ("_exec_out", #outfd p1)
   327   in
   328    (* close the child-side fds*)
   329     Posix.IO.close (#outfd p2);  Posix.IO.close (#infd p1);
   330     (* set the fds close on exec *)
   331     Posix.IO.setfd (#infd p2, Posix.IO.FD.flags [Posix.IO.FD.cloexec]);
   332     Posix.IO.setfd (#outfd p1, Posix.IO.FD.flags [Posix.IO.FD.cloexec]);
   333     {pid = pid, instr = instr, outstr = outstr}
   334   end;
   335 
   336 
   337 
   338 (**********************************************************)
   339 (* Start a watcher and set up signal handlers             *)
   340 (**********************************************************)
   341 
   342 fun reapAll s = (*Signal handler to tidy away dead processes*)
   343      (case Posix.Process.waitpid_nh(Posix.Process.W_ANY_CHILD, []) of
   344 	  SOME _ => reapAll s | NONE => ()) 
   345      handle OS.SysErr _ => ()
   346 
   347 (*FIXME: does the main process need something like this?
   348     IsaSignal.signal (IsaSignal.chld, IsaSignal.SIG_HANDLE reap)??*)
   349 
   350 fun killWatcher pid = 
   351   (goals_being_watched := 0;
   352    Posix.Process.kill(Posix.Process.K_GROUP pid, Posix.Signal.kill);
   353    reapAll());
   354 
   355 fun reapWatcher(pid, instr, outstr) = ignore
   356   (TextIO.closeIn instr; TextIO.closeOut outstr;
   357    Posix.Process.waitpid(Posix.Process.W_CHILD pid, []))
   358   handle OS.SysErr _ => ()
   359 
   360 fun string_of_subgoal th i =
   361     string_of_cterm (List.nth(cprems_of th, i-1))
   362     handle Subscript => "Subgoal number out of range!"
   363 
   364 fun prems_string_of th = space_implode "\n" (map string_of_cterm (cprems_of th))
   365 
   366 fun createWatcher (th, clause_arr) =
   367  let val {pid=childpid, instr=childin, outstr=childout} = setupWatcher (th,clause_arr)
   368      fun decr_watched() =
   369 	  (goals_being_watched := !goals_being_watched - 1;
   370 	   if !goals_being_watched = 0
   371 	   then 
   372 	     (debug ("\nReaping a watcher, childpid = "^
   373 		     Int.toString (ResLib.intOfPid childpid));
   374 	      killWatcher childpid (*???; reapWatcher (childpid, childin, childout)*) )
   375 	    else ())
   376      val _ = debug ("subgoals forked to createWatcher: "^ prems_string_of th);
   377      fun proofHandler n = 
   378        let val outcome = TextIO.inputLine childin
   379 	   val probfile = TextIO.inputLine childin
   380 	   val sgno = number_from_filename probfile
   381 	   val text = string_of_subgoal th sgno
   382 	   val _ = debug ("In signal handler. outcome = \"" ^ outcome ^ 
   383 		        "\"\nprobfile = " ^ probfile ^
   384 		        "\nGoals being watched: "^  Int.toString (!goals_being_watched))
   385        in 
   386 	 if String.isPrefix "[" outcome (*indicates a proof reconstruction*)
   387 	 then (priority (Recon_Transfer.apply_res_thm outcome);
   388 	       decr_watched())
   389 	 else if String.isPrefix "Invalid" outcome
   390 	 then (priority ("Subgoal is not provable:\n" ^ text);
   391 	       decr_watched())
   392 	 else if String.isPrefix "Failure" outcome
   393 	 then (priority ("Proof attempt failed:\n" ^ text);
   394 	       decr_watched()) 
   395 	(* print out a list of rules used from clasimpset*)
   396 	 else if String.isPrefix "Success" outcome
   397 	 then (priority (outcome ^ text);
   398 	       decr_watched())
   399 	  (* if proof translation failed *)
   400 	 else if String.isPrefix "Translation failed" outcome
   401 	 then (priority (outcome ^ text);
   402 	       decr_watched())
   403 	 else (priority "System error in proof handler";
   404 	       decr_watched())
   405        end
   406  in IsaSignal.signal (IsaSignal.usr2, IsaSignal.SIG_HANDLE proofHandler);
   407     (childin, childout, childpid)
   408   end
   409 
   410 end (* structure Watcher *)