summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorXavier Leroy <xavier.leroy@inria.fr>1996-04-03 10:02:34 +0000
committerXavier Leroy <xavier.leroy@inria.fr>1996-04-03 10:02:34 +0000
commit173cb4075f3f04fce2d1fc486c4ad6d1fa22cdce (patch)
tree5d53f20fb2afd24b16786c9cedff0d448540bd72
parent13d82083c2a69ed2bd616299c4ab35798675f777 (diff)
Retour en arriere sur l'emploi des threads POSIX. On revient a
l'ancienne implementation. git-svn-id: http://caml.inria.fr/svn/ocaml/trunk@732 f963ae5c-01c2-4b8c-9fe0-0dff7051ff02
-rw-r--r--otherlibs/threads/.depend22
-rw-r--r--otherlibs/threads/Makefile6
-rw-r--r--otherlibs/threads/condition.ml26
-rw-r--r--otherlibs/threads/condition.mli8
-rw-r--r--otherlibs/threads/mutex.ml29
-rw-r--r--otherlibs/threads/mutex.mli8
-rw-r--r--otherlibs/threads/scheduler.c454
-rw-r--r--otherlibs/threads/thread.ml58
-rw-r--r--otherlibs/threads/thread.mli72
-rw-r--r--otherlibs/threads/threadIO.ml39
-rw-r--r--otherlibs/threads/threadIO.mli46
-rw-r--r--otherlibs/threads/threadstubs.c452
12 files changed, 703 insertions, 517 deletions
diff --git a/otherlibs/threads/.depend b/otherlibs/threads/.depend
index 803c7d4bd..ddba64d86 100644
--- a/otherlibs/threads/.depend
+++ b/otherlibs/threads/.depend
@@ -1,18 +1,16 @@
-scheduler.o: scheduler.c ../../byterun/config.h \
+threadstubs.o: threadstubs.c ../../byterun/alloc.h \
+ ../../byterun/misc.h ../../byterun/config.h \
../../byterun/../config/m.h ../../byterun/../config/s.h \
- ../../byterun/misc.h ../../byterun/mlvalues.h ../../byterun/stacks.h \
- ../../byterun/memory.h ../../byterun/gc.h ../../byterun/major_gc.h \
- ../../byterun/freelist.h ../../byterun/minor_gc.h \
- ../../byterun/fail.h ../../byterun/io.h ../../byterun/roots.h \
- ../../byterun/alloc.h
+ ../../byterun/mlvalues.h ../../byterun/roots.h \
+ ../../byterun/signals.h ../../byterun/stacks.h ../../byterun/memory.h \
+ ../../byterun/gc.h ../../byterun/major_gc.h ../../byterun/freelist.h \
+ ../../byterun/minor_gc.h
condition.cmi: mutex.cmi
-condition.cmo: mutex.cmi thread.cmi condition.cmi
-condition.cmx: mutex.cmx thread.cmx condition.cmi
+condition.cmo: mutex.cmi condition.cmi
+condition.cmx: mutex.cmx condition.cmi
event.cmo: condition.cmi mutex.cmi event.cmi
event.cmx: condition.cmx mutex.cmx event.cmi
-mutex.cmo: thread.cmi mutex.cmi
-mutex.cmx: thread.cmx mutex.cmi
+mutex.cmo: mutex.cmi
+mutex.cmx: mutex.cmi
thread.cmo: thread.cmi
thread.cmx: thread.cmi
-threadIO.cmo: thread.cmi threadIO.cmi
-threadIO.cmx: thread.cmx threadIO.cmi
diff --git a/otherlibs/threads/Makefile b/otherlibs/threads/Makefile
index e10197af4..c0fad43ec 100644
--- a/otherlibs/threads/Makefile
+++ b/otherlibs/threads/Makefile
@@ -2,11 +2,11 @@ include ../../config/Makefile
# Compilation options
CC=$(BYTECC)
-CFLAGS=-I$(PTHREADS_INCLUDES) -I../../byterun -O $(BYTECCCOMPOPTS)
+CFLAGS=-I../../byterun -O $(BYTECCCOMPOPTS)
CAMLC=../../boot/cslrun ../../boot/cslc -I ../../boot -I ../unix
-C_OBJS=threadstubs.o
-CAML_OBJS=thread.cmo mutex.cmo condition.cmo event.cmo
+C_OBJS=scheduler.o
+CAML_OBJS=thread.cmo threadIO.cmo mutex.cmo condition.cmo event.cmo
all: libthreads.a threads.cma
diff --git a/otherlibs/threads/condition.ml b/otherlibs/threads/condition.ml
index 5fd59a41c..f0ed4e91b 100644
--- a/otherlibs/threads/condition.ml
+++ b/otherlibs/threads/condition.ml
@@ -11,8 +11,24 @@
(* $Id$ *)
-type t
-external new: unit -> t = "csl_condition_new"
-external wait: t -> Mutex.t -> unit = "csl_condition_wait"
-external signal: t -> unit = "csl_condition_signal"
-external broadcast: t -> unit = "csl_condition_broadcast"
+type t = { mutable waiting: Thread.t list }
+
+let new () = { waiting = [] }
+
+let wait cond mut =
+ Thread.critical_section := true;
+ Mutex.unlock mut;
+ cond.waiting <- Thread.self() :: cond.waiting;
+ Thread.sleep();
+ Mutex.lock mut
+
+let signal cond =
+ match cond.waiting with (* atomic *)
+ [] -> ()
+ | th :: rem -> cond.waiting <- rem (* atomic *); Thread.wakeup th
+
+let broadcast cond =
+ let w = cond.waiting in (* atomic *)
+ cond.waiting <- []; (* atomic *)
+ List.iter Thread.wakeup w
+
diff --git a/otherlibs/threads/condition.mli b/otherlibs/threads/condition.mli
index 3afb1bacc..2bb851395 100644
--- a/otherlibs/threads/condition.mli
+++ b/otherlibs/threads/condition.mli
@@ -32,16 +32,16 @@
type t
(* The type of condition variables. *)
-external new: unit -> t = "csl_condition_new"
+val new: unit -> t
(* Return a new condition variable. *)
-external wait: t -> Mutex.t -> unit = "csl_condition_wait"
+val wait: t -> Mutex.t -> unit
(* [wait c m] atomically unlocks the mutex [m] and suspends the
calling process on the condition variable [c]. The process will
restart after the condition variable [c] has been signalled.
The mutex [m] is locked again before [wait] returns. *)
-external signal: t -> unit = "csl_condition_signal"
+val signal: t -> unit
(* [signal c] restarts one of the processes waiting on the
condition variable [c]. *)
-external broadcast: t -> unit = "csl_condition_broadcast"
+val broadcast: t -> unit
(* [broadcast c] restarts all processes waiting on the
condition variable [c]. *)
diff --git a/otherlibs/threads/mutex.ml b/otherlibs/threads/mutex.ml
index c9eb0f820..551bc5706 100644
--- a/otherlibs/threads/mutex.ml
+++ b/otherlibs/threads/mutex.ml
@@ -11,8 +11,27 @@
(* $Id$ *)
-type t
-external new: unit -> t = "csl_mutex_new"
-external lock: t -> unit = "csl_mutex_lock"
-external try_lock: t -> bool = "csl_mutex_try_lock"
-external unlock: t -> unit = "csl_mutex_unlock"
+type t = { mutable locked: bool; mutable waiting: Thread.t list }
+
+let new () = { locked = false; waiting = [] }
+
+let rec lock m =
+ if m.locked then begin (* test and set atomic *)
+ Thread.critical_section := true;
+ m.waiting <- Thread.self() :: m.waiting;
+ Thread.sleep();
+ lock m
+ end else begin
+ m.locked <- true (* test and set atomic *)
+ end
+
+let try_lock m = (* test and set atomic *)
+ if m.locked then false else begin m.locked <- true; true end
+
+let unlock m =
+ (* Don't play with Thread.critical_section here because of Condition.wait *)
+ let w = m.waiting in (* atomic *)
+ m.waiting <- []; (* atomic *)
+ m.locked <- false; (* atomic *)
+ List.iter Thread.wakeup w
+
diff --git a/otherlibs/threads/mutex.mli b/otherlibs/threads/mutex.mli
index 847f8ae7b..422040a82 100644
--- a/otherlibs/threads/mutex.mli
+++ b/otherlibs/threads/mutex.mli
@@ -26,18 +26,18 @@
type t
(* The type of mutexes. *)
-external new: unit -> t = "csl_mutex_new"
+val new: unit -> t
(* Return a new mutex. *)
-external lock: t -> unit = "csl_mutex_lock"
+val lock: t -> unit
(* Lock the given mutex. Only one thread can have the mutex locked
at any time. A thread that attempts to lock a mutex already locked
by another thread will suspend until the other thread unlocks
the mutex. *)
-external try_lock: t -> bool = "csl_mutex_try_lock"
+val try_lock: t -> bool
(* Same as [try_lock], but does not suspend the calling thread if
the mutex is already locked: just return [false] immediately
in that case. If the mutex is unlocked, lock it and
return [true]. *)
-external unlock: t -> unit = "csl_mutex_unlock"
+val unlock: t -> unit
(* Unlock the given mutex. Other threads suspended trying to lock
the mutex will restart. *)
diff --git a/otherlibs/threads/scheduler.c b/otherlibs/threads/scheduler.c
new file mode 100644
index 000000000..be90f1972
--- /dev/null
+++ b/otherlibs/threads/scheduler.c
@@ -0,0 +1,454 @@
+#include "config.h"
+#include "misc.h"
+#include "mlvalues.h"
+#include "stacks.h"
+#include "fail.h"
+#include "io.h"
+#include "roots.h"
+#include "alloc.h"
+#include "memory.h"
+
+#if defined(HAS_SELECT) && defined(HAS_SETITIMER) && defined(HAS_GETTIMEOFDAY)
+#else
+#include "Cannot compile libthreads, system calls missing"
+#endif
+
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#ifdef HAS_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+#ifndef FD_ISSET
+typedef int fd_set;
+#define FD_SETSIZE (sizeof(int) * 8)
+#define FD_SET(fd,fds) (*(fds) |= 1 << (fd))
+#define FD_CLR(fd,fds) (*(fds) &= ~(1 << (fd)))
+#define FD_ISSET(fd,fds) (*(fds) & (1 << (fd)))
+#define FD_ZERO(fds) (*(fds) = 0)
+#endif
+
+/* Configuration */
+
+/* Initial size of stack when a thread is created (4 Ko) */
+#define Thread_stack_size (Stack_size / 4)
+
+/* Max computation time before rescheduling, in microseconds (50ms) */
+#define Thread_timeout 50000
+
+/* The thread descriptors */
+
+struct thread_struct {
+ value ident; /* Unique id (for equality comparisons) */
+ struct thread_struct * next; /* Double linking of threads */
+ struct thread_struct * prev;
+ value * stack_low; /* The execution stack for this thread */
+ value * stack_high;
+ value * stack_threshold;
+ value * sp;
+ value * trapsp;
+ value status; /* RUNNABLE, KILLED. etc (see below) */
+ value fd; /* File descriptor on which this thread is waiting */
+ value delay; /* Time until which this thread is blocked */
+ value joining; /* Thread we're trying to join */
+};
+
+typedef struct thread_struct * thread_t;
+
+#define RUNNABLE Val_int(0)
+#define KILLED Val_int(1)
+#define SUSPENDED Val_int(2)
+#define BLOCKED_READ Val_int(3)
+#define BLOCKED_WRITE Val_int(4)
+#define BLOCKED_DELAY Val_int(5)
+#define BLOCKED_JOIN Val_int(6)
+
+#define NO_FD Val_unit
+#define NO_DELAY Val_unit
+#define NO_JOINING Val_unit
+
+#define DELAY_INFTY 1E30 /* +infty, for this purpose */
+
+/* The thread currently active */
+static thread_t curr_thread = NULL;
+/* Identifier for next thread creation */
+static value next_ident = Val_int(0);
+
+#define Assign(dst,src) modify((value *)&(dst), (value)(src))
+
+/* Scan the stacks of the other threads */
+
+static void (*prev_scan_roots_hook) P((scanning_action));
+
+static void thread_scan_roots(action)
+ scanning_action action;
+{
+ thread_t th;
+ register value * sp;
+ /* Scan all active descriptors */
+ (*action)((value) curr_thread, (value *) &curr_thread);
+ /* Don't scan curr_thread->sp, this has already been done */
+ for (th = curr_thread->next; th != curr_thread; th = th->next) {
+ (*action)((value) th, (value *) &th);
+ for (sp = th->sp; sp < th->stack_high; sp++) {
+ (*action)(*sp, sp);
+ }
+ }
+ /* Hook */
+ if (prev_scan_roots_hook != NULL) (*prev_scan_roots_hook)(action);
+}
+
+/* Initialize the thread machinery */
+
+value thread_initialize(unit) /* ML */
+ value unit;
+{
+ struct itimerval timer;
+ /* Create a descriptor for the current thread */
+ curr_thread =
+ (thread_t) alloc_shr(sizeof(struct thread_struct) / sizeof(value), 0);
+ curr_thread->ident = next_ident;
+ next_ident = Val_int(Int_val(next_ident) + 1);
+ curr_thread->next = curr_thread;
+ curr_thread->prev = curr_thread;
+ curr_thread->stack_low = stack_low;
+ curr_thread->stack_high = stack_high;
+ curr_thread->stack_threshold = stack_threshold;
+ curr_thread->sp = extern_sp;
+ curr_thread->trapsp = trapsp;
+ curr_thread->status = RUNNABLE;
+ curr_thread->fd = NO_FD;
+ curr_thread->delay = NO_DELAY;
+ curr_thread->joining = NO_JOINING;
+ /* Initialize GC */
+ prev_scan_roots_hook = scan_roots_hook;
+ scan_roots_hook = thread_scan_roots;
+ /* Initialize interval timer */
+ timer.it_interval.tv_sec = 0;
+ timer.it_interval.tv_usec = Thread_timeout;
+ timer.it_value = timer.it_interval;
+ setitimer(ITIMER_VIRTUAL, &timer, NULL);
+ return Val_unit;
+}
+
+/* Create a thread */
+
+value thread_new(clos) /* ML */
+ value clos;
+{
+ thread_t th;
+ /* Allocate the thread and its stack */
+ Push_roots(r, 1);
+ r[0] = clos;
+ th = (thread_t) alloc_shr(sizeof(struct thread_struct) / sizeof(value), 0);
+ clos = r[0];
+ Pop_roots();
+ th->ident = next_ident;
+ next_ident = Val_int(Int_val(next_ident) + 1);
+ th->stack_low = (value *) stat_alloc(Thread_stack_size);
+ th->stack_high = th->stack_low + Thread_stack_size / sizeof(value);
+ th->stack_threshold = th->stack_low + Stack_threshold / sizeof(value);
+ th->sp = th->stack_high;
+ th->trapsp = th->stack_high;
+ /* Set up a return frame that pretends we're applying clos to ().
+ This way, when this thread is activated, the RETURN will take us
+ to the entry point of the closure. */
+ th->sp -= 4;
+ th->sp[0] = Val_unit;
+ th->sp[1] = (value) Code_val(clos);
+ th->sp[2] = clos;
+ th->sp[3] = Val_long(0); /* no extra args */
+ /* Fake a C call frame */
+ th->sp--;
+ th->sp[0] = Val_unit; /* a dummy environment */
+ /* The thread is initially runnable */
+ th->status = RUNNABLE;
+ th->fd = NO_FD;
+ th->delay = NO_DELAY;
+ th->joining = NO_JOINING;
+ /* Insert thread in doubly linked list of threads */
+ th->prev = curr_thread->prev;
+ th->next = curr_thread;
+ Assign(curr_thread->prev->next, th);
+ Assign(curr_thread->prev, th);
+ /* Return thread */
+ return (value) th;
+}
+
+/* Return the thread identifier */
+
+value thread_id(th) /* ML */
+ value th;
+{
+ return ((struct thread_struct *)th)->ident;
+}
+
+/* Return the current time as a floating-point number */
+
+static double timeofday()
+{
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ return (double) tv.tv_sec + (double) tv.tv_usec * 1e-6;
+}
+
+/* Find a runnable thread and activate it */
+
+#define FOREACH_THREAD(x) x = curr_thread; do { x = x->next;
+#define END_FOREACH(x) } while (x != curr_thread)
+
+static void schedule_thread()
+{
+ thread_t run_thread, th;
+ fd_set readfds, writefds;
+ double delay, now;
+ int need_select;
+
+ /* Save the status of the current thread */
+ curr_thread->stack_low = stack_low;
+ curr_thread->stack_high = stack_high;
+ curr_thread->stack_threshold = stack_threshold;
+ curr_thread->sp = extern_sp;
+ curr_thread->trapsp = trapsp;
+
+try_again:
+ /* Build fdsets and delay for select.
+ See if some join operation succeeded. */
+ FD_ZERO(&readfds);
+ FD_ZERO(&writefds);
+ delay = DELAY_INFTY;
+ now = -1.0;
+ need_select = 0;
+
+ FOREACH_THREAD(th)
+ switch (th->status) {
+ case BLOCKED_READ:
+ FD_SET(Int_val(th->fd), &readfds);
+ need_select = 1;
+ break;
+ case BLOCKED_WRITE:
+ FD_SET(Int_val(th->fd), &writefds);
+ need_select = 1;
+ break;
+ case BLOCKED_DELAY:
+ { double th_delay = Double_val(th->delay);
+ if (now < 0.0) now = timeofday();
+ if (th_delay < now) {
+ th->status = RUNNABLE;
+ Assign(th->delay, NO_DELAY);
+ } else {
+ if (th_delay < delay) delay = th_delay;
+ }
+ break;
+ }
+ case BLOCKED_JOIN:
+ if (((thread_t)(th->joining))->status == KILLED) {
+ th->status = RUNNABLE;
+ Assign(th->joining, NO_JOINING);
+ }
+ break;
+ }
+ END_FOREACH(th);
+
+ /* Find if a thread is runnable. */
+ run_thread = NULL;
+ FOREACH_THREAD(th)
+ if (th->status == RUNNABLE) { run_thread = th; break; }
+ END_FOREACH(th);
+
+ /* Do the select if needed */
+ if (need_select || run_thread == NULL) {
+ struct timeval delay_tv, * delay_ptr;
+ int retcode;
+ /* Convert delay to a timeval */
+ /* If a thread is runnable, just poll */
+ if (run_thread != NULL) {
+ delay_tv.tv_sec = 0;
+ delay_tv.tv_usec = 0;
+ delay_ptr = &delay_tv;
+ }
+ else if (delay == DELAY_INFTY) {
+ delay_ptr = NULL;
+ } else {
+ delay = delay - now;
+ delay_tv.tv_sec = (unsigned int) delay;
+ delay_tv.tv_usec = (delay - (double) delay_tv.tv_sec) * 1E6;
+ delay_ptr = &delay_tv;
+ }
+ retcode = select(FD_SETSIZE, &readfds, &writefds, NULL, delay_ptr);
+ if (retcode > 0) {
+ /* Some descriptors are ready.
+ Mark the corresponding threads runnable. */
+ FOREACH_THREAD(th)
+ switch (th->status) {
+ case BLOCKED_READ:
+ if (FD_ISSET(Int_val(th->fd), &readfds)) {
+ /* Wake up only one thread per fd. */
+ FD_CLR(Int_val(th->fd), &readfds);
+ th->status = RUNNABLE;
+ th->fd = NO_FD;
+ if (run_thread == NULL) run_thread = th; /* Found one. */
+ }
+ break;
+ case BLOCKED_WRITE:
+ if (FD_ISSET(Int_val(th->fd), &writefds)) {
+ /* Wake up only one thread per fd. */
+ FD_CLR(Int_val(th->fd), &writefds);
+ th->status = RUNNABLE;
+ th->fd = NO_FD;
+ if (run_thread == NULL) run_thread = th; /* Found one. */
+ }
+ break;
+ }
+ END_FOREACH(th);
+ }
+ /* If we get here with run_thread still NULL, some of the delays
+ have expired. We go through the loop once more to make the
+ corresponding threads runnable. */
+ if (run_thread == NULL && delay != DELAY_INFTY) goto try_again;
+ }
+
+ /* If we haven't something to run at that point, we're in big trouble. */
+ if (run_thread == NULL) invalid_argument("Thread: deadlock");
+
+ /* Activate the thread */
+ curr_thread = run_thread;
+ stack_low = curr_thread->stack_low;
+ stack_high = curr_thread->stack_high;
+ stack_threshold = curr_thread->stack_threshold;
+ extern_sp = curr_thread->sp;
+ trapsp = curr_thread->trapsp;
+}
+
+/* Reschedule without suspending the current thread */
+
+value thread_yield(unit) /* ML */
+ value unit;
+{
+ schedule_thread();
+ return Val_unit;
+}
+
+/* Suspend the current thread */
+
+value thread_sleep(unit) /* ML */
+ value unit;
+{
+ curr_thread->status = SUSPENDED;
+ schedule_thread();
+ return Val_unit;
+}
+
+/* Suspend the current thread on a Unix file descriptor */
+
+value thread_wait_read(fd) /* ML */
+ value fd;
+{
+ curr_thread->status = BLOCKED_READ;
+ curr_thread->fd = fd;
+ schedule_thread();
+ return Val_unit;
+}
+
+value thread_wait_write(fd) /* ML */
+ value fd;
+{
+ curr_thread->status = BLOCKED_WRITE;
+ curr_thread->fd = fd;
+ schedule_thread();
+ return Val_unit;
+}
+
+/* Suspend the current thread on a buffered input channel */
+
+value thread_wait_inchan(vchan) /* ML */
+ value vchan;
+{
+ struct channel * chan = (struct channel *) vchan;
+ if (chan->curr < chan->max) return Val_unit;
+ curr_thread->status = BLOCKED_READ;
+ curr_thread->fd = Val_int(chan->fd);
+ schedule_thread();
+ return Val_unit;
+}
+
+/* Suspend the current thread for some time */
+
+value thread_wait_for(time) /* ML */
+ value time;
+{
+ double date = timeofday() + Double_val(time);
+ curr_thread->status = BLOCKED_DELAY;
+ Assign(curr_thread->delay, copy_double(date));
+ schedule_thread();
+ return Val_unit;
+}
+
+/* Suspend the current thread until another thread terminates */
+
+value thread_join(th) /* ML */
+ value th;
+{
+ if (((thread_t)th)->status == KILLED) return Val_unit;
+ curr_thread->status = BLOCKED_JOIN;
+ Assign(curr_thread->joining, th);
+ schedule_thread();
+ return Val_unit;
+}
+
+/* Reactivate another thread */
+
+value thread_wakeup(thread) /* ML */
+ value thread;
+{
+ thread_t th = (thread_t) thread;
+ switch (th->status) {
+ case SUSPENDED:
+ th->status = RUNNABLE;
+ break;
+ case KILLED:
+ failwith("Thread.wakeup: killed thread");
+ break;
+ default:
+ failwith("Thread.wakeup: thread was not suspended");
+ break;
+ }
+ return Val_unit;
+}
+
+/* Return the current thread */
+
+value thread_self(unit) /* ML */
+ value unit;
+{
+ return (value) curr_thread;
+}
+
+/* Kill a thread */
+
+value thread_kill(thread) /* ML */
+ value thread;
+{
+ thread_t th = (thread_t) thread;
+ /* Don't paint ourselves in a corner */
+ if (th == th->next) failwith("Thread.kill: cannot kill the last thread");
+ /* This thread is no longer waiting on anything */
+ th->status = KILLED;
+ Assign(th->delay, NO_DELAY);
+ Assign(th->joining, NO_JOINING);
+ /* If this is the current thread, activate another one */
+ if (th == curr_thread) schedule_thread();
+ /* Remove thread from the doubly-linked list */
+ Assign(th->prev->next, th->next);
+ Assign(th->next->prev, th->prev);
+ /* Free its resources */
+ stat_free((char *) th->stack_low);
+ th->stack_low = NULL;
+ th->stack_high = NULL;
+ th->stack_threshold = NULL;
+ th->sp = NULL;
+ th->trapsp = NULL;
+ return Val_unit;
+}
+
diff --git a/otherlibs/threads/thread.ml b/otherlibs/threads/thread.ml
index 2fa59ffa6..04161cca0 100644
--- a/otherlibs/threads/thread.ml
+++ b/otherlibs/threads/thread.ml
@@ -15,30 +15,62 @@
type t
-external thread_initialize : unit -> unit = "csl_thread_initialize"
-external thread_new : (unit -> unit) -> t = "csl_thread_new"
+let critical_section = ref false
-external yield : unit -> unit = "csl_thread_yield"
-external self : unit -> t = "csl_thread_self"
-external id : t -> int = "csl_thread_id"
-external exit : unit -> unit = "csl_thread_exit"
-external join : t -> unit = "csl_thread_join"
-external detach : t -> unit = "csl_thread_detach"
+(* It is mucho important that the primitives that reschedule are called
+ through an ML function call, not directly. That's because when such a
+ primitive returns, the bytecode interpreter is only semi-obedient:
+ it takes sp from the new thread, but keeps pc from the old thread.
+ But that's OK if all calls to rescheduling primitives are immediately
+ followed by a RETURN operation, which will restore the correct pc
+ from the stack. *)
-(* For new, make sure the function passed to thread_new never
- raises an exception. *)
+external thread_initialize : unit -> unit = "thread_initialize"
+external thread_new : (unit -> unit) -> t = "thread_new"
+external thread_yield : unit -> unit = "thread_yield"
+external thread_sleep : unit -> unit = "thread_sleep"
+external thread_wait_read : Unix.file_descr -> unit = "thread_wait_read"
+external thread_wait_write : Unix.file_descr -> unit =
+ "thread_wait_write"
+external thread_wait_inchan : in_channel -> unit = "thread_wait_inchan"
+external thread_join : t -> unit = "thread_join"
+external thread_delay : float -> unit = "thread_wait_for"
+external thread_wakeup : t -> unit = "thread_wakeup"
+external thread_self : unit -> t = "thread_self"
+external thread_kill : t -> unit = "thread_kill"
+
+external id : t -> int = "thread_id"
+
+(* In sleep() below, we rely on the fact that signals are detected
+ only at function applications and beginning of loops,
+ making all other operations atomic. *)
+
+let sleep () = critical_section := false; thread_sleep()
+let wait_read fd = thread_wait_read fd
+let wait_write fd = thread_wait_write fd
+let wait_inchan ic = thread_wait_inchan ic
+let delay duration = thread_delay duration
+let join th = thread_join th
+let wakeup pid = thread_wakeup pid
+let self () = thread_self()
+let kill pid = thread_kill pid
+let exit () = thread_kill(thread_self())
+
+(* For new, make sure the function passed to thread_new always terminates
+ by calling exit. *)
let new fn arg =
thread_new
(fun () ->
try
- Printexc.print fn arg; ()
+ Printexc.print fn arg; exit()
with x ->
- flush stdout; flush stderr)
+ flush stdout; flush stderr; exit())
(* Preemption *)
-let preempt signal = yield()
+let preempt signal =
+ if !critical_section then () else thread_yield()
(* Initialization of the scheduler *)
diff --git a/otherlibs/threads/thread.mli b/otherlibs/threads/thread.mli
index a236bc176..6b14a4a1a 100644
--- a/otherlibs/threads/thread.mli
+++ b/otherlibs/threads/thread.mli
@@ -29,31 +29,65 @@ val new : ('a -> 'b) -> 'a -> t
but not propagated back to the parent thread. Similarly, the
result of the application [funct arg] is discarded and not
directly accessible to the parent thread. *)
-external self : unit -> t = "csl_thread_self"
+val self : unit -> t
(* Return the thread currently executing. *)
-external id : t -> int = "csl_thread_id"
+external id : t -> int = "thread_id"
(* Return the identifier of the given thread. A thread identifier
is an integer that identifies uniquely the thread.
It can be used to build data structures indexed by threads. *)
-external exit : unit -> unit = "csl_thread_exit"
+val exit : unit -> unit
(* Terminate prematurely the currently executing thread. *)
+val kill : t -> unit
+ (* Terminate prematurely the thread whose handle is given. *)
-(** Thread synchronization *)
+(** Suspending threads *)
-external join : t -> unit = "csl_thread_join"
+val delay: float -> unit
+ (* [delay d] suspends the execution of the calling thread for
+ [d] seconds. The other program threads continue to run during
+ this time. *)
+val wait_inchan : in_channel -> unit
+ (* [wait_inchan ic] suspends the execution of the calling thread
+ until at least one character is available for reading on the
+ input channel [ic]. The other program threads continue to run
+ during this time. In contrast, calling an input function directly
+ on [ic] would block all threads in the program until data is
+ available on the channel. See the module [ThreadIO] for
+ higher-level input functions compatible with threads. *)
+val wait_read : Unix.file_descr -> unit
+val wait_write : Unix.file_descr -> unit
+ (* Similar to [wait_inchan], but operates on a file descriptor
+ from the [Unix] library instead of an input channel.
+ [wait_read] suspends the thread until at least one
+ character is available for reading; [wait_write] suspends the
+ thread until at least one character can be written without
+ blocking. *)
+val join : t -> unit
(* [join th] suspends the execution of the calling thread
until the thread [th] has terminated. *)
-external detach : t -> unit = "csl_thread_detach"
- (* [detach th] indicates that the thread [th] will never
- be joined, and that its resources can be freed as soon
- as it terminates. *)
-
-(** Thread scheduling *)
-
-external yield : unit -> unit = "csl_thread_yield"
- (* [yield()] suggests the scheduler that this is a good point
- to suspend the current thread and reactivate another one.
- This is just a hint: the scheduler preempts periodically
- long-running threads even if they never execute [yield()].
- Using [yield()] may improve the responsiveness of the program,
- though. *)
+
+(** Low-level thread synchronization primitives *)
+
+(* The following primitives provide the basis for implementing
+ synchronization functions between threads. Their direct use is
+ discouraged, as they are very low-level and prone to race conditions
+ and deadlocks. The modules [Mutex], [Condition] and [Event]
+ provide higher-level synchronization primitives. *)
+
+val critical_section: bool ref
+ (* Setting this reference to [true] deactivate thread preemption
+ (the timer interrupt that transfers control from thread to thread),
+ causing the current thread to run uninterrupted until
+ [critical_section] is reset to [false] or the current thread
+ explicitely relinquishes control using [sleep], [delay],
+ [wait_inchan] or [wait_descr]. *)
+val sleep : unit -> unit
+ (* Suspend the calling thread until another thread reactivates it
+ using [wakeup]. Just before suspending the thread,
+ [critical_section] is reset to [false]. Resetting
+ [critical_section] and suspending the calling thread is an
+ atomic operation. *)
+val wakeup : t -> unit
+ (* Reactivate the given thread. This thread is assumed to
+ be suspended on a call to [sleep]. After the call to [wakeup],
+ the suspended thread will resume execution at some future time. *)
diff --git a/otherlibs/threads/threadIO.ml b/otherlibs/threads/threadIO.ml
new file mode 100644
index 000000000..eccce0c99
--- /dev/null
+++ b/otherlibs/threads/threadIO.ml
@@ -0,0 +1,39 @@
+(***********************************************************************)
+(* *)
+(* Caml Special Light *)
+(* *)
+(* Xavier Leroy, projet Cristal, INRIA Rocquencourt *)
+(* *)
+(* Copyright 1995 Institut National de Recherche en Informatique et *)
+(* Automatique. Distributed only by permission. *)
+(* *)
+(***********************************************************************)
+
+(* $Id$ *)
+
+(* Module [ThreadIO]: thread-compatible input-output operations *)
+
+let input_char ic = Thread.wait_inchan ic; input_char ic
+let input_line ic = Thread.wait_inchan ic; input_line ic
+let input ic buf ofs len = Thread.wait_inchan ic; input ic buf ofs len
+
+let rec really_input ic s ofs len =
+ if ofs < 0 or ofs + len > String.length s then invalid_arg "really_input"
+ else if len <= 0 then ()
+ else begin
+ let r = input ic s ofs len in
+ if r = 0
+ then raise End_of_file
+ else really_input ic s (ofs+r) (len-r)
+ end
+
+let input_byte ic = Thread.wait_inchan ic; input_byte ic
+let input_binary_int ic = Thread.wait_inchan ic; input_binary_int ic
+let input_value ic = Thread.wait_inchan ic; input_value ic
+
+let read_line () = flush stdout; input_line stdin
+let read_int () = int_of_string(read_line())
+let read_float () = float_of_string(read_line())
+
+let lexing_from_channel ic =
+ Lexing.from_function (fun buf n -> input ic buf 0 n)
diff --git a/otherlibs/threads/threadIO.mli b/otherlibs/threads/threadIO.mli
new file mode 100644
index 000000000..f9773336d
--- /dev/null
+++ b/otherlibs/threads/threadIO.mli
@@ -0,0 +1,46 @@
+(***********************************************************************)
+(* *)
+(* Caml Special Light *)
+(* *)
+(* Xavier Leroy, projet Cristal, INRIA Rocquencourt *)
+(* *)
+(* Copyright 1995 Institut National de Recherche en Informatique et *)
+(* Automatique. Distributed only by permission. *)
+(* *)
+(***********************************************************************)
+
+(* $Id$ *)
+
+(* Module [ThreadIO]: thread-compatible input-output operations *)
+
+(* This module reimplements some of the input functions from [Pervasives]
+ and [Lexing] so that they only block the calling thread, not all threads
+ in the program, if data is not immediately available on the input. *)
+
+(** General input functions *)
+
+val input_char : in_channel -> char
+val input : in_channel -> string -> int -> int -> int
+val really_input : in_channel -> string -> int -> int -> unit
+val input_byte : in_channel -> int
+ (* See the corresponding functions in module [Pervasives]. *)
+val input_line : in_channel -> string
+val input_binary_int : in_channel -> int
+val input_value : in_channel -> 'a
+ (* See the corresponding functions in module [Pervasives].
+ For the three functions above, the other program threads continue
+ to run until the first character of the input is available;
+ all threads are blocked while the remaining characters of the
+ input are being read. *)
+
+(** Input functions on standard input *)
+
+val read_line : unit -> string
+val read_int : unit -> int
+val read_float : unit -> float
+ (* See the corresponding functions in module [Pervasives]. *)
+
+(** Lexer buffers *)
+
+val lexing_from_channel: in_channel -> Lexing.lexbuf
+ (* See [Lexing.from_channel]. *)
diff --git a/otherlibs/threads/threadstubs.c b/otherlibs/threads/threadstubs.c
deleted file mode 100644
index ae2c9536b..000000000
--- a/otherlibs/threads/threadstubs.c
+++ /dev/null
@@ -1,452 +0,0 @@
-/***********************************************************************/
-/* */
-/* Caml Special Light */
-/* */
-/* Xavier Leroy and Damien Doligez, INRIA Rocquencourt */
-/* */
-/* Copyright 1995 Institut National de Recherche en Informatique et */
-/* Automatique. Distributed only by permission. */
-/* */
-/***********************************************************************/
-
-/* $Id$ */
-
-#include <pthread.h>
-#include <signal.h>
-#include <sys/time.h>
-#include "alloc.h"
-#include "misc.h"
-#include "mlvalues.h"
-#include "roots.h"
-#include "signals.h"
-#include "stacks.h"
-#include "sys.h"
-
-#ifdef HAS_THR_YIELD
-#include <thread.h>
-#endif
-
-/* Max computation time before rescheduling, in microseconds (50ms) */
-#define Thread_timeout 50000
-
-/* Initial size of stack when a thread is created (4 Ko) */
-#define Thread_stack_size (Stack_size / 4)
-
-/* The thread descriptors */
-
-struct csl_thread_struct {
- pthread_t pthread; /* The corresponding Posix thread */
- value ident; /* Unique id */
- value * stack_low; /* The execution stack for this thread */
- value * stack_high;
- value * stack_threshold;
- value * sp; /* Saved value of extern_sp for this thread */
- value * trapsp; /* Saved value of trapsp for this thread */
- struct csl_thread_struct * next; /* Double linking of threads */
- struct csl_thread_struct * prev;
-};
-
-typedef struct csl_thread_struct * csl_thread_t;
-
-#define Assign(dst,src) modify((value *)&(dst), (value)(src))
-
-/* The global mutex used to ensure that at most one thread is running
- Caml code */
-pthread_mutex_t csl_mutex;
-
-/* Head of the list of thread descriptors */
-csl_thread_t thread_list = NULL;
-
-/* The key used for storing the thread descriptor in the specific data
- of the corresponding Posix thread. */
-pthread_key_t thread_descriptor_key;
-
-/* Identifier for next thread creation */
-static value thread_next_ident = 0;
-
-/* Compatibility code for DEC OSF1 */
-
-#ifdef __osf__
-#define Attr_default pthread_attr_default
-#define Mutexattr_default pthread_mutexattr_default
-#define Condattr_default pthread_condattr_default
-#define Getspecific(res,key) pthread_getspecific(key, (void **) &(res))
-#define Pthread_key_create pthread_keycreate
-#define Pthread_detach(th) pthread_detach(&(th))
-#else
-#define Attr_default NULL
-#define Mutexattr_default NULL
-#define Condattr_default NULL
-#define Getspecific(res,key) (res) = pthread_getspecific(key)
-#define Pthread_key_create pthread_key_create
-#define Pthread_detach pthread_detach
-#endif
-
-/* Hook for scanning the stacks of the other threads */
-
-static void (*prev_scan_roots_hook) P((scanning_action));
-
-static void csl_thread_scan_roots(action)
- scanning_action action;
-{
- csl_thread_t th;
- register value * sp;
- /* Scan all thread descriptors */
- (*action)((value) thread_list, (value *) &thread_list);
- /* Scan the stacks */
- for (th = thread_list; th != NULL; th = th->next) {
- /* If this is the current thread, don't scan its stack, this
- has already been done */
- if (th->stack_low == stack_low) continue;
- for (sp = th->sp; sp < th->stack_high; sp++) {
- (*action)(*sp, sp);
- }
- }
- /* Hook */
- if (prev_scan_roots_hook != NULL) (*prev_scan_roots_hook)(action);
-}
-
-/* Hooks for enter_blocking_section and leave_blocking_section */
-
-static void (*prev_enter_blocking_section_hook) ();
-static void (*prev_leave_blocking_section_hook) ();
-
-static void csl_thread_enter_blocking_section()
-{
- csl_thread_t curr_thread;
- if (prev_enter_blocking_section_hook != NULL)
- (*prev_enter_blocking_section_hook)();
- /* Save the stack-related global variables in the thread descriptor
- of the current thread */
- Getspecific(curr_thread, thread_descriptor_key);
- curr_thread->stack_low = stack_low;
- curr_thread->stack_high = stack_high;
- curr_thread->stack_threshold = stack_threshold;
- curr_thread->sp = extern_sp;
- curr_thread->trapsp = trapsp;
- /* Release the global mutex */
- pthread_mutex_unlock(&csl_mutex);
-}
-
-static void csl_thread_leave_blocking_section()
-{
- csl_thread_t curr_thread;
- /* Re-acquire the global mutex */
- pthread_mutex_lock(&csl_mutex);
- /* Restore the stack-related global variables */
- Getspecific(curr_thread, thread_descriptor_key);
- stack_low = curr_thread->stack_low;
- stack_high = curr_thread->stack_high;
- stack_threshold = curr_thread->stack_threshold;
- extern_sp = curr_thread->sp;
- trapsp = curr_thread->trapsp;
- if (prev_leave_blocking_section_hook != NULL)
- (*prev_leave_blocking_section_hook)();
-}
-
-/* The "tick" thread fakes a SIGVTALRM signal at regular intervals. */
-
-static void * csl_thread_tick()
-{
- struct timeval timeout;
- while(1) {
- /* select() seems to be the most efficient way to suspend the
- thread for sub-second intervals */
- timeout.tv_sec = 0;
- timeout.tv_usec = Thread_timeout;
- select(0, NULL, NULL, NULL, &timeout);
- if (!async_signal_mode) handle_signal(SIGVTALRM);
- }
-}
-
-/* Thread cleanup: remove the descriptor from the list and
- free the stack space and the descriptor itself. */
-
-static void csl_thread_cleanup(th)
- csl_thread_t th;
-{
- /* Remove th from the doubly-linked list of threads */
- if (th == thread_list) {
- thread_list = th->next;
- } else {
- Assign(th->next->prev, th->prev);
- Assign(th->prev->next, th->next);
- }
- /* Free the memory resources */
- stat_free((char *) th->stack_low);
- th->stack_low = NULL;
- th->stack_high = NULL;
- th->stack_threshold = NULL;
- th->sp = NULL;
- th->trapsp = NULL;
-}
-
-/* Initialize the thread machinery */
-
-value csl_thread_initialize(unit) /* ML */
- value unit;
-{
- pthread_t tick_pthread;
- /* Initialize the mutex */
- if (pthread_mutex_init(&csl_mutex, Mutexattr_default) == -1)
- sys_error("Thread.init");
- pthread_mutex_lock(&csl_mutex);
- /* Initialize the key */
- Pthread_key_create(&thread_descriptor_key, NULL);
- /* Create a descriptor for the current thread */
- thread_list = (csl_thread_t)
- alloc_shr(sizeof(struct csl_thread_struct) / sizeof(value), 0);
- thread_list->pthread = pthread_self();
- thread_list->ident = Val_long(thread_next_ident);
- thread_next_ident++;
- /* The stack-related fields will be filled in at the next
- enter_blocking_section */
- thread_list->next = NULL;
- thread_list->prev = NULL;
- /* Associate the thread descriptor with the thread */
- pthread_setspecific(thread_descriptor_key, (void *) thread_list);
- /* Set up the hooks */
- prev_scan_roots_hook = scan_roots_hook;
- scan_roots_hook = csl_thread_scan_roots;
- prev_enter_blocking_section_hook = enter_blocking_section_hook;
- enter_blocking_section_hook = csl_thread_enter_blocking_section;
- prev_leave_blocking_section_hook = leave_blocking_section_hook;
- leave_blocking_section_hook = csl_thread_leave_blocking_section;
- /* Fork the tick thread */
- if (pthread_create(&tick_pthread, Attr_default, csl_thread_tick, 0) == -1)
- sys_error("Thread.init");
- Pthread_detach(tick_pthread);
- return Val_unit;
-}
-
-/* Create a thread */
-
-static void * csl_thread_start(th)
- csl_thread_t th;
-{
- value clos;
- /* Associate the thread descriptor with the thread */
- pthread_setspecific(thread_descriptor_key, (void *) th);
- /* Set up termination routine */
- pthread_cleanup_push(csl_thread_cleanup, (void *) th);
- /* Acquire the global mutex before running the thread */
- pthread_mutex_lock(&csl_mutex);
- /* Set up the stack variables */
- stack_low = th->stack_low;
- stack_high = th->stack_high;
- stack_threshold = th->stack_threshold;
- extern_sp = th->sp;
- trapsp = th->trapsp;
- /* Callback the closure */
- clos = *extern_sp++;
- callback(clos, Val_unit);
- /* Cleanup: free the thread resources */
- pthread_cleanup_pop(1);
- /* Release the mutex and die quietly */
- pthread_mutex_unlock(&csl_mutex);
- return 0;
-}
-
-value csl_thread_new(clos) /* ML */
- value clos;
-{
- csl_thread_t th;
- /* Allocate the thread and its stack */
- th = (csl_thread_t)
- alloc_shr(sizeof(struct csl_thread_struct) / sizeof(value), 0);
- th->ident = Val_long(thread_next_ident);
- thread_next_ident++;
- th->stack_low = (value *) stat_alloc(Thread_stack_size);
- th->stack_high = th->stack_low + Thread_stack_size / sizeof(value);
- th->stack_threshold = th->stack_low + Stack_threshold / sizeof(value);
- th->sp = th->stack_high;
- th->trapsp = th->stack_high;
- /* Add it to the list of threads */
- th->next = thread_list;
- th->prev = NULL;
- Assign(thread_list->prev, th);
- thread_list = th;
- /* Pass the closure in the newly created stack, so that it will be
- preserved by garbage collection */
- *--(th->sp) = clos;
- /* Fork the new thread */
- if (pthread_create(&(th->pthread), Attr_default, csl_thread_start,
- (void *) th) == -1)
- sys_error("Thread.new");
- return (value) th;
-}
-
-/* Return the current thread */
-
-value csl_thread_self(unit) /* ML */
- value unit;
-{
- csl_thread_t curr_thread;
- Getspecific(curr_thread, thread_descriptor_key);
- return (value) curr_thread;
-}
-
-/* Return the identifier of a thread */
-
-value csl_thread_id(th) /* ML */
- csl_thread_t th;
-{
- return th->ident;
-}
-
-/* Allow re-scheduling */
-
-value csl_thread_yield(unit) /* ML */
- value unit;
-{
- enter_blocking_section();
-#if defined(HAS_PTHREAD_YIELD)
- pthread_yield();
-#elif defined(HAS_THR_YIELD)
- thr_yield();
-#endif
- leave_blocking_section();
- return Val_unit;
-}
-
-/* Detach a thread */
-
-value csl_thread_detach(th) /* ML */
- csl_thread_t th;
-{
- if (Pthread_detach(th->pthread) == -1) sys_error("Thread.detach");
- return Val_unit;
-}
-
-/* Suspend the current thread until another thread terminates */
-
-value csl_thread_join(th) /* ML */
- csl_thread_t th;
-{
- void * status;
- int retcode;
- enter_blocking_section();
- retcode = pthread_join(th->pthread, &status);
- leave_blocking_section();
- if (retcode == -1) sys_error("Thread.join");
- return Val_unit;
-}
-
-/* Terminate the current thread */
-
-value csl_thread_exit(unit) /* ML */
- value unit;
-{
- enter_blocking_section();
- pthread_exit(0);
- return Val_unit; /* never reached */
-}
-
-/* Mutex operations */
-
-#define Mutex_val(v) (*((pthread_mutex_t *)(&Field(v, 1))))
-#define Max_mutex_number 1000
-
-static void csl_mutex_finalize(mut)
- value mut;
-{
- pthread_mutex_destroy(&Mutex_val(mut));
-}
-
-value csl_mutex_new(unit) /* ML */
- value unit;
-{
- value mut;
- mut = alloc_final(1 + sizeof(pthread_mutex_t) / sizeof(value),
- csl_mutex_finalize, 1, Max_mutex_number);
- if (pthread_mutex_init(&Mutex_val(mut), Mutexattr_default) == -1)
- sys_error("Mutex.new");
- return mut;
-}
-
-value csl_mutex_lock(mut) /* ML */
- value mut;
-{
- int retcode;
- enter_blocking_section();
- retcode = pthread_mutex_lock(&(Mutex_val(mut)));
- leave_blocking_section();
- if (retcode == -1) sys_error("Mutex.lock");
- return Val_unit;
-}
-
-value csl_mutex_unlock(mut) /* ML */
- value mut;
-{
- int retcode;
- enter_blocking_section();
- retcode = pthread_mutex_unlock(&(Mutex_val(mut)));
- leave_blocking_section();
- if (retcode == -1) sys_error("Mutex.unlock");
- return Val_unit;
-}
-
-value csl_mutex_try_lock(mut) /* ML */
- value mut;
-{
- int retcode;
- retcode = pthread_mutex_trylock(&(Mutex_val(mut)));
- if (retcode == -1) sys_error("Mutex.try_lock");
- return Val_bool(retcode);
-}
-
-/* Conditions operations */
-
-#define Condition_val(v) (*((pthread_cond_t *)(&Field(v, 1))))
-#define Max_condition_number 1000
-
-static void csl_condition_finalize(cond)
- value cond;
-{
- pthread_cond_destroy(&Condition_val(cond));
-}
-
-value csl_condition_new(unit) /* ML */
- value unit;
-{
- value cond;
- cond = alloc_final(1 + sizeof(pthread_cond_t) / sizeof(value),
- csl_condition_finalize, 1, Max_condition_number);
- if (pthread_cond_init(&Condition_val(cond), Condattr_default) == -1)
- sys_error("Condition.new");
- return cond;
-}
-
-value csl_condition_wait(cond, mut) /* ML */
- value cond, mut;
-{
- int retcode;
- enter_blocking_section();
- retcode = pthread_cond_wait(&Condition_val(cond), &Mutex_val(mut));
- leave_blocking_section();
- if (retcode == -1) sys_error("Condition.wait");
- return Val_unit;
-}
-
-value csl_condition_signal(cond) /* ML */
- value cond;
-{
- int retcode;
- enter_blocking_section();
- retcode = pthread_cond_signal(&Condition_val(cond));
- leave_blocking_section();
- if (retcode == -1) sys_error("Condition.signal");
- return Val_unit;
-}
-
-value csl_condition_broadcast(cond) /* ML */
- value cond;
-{
- int retcode;
- enter_blocking_section();
- retcode = pthread_cond_broadcast(&Condition_val(cond));
- leave_blocking_section();
- if (retcode == -1) sys_error("Condition.broadcast");
- return Val_unit;
-}
-