From e4a7224a2daffb17f1ce7d5de28ed6c8db51130d Mon Sep 17 00:00:00 2001 From: Alex Shinn Date: Wed, 8 Sep 2010 13:54:52 +0000 Subject: [PATCH] initial support for non-blocking i/o --- include/chibi/sexp.h | 12 +-- lib/chibi/net.module | 3 +- lib/chibi/net.scm | 19 +++-- lib/chibi/net.stub | 1 + lib/chibi/process.module | 32 ++++++- lib/chibi/process.stub | 5 ++ lib/chibi/signal.c | 45 ++++++++++ lib/srfi/18.module | 3 +- lib/srfi/18/threads.c | 176 ++++++++++++++++++++++++++++++++++----- main.c | 21 ++++- 10 files changed, 278 insertions(+), 39 deletions(-) diff --git a/include/chibi/sexp.h b/include/chibi/sexp.h index afa11fbc..da0597b0 100644 --- a/include/chibi/sexp.h +++ b/include/chibi/sexp.h @@ -844,9 +844,9 @@ SEXP_API struct sexp_struct *sexp_type_specs; #define sexp_type_name(x) ((x)->value.type.name) #define sexp_type_finalize(x) ((x)->value.type.finalize) -#define sexp_bignum_sign(x) ((x)->value.bignum.sign) -#define sexp_bignum_length(x) ((x)->value.bignum.length) -#define sexp_bignum_data(x) ((x)->value.bignum.data) +#define sexp_bignum_sign(x) ((x)->value.bignum.sign) +#define sexp_bignum_length(x) ((x)->value.bignum.length) +#define sexp_bignum_data(x) ((x)->value.bignum.data) /****************************** arithmetic ****************************/ @@ -904,6 +904,7 @@ enum sexp_context_globals { SEXP_G_THREADS_SIGNALS, SEXP_G_THREADS_SIGNAL_RUNNER, SEXP_G_THREADS_POLL_FDS, + SEXP_G_THREADS_FD_THREADS, SEXP_G_THREADS_BLOCKER, #endif SEXP_G_NUM_GLOBALS @@ -961,8 +962,9 @@ SEXP_API sexp sexp_buffered_flush (sexp ctx, sexp p); #endif -#define sexp_newline(ctx, p) sexp_write_char(ctx, '\n', (p)) -#define sexp_at_eofp(p) (feof(sexp_port_stream(p))) +#define sexp_newline(ctx, p) sexp_write_char((ctx), '\n', (p)) +#define sexp_at_eofp(p) (feof(sexp_port_stream(p))) +#define sexp_port_fileno(p) (fileno(sexp_port_stream(p))) SEXP_API sexp sexp_make_context(sexp ctx, size_t size); SEXP_API sexp sexp_alloc_tagged(sexp ctx, size_t size, sexp_uint_t tag); diff --git a/lib/chibi/net.module b/lib/chibi/net.module index 845a7aa8..39335033 100644 --- a/lib/chibi/net.module +++ b/lib/chibi/net.module @@ -1,6 +1,7 @@ (define-module (chibi net) - (export sockaddr? address-info? get-address-info socket connect + (export sockaddr? address-info? get-address-info + socket connect bind accept listen with-net-io open-net-io address-info-family address-info-socket-type address-info-protocol address-info-address address-info-address-length address-info-next) diff --git a/lib/chibi/net.scm b/lib/chibi/net.scm index 5f912cb5..4235c2e4 100644 --- a/lib/chibi/net.scm +++ b/lib/chibi/net.scm @@ -15,13 +15,18 @@ (address-info-protocol addr)))) (if (negative? sock) (lp (address-info-next addr)) - (if (negative? - (connect sock - (address-info-address addr) - (address-info-address-length addr))) - (lp (address-info-next addr)) - (list (open-input-file-descriptor sock) - (open-output-file-descriptor sock)))))))) + (cond + ((negative? + (connect sock + (address-info-address addr) + (address-info-address-length addr))) + (lp (address-info-next addr))) + (else + (cond-expand + (threads (set-file-descriptor-flags! sock open/non-block)) + (else #f)) + (list (open-input-file-descriptor sock) + (open-output-file-descriptor sock))))))))) (define (with-net-io host service proc) (let ((io (open-net-io host service))) diff --git a/lib/chibi/net.stub b/lib/chibi/net.stub index 0d72bc90..5b923b28 100644 --- a/lib/chibi/net.stub +++ b/lib/chibi/net.stub @@ -23,3 +23,4 @@ (define-c int listen (int int)) (define-c int socket (int int int)) (define-c int connect (int sockaddr int)) +(define-c int accept (int sockaddr int)) diff --git a/lib/chibi/process.module b/lib/chibi/process.module index 372b56e4..b487ccef 100644 --- a/lib/chibi/process.module +++ b/lib/chibi/process.module @@ -1,6 +1,7 @@ (define-module (chibi process) (export exit sleep alarm fork kill execute waitpid + process-command-line process-running? set-signal-action! make-signal-set signal-set-contains? signal-set-fill! signal-set-add! signal-set-delete! current-signal-mask @@ -14,5 +15,34 @@ signal/tty-output) (import-immutable (scheme)) (cond-expand (threads (import (srfi 18))) (else #f)) - (include-shared "process")) + (include-shared "process") + (cond-expand + (unix + (body + (define (process-command-line pid) + (call-with-current-continuation + (lambda (return) + (with-exception-handler + (lambda (exn) (return #f)) + (lambda () + (let ((file (string-append "/proc/" (number->string pid) "/cmdline"))) + (call-with-input-file file + (lambda (in) + (let lp ((arg '()) (res '())) + (let ((ch (read-char in))) + (if (or (eof-object? ch) (eqv? (char->integer ch) 0)) + (let ((res (cons (list->string (reverse arg)) res)) + (ch2 (peek-char in))) + (if (or (eof-object? ch2) (eqv? (char->integer ch2) 0)) + (reverse res) + (lp '() res))) + (lp (cons ch arg) res)))))))))))))) + (else #f)) + (body + (define (process-running? pid . o) + (let ((cmdline (process-command-line pid))) + (and (pair? cmdline) + (or (null? o) + (not (car o)) + (equal? (car o) (car cmdline)))))))) diff --git a/lib/chibi/process.stub b/lib/chibi/process.stub index 93b08d95..ed0db2eb 100644 --- a/lib/chibi/process.stub +++ b/lib/chibi/process.stub @@ -70,4 +70,9 @@ (define-c void exit (int)) (define-c int (execute execvp) (string (array string))) +(cond-expand + (unix) + (else + (define-c sexp (process-command-line sexp_pid_cmdline) ((value ctx sexp) int)))) + (c-init "sexp_init_signals(ctx, env);") diff --git a/lib/chibi/signal.c b/lib/chibi/signal.c index 7202d96e..baa4ff84 100644 --- a/lib/chibi/signal.c +++ b/lib/chibi/signal.c @@ -62,6 +62,51 @@ static sexp sexp_set_signal_action (sexp ctx, sexp self, sexp signum, sexp newac return oldaction; } +#if SEXP_BSD + +#include +#include + +static sexp sexp_pid_cmdline (sexp ctx, int pid) { + unsigned long reslen = sizeof(struct kinfo_proc); + struct kinfo_proc res; + int name[4] = {CTL_KERN, KERN_PROC, KERN_PROC_PID, pid}; + if (sysctl(name, 4, &res, &reslen, NULL, 0) >= 0) { + return sexp_c_string(ctx, res.kp_proc.p_comm, -1); + } else { + return SEXP_FALSE; + } +} + +#else + +/* #include */ +/* #include */ + +/* #define CMDLINE_LENGTH 512 */ + +/* static sexp sexp_pid_cmdline (sexp ctx, int pid) { */ +/* struct __sysctl_args args; */ +/* char cmdline[CMDLINE_LENGTH]; */ +/* size_t cmdline_length; */ +/* int name[] = { CTL_KERN, KERN_OSTYPE }; */ + +/* memset(&args, 0, sizeof(struct __sysctl_args)); */ +/* args.name = name; */ +/* args.nlen = sizeof(name)/sizeof(name[0]); */ +/* args.oldval = cmdline; */ +/* args.oldlenp = &cmdline_length; */ +/* cmdline_length = sizeof(cmdline); */ + +/* if (syscall(SYS__sysctl, &args) == -1) { */ +/* return SEXP_FALSE; */ +/* } else { */ +/* return sexp_c_string(ctx, cmdline, -1); */ +/* } */ +/* } */ + +#endif + static void sexp_init_signals (sexp ctx, sexp env) { call_sigaction.sa_sigaction = sexp_call_sigaction; #if SEXP_USE_GREEN_THREADS diff --git a/lib/srfi/18.module b/lib/srfi/18.module index 3ed564f8..dd0fa8a3 100644 --- a/lib/srfi/18.module +++ b/lib/srfi/18.module @@ -19,6 +19,7 @@ (srfi 9) (chibi ast) (chibi time)) + (include "18/types.scm") (include-shared "18/threads") - (include "18/types.scm" "18/interface.scm")) + (include "18/interface.scm")) diff --git a/lib/srfi/18/threads.c b/lib/srfi/18/threads.c index 99390353..dd5c22dd 100644 --- a/lib/srfi/18/threads.c +++ b/lib/srfi/18/threads.c @@ -6,20 +6,37 @@ #include #include #include +#include +#define sexp_mutexp(x) (sexp_check_tag(x, sexp_mutex_id)) #define sexp_mutex_name(x) sexp_slot_ref(x, 0) #define sexp_mutex_specific(x) sexp_slot_ref(x, 1) #define sexp_mutex_thread(x) sexp_slot_ref(x, 2) -#define sexp_mutex_lockp(x) sexp_slot_ref(x, 3) +#define sexp_mutex_lockp(x) sexp_slot_ref(x, 3) +#define sexp_condvarp(x) (sexp_check_tag(x, sexp_condvar_id)) #define sexp_condvar_name(x) sexp_slot_ref(x, 0) #define sexp_condvar_specific(x) sexp_slot_ref(x, 1) #define sexp_condvar_threads(x) sexp_slot_ref(x, 2) +struct sexp_pollfds_t { + struct pollfd *fds; + nfds_t nfds, mfds; +}; + +#define SEXP_INIT_POLLFDS_MAX_FDS 16 + +#define sexp_pollfdsp(x) (sexp_check_tag(x, sexp_pollfds_id)) +#define sexp_pollfds_fds(x) (((struct sexp_pollfds_t*)(&(x)->value))->fds) +#define sexp_pollfds_num_fds(x) (((struct sexp_pollfds_t*)(&(x)->value))->nfds) +#define sexp_pollfds_max_fds(x) (((struct sexp_pollfds_t*)(&(x)->value))->mfds) + +#define sexp_sizeof_pollfds (sexp_sizeof_header + sizeof(struct sexp_pollfds_t)) + #define timeval_le(a, b) (((a).tv_sec < (b).tv_sec) || (((a).tv_sec == (b).tv_sec) && ((a).tv_usec < (b).tv_usec))) #define sexp_context_before(c, t) (((sexp_context_timeval(c).tv_sec != 0) || (sexp_context_timeval(c).tv_usec != 0)) && timeval_le(sexp_context_timeval(c), t)) -/* static int mutex_id, condvar_id; */ +static int sexp_mutex_id, sexp_condvar_id, sexp_pollfds_id; /**************************** threads *************************************/ @@ -165,7 +182,7 @@ sexp sexp_thread_sleep (sexp ctx sexp_api_params(self, n), sexp timeout) { /**************************** mutexes *************************************/ sexp sexp_mutex_state (sexp ctx sexp_api_params(self, n), sexp mutex) { - /* sexp_assert_type(ctx, sexp_mutexp, mutex_id, timeout); */ + sexp_assert_type(ctx, sexp_mutexp, sexp_mutex_id, mutex); if (sexp_truep(sexp_mutex_lockp(mutex))) { if (sexp_contextp(sexp_mutex_thread(mutex))) return sexp_mutex_thread(mutex); @@ -254,19 +271,6 @@ sexp sexp_condition_variable_broadcast (sexp ctx sexp_api_params(self, n), sexp /**************************** the scheduler *******************************/ -void sexp_wait_on_single_thread (sexp ctx) { - struct timeval tval; - useconds_t usecs = 0; - gettimeofday(&tval, NULL); - if (tval.tv_sec < sexp_context_timeval(ctx).tv_sec) - usecs = (sexp_context_timeval(ctx).tv_sec - tval.tv_sec) * 1000000; - if (tval.tv_usec < sexp_context_timeval(ctx).tv_usec) - usecs += sexp_context_timeval(ctx).tv_usec - tval.tv_usec; - else if (usecs > 0) - usecs -= tval.tv_usec - sexp_context_timeval(ctx).tv_usec; - usleep(usecs); -} - static const sexp_uint_t sexp_log2_lookup[32] = { 0, 1, 28, 2, 29, 14, 24, 3, 30, 22, 20, 15, 25, 17, 4, 8, 31, 27, 13, 23, 21, 19, 16, 7, 26, 12, 18, 6, 11, 5, 10, 9 @@ -295,13 +299,73 @@ static sexp sexp_get_signal_handler (sexp ctx sexp_api_params(self, n), sexp sig return sexp_vector_ref(sexp_global(ctx, SEXP_G_SIGNAL_HANDLERS), signum); } +static sexp sexp_make_pollfds (sexp ctx) { + sexp res = sexp_alloc_tagged(ctx, sexp_sizeof_pollfds, sexp_pollfds_id); + sexp_pollfds_fds(res) = malloc(SEXP_INIT_POLLFDS_MAX_FDS * sizeof(struct pollfd)); + sexp_pollfds_num_fds(res) = 0; + sexp_pollfds_max_fds(res) = SEXP_INIT_POLLFDS_MAX_FDS; + return res; +} + +static sexp sexp_free_pollfds (sexp ctx sexp_api_params(self, n), sexp pollfds) { + if (sexp_pollfds_fds(pollfds)) { + free(sexp_pollfds_fds(pollfds)); + sexp_pollfds_fds(pollfds) = NULL; + sexp_pollfds_num_fds(pollfds) = 0; + sexp_pollfds_max_fds(pollfds) = 0; + } + return SEXP_VOID; +} + +/* return true if this fd was already being polled */ +static sexp sexp_insert_pollfd (sexp ctx, int fd, int events) { + int i; + struct pollfd *pfd; + sexp pollfds = sexp_global(ctx, SEXP_G_THREADS_POLL_FDS); + if (! (pollfds && sexp_pollfdsp(pollfds))) { + sexp_global(ctx, SEXP_G_THREADS_POLL_FDS) = pollfds = sexp_make_pollfds(ctx); + } + for (i=0; ifd = fd; + pfd->events = events; + return SEXP_FALSE; +} + +/* block the current thread on the specified port */ static sexp sexp_blocker (sexp ctx sexp_api_params(self, n), sexp port) { + int fd; + sexp_assert_type(ctx, sexp_portp, SEXP_IPORT, port); + /* register the fd */ + fd = sexp_port_fileno(port); + if (fd >= 0) + sexp_insert_pollfd(ctx, fd, sexp_iportp(port) ? POLLIN : POLLOUT); + /* pause the current thread */ + sexp_context_waitp(ctx) = 1; + sexp_context_event(ctx) = port; + sexp_insert_timed(ctx, ctx, SEXP_FALSE); return SEXP_VOID; } sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) { + int i, k; struct timeval tval; - sexp res, ls1, ls2, runner, paused, front; + struct pollfd *pfds; + useconds_t usecs = 0; + sexp res, ls1, ls2, runner, paused, front, pollfds; sexp_gc_var1(tmp); sexp_gc_preserve1(ctx, tmp); @@ -327,8 +391,42 @@ sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) { } /* check blocked fds */ - /* if () { */ - /* } */ + pollfds = sexp_global(ctx, SEXP_G_THREADS_POLL_FDS); + if (sexp_pollfdsp(pollfds) && sexp_pollfds_num_fds(pollfds) > 0) { + pfds = sexp_pollfds_fds(pollfds); + k = poll(sexp_pollfds_fds(pollfds), sexp_pollfds_num_fds(pollfds), 0); + unblock_io_threads: + for (i=sexp_pollfds_num_fds(pollfds)-1; i>=0 && k>0; --i) { + if (pfds[i].revents > 0) { /* free all threads blocked on this fd */ + k--; + pfds[i].events = 0; /* FIXME: delete from queue completely */ + for (ls1=SEXP_NULL, ls2=paused; sexp_pairp(ls2); ) { + /* FIXME distinguish input and output on the same fd */ + if (sexp_portp(sexp_context_event(sexp_car(ls2))) + && sexp_port_fileno(sexp_context_event(sexp_car(ls2))) == pfds[i].fd) { + sexp_context_waitp(sexp_car(ls2)) = 0; + sexp_context_timeoutp(sexp_car(ls2)) = 0; + if (ls1==SEXP_NULL) + sexp_global(ctx, SEXP_G_THREADS_PAUSED) = paused = sexp_cdr(ls2); + else + sexp_cdr(ls1) = sexp_cdr(ls2); + tmp = sexp_cdr(ls2); + sexp_cdr(ls2) = SEXP_NULL; + if (! sexp_pairp(sexp_global(ctx, SEXP_G_THREADS_BACK))) { + sexp_global(ctx, SEXP_G_THREADS_FRONT) = front = ls2; + } else { + sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_BACK)) = ls2; + } + sexp_global(ctx, SEXP_G_THREADS_BACK) = ls2; + ls2 = tmp; + } else { + ls1 = ls2; + ls2 = sexp_cdr(ls2); + } + } + } + } + } /* if we've terminated, check threads joining us */ if (sexp_context_refuel(ctx) <= 0) { @@ -414,9 +512,24 @@ sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) { if (sexp_not(sexp_memq(ctx, tmp, paused))) sexp_insert_timed(ctx, tmp, tmp); } - sexp_wait_on_single_thread(res); - sexp_context_timeoutp(res) = 1; - sexp_context_waitp(res) = 0; + usecs = 0; + gettimeofday(&tval, NULL); + if (tval.tv_sec < sexp_context_timeval(res).tv_sec) + usecs = (sexp_context_timeval(res).tv_sec - tval.tv_sec) * 1000000; + if (tval.tv_usec < sexp_context_timeval(res).tv_usec) + usecs += sexp_context_timeval(res).tv_usec - tval.tv_usec; + else if (usecs > 0) + usecs -= tval.tv_usec - sexp_context_timeval(res).tv_usec; + /* either wait on an fd, or just sleep */ + pollfds = sexp_global(res, SEXP_G_THREADS_POLL_FDS); + if (sexp_portp(sexp_context_event(res)) && sexp_pollfdsp(pollfds)) { + if ((k = poll(sexp_pollfds_fds(pollfds), sexp_pollfds_num_fds(pollfds), usecs/1000)) > 0) + goto unblock_io_threads; + } else { + usleep(usecs); + sexp_context_timeoutp(res) = 1; + sexp_context_waitp(res) = 0; + } } sexp_gc_release1(ctx); @@ -425,7 +538,25 @@ sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) { /**************************************************************************/ +int sexp_lookup_type (sexp ctx, sexp env, const char *name) { + sexp t = sexp_env_ref(env, sexp_intern(ctx, name, -1), SEXP_FALSE); + return (sexp_typep(t)) ? sexp_type_tag(t) : -1; +} + sexp sexp_init_library (sexp ctx sexp_api_params(self, n), sexp env) { + sexp t; + sexp_gc_var1(name); + sexp_gc_preserve1(ctx, name); + + sexp_mutex_id = sexp_lookup_type(ctx, env, "mutex"); + sexp_condvar_id = sexp_lookup_type(ctx, env, "condition-variable"); + name = sexp_c_string(ctx, "pollfds", -1); + t = sexp_register_type(ctx, name, SEXP_ZERO, SEXP_ZERO, SEXP_ZERO, SEXP_ZERO, + SEXP_ZERO, sexp_make_fixnum(sexp_sizeof_pollfds), + SEXP_ZERO, SEXP_ZERO, SEXP_ZERO, SEXP_ZERO, SEXP_ZERO, + SEXP_ZERO, SEXP_ZERO, (sexp_proc2)sexp_free_pollfds); + if (sexp_typep(t)) + sexp_pollfds_id = sexp_type_tag(t); sexp_define_type_predicate(ctx, env, "thread?", SEXP_CONTEXT); sexp_define_foreign(ctx, env, "thread-timeout?", 0, sexp_thread_timeoutp); @@ -454,6 +585,7 @@ sexp sexp_init_library (sexp ctx sexp_api_params(self, n), sexp env) { /* remember the env to lookup the runner later */ sexp_global(ctx, SEXP_G_THREADS_SIGNAL_RUNNER) = env; + sexp_gc_release1(ctx); return SEXP_VOID; } diff --git a/main.c b/main.c index d07a9767..cbdd2bff 100644 --- a/main.c +++ b/main.c @@ -12,6 +12,10 @@ #define sexp_version_string "chibi-scheme "sexp_version" \""sexp_release_name"\" " +#if SEXP_USE_GREEN_THREADS +#include +#endif + #ifdef PLAN9 #define exit_failure() exits("ERROR") #else @@ -27,7 +31,7 @@ static void repl (sexp ctx) { sexp_env_define(ctx, sexp_context_env(ctx), sexp_global(ctx, SEXP_G_INTERACTION_ENV_SYMBOL), env); sexp_context_tracep(ctx) = 1; - in = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_IN_SYMBOL), SEXP_FALSE); + in = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_IN_SYMBOL), SEXP_FALSE); out = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_OUT_SYMBOL), SEXP_FALSE); err = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_ERR_SYMBOL), SEXP_FALSE); sexp_port_sourcep(in) = 1; @@ -80,6 +84,19 @@ static sexp check_exception (sexp ctx, sexp res) { return res; } +static sexp sexp_load_standard_repl_env (sexp ctx, sexp env, sexp k) { + sexp p, res = sexp_load_standard_env(ctx, env, k); +#if SEXP_USE_GREEN_THREADS + p = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_IN_SYMBOL), SEXP_FALSE); + if (sexp_portp(p)) fcntl(sexp_port_fileno(p), F_SETFL, O_NONBLOCK); + p = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_OUT_SYMBOL), SEXP_FALSE); + if (sexp_portp(p)) fcntl(sexp_port_fileno(p), F_SETFL, O_NONBLOCK); + p = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_ERR_SYMBOL), SEXP_FALSE); + if (sexp_portp(p)) fcntl(sexp_port_fileno(p), F_SETFL, O_NONBLOCK); +#endif + return res; +} + #define init_context() if (! ctx) do { \ ctx = sexp_make_eval_context(NULL, NULL, NULL, heap_size); \ env = sexp_context_env(ctx); \ @@ -88,7 +105,7 @@ static sexp check_exception (sexp ctx, sexp res) { #define load_init() if (! init_loaded++) do { \ init_context(); \ - check_exception(ctx, sexp_load_standard_env(ctx, env, SEXP_FIVE)); \ + check_exception(ctx, sexp_load_standard_repl_env(ctx, env, SEXP_FIVE)); \ } while (0) void run_main (int argc, char **argv) {