diff options
author | Xavier Leroy <xavier.leroy@inria.fr> | 1996-04-03 10:02:34 +0000 |
---|---|---|
committer | Xavier Leroy <xavier.leroy@inria.fr> | 1996-04-03 10:02:34 +0000 |
commit | 173cb4075f3f04fce2d1fc486c4ad6d1fa22cdce (patch) | |
tree | 5d53f20fb2afd24b16786c9cedff0d448540bd72 | |
parent | 13d82083c2a69ed2bd616299c4ab35798675f777 (diff) |
Retour en arriere sur l'emploi des threads POSIX. On revient a
l'ancienne implementation.
git-svn-id: http://caml.inria.fr/svn/ocaml/trunk@732 f963ae5c-01c2-4b8c-9fe0-0dff7051ff02
-rw-r--r-- | otherlibs/threads/.depend | 22 | ||||
-rw-r--r-- | otherlibs/threads/Makefile | 6 | ||||
-rw-r--r-- | otherlibs/threads/condition.ml | 26 | ||||
-rw-r--r-- | otherlibs/threads/condition.mli | 8 | ||||
-rw-r--r-- | otherlibs/threads/mutex.ml | 29 | ||||
-rw-r--r-- | otherlibs/threads/mutex.mli | 8 | ||||
-rw-r--r-- | otherlibs/threads/scheduler.c | 454 | ||||
-rw-r--r-- | otherlibs/threads/thread.ml | 58 | ||||
-rw-r--r-- | otherlibs/threads/thread.mli | 72 | ||||
-rw-r--r-- | otherlibs/threads/threadIO.ml | 39 | ||||
-rw-r--r-- | otherlibs/threads/threadIO.mli | 46 | ||||
-rw-r--r-- | otherlibs/threads/threadstubs.c | 452 |
12 files changed, 703 insertions, 517 deletions
diff --git a/otherlibs/threads/.depend b/otherlibs/threads/.depend index 803c7d4bd..ddba64d86 100644 --- a/otherlibs/threads/.depend +++ b/otherlibs/threads/.depend @@ -1,18 +1,16 @@ -scheduler.o: scheduler.c ../../byterun/config.h \ +threadstubs.o: threadstubs.c ../../byterun/alloc.h \ + ../../byterun/misc.h ../../byterun/config.h \ ../../byterun/../config/m.h ../../byterun/../config/s.h \ - ../../byterun/misc.h ../../byterun/mlvalues.h ../../byterun/stacks.h \ - ../../byterun/memory.h ../../byterun/gc.h ../../byterun/major_gc.h \ - ../../byterun/freelist.h ../../byterun/minor_gc.h \ - ../../byterun/fail.h ../../byterun/io.h ../../byterun/roots.h \ - ../../byterun/alloc.h + ../../byterun/mlvalues.h ../../byterun/roots.h \ + ../../byterun/signals.h ../../byterun/stacks.h ../../byterun/memory.h \ + ../../byterun/gc.h ../../byterun/major_gc.h ../../byterun/freelist.h \ + ../../byterun/minor_gc.h condition.cmi: mutex.cmi -condition.cmo: mutex.cmi thread.cmi condition.cmi -condition.cmx: mutex.cmx thread.cmx condition.cmi +condition.cmo: mutex.cmi condition.cmi +condition.cmx: mutex.cmx condition.cmi event.cmo: condition.cmi mutex.cmi event.cmi event.cmx: condition.cmx mutex.cmx event.cmi -mutex.cmo: thread.cmi mutex.cmi -mutex.cmx: thread.cmx mutex.cmi +mutex.cmo: mutex.cmi +mutex.cmx: mutex.cmi thread.cmo: thread.cmi thread.cmx: thread.cmi -threadIO.cmo: thread.cmi threadIO.cmi -threadIO.cmx: thread.cmx threadIO.cmi diff --git a/otherlibs/threads/Makefile b/otherlibs/threads/Makefile index e10197af4..c0fad43ec 100644 --- a/otherlibs/threads/Makefile +++ b/otherlibs/threads/Makefile @@ -2,11 +2,11 @@ include ../../config/Makefile # Compilation options CC=$(BYTECC) -CFLAGS=-I$(PTHREADS_INCLUDES) -I../../byterun -O $(BYTECCCOMPOPTS) +CFLAGS=-I../../byterun -O $(BYTECCCOMPOPTS) CAMLC=../../boot/cslrun ../../boot/cslc -I ../../boot -I ../unix -C_OBJS=threadstubs.o -CAML_OBJS=thread.cmo mutex.cmo condition.cmo event.cmo +C_OBJS=scheduler.o +CAML_OBJS=thread.cmo threadIO.cmo mutex.cmo condition.cmo event.cmo all: libthreads.a threads.cma diff --git a/otherlibs/threads/condition.ml b/otherlibs/threads/condition.ml index 5fd59a41c..f0ed4e91b 100644 --- a/otherlibs/threads/condition.ml +++ b/otherlibs/threads/condition.ml @@ -11,8 +11,24 @@ (* $Id$ *) -type t -external new: unit -> t = "csl_condition_new" -external wait: t -> Mutex.t -> unit = "csl_condition_wait" -external signal: t -> unit = "csl_condition_signal" -external broadcast: t -> unit = "csl_condition_broadcast" +type t = { mutable waiting: Thread.t list } + +let new () = { waiting = [] } + +let wait cond mut = + Thread.critical_section := true; + Mutex.unlock mut; + cond.waiting <- Thread.self() :: cond.waiting; + Thread.sleep(); + Mutex.lock mut + +let signal cond = + match cond.waiting with (* atomic *) + [] -> () + | th :: rem -> cond.waiting <- rem (* atomic *); Thread.wakeup th + +let broadcast cond = + let w = cond.waiting in (* atomic *) + cond.waiting <- []; (* atomic *) + List.iter Thread.wakeup w + diff --git a/otherlibs/threads/condition.mli b/otherlibs/threads/condition.mli index 3afb1bacc..2bb851395 100644 --- a/otherlibs/threads/condition.mli +++ b/otherlibs/threads/condition.mli @@ -32,16 +32,16 @@ type t (* The type of condition variables. *) -external new: unit -> t = "csl_condition_new" +val new: unit -> t (* Return a new condition variable. *) -external wait: t -> Mutex.t -> unit = "csl_condition_wait" +val wait: t -> Mutex.t -> unit (* [wait c m] atomically unlocks the mutex [m] and suspends the calling process on the condition variable [c]. The process will restart after the condition variable [c] has been signalled. The mutex [m] is locked again before [wait] returns. *) -external signal: t -> unit = "csl_condition_signal" +val signal: t -> unit (* [signal c] restarts one of the processes waiting on the condition variable [c]. *) -external broadcast: t -> unit = "csl_condition_broadcast" +val broadcast: t -> unit (* [broadcast c] restarts all processes waiting on the condition variable [c]. *) diff --git a/otherlibs/threads/mutex.ml b/otherlibs/threads/mutex.ml index c9eb0f820..551bc5706 100644 --- a/otherlibs/threads/mutex.ml +++ b/otherlibs/threads/mutex.ml @@ -11,8 +11,27 @@ (* $Id$ *) -type t -external new: unit -> t = "csl_mutex_new" -external lock: t -> unit = "csl_mutex_lock" -external try_lock: t -> bool = "csl_mutex_try_lock" -external unlock: t -> unit = "csl_mutex_unlock" +type t = { mutable locked: bool; mutable waiting: Thread.t list } + +let new () = { locked = false; waiting = [] } + +let rec lock m = + if m.locked then begin (* test and set atomic *) + Thread.critical_section := true; + m.waiting <- Thread.self() :: m.waiting; + Thread.sleep(); + lock m + end else begin + m.locked <- true (* test and set atomic *) + end + +let try_lock m = (* test and set atomic *) + if m.locked then false else begin m.locked <- true; true end + +let unlock m = + (* Don't play with Thread.critical_section here because of Condition.wait *) + let w = m.waiting in (* atomic *) + m.waiting <- []; (* atomic *) + m.locked <- false; (* atomic *) + List.iter Thread.wakeup w + diff --git a/otherlibs/threads/mutex.mli b/otherlibs/threads/mutex.mli index 847f8ae7b..422040a82 100644 --- a/otherlibs/threads/mutex.mli +++ b/otherlibs/threads/mutex.mli @@ -26,18 +26,18 @@ type t (* The type of mutexes. *) -external new: unit -> t = "csl_mutex_new" +val new: unit -> t (* Return a new mutex. *) -external lock: t -> unit = "csl_mutex_lock" +val lock: t -> unit (* Lock the given mutex. Only one thread can have the mutex locked at any time. A thread that attempts to lock a mutex already locked by another thread will suspend until the other thread unlocks the mutex. *) -external try_lock: t -> bool = "csl_mutex_try_lock" +val try_lock: t -> bool (* Same as [try_lock], but does not suspend the calling thread if the mutex is already locked: just return [false] immediately in that case. If the mutex is unlocked, lock it and return [true]. *) -external unlock: t -> unit = "csl_mutex_unlock" +val unlock: t -> unit (* Unlock the given mutex. Other threads suspended trying to lock the mutex will restart. *) diff --git a/otherlibs/threads/scheduler.c b/otherlibs/threads/scheduler.c new file mode 100644 index 000000000..be90f1972 --- /dev/null +++ b/otherlibs/threads/scheduler.c @@ -0,0 +1,454 @@ +#include "config.h" +#include "misc.h" +#include "mlvalues.h" +#include "stacks.h" +#include "fail.h" +#include "io.h" +#include "roots.h" +#include "alloc.h" +#include "memory.h" + +#if defined(HAS_SELECT) && defined(HAS_SETITIMER) && defined(HAS_GETTIMEOFDAY) +#else +#include "Cannot compile libthreads, system calls missing" +#endif + +#include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> +#ifdef HAS_SYS_SELECT_H +#include <sys/select.h> +#endif + +#ifndef FD_ISSET +typedef int fd_set; +#define FD_SETSIZE (sizeof(int) * 8) +#define FD_SET(fd,fds) (*(fds) |= 1 << (fd)) +#define FD_CLR(fd,fds) (*(fds) &= ~(1 << (fd))) +#define FD_ISSET(fd,fds) (*(fds) & (1 << (fd))) +#define FD_ZERO(fds) (*(fds) = 0) +#endif + +/* Configuration */ + +/* Initial size of stack when a thread is created (4 Ko) */ +#define Thread_stack_size (Stack_size / 4) + +/* Max computation time before rescheduling, in microseconds (50ms) */ +#define Thread_timeout 50000 + +/* The thread descriptors */ + +struct thread_struct { + value ident; /* Unique id (for equality comparisons) */ + struct thread_struct * next; /* Double linking of threads */ + struct thread_struct * prev; + value * stack_low; /* The execution stack for this thread */ + value * stack_high; + value * stack_threshold; + value * sp; + value * trapsp; + value status; /* RUNNABLE, KILLED. etc (see below) */ + value fd; /* File descriptor on which this thread is waiting */ + value delay; /* Time until which this thread is blocked */ + value joining; /* Thread we're trying to join */ +}; + +typedef struct thread_struct * thread_t; + +#define RUNNABLE Val_int(0) +#define KILLED Val_int(1) +#define SUSPENDED Val_int(2) +#define BLOCKED_READ Val_int(3) +#define BLOCKED_WRITE Val_int(4) +#define BLOCKED_DELAY Val_int(5) +#define BLOCKED_JOIN Val_int(6) + +#define NO_FD Val_unit +#define NO_DELAY Val_unit +#define NO_JOINING Val_unit + +#define DELAY_INFTY 1E30 /* +infty, for this purpose */ + +/* The thread currently active */ +static thread_t curr_thread = NULL; +/* Identifier for next thread creation */ +static value next_ident = Val_int(0); + +#define Assign(dst,src) modify((value *)&(dst), (value)(src)) + +/* Scan the stacks of the other threads */ + +static void (*prev_scan_roots_hook) P((scanning_action)); + +static void thread_scan_roots(action) + scanning_action action; +{ + thread_t th; + register value * sp; + /* Scan all active descriptors */ + (*action)((value) curr_thread, (value *) &curr_thread); + /* Don't scan curr_thread->sp, this has already been done */ + for (th = curr_thread->next; th != curr_thread; th = th->next) { + (*action)((value) th, (value *) &th); + for (sp = th->sp; sp < th->stack_high; sp++) { + (*action)(*sp, sp); + } + } + /* Hook */ + if (prev_scan_roots_hook != NULL) (*prev_scan_roots_hook)(action); +} + +/* Initialize the thread machinery */ + +value thread_initialize(unit) /* ML */ + value unit; +{ + struct itimerval timer; + /* Create a descriptor for the current thread */ + curr_thread = + (thread_t) alloc_shr(sizeof(struct thread_struct) / sizeof(value), 0); + curr_thread->ident = next_ident; + next_ident = Val_int(Int_val(next_ident) + 1); + curr_thread->next = curr_thread; + curr_thread->prev = curr_thread; + curr_thread->stack_low = stack_low; + curr_thread->stack_high = stack_high; + curr_thread->stack_threshold = stack_threshold; + curr_thread->sp = extern_sp; + curr_thread->trapsp = trapsp; + curr_thread->status = RUNNABLE; + curr_thread->fd = NO_FD; + curr_thread->delay = NO_DELAY; + curr_thread->joining = NO_JOINING; + /* Initialize GC */ + prev_scan_roots_hook = scan_roots_hook; + scan_roots_hook = thread_scan_roots; + /* Initialize interval timer */ + timer.it_interval.tv_sec = 0; + timer.it_interval.tv_usec = Thread_timeout; + timer.it_value = timer.it_interval; + setitimer(ITIMER_VIRTUAL, &timer, NULL); + return Val_unit; +} + +/* Create a thread */ + +value thread_new(clos) /* ML */ + value clos; +{ + thread_t th; + /* Allocate the thread and its stack */ + Push_roots(r, 1); + r[0] = clos; + th = (thread_t) alloc_shr(sizeof(struct thread_struct) / sizeof(value), 0); + clos = r[0]; + Pop_roots(); + th->ident = next_ident; + next_ident = Val_int(Int_val(next_ident) + 1); + th->stack_low = (value *) stat_alloc(Thread_stack_size); + th->stack_high = th->stack_low + Thread_stack_size / sizeof(value); + th->stack_threshold = th->stack_low + Stack_threshold / sizeof(value); + th->sp = th->stack_high; + th->trapsp = th->stack_high; + /* Set up a return frame that pretends we're applying clos to (). + This way, when this thread is activated, the RETURN will take us + to the entry point of the closure. */ + th->sp -= 4; + th->sp[0] = Val_unit; + th->sp[1] = (value) Code_val(clos); + th->sp[2] = clos; + th->sp[3] = Val_long(0); /* no extra args */ + /* Fake a C call frame */ + th->sp--; + th->sp[0] = Val_unit; /* a dummy environment */ + /* The thread is initially runnable */ + th->status = RUNNABLE; + th->fd = NO_FD; + th->delay = NO_DELAY; + th->joining = NO_JOINING; + /* Insert thread in doubly linked list of threads */ + th->prev = curr_thread->prev; + th->next = curr_thread; + Assign(curr_thread->prev->next, th); + Assign(curr_thread->prev, th); + /* Return thread */ + return (value) th; +} + +/* Return the thread identifier */ + +value thread_id(th) /* ML */ + value th; +{ + return ((struct thread_struct *)th)->ident; +} + +/* Return the current time as a floating-point number */ + +static double timeofday() +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return (double) tv.tv_sec + (double) tv.tv_usec * 1e-6; +} + +/* Find a runnable thread and activate it */ + +#define FOREACH_THREAD(x) x = curr_thread; do { x = x->next; +#define END_FOREACH(x) } while (x != curr_thread) + +static void schedule_thread() +{ + thread_t run_thread, th; + fd_set readfds, writefds; + double delay, now; + int need_select; + + /* Save the status of the current thread */ + curr_thread->stack_low = stack_low; + curr_thread->stack_high = stack_high; + curr_thread->stack_threshold = stack_threshold; + curr_thread->sp = extern_sp; + curr_thread->trapsp = trapsp; + +try_again: + /* Build fdsets and delay for select. + See if some join operation succeeded. */ + FD_ZERO(&readfds); + FD_ZERO(&writefds); + delay = DELAY_INFTY; + now = -1.0; + need_select = 0; + + FOREACH_THREAD(th) + switch (th->status) { + case BLOCKED_READ: + FD_SET(Int_val(th->fd), &readfds); + need_select = 1; + break; + case BLOCKED_WRITE: + FD_SET(Int_val(th->fd), &writefds); + need_select = 1; + break; + case BLOCKED_DELAY: + { double th_delay = Double_val(th->delay); + if (now < 0.0) now = timeofday(); + if (th_delay < now) { + th->status = RUNNABLE; + Assign(th->delay, NO_DELAY); + } else { + if (th_delay < delay) delay = th_delay; + } + break; + } + case BLOCKED_JOIN: + if (((thread_t)(th->joining))->status == KILLED) { + th->status = RUNNABLE; + Assign(th->joining, NO_JOINING); + } + break; + } + END_FOREACH(th); + + /* Find if a thread is runnable. */ + run_thread = NULL; + FOREACH_THREAD(th) + if (th->status == RUNNABLE) { run_thread = th; break; } + END_FOREACH(th); + + /* Do the select if needed */ + if (need_select || run_thread == NULL) { + struct timeval delay_tv, * delay_ptr; + int retcode; + /* Convert delay to a timeval */ + /* If a thread is runnable, just poll */ + if (run_thread != NULL) { + delay_tv.tv_sec = 0; + delay_tv.tv_usec = 0; + delay_ptr = &delay_tv; + } + else if (delay == DELAY_INFTY) { + delay_ptr = NULL; + } else { + delay = delay - now; + delay_tv.tv_sec = (unsigned int) delay; + delay_tv.tv_usec = (delay - (double) delay_tv.tv_sec) * 1E6; + delay_ptr = &delay_tv; + } + retcode = select(FD_SETSIZE, &readfds, &writefds, NULL, delay_ptr); + if (retcode > 0) { + /* Some descriptors are ready. + Mark the corresponding threads runnable. */ + FOREACH_THREAD(th) + switch (th->status) { + case BLOCKED_READ: + if (FD_ISSET(Int_val(th->fd), &readfds)) { + /* Wake up only one thread per fd. */ + FD_CLR(Int_val(th->fd), &readfds); + th->status = RUNNABLE; + th->fd = NO_FD; + if (run_thread == NULL) run_thread = th; /* Found one. */ + } + break; + case BLOCKED_WRITE: + if (FD_ISSET(Int_val(th->fd), &writefds)) { + /* Wake up only one thread per fd. */ + FD_CLR(Int_val(th->fd), &writefds); + th->status = RUNNABLE; + th->fd = NO_FD; + if (run_thread == NULL) run_thread = th; /* Found one. */ + } + break; + } + END_FOREACH(th); + } + /* If we get here with run_thread still NULL, some of the delays + have expired. We go through the loop once more to make the + corresponding threads runnable. */ + if (run_thread == NULL && delay != DELAY_INFTY) goto try_again; + } + + /* If we haven't something to run at that point, we're in big trouble. */ + if (run_thread == NULL) invalid_argument("Thread: deadlock"); + + /* Activate the thread */ + curr_thread = run_thread; + stack_low = curr_thread->stack_low; + stack_high = curr_thread->stack_high; + stack_threshold = curr_thread->stack_threshold; + extern_sp = curr_thread->sp; + trapsp = curr_thread->trapsp; +} + +/* Reschedule without suspending the current thread */ + +value thread_yield(unit) /* ML */ + value unit; +{ + schedule_thread(); + return Val_unit; +} + +/* Suspend the current thread */ + +value thread_sleep(unit) /* ML */ + value unit; +{ + curr_thread->status = SUSPENDED; + schedule_thread(); + return Val_unit; +} + +/* Suspend the current thread on a Unix file descriptor */ + +value thread_wait_read(fd) /* ML */ + value fd; +{ + curr_thread->status = BLOCKED_READ; + curr_thread->fd = fd; + schedule_thread(); + return Val_unit; +} + +value thread_wait_write(fd) /* ML */ + value fd; +{ + curr_thread->status = BLOCKED_WRITE; + curr_thread->fd = fd; + schedule_thread(); + return Val_unit; +} + +/* Suspend the current thread on a buffered input channel */ + +value thread_wait_inchan(vchan) /* ML */ + value vchan; +{ + struct channel * chan = (struct channel *) vchan; + if (chan->curr < chan->max) return Val_unit; + curr_thread->status = BLOCKED_READ; + curr_thread->fd = Val_int(chan->fd); + schedule_thread(); + return Val_unit; +} + +/* Suspend the current thread for some time */ + +value thread_wait_for(time) /* ML */ + value time; +{ + double date = timeofday() + Double_val(time); + curr_thread->status = BLOCKED_DELAY; + Assign(curr_thread->delay, copy_double(date)); + schedule_thread(); + return Val_unit; +} + +/* Suspend the current thread until another thread terminates */ + +value thread_join(th) /* ML */ + value th; +{ + if (((thread_t)th)->status == KILLED) return Val_unit; + curr_thread->status = BLOCKED_JOIN; + Assign(curr_thread->joining, th); + schedule_thread(); + return Val_unit; +} + +/* Reactivate another thread */ + +value thread_wakeup(thread) /* ML */ + value thread; +{ + thread_t th = (thread_t) thread; + switch (th->status) { + case SUSPENDED: + th->status = RUNNABLE; + break; + case KILLED: + failwith("Thread.wakeup: killed thread"); + break; + default: + failwith("Thread.wakeup: thread was not suspended"); + break; + } + return Val_unit; +} + +/* Return the current thread */ + +value thread_self(unit) /* ML */ + value unit; +{ + return (value) curr_thread; +} + +/* Kill a thread */ + +value thread_kill(thread) /* ML */ + value thread; +{ + thread_t th = (thread_t) thread; + /* Don't paint ourselves in a corner */ + if (th == th->next) failwith("Thread.kill: cannot kill the last thread"); + /* This thread is no longer waiting on anything */ + th->status = KILLED; + Assign(th->delay, NO_DELAY); + Assign(th->joining, NO_JOINING); + /* If this is the current thread, activate another one */ + if (th == curr_thread) schedule_thread(); + /* Remove thread from the doubly-linked list */ + Assign(th->prev->next, th->next); + Assign(th->next->prev, th->prev); + /* Free its resources */ + stat_free((char *) th->stack_low); + th->stack_low = NULL; + th->stack_high = NULL; + th->stack_threshold = NULL; + th->sp = NULL; + th->trapsp = NULL; + return Val_unit; +} + diff --git a/otherlibs/threads/thread.ml b/otherlibs/threads/thread.ml index 2fa59ffa6..04161cca0 100644 --- a/otherlibs/threads/thread.ml +++ b/otherlibs/threads/thread.ml @@ -15,30 +15,62 @@ type t -external thread_initialize : unit -> unit = "csl_thread_initialize" -external thread_new : (unit -> unit) -> t = "csl_thread_new" +let critical_section = ref false -external yield : unit -> unit = "csl_thread_yield" -external self : unit -> t = "csl_thread_self" -external id : t -> int = "csl_thread_id" -external exit : unit -> unit = "csl_thread_exit" -external join : t -> unit = "csl_thread_join" -external detach : t -> unit = "csl_thread_detach" +(* It is mucho important that the primitives that reschedule are called + through an ML function call, not directly. That's because when such a + primitive returns, the bytecode interpreter is only semi-obedient: + it takes sp from the new thread, but keeps pc from the old thread. + But that's OK if all calls to rescheduling primitives are immediately + followed by a RETURN operation, which will restore the correct pc + from the stack. *) -(* For new, make sure the function passed to thread_new never - raises an exception. *) +external thread_initialize : unit -> unit = "thread_initialize" +external thread_new : (unit -> unit) -> t = "thread_new" +external thread_yield : unit -> unit = "thread_yield" +external thread_sleep : unit -> unit = "thread_sleep" +external thread_wait_read : Unix.file_descr -> unit = "thread_wait_read" +external thread_wait_write : Unix.file_descr -> unit = + "thread_wait_write" +external thread_wait_inchan : in_channel -> unit = "thread_wait_inchan" +external thread_join : t -> unit = "thread_join" +external thread_delay : float -> unit = "thread_wait_for" +external thread_wakeup : t -> unit = "thread_wakeup" +external thread_self : unit -> t = "thread_self" +external thread_kill : t -> unit = "thread_kill" + +external id : t -> int = "thread_id" + +(* In sleep() below, we rely on the fact that signals are detected + only at function applications and beginning of loops, + making all other operations atomic. *) + +let sleep () = critical_section := false; thread_sleep() +let wait_read fd = thread_wait_read fd +let wait_write fd = thread_wait_write fd +let wait_inchan ic = thread_wait_inchan ic +let delay duration = thread_delay duration +let join th = thread_join th +let wakeup pid = thread_wakeup pid +let self () = thread_self() +let kill pid = thread_kill pid +let exit () = thread_kill(thread_self()) + +(* For new, make sure the function passed to thread_new always terminates + by calling exit. *) let new fn arg = thread_new (fun () -> try - Printexc.print fn arg; () + Printexc.print fn arg; exit() with x -> - flush stdout; flush stderr) + flush stdout; flush stderr; exit()) (* Preemption *) -let preempt signal = yield() +let preempt signal = + if !critical_section then () else thread_yield() (* Initialization of the scheduler *) diff --git a/otherlibs/threads/thread.mli b/otherlibs/threads/thread.mli index a236bc176..6b14a4a1a 100644 --- a/otherlibs/threads/thread.mli +++ b/otherlibs/threads/thread.mli @@ -29,31 +29,65 @@ val new : ('a -> 'b) -> 'a -> t but not propagated back to the parent thread. Similarly, the result of the application [funct arg] is discarded and not directly accessible to the parent thread. *) -external self : unit -> t = "csl_thread_self" +val self : unit -> t (* Return the thread currently executing. *) -external id : t -> int = "csl_thread_id" +external id : t -> int = "thread_id" (* Return the identifier of the given thread. A thread identifier is an integer that identifies uniquely the thread. It can be used to build data structures indexed by threads. *) -external exit : unit -> unit = "csl_thread_exit" +val exit : unit -> unit (* Terminate prematurely the currently executing thread. *) +val kill : t -> unit + (* Terminate prematurely the thread whose handle is given. *) -(** Thread synchronization *) +(** Suspending threads *) -external join : t -> unit = "csl_thread_join" +val delay: float -> unit + (* [delay d] suspends the execution of the calling thread for + [d] seconds. The other program threads continue to run during + this time. *) +val wait_inchan : in_channel -> unit + (* [wait_inchan ic] suspends the execution of the calling thread + until at least one character is available for reading on the + input channel [ic]. The other program threads continue to run + during this time. In contrast, calling an input function directly + on [ic] would block all threads in the program until data is + available on the channel. See the module [ThreadIO] for + higher-level input functions compatible with threads. *) +val wait_read : Unix.file_descr -> unit +val wait_write : Unix.file_descr -> unit + (* Similar to [wait_inchan], but operates on a file descriptor + from the [Unix] library instead of an input channel. + [wait_read] suspends the thread until at least one + character is available for reading; [wait_write] suspends the + thread until at least one character can be written without + blocking. *) +val join : t -> unit (* [join th] suspends the execution of the calling thread until the thread [th] has terminated. *) -external detach : t -> unit = "csl_thread_detach" - (* [detach th] indicates that the thread [th] will never - be joined, and that its resources can be freed as soon - as it terminates. *) - -(** Thread scheduling *) - -external yield : unit -> unit = "csl_thread_yield" - (* [yield()] suggests the scheduler that this is a good point - to suspend the current thread and reactivate another one. - This is just a hint: the scheduler preempts periodically - long-running threads even if they never execute [yield()]. - Using [yield()] may improve the responsiveness of the program, - though. *) + +(** Low-level thread synchronization primitives *) + +(* The following primitives provide the basis for implementing + synchronization functions between threads. Their direct use is + discouraged, as they are very low-level and prone to race conditions + and deadlocks. The modules [Mutex], [Condition] and [Event] + provide higher-level synchronization primitives. *) + +val critical_section: bool ref + (* Setting this reference to [true] deactivate thread preemption + (the timer interrupt that transfers control from thread to thread), + causing the current thread to run uninterrupted until + [critical_section] is reset to [false] or the current thread + explicitely relinquishes control using [sleep], [delay], + [wait_inchan] or [wait_descr]. *) +val sleep : unit -> unit + (* Suspend the calling thread until another thread reactivates it + using [wakeup]. Just before suspending the thread, + [critical_section] is reset to [false]. Resetting + [critical_section] and suspending the calling thread is an + atomic operation. *) +val wakeup : t -> unit + (* Reactivate the given thread. This thread is assumed to + be suspended on a call to [sleep]. After the call to [wakeup], + the suspended thread will resume execution at some future time. *) diff --git a/otherlibs/threads/threadIO.ml b/otherlibs/threads/threadIO.ml new file mode 100644 index 000000000..eccce0c99 --- /dev/null +++ b/otherlibs/threads/threadIO.ml @@ -0,0 +1,39 @@ +(***********************************************************************) +(* *) +(* Caml Special Light *) +(* *) +(* Xavier Leroy, projet Cristal, INRIA Rocquencourt *) +(* *) +(* Copyright 1995 Institut National de Recherche en Informatique et *) +(* Automatique. Distributed only by permission. *) +(* *) +(***********************************************************************) + +(* $Id$ *) + +(* Module [ThreadIO]: thread-compatible input-output operations *) + +let input_char ic = Thread.wait_inchan ic; input_char ic +let input_line ic = Thread.wait_inchan ic; input_line ic +let input ic buf ofs len = Thread.wait_inchan ic; input ic buf ofs len + +let rec really_input ic s ofs len = + if ofs < 0 or ofs + len > String.length s then invalid_arg "really_input" + else if len <= 0 then () + else begin + let r = input ic s ofs len in + if r = 0 + then raise End_of_file + else really_input ic s (ofs+r) (len-r) + end + +let input_byte ic = Thread.wait_inchan ic; input_byte ic +let input_binary_int ic = Thread.wait_inchan ic; input_binary_int ic +let input_value ic = Thread.wait_inchan ic; input_value ic + +let read_line () = flush stdout; input_line stdin +let read_int () = int_of_string(read_line()) +let read_float () = float_of_string(read_line()) + +let lexing_from_channel ic = + Lexing.from_function (fun buf n -> input ic buf 0 n) diff --git a/otherlibs/threads/threadIO.mli b/otherlibs/threads/threadIO.mli new file mode 100644 index 000000000..f9773336d --- /dev/null +++ b/otherlibs/threads/threadIO.mli @@ -0,0 +1,46 @@ +(***********************************************************************) +(* *) +(* Caml Special Light *) +(* *) +(* Xavier Leroy, projet Cristal, INRIA Rocquencourt *) +(* *) +(* Copyright 1995 Institut National de Recherche en Informatique et *) +(* Automatique. Distributed only by permission. *) +(* *) +(***********************************************************************) + +(* $Id$ *) + +(* Module [ThreadIO]: thread-compatible input-output operations *) + +(* This module reimplements some of the input functions from [Pervasives] + and [Lexing] so that they only block the calling thread, not all threads + in the program, if data is not immediately available on the input. *) + +(** General input functions *) + +val input_char : in_channel -> char +val input : in_channel -> string -> int -> int -> int +val really_input : in_channel -> string -> int -> int -> unit +val input_byte : in_channel -> int + (* See the corresponding functions in module [Pervasives]. *) +val input_line : in_channel -> string +val input_binary_int : in_channel -> int +val input_value : in_channel -> 'a + (* See the corresponding functions in module [Pervasives]. + For the three functions above, the other program threads continue + to run until the first character of the input is available; + all threads are blocked while the remaining characters of the + input are being read. *) + +(** Input functions on standard input *) + +val read_line : unit -> string +val read_int : unit -> int +val read_float : unit -> float + (* See the corresponding functions in module [Pervasives]. *) + +(** Lexer buffers *) + +val lexing_from_channel: in_channel -> Lexing.lexbuf + (* See [Lexing.from_channel]. *) diff --git a/otherlibs/threads/threadstubs.c b/otherlibs/threads/threadstubs.c deleted file mode 100644 index ae2c9536b..000000000 --- a/otherlibs/threads/threadstubs.c +++ /dev/null @@ -1,452 +0,0 @@ -/***********************************************************************/ -/* */ -/* Caml Special Light */ -/* */ -/* Xavier Leroy and Damien Doligez, INRIA Rocquencourt */ -/* */ -/* Copyright 1995 Institut National de Recherche en Informatique et */ -/* Automatique. Distributed only by permission. */ -/* */ -/***********************************************************************/ - -/* $Id$ */ - -#include <pthread.h> -#include <signal.h> -#include <sys/time.h> -#include "alloc.h" -#include "misc.h" -#include "mlvalues.h" -#include "roots.h" -#include "signals.h" -#include "stacks.h" -#include "sys.h" - -#ifdef HAS_THR_YIELD -#include <thread.h> -#endif - -/* Max computation time before rescheduling, in microseconds (50ms) */ -#define Thread_timeout 50000 - -/* Initial size of stack when a thread is created (4 Ko) */ -#define Thread_stack_size (Stack_size / 4) - -/* The thread descriptors */ - -struct csl_thread_struct { - pthread_t pthread; /* The corresponding Posix thread */ - value ident; /* Unique id */ - value * stack_low; /* The execution stack for this thread */ - value * stack_high; - value * stack_threshold; - value * sp; /* Saved value of extern_sp for this thread */ - value * trapsp; /* Saved value of trapsp for this thread */ - struct csl_thread_struct * next; /* Double linking of threads */ - struct csl_thread_struct * prev; -}; - -typedef struct csl_thread_struct * csl_thread_t; - -#define Assign(dst,src) modify((value *)&(dst), (value)(src)) - -/* The global mutex used to ensure that at most one thread is running - Caml code */ -pthread_mutex_t csl_mutex; - -/* Head of the list of thread descriptors */ -csl_thread_t thread_list = NULL; - -/* The key used for storing the thread descriptor in the specific data - of the corresponding Posix thread. */ -pthread_key_t thread_descriptor_key; - -/* Identifier for next thread creation */ -static value thread_next_ident = 0; - -/* Compatibility code for DEC OSF1 */ - -#ifdef __osf__ -#define Attr_default pthread_attr_default -#define Mutexattr_default pthread_mutexattr_default -#define Condattr_default pthread_condattr_default -#define Getspecific(res,key) pthread_getspecific(key, (void **) &(res)) -#define Pthread_key_create pthread_keycreate -#define Pthread_detach(th) pthread_detach(&(th)) -#else -#define Attr_default NULL -#define Mutexattr_default NULL -#define Condattr_default NULL -#define Getspecific(res,key) (res) = pthread_getspecific(key) -#define Pthread_key_create pthread_key_create -#define Pthread_detach pthread_detach -#endif - -/* Hook for scanning the stacks of the other threads */ - -static void (*prev_scan_roots_hook) P((scanning_action)); - -static void csl_thread_scan_roots(action) - scanning_action action; -{ - csl_thread_t th; - register value * sp; - /* Scan all thread descriptors */ - (*action)((value) thread_list, (value *) &thread_list); - /* Scan the stacks */ - for (th = thread_list; th != NULL; th = th->next) { - /* If this is the current thread, don't scan its stack, this - has already been done */ - if (th->stack_low == stack_low) continue; - for (sp = th->sp; sp < th->stack_high; sp++) { - (*action)(*sp, sp); - } - } - /* Hook */ - if (prev_scan_roots_hook != NULL) (*prev_scan_roots_hook)(action); -} - -/* Hooks for enter_blocking_section and leave_blocking_section */ - -static void (*prev_enter_blocking_section_hook) (); -static void (*prev_leave_blocking_section_hook) (); - -static void csl_thread_enter_blocking_section() -{ - csl_thread_t curr_thread; - if (prev_enter_blocking_section_hook != NULL) - (*prev_enter_blocking_section_hook)(); - /* Save the stack-related global variables in the thread descriptor - of the current thread */ - Getspecific(curr_thread, thread_descriptor_key); - curr_thread->stack_low = stack_low; - curr_thread->stack_high = stack_high; - curr_thread->stack_threshold = stack_threshold; - curr_thread->sp = extern_sp; - curr_thread->trapsp = trapsp; - /* Release the global mutex */ - pthread_mutex_unlock(&csl_mutex); -} - -static void csl_thread_leave_blocking_section() -{ - csl_thread_t curr_thread; - /* Re-acquire the global mutex */ - pthread_mutex_lock(&csl_mutex); - /* Restore the stack-related global variables */ - Getspecific(curr_thread, thread_descriptor_key); - stack_low = curr_thread->stack_low; - stack_high = curr_thread->stack_high; - stack_threshold = curr_thread->stack_threshold; - extern_sp = curr_thread->sp; - trapsp = curr_thread->trapsp; - if (prev_leave_blocking_section_hook != NULL) - (*prev_leave_blocking_section_hook)(); -} - -/* The "tick" thread fakes a SIGVTALRM signal at regular intervals. */ - -static void * csl_thread_tick() -{ - struct timeval timeout; - while(1) { - /* select() seems to be the most efficient way to suspend the - thread for sub-second intervals */ - timeout.tv_sec = 0; - timeout.tv_usec = Thread_timeout; - select(0, NULL, NULL, NULL, &timeout); - if (!async_signal_mode) handle_signal(SIGVTALRM); - } -} - -/* Thread cleanup: remove the descriptor from the list and - free the stack space and the descriptor itself. */ - -static void csl_thread_cleanup(th) - csl_thread_t th; -{ - /* Remove th from the doubly-linked list of threads */ - if (th == thread_list) { - thread_list = th->next; - } else { - Assign(th->next->prev, th->prev); - Assign(th->prev->next, th->next); - } - /* Free the memory resources */ - stat_free((char *) th->stack_low); - th->stack_low = NULL; - th->stack_high = NULL; - th->stack_threshold = NULL; - th->sp = NULL; - th->trapsp = NULL; -} - -/* Initialize the thread machinery */ - -value csl_thread_initialize(unit) /* ML */ - value unit; -{ - pthread_t tick_pthread; - /* Initialize the mutex */ - if (pthread_mutex_init(&csl_mutex, Mutexattr_default) == -1) - sys_error("Thread.init"); - pthread_mutex_lock(&csl_mutex); - /* Initialize the key */ - Pthread_key_create(&thread_descriptor_key, NULL); - /* Create a descriptor for the current thread */ - thread_list = (csl_thread_t) - alloc_shr(sizeof(struct csl_thread_struct) / sizeof(value), 0); - thread_list->pthread = pthread_self(); - thread_list->ident = Val_long(thread_next_ident); - thread_next_ident++; - /* The stack-related fields will be filled in at the next - enter_blocking_section */ - thread_list->next = NULL; - thread_list->prev = NULL; - /* Associate the thread descriptor with the thread */ - pthread_setspecific(thread_descriptor_key, (void *) thread_list); - /* Set up the hooks */ - prev_scan_roots_hook = scan_roots_hook; - scan_roots_hook = csl_thread_scan_roots; - prev_enter_blocking_section_hook = enter_blocking_section_hook; - enter_blocking_section_hook = csl_thread_enter_blocking_section; - prev_leave_blocking_section_hook = leave_blocking_section_hook; - leave_blocking_section_hook = csl_thread_leave_blocking_section; - /* Fork the tick thread */ - if (pthread_create(&tick_pthread, Attr_default, csl_thread_tick, 0) == -1) - sys_error("Thread.init"); - Pthread_detach(tick_pthread); - return Val_unit; -} - -/* Create a thread */ - -static void * csl_thread_start(th) - csl_thread_t th; -{ - value clos; - /* Associate the thread descriptor with the thread */ - pthread_setspecific(thread_descriptor_key, (void *) th); - /* Set up termination routine */ - pthread_cleanup_push(csl_thread_cleanup, (void *) th); - /* Acquire the global mutex before running the thread */ - pthread_mutex_lock(&csl_mutex); - /* Set up the stack variables */ - stack_low = th->stack_low; - stack_high = th->stack_high; - stack_threshold = th->stack_threshold; - extern_sp = th->sp; - trapsp = th->trapsp; - /* Callback the closure */ - clos = *extern_sp++; - callback(clos, Val_unit); - /* Cleanup: free the thread resources */ - pthread_cleanup_pop(1); - /* Release the mutex and die quietly */ - pthread_mutex_unlock(&csl_mutex); - return 0; -} - -value csl_thread_new(clos) /* ML */ - value clos; -{ - csl_thread_t th; - /* Allocate the thread and its stack */ - th = (csl_thread_t) - alloc_shr(sizeof(struct csl_thread_struct) / sizeof(value), 0); - th->ident = Val_long(thread_next_ident); - thread_next_ident++; - th->stack_low = (value *) stat_alloc(Thread_stack_size); - th->stack_high = th->stack_low + Thread_stack_size / sizeof(value); - th->stack_threshold = th->stack_low + Stack_threshold / sizeof(value); - th->sp = th->stack_high; - th->trapsp = th->stack_high; - /* Add it to the list of threads */ - th->next = thread_list; - th->prev = NULL; - Assign(thread_list->prev, th); - thread_list = th; - /* Pass the closure in the newly created stack, so that it will be - preserved by garbage collection */ - *--(th->sp) = clos; - /* Fork the new thread */ - if (pthread_create(&(th->pthread), Attr_default, csl_thread_start, - (void *) th) == -1) - sys_error("Thread.new"); - return (value) th; -} - -/* Return the current thread */ - -value csl_thread_self(unit) /* ML */ - value unit; -{ - csl_thread_t curr_thread; - Getspecific(curr_thread, thread_descriptor_key); - return (value) curr_thread; -} - -/* Return the identifier of a thread */ - -value csl_thread_id(th) /* ML */ - csl_thread_t th; -{ - return th->ident; -} - -/* Allow re-scheduling */ - -value csl_thread_yield(unit) /* ML */ - value unit; -{ - enter_blocking_section(); -#if defined(HAS_PTHREAD_YIELD) - pthread_yield(); -#elif defined(HAS_THR_YIELD) - thr_yield(); -#endif - leave_blocking_section(); - return Val_unit; -} - -/* Detach a thread */ - -value csl_thread_detach(th) /* ML */ - csl_thread_t th; -{ - if (Pthread_detach(th->pthread) == -1) sys_error("Thread.detach"); - return Val_unit; -} - -/* Suspend the current thread until another thread terminates */ - -value csl_thread_join(th) /* ML */ - csl_thread_t th; -{ - void * status; - int retcode; - enter_blocking_section(); - retcode = pthread_join(th->pthread, &status); - leave_blocking_section(); - if (retcode == -1) sys_error("Thread.join"); - return Val_unit; -} - -/* Terminate the current thread */ - -value csl_thread_exit(unit) /* ML */ - value unit; -{ - enter_blocking_section(); - pthread_exit(0); - return Val_unit; /* never reached */ -} - -/* Mutex operations */ - -#define Mutex_val(v) (*((pthread_mutex_t *)(&Field(v, 1)))) -#define Max_mutex_number 1000 - -static void csl_mutex_finalize(mut) - value mut; -{ - pthread_mutex_destroy(&Mutex_val(mut)); -} - -value csl_mutex_new(unit) /* ML */ - value unit; -{ - value mut; - mut = alloc_final(1 + sizeof(pthread_mutex_t) / sizeof(value), - csl_mutex_finalize, 1, Max_mutex_number); - if (pthread_mutex_init(&Mutex_val(mut), Mutexattr_default) == -1) - sys_error("Mutex.new"); - return mut; -} - -value csl_mutex_lock(mut) /* ML */ - value mut; -{ - int retcode; - enter_blocking_section(); - retcode = pthread_mutex_lock(&(Mutex_val(mut))); - leave_blocking_section(); - if (retcode == -1) sys_error("Mutex.lock"); - return Val_unit; -} - -value csl_mutex_unlock(mut) /* ML */ - value mut; -{ - int retcode; - enter_blocking_section(); - retcode = pthread_mutex_unlock(&(Mutex_val(mut))); - leave_blocking_section(); - if (retcode == -1) sys_error("Mutex.unlock"); - return Val_unit; -} - -value csl_mutex_try_lock(mut) /* ML */ - value mut; -{ - int retcode; - retcode = pthread_mutex_trylock(&(Mutex_val(mut))); - if (retcode == -1) sys_error("Mutex.try_lock"); - return Val_bool(retcode); -} - -/* Conditions operations */ - -#define Condition_val(v) (*((pthread_cond_t *)(&Field(v, 1)))) -#define Max_condition_number 1000 - -static void csl_condition_finalize(cond) - value cond; -{ - pthread_cond_destroy(&Condition_val(cond)); -} - -value csl_condition_new(unit) /* ML */ - value unit; -{ - value cond; - cond = alloc_final(1 + sizeof(pthread_cond_t) / sizeof(value), - csl_condition_finalize, 1, Max_condition_number); - if (pthread_cond_init(&Condition_val(cond), Condattr_default) == -1) - sys_error("Condition.new"); - return cond; -} - -value csl_condition_wait(cond, mut) /* ML */ - value cond, mut; -{ - int retcode; - enter_blocking_section(); - retcode = pthread_cond_wait(&Condition_val(cond), &Mutex_val(mut)); - leave_blocking_section(); - if (retcode == -1) sys_error("Condition.wait"); - return Val_unit; -} - -value csl_condition_signal(cond) /* ML */ - value cond; -{ - int retcode; - enter_blocking_section(); - retcode = pthread_cond_signal(&Condition_val(cond)); - leave_blocking_section(); - if (retcode == -1) sys_error("Condition.signal"); - return Val_unit; -} - -value csl_condition_broadcast(cond) /* ML */ - value cond; -{ - int retcode; - enter_blocking_section(); - retcode = pthread_cond_broadcast(&Condition_val(cond)); - leave_blocking_section(); - if (retcode == -1) sys_error("Condition.broadcast"); - return Val_unit; -} - |