diff --git a/Makefile b/Makefile index 8f47357e..ea3bd2f6 100644 --- a/Makefile +++ b/Makefile @@ -161,6 +161,9 @@ test-basic: chibi-scheme$(EXE) test-build: ./tests/build/build-tests.sh +test-threads: chibi-scheme$(EXE) + LD_LIBRARY_PATH=".:$(LD_LIBRARY_PATH)" ./chibi-scheme$(EXE) tests/thread-tests.scm + test-numbers: chibi-scheme$(EXE) LD_LIBRARY_PATH=".:$(LD_LIBRARY_PATH)" ./chibi-scheme$(EXE) tests/numeric-tests.scm diff --git a/eval.c b/eval.c index baa28e8a..eb438301 100644 --- a/eval.c +++ b/eval.c @@ -8,46 +8,6 @@ static int scheme_initialized_p = 0; -#if SEXP_USE_DEBUG_VM -static void sexp_print_stack (sexp ctx, sexp *stack, int top, int fp, sexp out) { - int i; - if (! sexp_oportp(out)) out = sexp_current_error_port(ctx); - for (i=0; i4; i=sexp_unbox_fixnum(stack[i+3])) { - self = stack[i+2]; - if (sexp_procedurep(self)) { - sexp_write_string(ctx, " called from ", out); - bc = sexp_procedure_code(self); - if (sexp_truep(sexp_bytecode_name(bc))) - sexp_write(ctx, sexp_bytecode_name(bc), out); - else - sexp_printf(ctx, out, "anon: %p", bc); - if ((ls=sexp_bytecode_source(bc)) && sexp_pairp(ls)) { - if (sexp_fixnump(sexp_cdr(ls)) && (sexp_cdr(ls) >= SEXP_ZERO)) { - sexp_write_string(ctx, " on line ", out); - sexp_write(ctx, sexp_cdr(ls), out); - } - if (sexp_stringp(sexp_car(ls))) { - sexp_write_string(ctx, " of file ", out); - sexp_write_string(ctx, sexp_string_data(sexp_car(ls)), out); - } - } - sexp_write_char(ctx, '\n', out); - } - } -} - static sexp analyze (sexp ctx, sexp x); static void generate (sexp ctx, sexp x); diff --git a/include/chibi/sexp.h b/include/chibi/sexp.h index d21ea569..3e66a297 100644 --- a/include/chibi/sexp.h +++ b/include/chibi/sexp.h @@ -694,7 +694,7 @@ SEXP_API sexp sexp_make_unsigned_integer(sexp ctx, sexp_luint_t x); #define sexp_context_parent(x) ((x)->value.context.parent) #define sexp_context_saves(x) ((x)->value.context.saves) #define sexp_context_tailp(x) ((x)->value.context.tailp) -#define sexp_context_tracep(x) ((x)->value.context.tailp) +#define sexp_context_tracep(x) ((x)->value.context.tracep) #define sexp_context_globals(x) ((x)->value.context.globals) #define sexp_context_last_fp(x) ((x)->value.context.last_fp) #define sexp_context_refuel(x) ((x)->value.context.refuel) diff --git a/lib/srfi/18/interface.scm b/lib/srfi/18/interface.scm index d917cf25..7dde92aa 100644 --- a/lib/srfi/18/interface.scm +++ b/lib/srfi/18/interface.scm @@ -8,7 +8,8 @@ (if (thread-timeout?) (if (and (pair? o) (pair? (cdr o))) (cadr o) - (error "timed out waiting for thread" thread))))))) + (error "timed out waiting for thread" thread)) + #t))))) (define (thread-terminate! thread) (if (%thread-terminate! thread) ;; need to yield if terminating ourself @@ -21,11 +22,18 @@ (define (mutex-lock! mutex . o) (let ((timeout (and (pair? o) (car o))) (thread (if (and (pair? o) (pair? (cdr o))) (cadr o) #t))) - (if (not (%mutex-lock! mutex timeout thread)) - (thread-yield!)))) + (cond ((%mutex-lock! mutex timeout thread)) + (else + (thread-yield!) + (not (thread-timeout?)))))) (define (mutex-unlock! mutex . o) - #f) + (let ((condvar (and (pair? o) (car o))) + (timeout (if (and (pair? o) (pair? (cdr o))) (cadr o) #f))) + (cond ((%mutex-unlock! mutex condvar timeout)) + (else + (thread-yield!) + (not (thread-timeout?)))))) (define current-time get-time-of-day) (define time? timeval?) diff --git a/lib/srfi/18/threads.c b/lib/srfi/18/threads.c index 24c57050..046d8bf4 100644 --- a/lib/srfi/18/threads.c +++ b/lib/srfi/18/threads.c @@ -5,6 +5,7 @@ #include #include #include +#include #define sexp_mutex_name(x) sexp_slot_ref(x, 0) #define sexp_mutex_specific(x) sexp_slot_ref(x, 1) @@ -16,7 +17,7 @@ #define sexp_condvar_threads(x) sexp_slot_ref(x, 2) #define timeval_le(a, b) (((a).tv_sec < (b).tv_sec) || (((a).tv_sec == (b).tv_sec) && ((a).tv_usec < (b).tv_usec))) -#define sexp_context_before(c, t) ((sexp_context_timeval(c).tv_sec != 0) && timeval_le(sexp_context_timeval(c), t)) +#define sexp_context_before(c, t) (((sexp_context_timeval(c).tv_sec != 0) || (sexp_context_timeval(c).tv_usec != 0)) && timeval_le(sexp_context_timeval(c), t)) /* static int mutex_id, condvar_id; */ @@ -72,13 +73,12 @@ sexp sexp_make_thread (sexp ctx sexp_api_params(self, n), sexp thunk, sexp name) sexp sexp_thread_start (sexp ctx sexp_api_params(self, n), sexp thread) { sexp cell; sexp_assert_type(ctx, sexp_contextp, SEXP_CONTEXT, thread); + cell = sexp_cons(ctx, thread, SEXP_NULL); if (sexp_pairp(sexp_global(ctx, SEXP_G_THREADS_BACK))) { - cell = sexp_cons(ctx, thread, SEXP_NULL); sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_BACK)) = cell; sexp_global(ctx, SEXP_G_THREADS_BACK) = cell; } else { /* init queue */ - sexp_global(ctx, SEXP_G_THREADS_BACK) = sexp_global(ctx, SEXP_G_THREADS_FRONT) - = sexp_cons(ctx, thread, SEXP_NULL); + sexp_global(ctx, SEXP_G_THREADS_BACK) = sexp_global(ctx, SEXP_G_THREADS_FRONT) = cell; } return SEXP_VOID; } @@ -115,14 +115,15 @@ static void sexp_insert_timed (sexp ctx, sexp thread, sexp timeout) { double d; #endif sexp ls1=SEXP_NULL, ls2=sexp_global(ctx, SEXP_G_THREADS_PAUSED); + if (sexp_integerp(timeout) || sexp_flonump(timeout)) + gettimeofday(&sexp_context_timeval(ctx), NULL); if (sexp_integerp(timeout)) { - sexp_context_timeval(ctx).tv_sec = sexp_unbox_fixnum(timeout); - sexp_context_timeval(ctx).tv_usec = 0; + sexp_context_timeval(ctx).tv_sec += sexp_unbox_fixnum(timeout); #if SEXP_USE_FLONUMS } else if (sexp_flonump(timeout)) { d = sexp_flonum_value(timeout); - sexp_context_timeval(ctx).tv_sec = trunc(d); - sexp_context_timeval(ctx).tv_usec = (d-trunc(d))*1000000; + sexp_context_timeval(ctx).tv_sec += trunc(d); + sexp_context_timeval(ctx).tv_usec += (d-trunc(d))*1000000; #endif } else { sexp_context_timeval(ctx).tv_sec = 0; @@ -143,8 +144,10 @@ static void sexp_insert_timed (sexp ctx, sexp thread, sexp timeout) { sexp sexp_thread_join (sexp ctx sexp_api_params(self, n), sexp thread, sexp timeout) { sexp_assert_type(ctx, sexp_contextp, SEXP_CONTEXT, thread); - if (sexp_context_refuel(thread) <= 0) /* return true if already terminated */ + if (sexp_context_refuel(thread) <= 0) /* return true if already terminated */ { return SEXP_TRUE; + } + sexp_context_timeoutp(ctx) = 0; sexp_context_waitp(ctx) = 1; sexp_context_event(ctx) = thread; sexp_insert_timed(ctx, ctx, timeout); @@ -188,31 +191,79 @@ sexp sexp_mutex_lock (sexp ctx sexp_api_params(self, n), sexp mutex, sexp timeou } sexp sexp_mutex_unlock (sexp ctx sexp_api_params(self, n), sexp mutex, sexp condvar, sexp timeout) { + sexp ls1, ls2; if (sexp_not(condvar)) { - /* normal unlock */ + /* normal unlock - always succeeds, just need to unblock threads */ if (sexp_truep(sexp_mutex_lockp(mutex))) { sexp_mutex_lockp(mutex) = SEXP_FALSE; sexp_mutex_thread(mutex) = ctx; - /* XXXX search for threads blocked on this mutex */ + /* search for threads blocked on this mutex */ + for (ls1=SEXP_NULL, ls2=sexp_global(ctx, SEXP_G_THREADS_PAUSED); + sexp_pairp(ls2); ls1=ls2, ls2=sexp_cdr(ls2)) + if (sexp_context_event(sexp_car(ls2)) == mutex) { + if (ls1==SEXP_NULL) + sexp_global(ctx, SEXP_G_THREADS_PAUSED) = sexp_cdr(ls2); + else + sexp_cdr(ls1) = sexp_cdr(ls2); + sexp_cdr(ls2) = sexp_global(ctx, SEXP_G_THREADS_FRONT); + sexp_global(ctx, SEXP_G_THREADS_FRONT) = ls2; + if (! sexp_pairp(sexp_cdr(ls2))) + sexp_global(ctx, SEXP_G_THREADS_BACK) = ls2; + sexp_context_waitp(sexp_car(ls2)) + = sexp_context_timeoutp(sexp_car(ls2)) = 0; + break; + } } + return SEXP_TRUE; } else { /* wait on condition var */ - + sexp_context_waitp(ctx) = 1; + sexp_context_event(ctx) = condvar; + sexp_insert_timed(ctx, ctx, timeout); + return SEXP_FALSE; } } /**************************** condition variables *************************/ sexp sexp_condition_variable_signal (sexp ctx sexp_api_params(self, n), sexp condvar) { - return SEXP_VOID; + sexp ls1=SEXP_NULL, ls2=sexp_global(ctx, SEXP_G_THREADS_PAUSED); + for ( ; sexp_pairp(ls2); ls1=ls2, ls2=sexp_cdr(ls2)) + if (sexp_context_event(sexp_car(ls2)) == condvar) { + if (ls1==SEXP_NULL) + sexp_global(ctx, SEXP_G_THREADS_PAUSED) = sexp_cdr(ls2); + else + sexp_cdr(ls1) = sexp_cdr(ls2); + sexp_cdr(ls2) = sexp_global(ctx, SEXP_G_THREADS_FRONT); + sexp_global(ctx, SEXP_G_THREADS_FRONT) = ls2; + if (! sexp_pairp(sexp_cdr(ls2))) + sexp_global(ctx, SEXP_G_THREADS_BACK) = ls2; + sexp_context_waitp(sexp_car(ls2)) = sexp_context_timeoutp(sexp_car(ls2)) = 0; + return SEXP_TRUE; + } + return SEXP_FALSE; } sexp sexp_condition_variable_broadcast (sexp ctx sexp_api_params(self, n), sexp condvar) { - return SEXP_VOID; + sexp res = SEXP_FALSE; + while (sexp_truep(sexp_condition_variable_signal(ctx, self, n, condvar))) + res = SEXP_TRUE; + return res; } /**************************** the scheduler *******************************/ +void sexp_wait_on_single_thread (sexp ctx) { + struct timeval tval; + useconds_t usecs = 0; + gettimeofday(&tval, NULL); + if (tval.tv_sec < sexp_context_timeval(ctx).tv_sec) + usecs = (sexp_context_timeval(ctx).tv_sec - tval.tv_sec) * 1000000; + if (tval.tv_usec < sexp_context_timeval(ctx).tv_usec) + usecs += sexp_context_timeval(ctx).tv_usec - tval.tv_usec; + usleep(usecs); +} + sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) { struct timeval tval; sexp res, ls1, ls2, tmp, paused, front=sexp_global(ctx, SEXP_G_THREADS_FRONT); @@ -221,27 +272,31 @@ sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) { /* if we've terminated, check threads joining us */ if (sexp_context_refuel(ctx) <= 0) { - for (ls1=SEXP_NULL, ls2=paused; sexp_pairp(ls2); ls2=sexp_cdr(ls2)) + for (ls1=SEXP_NULL, ls2=paused; sexp_pairp(ls2); ) { if (sexp_context_event(sexp_car(ls2)) == ctx) { - sexp_context_waitp(ctx) = 0; + sexp_context_waitp(sexp_car(ls2)) = 0; + sexp_context_timeoutp(sexp_car(ls2)) = 0; if (ls1==SEXP_NULL) sexp_global(ctx, SEXP_G_THREADS_PAUSED) = paused = sexp_cdr(ls2); else sexp_cdr(ls1) = sexp_cdr(ls2); tmp = sexp_cdr(ls2); - sexp_cdr(ls2) = front; - sexp_global(ctx, SEXP_G_THREADS_FRONT) = front = ls2; + sexp_cdr(ls2) = SEXP_NULL; + if (! sexp_pairp(sexp_global(ctx, SEXP_G_THREADS_BACK))) { + sexp_global(ctx, SEXP_G_THREADS_FRONT) = front = ls2; + } else { + sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_BACK)) = ls2; + } + sexp_global(ctx, SEXP_G_THREADS_BACK) = ls2; ls2 = tmp; } else { ls1 = ls2; ls2 = sexp_cdr(ls2); } + } } - /* TODO: check threads blocked on I/O */ - /* ... */ - - /* check timeouts (must be _after_ previous checks) */ + /* check timeouts */ if (sexp_pairp(paused)) { if (gettimeofday(&tval, NULL) == 0) { ls1 = SEXP_NULL; @@ -253,9 +308,14 @@ sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) { ls2 = sexp_cdr(ls2); } if (sexp_pairp(ls1)) { - sexp_cdr(ls1) = front; - sexp_global(ctx, SEXP_G_THREADS_FRONT) = front = paused; - sexp_global(ctx, SEXP_G_THREADS_PAUSED) = ls2; + sexp_cdr(ls1) = SEXP_NULL; + if (! sexp_pairp(sexp_global(ctx, SEXP_G_THREADS_BACK))) { + sexp_global(ctx, SEXP_G_THREADS_FRONT) = front = paused; + } else { + sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_BACK)) = paused; + } + sexp_global(ctx, SEXP_G_THREADS_BACK) = ls1; + sexp_global(ctx, SEXP_G_THREADS_PAUSED) = paused = ls2; } } } @@ -266,7 +326,7 @@ sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) { if ((sexp_context_refuel(ctx) <= 0) || sexp_context_waitp(ctx)) { /* either terminated or paused */ sexp_global(ctx, SEXP_G_THREADS_FRONT) = sexp_cdr(front); - if (ctx == sexp_car(sexp_global(ctx, SEXP_G_THREADS_BACK))) + if (! sexp_pairp(sexp_cdr(front))) sexp_global(ctx, SEXP_G_THREADS_BACK) = SEXP_NULL; } else { /* swap with front of queue */ @@ -284,6 +344,13 @@ sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) { res = ctx; } + if (sexp_context_waitp(res)) { + /* the only thread available was waiting */ + sexp_wait_on_single_thread(res); + sexp_context_timeoutp(res) = 1; + sexp_context_waitp(res) = 0; + } + return res; } diff --git a/opt/opcode_names.h b/opt/opcode_names.h index d4c44632..a8c06e9a 100644 --- a/opt/opcode_names.h +++ b/opt/opcode_names.h @@ -1,7 +1,7 @@ static const char* reverse_opcode_names[] = {"NOOP", "RAISE", "RESUMECC", "CALLCC", "APPLY1", "TAIL-CALL", "CALL", - "FCALL0", "FCALL1", "FCALL2", "FCALL3", "FCALL4", "FCALL5", "FCALL6", + "FCALL0", "FCALL1", "FCALL2", "FCALL3", "FCALL4", "FCALL5", "FCALL6", "FCALLN", "JUMP-UNLESS", "JUMP", "PUSH", "DROP", "GLOBAL-REF", "GLOBAL-KNOWN-REF", "STACK-REF", "LOCAL-REF", "LOCAL-SET", "CLOSURE-REF", "VECTOR-REF", "VECTOR-SET", "VECTOR-LENGTH", "STRING-REF", diff --git a/sexp.c b/sexp.c index b2390075..1af3d9a0 100644 --- a/sexp.c +++ b/sexp.c @@ -421,6 +421,7 @@ sexp sexp_print_exception_op (sexp ctx sexp_api_params(self, n), sexp exn, sexp } ls = sexp_exception_source(exn); if ((! (ls && sexp_pairp(ls))) + && sexp_exception_procedure(exn) && sexp_procedurep(sexp_exception_procedure(exn))) ls = sexp_bytecode_source(sexp_procedure_code(sexp_exception_procedure(exn))); if (ls && sexp_pairp(ls)) { diff --git a/tests/thread-tests.scm b/tests/thread-tests.scm new file mode 100644 index 00000000..df6d8a69 --- /dev/null +++ b/tests/thread-tests.scm @@ -0,0 +1,58 @@ + +(import (srfi 18)) + +(define *tests-run* 0) +(define *tests-passed* 0) + +(define-syntax test + (syntax-rules () + ((test name expr expect) + (begin + (set! *tests-run* (+ *tests-run* 1)) + (let ((str (call-with-output-string (lambda (out) (display name out)))) + (res expr)) + (display str) + (write-char #\space) + (display (make-string (max 0 (- 72 (string-length str))) #\.)) + (flush-output) + (cond + ((equal? res expect) + (set! *tests-passed* (+ *tests-passed* 1)) + (display " [PASS]\n")) + (else + (display " [FAIL]\n") + (display " expected ") (write expect) + (display " but got ") (write res) (newline)))))))) + +(define (test-report) + (write *tests-passed*) + (display " out of ") + (write *tests-run*) + (display " passed (") + (write (* (/ *tests-passed* *tests-run*) 100)) + (display "%)") + (newline)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; run tests + +(test "no threads" (begin 'ok) 'ok) +(test "unstarted thread" (let ((t (make-thread (lambda () (error "oops"))))) 'ok) 'ok) +(test "ignored thread terminates" (let ((t (make-thread (lambda () 'oops)))) (thread-start! t) 'ok) 'ok) +(test "ignored thread hangs" (let ((t (make-thread (lambda () (let lp () (lp)))))) (thread-start! t) 'ok) 'ok) +(test "joined thread terminates" (let ((t (make-thread (lambda () 'oops)))) (thread-start! t) (thread-join! t) 'ok) 'ok) +(test "joined thread hangs, timeout" (let ((t (make-thread (lambda () (let lp () (lp)))))) (thread-start! t) (thread-join! t 1 'timeout)) 'timeout) + +(test "basic mutex" (let ((m (make-mutex))) (and (mutex? m) 'ok)) 'ok) +(test "mutex unlock" (let ((m (make-mutex))) (and (mutex-unlock! m) 'ok)) 'ok) +(test "mutex lock/unlock" (let ((m (make-mutex))) (and (mutex-lock! m) (mutex-unlock! m) 'ok)) 'ok) +(test "mutex lock timeout" (let* ((m (make-mutex)) (t (make-thread (lambda () (mutex-lock! m))))) (thread-start! t) (thread-yield!) (if (mutex-lock! m 1) 'fail 'timeout)) 'timeout) + +;(test "basic condition-variable" () 'ok) +;(test "condition-variable signal" () 'ok) +;(test "condition-variable broadcast" () 'ok) + +;(test "mailbox") + +(test-report) + diff --git a/vm.c b/vm.c index 1ebad747..88e4e494 100644 --- a/vm.c +++ b/vm.c @@ -4,6 +4,48 @@ static sexp sexp_apply1 (sexp ctx, sexp f, sexp x); +#if SEXP_USE_DEBUG_VM > 1 +static void sexp_print_stack (sexp ctx, sexp *stack, int top, int fp, sexp out) { + int i; + if (! sexp_oportp(out)) out = sexp_current_error_port(ctx); + for (i=0; i4; i=sexp_unbox_fixnum(stack[i+3])) { + self = stack[i+2]; + if (sexp_procedurep(self)) { + sexp_write_string(ctx, " called from ", out); + bc = sexp_procedure_code(self); + if (sexp_truep(sexp_bytecode_name(bc))) + sexp_write(ctx, sexp_bytecode_name(bc), out); + else + sexp_printf(ctx, out, "anon: %p", bc); + if ((ls=sexp_bytecode_source(bc)) && sexp_pairp(ls)) { + if (sexp_fixnump(sexp_cdr(ls)) && (sexp_cdr(ls) >= SEXP_ZERO)) { + sexp_write_string(ctx, " on line ", out); + sexp_write(ctx, sexp_cdr(ls), out); + } + if (sexp_stringp(sexp_car(ls))) { + sexp_write_string(ctx, " of file ", out); + sexp_write_string(ctx, sexp_string_data(sexp_car(ls)), out); + } + } + sexp_write_char(ctx, '\n', out); + } + } +} + /************************* code generation ****************************/ static void emit_word (sexp ctx, sexp_uint_t val) { @@ -503,8 +545,8 @@ sexp sexp_vm (sexp ctx, sexp proc) { #if SEXP_USE_DEBUG_VM if (sexp_context_tracep(ctx)) { sexp_print_stack(ctx, stack, top, fp, SEXP_FALSE); - fprintf(stderr, "%s\n", (*ip<=SEXP_OP_NUM_OPCODES) ? - reverse_opcode_names[*ip] : "UNKNOWN"); + fprintf(stderr, "%s ip: %p stack: %p top: %d fp: %d\n", (*ip<=SEXP_OP_NUM_OPCODES) ? + reverse_opcode_names[*ip] : "UNKNOWN", ip, stack, top, fp); } #endif switch (*ip++) { @@ -515,7 +557,7 @@ sexp sexp_vm (sexp ctx, sexp proc) { tmp1 = sexp_cdr(sexp_global(ctx, SEXP_G_ERR_HANDLER)); sexp_context_last_fp(ctx) = fp; if (! sexp_procedurep(tmp1)) goto end_loop; - stack[top] = (sexp) 1; + stack[top] = SEXP_ONE; stack[top+1] = sexp_make_fixnum(ip-sexp_bytecode_data(bc)); stack[top+2] = self; stack[top+3] = sexp_make_fixnum(fp); @@ -643,8 +685,10 @@ sexp sexp_vm (sexp ctx, sexp proc) { fp = top-4; break; case SEXP_OP_FCALL0: + tmp1 = _WORD0; _ALIGN_IP(); sexp_context_top(ctx) = top; + sexp_context_last_fp(ctx) = fp; _PUSH(((sexp_proc1)sexp_opcode_func(_WORD0))(ctx sexp_api_pass(_WORD0, 0))); ip += sizeof(sexp); sexp_check_exception(); @@ -1225,6 +1269,7 @@ sexp sexp_vm (sexp ctx, sexp proc) { break; case SEXP_OP_YIELD: fuel = 0; + _PUSH(SEXP_VOID); break; case SEXP_OP_RET: i = sexp_unbox_fixnum(stack[fp]);