diff --git a/lib/chibi/accept.c b/lib/chibi/accept.c index e9642153..02ddd45a 100644 --- a/lib/chibi/accept.c +++ b/lib/chibi/accept.c @@ -13,8 +13,8 @@ sexp sexp_accept (sexp ctx, sexp self, int sock, struct sockaddr* addr, socklen_ #if SEXP_USE_GREEN_THREADS if (res < 0 && errno == EWOULDBLOCK) { f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER); - if (sexp_opcodep(f)) { - ((sexp_proc2)sexp_opcode_func(f))(ctx, f, 1, sexp_make_fixnum(sock)); + if (sexp_applicablep(f)) { + sexp_apply2(ctx, f, sexp_make_fixnum(sock), SEXP_FALSE); return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR); } } @@ -26,17 +26,19 @@ sexp sexp_accept (sexp ctx, sexp self, int sock, struct sockaddr* addr, socklen_ /* likewise sendto and recvfrom should suspend the thread gracefully */ -sexp sexp_sendto (sexp ctx, sexp self, int sock, const void* buffer, size_t len, int flags, struct sockaddr* addr, socklen_t addr_len) { +#define sexp_zerop(x) ((x) == SEXP_ZERO || (sexp_flonump(x) && sexp_flonum_value(x) == 0.0)) + +sexp sexp_sendto (sexp ctx, sexp self, int sock, const void* buffer, size_t len, int flags, struct sockaddr* addr, socklen_t addr_len, sexp timeout) { #if SEXP_USE_GREEN_THREADS sexp f; #endif ssize_t res; res = sendto(sock, buffer, len, flags, addr, addr_len); #if SEXP_USE_GREEN_THREADS - if (res < 0 && errno == EWOULDBLOCK) { + if (res < 0 && errno == EWOULDBLOCK && !sexp_zerop(timeout)) { f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER); - if (sexp_opcodep(f)) { - ((sexp_proc2)sexp_opcode_func(f))(ctx, f, 1, sexp_make_fixnum(sock)); + if (sexp_applicablep(f)) { + sexp_apply2(ctx, f, sexp_make_fixnum(sock), timeout); return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR); } } @@ -44,17 +46,17 @@ sexp sexp_sendto (sexp ctx, sexp self, int sock, const void* buffer, size_t len, return sexp_make_fixnum(res); } -sexp sexp_recvfrom (sexp ctx, sexp self, int sock, void* buffer, size_t len, int flags, struct sockaddr* addr, socklen_t addr_len) { +sexp sexp_recvfrom (sexp ctx, sexp self, int sock, void* buffer, size_t len, int flags, struct sockaddr* addr, socklen_t addr_len, sexp timeout) { #if SEXP_USE_GREEN_THREADS sexp f; #endif ssize_t res; res = recvfrom(sock, buffer, len, flags, addr, &addr_len); #if SEXP_USE_GREEN_THREADS - if (res < 0 && errno == EWOULDBLOCK) { + if (res < 0 && errno == EWOULDBLOCK && !sexp_zerop(timeout)) { f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER); - if (sexp_opcodep(f)) { - ((sexp_proc2)sexp_opcode_func(f))(ctx, f, 1, sexp_make_fixnum(sock)); + if (sexp_applicablep(f)) { + sexp_apply2(ctx, f, sexp_make_fixnum(sock), timeout); return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR); } } diff --git a/lib/chibi/io/port.c b/lib/chibi/io/port.c index f6ab8640..5d53bab2 100644 --- a/lib/chibi/io/port.c +++ b/lib/chibi/io/port.c @@ -358,7 +358,7 @@ sexp sexp_write_u8 (sexp ctx, sexp self, sexp u8, sexp out) { #if SEXP_USE_GREEN_THREADS if (errno == EAGAIN) { if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) - sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), out); + sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), out, SEXP_FALSE); return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR); } #endif @@ -381,7 +381,7 @@ sexp sexp_read_u8 (sexp ctx, sexp self, sexp in) { if (sexp_port_stream(in)) clearerr(sexp_port_stream(in)); if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) - sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), in); + sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), in, SEXP_FALSE); return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR); } #endif diff --git a/lib/chibi/net.scm b/lib/chibi/net.scm index e3ef3436..7f1d3947 100644 --- a/lib/chibi/net.scm +++ b/lib/chibi/net.scm @@ -113,11 +113,17 @@ ;;> have had a default address specified with \scheme{connect}. (define (send socket bv . o) + (apply send/non-blocking socket bv #f o)) + +;;> Equivalent to \scheme{send} but gives up and returns false if the +;;> packet can't be sent within \var{timeout} seconds. + +(define (send/non-blocking socket bv timeout . o) (let* ((flags (if (pair? o) (car o) 0)) (addrinfo (and (pair? o) (pair? (cdr o)) (cadr o))) (sockaddr (and addrinfo (address-info-address addrinfo))) (sockaddr-len (if addrinfo (address-info-address-length addrinfo) 0))) - (%send socket bv flags sockaddr sockaddr-len))) + (%send socket bv flags sockaddr sockaddr-len timeout))) ;;> Recieves data from \var{socket} to fill the bytevector \var{bv} by ;;> calling recvfrom. Returns the number of bytes read, or a negative @@ -126,11 +132,17 @@ ;;> \scheme{connect}. (define (receive! socket bv . o) + (apply receive!/non-blocking socket bv #f o)) + +;;> Equivalent to \scheme{receive!} but gives up and returns false if +;;> no packets are received within \var{timeout} seconds. + +(define (receive!/non-blocking socket bv timeout . o) (let* ((flags (if (pair? o) (car o) 0)) (addrinfo (and (pair? o) (pair? (cdr o)) (cadr o))) (sockaddr (and addrinfo (address-info-address addrinfo))) (sockaddr-len (if addrinfo (address-info-address-length addrinfo) 0))) - (%receive! socket bv flags sockaddr sockaddr-len))) + (%receive! socket bv flags sockaddr sockaddr-len timeout))) ;;> Shortcut for \scheme{receive}, returning a newly created ;;> bytevector of length \var{n} on success and \scheme{#f} on @@ -141,3 +153,12 @@ (m (apply receive! socket bv o))) (and (>= m 0) (subbytes bv 0 m)))) + +;;> Equivalent to \scheme{receive} but gives up and returns false if +;;> no packets are received within \var{timeout} seconds. + +(define (receive/non-blocking socket n timeout . o) + (let* ((bv (make-bytevector n)) + (m (apply receive!/non-blocking socket bv timeout o))) + (and (>= m 0) + (subbytes bv 0 m)))) diff --git a/lib/chibi/net.sld b/lib/chibi/net.sld index 1522a5b2..09542fe7 100644 --- a/lib/chibi/net.sld +++ b/lib/chibi/net.sld @@ -5,6 +5,7 @@ sockaddr-name sockaddr-port make-sockaddr with-net-io open-net-io make-listener-socket send receive! receive + send/non-blocking receive!/non-blocking receive/non-blocking address-info-family address-info-socket-type address-info-protocol address-info-address address-info-address-length address-info-next address-family/unix address-family/inet address-family/inet6 diff --git a/lib/chibi/net.stub b/lib/chibi/net.stub index e319ba16..43578758 100644 --- a/lib/chibi/net.stub +++ b/lib/chibi/net.stub @@ -55,12 +55,12 @@ (define-c sexp (%send "sexp_sendto") ((value ctx sexp) (value self sexp) fileno bytevector (value (bytevector-length arg3) size_t) int - (maybe-null sockaddr) socklen_t)) + (maybe-null sockaddr) socklen_t sexp)) (define-c sexp (%receive! "sexp_recvfrom") ((value ctx sexp) (value self sexp) fileno bytevector (value (bytevector-length arg3) size_t) int - (maybe-null sockaddr) socklen_t)) + (maybe-null sockaddr) socklen_t sexp)) ;;> Returns a list of 2 new sockets, the input and output end of a new ;;> pipe, respectively. diff --git a/lib/srfi/18/threads.c b/lib/srfi/18/threads.c index e5e11503..b5f5beb8 100644 --- a/lib/srfi/18/threads.c +++ b/lib/srfi/18/threads.c @@ -390,7 +390,7 @@ static sexp sexp_insert_pollfd (sexp ctx, int fd, int events) { } /* block the current thread on the specified port */ -static sexp sexp_blocker (sexp ctx, sexp self, sexp_sint_t n, sexp portorfd) { +static sexp sexp_blocker (sexp ctx, sexp self, sexp_sint_t n, sexp portorfd, sexp timeout) { int fd; /* register the fd */ if (sexp_portp(portorfd)) @@ -406,7 +406,7 @@ static sexp sexp_blocker (sexp ctx, sexp self, sexp_sint_t n, sexp portorfd) { /* pause the current thread */ sexp_context_waitp(ctx) = 1; sexp_context_event(ctx) = portorfd; - sexp_insert_timed(ctx, ctx, SEXP_FALSE); + sexp_insert_timed(ctx, ctx, timeout); return SEXP_VOID; } @@ -676,7 +676,7 @@ sexp sexp_init_library (sexp ctx, sexp self, sexp_sint_t n, sexp env, const char sexp_global(ctx, SEXP_G_THREADS_SCHEDULER) = sexp_make_foreign(ctx, "scheduler", 1, 0, (sexp_proc1)sexp_scheduler, SEXP_FALSE); sexp_global(ctx, SEXP_G_THREADS_BLOCKER) - = sexp_make_foreign(ctx, "blocker", 1, 0, (sexp_proc1)sexp_blocker, SEXP_FALSE); + = sexp_make_foreign(ctx, "blocker", 2, 0, (sexp_proc1)sexp_blocker, SEXP_FALSE); /* remember the env to lookup the runner later */ sexp_global(ctx, SEXP_G_THREADS_SIGNAL_RUNNER) = env; diff --git a/sexp.c b/sexp.c index c850611d..6f2bb6b9 100644 --- a/sexp.c +++ b/sexp.c @@ -1840,8 +1840,8 @@ int sexp_maybe_block_port (sexp ctx, sexp in, int forcep) { if (sexp_port_stream(in)) clearerr(sexp_port_stream(in)); f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER); - if (sexp_opcodep(f)) { - ((sexp_proc2)sexp_opcode_func(f))(ctx, f, 1, in); + if (sexp_applicablep(f)) { + sexp_apply2(ctx, f, in, SEXP_FALSE); return 1; } } diff --git a/vm.c b/vm.c index 963b73fc..48e2172a 100644 --- a/vm.c +++ b/vm.c @@ -1906,7 +1906,7 @@ sexp sexp_apply (sexp ctx, sexp proc, sexp args) { && (errno == EAGAIN)) { if (sexp_port_stream(_ARG2)) clearerr(sexp_port_stream(_ARG2)); if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) - sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG2); + sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG2, SEXP_FALSE); else /* no scheduler but output full, wait 5ms */ usleep(5*1000); fuel = 0; @@ -1959,7 +1959,7 @@ sexp sexp_apply (sexp ctx, sexp proc, sexp args) { /* yield if threads are enabled (otherwise busy loop) */ /* TODO: the wait seems necessary on OS X to stop a print loop to ptys */ if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) - sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG3); + sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG3, SEXP_FALSE); else /* no scheduler but output full, wait 5ms */ usleep(5*1000); fuel = 0; @@ -1991,7 +1991,7 @@ sexp sexp_apply (sexp ctx, sexp proc, sexp args) { if (sexp_port_stream(_ARG1)) clearerr(sexp_port_stream(_ARG1)); /* TODO: block and unblock */ if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) - sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG1); + sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG1, SEXP_FALSE); fuel = 0; ip--; /* try again */ } else @@ -2017,7 +2017,7 @@ sexp sexp_apply (sexp ctx, sexp proc, sexp args) { && (errno == EAGAIN)) { if (sexp_port_stream(_ARG1)) clearerr(sexp_port_stream(_ARG1)); if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) - sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG1); + sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG1, SEXP_FALSE); fuel = 0; ip--; /* try again */ } else @@ -2120,4 +2120,17 @@ sexp sexp_apply1 (sexp ctx, sexp f, sexp x) { return res; } +sexp sexp_apply2 (sexp ctx, sexp f, sexp x, sexp y) { + sexp res; + sexp_gc_var1(args); + if (sexp_opcodep(f) && sexp_opcode_func(f)) { + res = ((sexp_proc3)sexp_opcode_func(f))(ctx, f, 2, x, y); + } else { + sexp_gc_preserve1(ctx, args); + res = sexp_apply(ctx, f, args=sexp_list2(ctx, x, y)); + sexp_gc_release1(ctx); + } + return res; +} + #endif