--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Concurrent/standard_thread.scala Tue Nov 03 13:54:34 2015 +0100
@@ -0,0 +1,110 @@
+/* Title: Pure/Concurrent/standard_thread.scala
+ Module: PIDE
+ Author: Makarius
+
+Standard thread operations.
+*/
+
+package isabelle
+
+
+import java.lang.Thread
+import java.util.concurrent.{Callable, Future => JFuture, ThreadPoolExecutor,
+ TimeUnit, LinkedBlockingQueue}
+
+
+object Standard_Thread
+{
+ /* plain thread */
+
+ def fork(name: String = "", daemon: Boolean = false)(body: => Unit): Thread =
+ {
+ val thread =
+ if (name == null || name == "") new Thread() { override def run = body }
+ else new Thread(name) { override def run = body }
+ thread.setDaemon(daemon)
+ thread.start
+ thread
+ }
+
+
+ /* future result via thread */
+
+ def future[A](name: String = "", daemon: Boolean = false)(body: => A): (Thread, Future[A]) =
+ {
+ val result = Future.promise[A]
+ val thread = fork(name, daemon) { result.fulfill_result(Exn.capture(body)) }
+ (thread, result)
+ }
+
+
+ /* thread pool */
+
+ lazy val default_pool =
+ {
+ val m = Properties.Value.Int.unapply(System.getProperty("isabelle.threads", "0")) getOrElse 0
+ val n = if (m > 0) m else (Runtime.getRuntime.availableProcessors max 1) min 8
+ new ThreadPoolExecutor(n, n, 2500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])
+ }
+
+ def submit_task[A](body: => A): JFuture[A] =
+ default_pool.submit(new Callable[A] { def call = body })
+
+
+ /* delayed events */
+
+ final class Delay private [Standard_Thread](
+ first: Boolean, delay: => Time, cancel: () => Unit, event: => Unit)
+ {
+ private var running: Option[Event_Timer.Request] = None
+
+ private def run: Unit =
+ {
+ val do_run = synchronized {
+ if (running.isDefined) { running = None; true } else false
+ }
+ if (do_run) event
+ }
+
+ def invoke(): Unit = synchronized
+ {
+ val new_run =
+ running match {
+ case Some(request) => if (first) false else { request.cancel; cancel(); true }
+ case None => cancel(); true
+ }
+ if (new_run)
+ running = Some(Event_Timer.request(Time.now() + delay)(run))
+ }
+
+ def revoke(): Unit = synchronized
+ {
+ running match {
+ case Some(request) => request.cancel; cancel(); running = None
+ case None => cancel()
+ }
+ }
+
+ def postpone(alt_delay: Time): Unit = synchronized
+ {
+ running match {
+ case Some(request) =>
+ val alt_time = Time.now() + alt_delay
+ if (request.time < alt_time && request.cancel) {
+ cancel()
+ running = Some(Event_Timer.request(alt_time)(run))
+ }
+ else cancel()
+ case None => cancel()
+ }
+ }
+ }
+
+ // delayed event after first invocation
+ def delay_first(delay: => Time, cancel: () => Unit = () => ())(event: => Unit): Delay =
+ new Delay(true, delay, cancel, event)
+
+ // delayed event after last invocation
+ def delay_last(delay: => Time, cancel: () => Unit = () => ())(event: => Unit): Delay =
+ new Delay(false, delay, cancel, event)
+}