initial support for non-blocking i/o

This commit is contained in:
Alex Shinn 2010-09-08 13:54:52 +00:00
parent e474561f70
commit e4a7224a2d
10 changed files with 278 additions and 39 deletions

View file

@ -844,9 +844,9 @@ SEXP_API struct sexp_struct *sexp_type_specs;
#define sexp_type_name(x) ((x)->value.type.name)
#define sexp_type_finalize(x) ((x)->value.type.finalize)
#define sexp_bignum_sign(x) ((x)->value.bignum.sign)
#define sexp_bignum_length(x) ((x)->value.bignum.length)
#define sexp_bignum_data(x) ((x)->value.bignum.data)
#define sexp_bignum_sign(x) ((x)->value.bignum.sign)
#define sexp_bignum_length(x) ((x)->value.bignum.length)
#define sexp_bignum_data(x) ((x)->value.bignum.data)
/****************************** arithmetic ****************************/
@ -904,6 +904,7 @@ enum sexp_context_globals {
SEXP_G_THREADS_SIGNALS,
SEXP_G_THREADS_SIGNAL_RUNNER,
SEXP_G_THREADS_POLL_FDS,
SEXP_G_THREADS_FD_THREADS,
SEXP_G_THREADS_BLOCKER,
#endif
SEXP_G_NUM_GLOBALS
@ -961,8 +962,9 @@ SEXP_API sexp sexp_buffered_flush (sexp ctx, sexp p);
#endif
#define sexp_newline(ctx, p) sexp_write_char(ctx, '\n', (p))
#define sexp_at_eofp(p) (feof(sexp_port_stream(p)))
#define sexp_newline(ctx, p) sexp_write_char((ctx), '\n', (p))
#define sexp_at_eofp(p) (feof(sexp_port_stream(p)))
#define sexp_port_fileno(p) (fileno(sexp_port_stream(p)))
SEXP_API sexp sexp_make_context(sexp ctx, size_t size);
SEXP_API sexp sexp_alloc_tagged(sexp ctx, size_t size, sexp_uint_t tag);

View file

@ -1,6 +1,7 @@
(define-module (chibi net)
(export sockaddr? address-info? get-address-info socket connect
(export sockaddr? address-info? get-address-info
socket connect bind accept listen
with-net-io open-net-io
address-info-family address-info-socket-type address-info-protocol
address-info-address address-info-address-length address-info-next)

View file

@ -15,13 +15,18 @@
(address-info-protocol addr))))
(if (negative? sock)
(lp (address-info-next addr))
(if (negative?
(connect sock
(address-info-address addr)
(address-info-address-length addr)))
(lp (address-info-next addr))
(list (open-input-file-descriptor sock)
(open-output-file-descriptor sock))))))))
(cond
((negative?
(connect sock
(address-info-address addr)
(address-info-address-length addr)))
(lp (address-info-next addr)))
(else
(cond-expand
(threads (set-file-descriptor-flags! sock open/non-block))
(else #f))
(list (open-input-file-descriptor sock)
(open-output-file-descriptor sock)))))))))
(define (with-net-io host service proc)
(let ((io (open-net-io host service)))

View file

@ -23,3 +23,4 @@
(define-c int listen (int int))
(define-c int socket (int int int))
(define-c int connect (int sockaddr int))
(define-c int accept (int sockaddr int))

View file

@ -1,6 +1,7 @@
(define-module (chibi process)
(export exit sleep alarm fork kill execute waitpid
process-command-line process-running?
set-signal-action! make-signal-set signal-set-contains?
signal-set-fill! signal-set-add! signal-set-delete!
current-signal-mask
@ -14,5 +15,34 @@
signal/tty-output)
(import-immutable (scheme))
(cond-expand (threads (import (srfi 18))) (else #f))
(include-shared "process"))
(include-shared "process")
(cond-expand
(unix
(body
(define (process-command-line pid)
(call-with-current-continuation
(lambda (return)
(with-exception-handler
(lambda (exn) (return #f))
(lambda ()
(let ((file (string-append "/proc/" (number->string pid) "/cmdline")))
(call-with-input-file file
(lambda (in)
(let lp ((arg '()) (res '()))
(let ((ch (read-char in)))
(if (or (eof-object? ch) (eqv? (char->integer ch) 0))
(let ((res (cons (list->string (reverse arg)) res))
(ch2 (peek-char in)))
(if (or (eof-object? ch2) (eqv? (char->integer ch2) 0))
(reverse res)
(lp '() res)))
(lp (cons ch arg) res))))))))))))))
(else #f))
(body
(define (process-running? pid . o)
(let ((cmdline (process-command-line pid)))
(and (pair? cmdline)
(or (null? o)
(not (car o))
(equal? (car o) (car cmdline))))))))

View file

@ -70,4 +70,9 @@
(define-c void exit (int))
(define-c int (execute execvp) (string (array string)))
(cond-expand
(unix)
(else
(define-c sexp (process-command-line sexp_pid_cmdline) ((value ctx sexp) int))))
(c-init "sexp_init_signals(ctx, env);")

View file

@ -62,6 +62,51 @@ static sexp sexp_set_signal_action (sexp ctx, sexp self, sexp signum, sexp newac
return oldaction;
}
#if SEXP_BSD
#include <sys/sysctl.h>
#include <sys/proc.h>
static sexp sexp_pid_cmdline (sexp ctx, int pid) {
unsigned long reslen = sizeof(struct kinfo_proc);
struct kinfo_proc res;
int name[4] = {CTL_KERN, KERN_PROC, KERN_PROC_PID, pid};
if (sysctl(name, 4, &res, &reslen, NULL, 0) >= 0) {
return sexp_c_string(ctx, res.kp_proc.p_comm, -1);
} else {
return SEXP_FALSE;
}
}
#else
/* #include <sys/syscall.h> */
/* #include <linux/sysctl.h> */
/* #define CMDLINE_LENGTH 512 */
/* static sexp sexp_pid_cmdline (sexp ctx, int pid) { */
/* struct __sysctl_args args; */
/* char cmdline[CMDLINE_LENGTH]; */
/* size_t cmdline_length; */
/* int name[] = { CTL_KERN, KERN_OSTYPE }; */
/* memset(&args, 0, sizeof(struct __sysctl_args)); */
/* args.name = name; */
/* args.nlen = sizeof(name)/sizeof(name[0]); */
/* args.oldval = cmdline; */
/* args.oldlenp = &cmdline_length; */
/* cmdline_length = sizeof(cmdline); */
/* if (syscall(SYS__sysctl, &args) == -1) { */
/* return SEXP_FALSE; */
/* } else { */
/* return sexp_c_string(ctx, cmdline, -1); */
/* } */
/* } */
#endif
static void sexp_init_signals (sexp ctx, sexp env) {
call_sigaction.sa_sigaction = sexp_call_sigaction;
#if SEXP_USE_GREEN_THREADS

View file

@ -19,6 +19,7 @@
(srfi 9)
(chibi ast)
(chibi time))
(include "18/types.scm")
(include-shared "18/threads")
(include "18/types.scm" "18/interface.scm"))
(include "18/interface.scm"))

View file

@ -6,20 +6,37 @@
#include <time.h>
#include <sys/time.h>
#include <unistd.h>
#include <poll.h>
#define sexp_mutexp(x) (sexp_check_tag(x, sexp_mutex_id))
#define sexp_mutex_name(x) sexp_slot_ref(x, 0)
#define sexp_mutex_specific(x) sexp_slot_ref(x, 1)
#define sexp_mutex_thread(x) sexp_slot_ref(x, 2)
#define sexp_mutex_lockp(x) sexp_slot_ref(x, 3)
#define sexp_mutex_lockp(x) sexp_slot_ref(x, 3)
#define sexp_condvarp(x) (sexp_check_tag(x, sexp_condvar_id))
#define sexp_condvar_name(x) sexp_slot_ref(x, 0)
#define sexp_condvar_specific(x) sexp_slot_ref(x, 1)
#define sexp_condvar_threads(x) sexp_slot_ref(x, 2)
struct sexp_pollfds_t {
struct pollfd *fds;
nfds_t nfds, mfds;
};
#define SEXP_INIT_POLLFDS_MAX_FDS 16
#define sexp_pollfdsp(x) (sexp_check_tag(x, sexp_pollfds_id))
#define sexp_pollfds_fds(x) (((struct sexp_pollfds_t*)(&(x)->value))->fds)
#define sexp_pollfds_num_fds(x) (((struct sexp_pollfds_t*)(&(x)->value))->nfds)
#define sexp_pollfds_max_fds(x) (((struct sexp_pollfds_t*)(&(x)->value))->mfds)
#define sexp_sizeof_pollfds (sexp_sizeof_header + sizeof(struct sexp_pollfds_t))
#define timeval_le(a, b) (((a).tv_sec < (b).tv_sec) || (((a).tv_sec == (b).tv_sec) && ((a).tv_usec < (b).tv_usec)))
#define sexp_context_before(c, t) (((sexp_context_timeval(c).tv_sec != 0) || (sexp_context_timeval(c).tv_usec != 0)) && timeval_le(sexp_context_timeval(c), t))
/* static int mutex_id, condvar_id; */
static int sexp_mutex_id, sexp_condvar_id, sexp_pollfds_id;
/**************************** threads *************************************/
@ -165,7 +182,7 @@ sexp sexp_thread_sleep (sexp ctx sexp_api_params(self, n), sexp timeout) {
/**************************** mutexes *************************************/
sexp sexp_mutex_state (sexp ctx sexp_api_params(self, n), sexp mutex) {
/* sexp_assert_type(ctx, sexp_mutexp, mutex_id, timeout); */
sexp_assert_type(ctx, sexp_mutexp, sexp_mutex_id, mutex);
if (sexp_truep(sexp_mutex_lockp(mutex))) {
if (sexp_contextp(sexp_mutex_thread(mutex)))
return sexp_mutex_thread(mutex);
@ -254,19 +271,6 @@ sexp sexp_condition_variable_broadcast (sexp ctx sexp_api_params(self, n), sexp
/**************************** the scheduler *******************************/
void sexp_wait_on_single_thread (sexp ctx) {
struct timeval tval;
useconds_t usecs = 0;
gettimeofday(&tval, NULL);
if (tval.tv_sec < sexp_context_timeval(ctx).tv_sec)
usecs = (sexp_context_timeval(ctx).tv_sec - tval.tv_sec) * 1000000;
if (tval.tv_usec < sexp_context_timeval(ctx).tv_usec)
usecs += sexp_context_timeval(ctx).tv_usec - tval.tv_usec;
else if (usecs > 0)
usecs -= tval.tv_usec - sexp_context_timeval(ctx).tv_usec;
usleep(usecs);
}
static const sexp_uint_t sexp_log2_lookup[32] = {
0, 1, 28, 2, 29, 14, 24, 3, 30, 22, 20, 15, 25, 17, 4, 8,
31, 27, 13, 23, 21, 19, 16, 7, 26, 12, 18, 6, 11, 5, 10, 9
@ -295,13 +299,73 @@ static sexp sexp_get_signal_handler (sexp ctx sexp_api_params(self, n), sexp sig
return sexp_vector_ref(sexp_global(ctx, SEXP_G_SIGNAL_HANDLERS), signum);
}
static sexp sexp_make_pollfds (sexp ctx) {
sexp res = sexp_alloc_tagged(ctx, sexp_sizeof_pollfds, sexp_pollfds_id);
sexp_pollfds_fds(res) = malloc(SEXP_INIT_POLLFDS_MAX_FDS * sizeof(struct pollfd));
sexp_pollfds_num_fds(res) = 0;
sexp_pollfds_max_fds(res) = SEXP_INIT_POLLFDS_MAX_FDS;
return res;
}
static sexp sexp_free_pollfds (sexp ctx sexp_api_params(self, n), sexp pollfds) {
if (sexp_pollfds_fds(pollfds)) {
free(sexp_pollfds_fds(pollfds));
sexp_pollfds_fds(pollfds) = NULL;
sexp_pollfds_num_fds(pollfds) = 0;
sexp_pollfds_max_fds(pollfds) = 0;
}
return SEXP_VOID;
}
/* return true if this fd was already being polled */
static sexp sexp_insert_pollfd (sexp ctx, int fd, int events) {
int i;
struct pollfd *pfd;
sexp pollfds = sexp_global(ctx, SEXP_G_THREADS_POLL_FDS);
if (! (pollfds && sexp_pollfdsp(pollfds))) {
sexp_global(ctx, SEXP_G_THREADS_POLL_FDS) = pollfds = sexp_make_pollfds(ctx);
}
for (i=0; i<sexp_pollfds_num_fds(pollfds); ++i) {
if (sexp_pollfds_fds(pollfds)[i].fd == fd) {
sexp_pollfds_fds(pollfds)[i].events |= events;
return SEXP_TRUE;
}
}
if (sexp_pollfds_num_fds(pollfds) == sexp_pollfds_max_fds(pollfds)) {
sexp_pollfds_max_fds(pollfds) = i*2;
pfd = sexp_pollfds_fds(pollfds);
sexp_pollfds_fds(pollfds) = malloc(i*2*sizeof(struct pollfd));
if (sexp_pollfds_fds(pollfds))
memcpy(sexp_pollfds_fds(pollfds), pfd, i*2*sizeof(struct pollfd));
free(pfd);
}
pfd = &(sexp_pollfds_fds(pollfds)[sexp_pollfds_num_fds(pollfds)++]);
pfd->fd = fd;
pfd->events = events;
return SEXP_FALSE;
}
/* block the current thread on the specified port */
static sexp sexp_blocker (sexp ctx sexp_api_params(self, n), sexp port) {
int fd;
sexp_assert_type(ctx, sexp_portp, SEXP_IPORT, port);
/* register the fd */
fd = sexp_port_fileno(port);
if (fd >= 0)
sexp_insert_pollfd(ctx, fd, sexp_iportp(port) ? POLLIN : POLLOUT);
/* pause the current thread */
sexp_context_waitp(ctx) = 1;
sexp_context_event(ctx) = port;
sexp_insert_timed(ctx, ctx, SEXP_FALSE);
return SEXP_VOID;
}
sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) {
int i, k;
struct timeval tval;
sexp res, ls1, ls2, runner, paused, front;
struct pollfd *pfds;
useconds_t usecs = 0;
sexp res, ls1, ls2, runner, paused, front, pollfds;
sexp_gc_var1(tmp);
sexp_gc_preserve1(ctx, tmp);
@ -327,8 +391,42 @@ sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) {
}
/* check blocked fds */
/* if () { */
/* } */
pollfds = sexp_global(ctx, SEXP_G_THREADS_POLL_FDS);
if (sexp_pollfdsp(pollfds) && sexp_pollfds_num_fds(pollfds) > 0) {
pfds = sexp_pollfds_fds(pollfds);
k = poll(sexp_pollfds_fds(pollfds), sexp_pollfds_num_fds(pollfds), 0);
unblock_io_threads:
for (i=sexp_pollfds_num_fds(pollfds)-1; i>=0 && k>0; --i) {
if (pfds[i].revents > 0) { /* free all threads blocked on this fd */
k--;
pfds[i].events = 0; /* FIXME: delete from queue completely */
for (ls1=SEXP_NULL, ls2=paused; sexp_pairp(ls2); ) {
/* FIXME distinguish input and output on the same fd */
if (sexp_portp(sexp_context_event(sexp_car(ls2)))
&& sexp_port_fileno(sexp_context_event(sexp_car(ls2))) == pfds[i].fd) {
sexp_context_waitp(sexp_car(ls2)) = 0;
sexp_context_timeoutp(sexp_car(ls2)) = 0;
if (ls1==SEXP_NULL)
sexp_global(ctx, SEXP_G_THREADS_PAUSED) = paused = sexp_cdr(ls2);
else
sexp_cdr(ls1) = sexp_cdr(ls2);
tmp = sexp_cdr(ls2);
sexp_cdr(ls2) = SEXP_NULL;
if (! sexp_pairp(sexp_global(ctx, SEXP_G_THREADS_BACK))) {
sexp_global(ctx, SEXP_G_THREADS_FRONT) = front = ls2;
} else {
sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_BACK)) = ls2;
}
sexp_global(ctx, SEXP_G_THREADS_BACK) = ls2;
ls2 = tmp;
} else {
ls1 = ls2;
ls2 = sexp_cdr(ls2);
}
}
}
}
}
/* if we've terminated, check threads joining us */
if (sexp_context_refuel(ctx) <= 0) {
@ -414,9 +512,24 @@ sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) {
if (sexp_not(sexp_memq(ctx, tmp, paused)))
sexp_insert_timed(ctx, tmp, tmp);
}
sexp_wait_on_single_thread(res);
sexp_context_timeoutp(res) = 1;
sexp_context_waitp(res) = 0;
usecs = 0;
gettimeofday(&tval, NULL);
if (tval.tv_sec < sexp_context_timeval(res).tv_sec)
usecs = (sexp_context_timeval(res).tv_sec - tval.tv_sec) * 1000000;
if (tval.tv_usec < sexp_context_timeval(res).tv_usec)
usecs += sexp_context_timeval(res).tv_usec - tval.tv_usec;
else if (usecs > 0)
usecs -= tval.tv_usec - sexp_context_timeval(res).tv_usec;
/* either wait on an fd, or just sleep */
pollfds = sexp_global(res, SEXP_G_THREADS_POLL_FDS);
if (sexp_portp(sexp_context_event(res)) && sexp_pollfdsp(pollfds)) {
if ((k = poll(sexp_pollfds_fds(pollfds), sexp_pollfds_num_fds(pollfds), usecs/1000)) > 0)
goto unblock_io_threads;
} else {
usleep(usecs);
sexp_context_timeoutp(res) = 1;
sexp_context_waitp(res) = 0;
}
}
sexp_gc_release1(ctx);
@ -425,7 +538,25 @@ sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) {
/**************************************************************************/
int sexp_lookup_type (sexp ctx, sexp env, const char *name) {
sexp t = sexp_env_ref(env, sexp_intern(ctx, name, -1), SEXP_FALSE);
return (sexp_typep(t)) ? sexp_type_tag(t) : -1;
}
sexp sexp_init_library (sexp ctx sexp_api_params(self, n), sexp env) {
sexp t;
sexp_gc_var1(name);
sexp_gc_preserve1(ctx, name);
sexp_mutex_id = sexp_lookup_type(ctx, env, "mutex");
sexp_condvar_id = sexp_lookup_type(ctx, env, "condition-variable");
name = sexp_c_string(ctx, "pollfds", -1);
t = sexp_register_type(ctx, name, SEXP_ZERO, SEXP_ZERO, SEXP_ZERO, SEXP_ZERO,
SEXP_ZERO, sexp_make_fixnum(sexp_sizeof_pollfds),
SEXP_ZERO, SEXP_ZERO, SEXP_ZERO, SEXP_ZERO, SEXP_ZERO,
SEXP_ZERO, SEXP_ZERO, (sexp_proc2)sexp_free_pollfds);
if (sexp_typep(t))
sexp_pollfds_id = sexp_type_tag(t);
sexp_define_type_predicate(ctx, env, "thread?", SEXP_CONTEXT);
sexp_define_foreign(ctx, env, "thread-timeout?", 0, sexp_thread_timeoutp);
@ -454,6 +585,7 @@ sexp sexp_init_library (sexp ctx sexp_api_params(self, n), sexp env) {
/* remember the env to lookup the runner later */
sexp_global(ctx, SEXP_G_THREADS_SIGNAL_RUNNER) = env;
sexp_gc_release1(ctx);
return SEXP_VOID;
}

21
main.c
View file

@ -12,6 +12,10 @@
#define sexp_version_string "chibi-scheme "sexp_version" \""sexp_release_name"\" "
#if SEXP_USE_GREEN_THREADS
#include <fcntl.h>
#endif
#ifdef PLAN9
#define exit_failure() exits("ERROR")
#else
@ -27,7 +31,7 @@ static void repl (sexp ctx) {
sexp_env_define(ctx, sexp_context_env(ctx),
sexp_global(ctx, SEXP_G_INTERACTION_ENV_SYMBOL), env);
sexp_context_tracep(ctx) = 1;
in = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_IN_SYMBOL), SEXP_FALSE);
in = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_IN_SYMBOL), SEXP_FALSE);
out = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_OUT_SYMBOL), SEXP_FALSE);
err = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_ERR_SYMBOL), SEXP_FALSE);
sexp_port_sourcep(in) = 1;
@ -80,6 +84,19 @@ static sexp check_exception (sexp ctx, sexp res) {
return res;
}
static sexp sexp_load_standard_repl_env (sexp ctx, sexp env, sexp k) {
sexp p, res = sexp_load_standard_env(ctx, env, k);
#if SEXP_USE_GREEN_THREADS
p = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_IN_SYMBOL), SEXP_FALSE);
if (sexp_portp(p)) fcntl(sexp_port_fileno(p), F_SETFL, O_NONBLOCK);
p = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_OUT_SYMBOL), SEXP_FALSE);
if (sexp_portp(p)) fcntl(sexp_port_fileno(p), F_SETFL, O_NONBLOCK);
p = sexp_env_ref(env, sexp_global(ctx, SEXP_G_CUR_ERR_SYMBOL), SEXP_FALSE);
if (sexp_portp(p)) fcntl(sexp_port_fileno(p), F_SETFL, O_NONBLOCK);
#endif
return res;
}
#define init_context() if (! ctx) do { \
ctx = sexp_make_eval_context(NULL, NULL, NULL, heap_size); \
env = sexp_context_env(ctx); \
@ -88,7 +105,7 @@ static sexp check_exception (sexp ctx, sexp res) {
#define load_init() if (! init_loaded++) do { \
init_context(); \
check_exception(ctx, sexp_load_standard_env(ctx, env, SEXP_FIVE)); \
check_exception(ctx, sexp_load_standard_repl_env(ctx, env, SEXP_FIVE)); \
} while (0)
void run_main (int argc, char **argv) {