Adding non-blocking versions of send and receive.

This commit is contained in:
Alex Shinn 2014-09-27 20:53:33 +09:00
parent 7a2f0e553f
commit fbacd0ff6d
8 changed files with 62 additions and 25 deletions

View file

@ -13,8 +13,8 @@ sexp sexp_accept (sexp ctx, sexp self, int sock, struct sockaddr* addr, socklen_
#if SEXP_USE_GREEN_THREADS #if SEXP_USE_GREEN_THREADS
if (res < 0 && errno == EWOULDBLOCK) { if (res < 0 && errno == EWOULDBLOCK) {
f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER); f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER);
if (sexp_opcodep(f)) { if (sexp_applicablep(f)) {
((sexp_proc2)sexp_opcode_func(f))(ctx, f, 1, sexp_make_fixnum(sock)); sexp_apply2(ctx, f, sexp_make_fixnum(sock), SEXP_FALSE);
return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR); return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR);
} }
} }
@ -26,17 +26,19 @@ sexp sexp_accept (sexp ctx, sexp self, int sock, struct sockaddr* addr, socklen_
/* likewise sendto and recvfrom should suspend the thread gracefully */ /* likewise sendto and recvfrom should suspend the thread gracefully */
sexp sexp_sendto (sexp ctx, sexp self, int sock, const void* buffer, size_t len, int flags, struct sockaddr* addr, socklen_t addr_len) { #define sexp_zerop(x) ((x) == SEXP_ZERO || (sexp_flonump(x) && sexp_flonum_value(x) == 0.0))
sexp sexp_sendto (sexp ctx, sexp self, int sock, const void* buffer, size_t len, int flags, struct sockaddr* addr, socklen_t addr_len, sexp timeout) {
#if SEXP_USE_GREEN_THREADS #if SEXP_USE_GREEN_THREADS
sexp f; sexp f;
#endif #endif
ssize_t res; ssize_t res;
res = sendto(sock, buffer, len, flags, addr, addr_len); res = sendto(sock, buffer, len, flags, addr, addr_len);
#if SEXP_USE_GREEN_THREADS #if SEXP_USE_GREEN_THREADS
if (res < 0 && errno == EWOULDBLOCK) { if (res < 0 && errno == EWOULDBLOCK && !sexp_zerop(timeout)) {
f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER); f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER);
if (sexp_opcodep(f)) { if (sexp_applicablep(f)) {
((sexp_proc2)sexp_opcode_func(f))(ctx, f, 1, sexp_make_fixnum(sock)); sexp_apply2(ctx, f, sexp_make_fixnum(sock), timeout);
return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR); return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR);
} }
} }
@ -44,17 +46,17 @@ sexp sexp_sendto (sexp ctx, sexp self, int sock, const void* buffer, size_t len,
return sexp_make_fixnum(res); return sexp_make_fixnum(res);
} }
sexp sexp_recvfrom (sexp ctx, sexp self, int sock, void* buffer, size_t len, int flags, struct sockaddr* addr, socklen_t addr_len) { sexp sexp_recvfrom (sexp ctx, sexp self, int sock, void* buffer, size_t len, int flags, struct sockaddr* addr, socklen_t addr_len, sexp timeout) {
#if SEXP_USE_GREEN_THREADS #if SEXP_USE_GREEN_THREADS
sexp f; sexp f;
#endif #endif
ssize_t res; ssize_t res;
res = recvfrom(sock, buffer, len, flags, addr, &addr_len); res = recvfrom(sock, buffer, len, flags, addr, &addr_len);
#if SEXP_USE_GREEN_THREADS #if SEXP_USE_GREEN_THREADS
if (res < 0 && errno == EWOULDBLOCK) { if (res < 0 && errno == EWOULDBLOCK && !sexp_zerop(timeout)) {
f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER); f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER);
if (sexp_opcodep(f)) { if (sexp_applicablep(f)) {
((sexp_proc2)sexp_opcode_func(f))(ctx, f, 1, sexp_make_fixnum(sock)); sexp_apply2(ctx, f, sexp_make_fixnum(sock), timeout);
return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR); return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR);
} }
} }

View file

@ -358,7 +358,7 @@ sexp sexp_write_u8 (sexp ctx, sexp self, sexp u8, sexp out) {
#if SEXP_USE_GREEN_THREADS #if SEXP_USE_GREEN_THREADS
if (errno == EAGAIN) { if (errno == EAGAIN) {
if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER)))
sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), out); sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), out, SEXP_FALSE);
return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR); return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR);
} }
#endif #endif
@ -381,7 +381,7 @@ sexp sexp_read_u8 (sexp ctx, sexp self, sexp in) {
if (sexp_port_stream(in)) if (sexp_port_stream(in))
clearerr(sexp_port_stream(in)); clearerr(sexp_port_stream(in));
if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER)))
sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), in); sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), in, SEXP_FALSE);
return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR); return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR);
} }
#endif #endif

View file

@ -113,11 +113,17 @@
;;> have had a default address specified with \scheme{connect}. ;;> have had a default address specified with \scheme{connect}.
(define (send socket bv . o) (define (send socket bv . o)
(apply send/non-blocking socket bv #f o))
;;> Equivalent to \scheme{send} but gives up and returns false if the
;;> packet can't be sent within \var{timeout} seconds.
(define (send/non-blocking socket bv timeout . o)
(let* ((flags (if (pair? o) (car o) 0)) (let* ((flags (if (pair? o) (car o) 0))
(addrinfo (and (pair? o) (pair? (cdr o)) (cadr o))) (addrinfo (and (pair? o) (pair? (cdr o)) (cadr o)))
(sockaddr (and addrinfo (address-info-address addrinfo))) (sockaddr (and addrinfo (address-info-address addrinfo)))
(sockaddr-len (if addrinfo (address-info-address-length addrinfo) 0))) (sockaddr-len (if addrinfo (address-info-address-length addrinfo) 0)))
(%send socket bv flags sockaddr sockaddr-len))) (%send socket bv flags sockaddr sockaddr-len timeout)))
;;> Recieves data from \var{socket} to fill the bytevector \var{bv} by ;;> Recieves data from \var{socket} to fill the bytevector \var{bv} by
;;> calling recvfrom. Returns the number of bytes read, or a negative ;;> calling recvfrom. Returns the number of bytes read, or a negative
@ -126,11 +132,17 @@
;;> \scheme{connect}. ;;> \scheme{connect}.
(define (receive! socket bv . o) (define (receive! socket bv . o)
(apply receive!/non-blocking socket bv #f o))
;;> Equivalent to \scheme{receive!} but gives up and returns false if
;;> no packets are received within \var{timeout} seconds.
(define (receive!/non-blocking socket bv timeout . o)
(let* ((flags (if (pair? o) (car o) 0)) (let* ((flags (if (pair? o) (car o) 0))
(addrinfo (and (pair? o) (pair? (cdr o)) (cadr o))) (addrinfo (and (pair? o) (pair? (cdr o)) (cadr o)))
(sockaddr (and addrinfo (address-info-address addrinfo))) (sockaddr (and addrinfo (address-info-address addrinfo)))
(sockaddr-len (if addrinfo (address-info-address-length addrinfo) 0))) (sockaddr-len (if addrinfo (address-info-address-length addrinfo) 0)))
(%receive! socket bv flags sockaddr sockaddr-len))) (%receive! socket bv flags sockaddr sockaddr-len timeout)))
;;> Shortcut for \scheme{receive}, returning a newly created ;;> Shortcut for \scheme{receive}, returning a newly created
;;> bytevector of length \var{n} on success and \scheme{#f} on ;;> bytevector of length \var{n} on success and \scheme{#f} on
@ -141,3 +153,12 @@
(m (apply receive! socket bv o))) (m (apply receive! socket bv o)))
(and (>= m 0) (and (>= m 0)
(subbytes bv 0 m)))) (subbytes bv 0 m))))
;;> Equivalent to \scheme{receive} but gives up and returns false if
;;> no packets are received within \var{timeout} seconds.
(define (receive/non-blocking socket n timeout . o)
(let* ((bv (make-bytevector n))
(m (apply receive!/non-blocking socket bv timeout o)))
(and (>= m 0)
(subbytes bv 0 m))))

View file

@ -5,6 +5,7 @@
sockaddr-name sockaddr-port make-sockaddr sockaddr-name sockaddr-port make-sockaddr
with-net-io open-net-io make-listener-socket with-net-io open-net-io make-listener-socket
send receive! receive send receive! receive
send/non-blocking receive!/non-blocking receive/non-blocking
address-info-family address-info-socket-type address-info-protocol address-info-family address-info-socket-type address-info-protocol
address-info-address address-info-address-length address-info-next address-info-address address-info-address-length address-info-next
address-family/unix address-family/inet address-family/inet6 address-family/unix address-family/inet address-family/inet6

View file

@ -55,12 +55,12 @@
(define-c sexp (%send "sexp_sendto") (define-c sexp (%send "sexp_sendto")
((value ctx sexp) (value self sexp) ((value ctx sexp) (value self sexp)
fileno bytevector (value (bytevector-length arg3) size_t) int fileno bytevector (value (bytevector-length arg3) size_t) int
(maybe-null sockaddr) socklen_t)) (maybe-null sockaddr) socklen_t sexp))
(define-c sexp (%receive! "sexp_recvfrom") (define-c sexp (%receive! "sexp_recvfrom")
((value ctx sexp) (value self sexp) ((value ctx sexp) (value self sexp)
fileno bytevector (value (bytevector-length arg3) size_t) int fileno bytevector (value (bytevector-length arg3) size_t) int
(maybe-null sockaddr) socklen_t)) (maybe-null sockaddr) socklen_t sexp))
;;> Returns a list of 2 new sockets, the input and output end of a new ;;> Returns a list of 2 new sockets, the input and output end of a new
;;> pipe, respectively. ;;> pipe, respectively.

View file

@ -390,7 +390,7 @@ static sexp sexp_insert_pollfd (sexp ctx, int fd, int events) {
} }
/* block the current thread on the specified port */ /* block the current thread on the specified port */
static sexp sexp_blocker (sexp ctx, sexp self, sexp_sint_t n, sexp portorfd) { static sexp sexp_blocker (sexp ctx, sexp self, sexp_sint_t n, sexp portorfd, sexp timeout) {
int fd; int fd;
/* register the fd */ /* register the fd */
if (sexp_portp(portorfd)) if (sexp_portp(portorfd))
@ -406,7 +406,7 @@ static sexp sexp_blocker (sexp ctx, sexp self, sexp_sint_t n, sexp portorfd) {
/* pause the current thread */ /* pause the current thread */
sexp_context_waitp(ctx) = 1; sexp_context_waitp(ctx) = 1;
sexp_context_event(ctx) = portorfd; sexp_context_event(ctx) = portorfd;
sexp_insert_timed(ctx, ctx, SEXP_FALSE); sexp_insert_timed(ctx, ctx, timeout);
return SEXP_VOID; return SEXP_VOID;
} }
@ -676,7 +676,7 @@ sexp sexp_init_library (sexp ctx, sexp self, sexp_sint_t n, sexp env, const char
sexp_global(ctx, SEXP_G_THREADS_SCHEDULER) sexp_global(ctx, SEXP_G_THREADS_SCHEDULER)
= sexp_make_foreign(ctx, "scheduler", 1, 0, (sexp_proc1)sexp_scheduler, SEXP_FALSE); = sexp_make_foreign(ctx, "scheduler", 1, 0, (sexp_proc1)sexp_scheduler, SEXP_FALSE);
sexp_global(ctx, SEXP_G_THREADS_BLOCKER) sexp_global(ctx, SEXP_G_THREADS_BLOCKER)
= sexp_make_foreign(ctx, "blocker", 1, 0, (sexp_proc1)sexp_blocker, SEXP_FALSE); = sexp_make_foreign(ctx, "blocker", 2, 0, (sexp_proc1)sexp_blocker, SEXP_FALSE);
/* remember the env to lookup the runner later */ /* remember the env to lookup the runner later */
sexp_global(ctx, SEXP_G_THREADS_SIGNAL_RUNNER) = env; sexp_global(ctx, SEXP_G_THREADS_SIGNAL_RUNNER) = env;

4
sexp.c
View file

@ -1840,8 +1840,8 @@ int sexp_maybe_block_port (sexp ctx, sexp in, int forcep) {
if (sexp_port_stream(in)) if (sexp_port_stream(in))
clearerr(sexp_port_stream(in)); clearerr(sexp_port_stream(in));
f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER); f = sexp_global(ctx, SEXP_G_THREADS_BLOCKER);
if (sexp_opcodep(f)) { if (sexp_applicablep(f)) {
((sexp_proc2)sexp_opcode_func(f))(ctx, f, 1, in); sexp_apply2(ctx, f, in, SEXP_FALSE);
return 1; return 1;
} }
} }

21
vm.c
View file

@ -1906,7 +1906,7 @@ sexp sexp_apply (sexp ctx, sexp proc, sexp args) {
&& (errno == EAGAIN)) { && (errno == EAGAIN)) {
if (sexp_port_stream(_ARG2)) clearerr(sexp_port_stream(_ARG2)); if (sexp_port_stream(_ARG2)) clearerr(sexp_port_stream(_ARG2));
if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER)))
sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG2); sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG2, SEXP_FALSE);
else /* no scheduler but output full, wait 5ms */ else /* no scheduler but output full, wait 5ms */
usleep(5*1000); usleep(5*1000);
fuel = 0; fuel = 0;
@ -1959,7 +1959,7 @@ sexp sexp_apply (sexp ctx, sexp proc, sexp args) {
/* yield if threads are enabled (otherwise busy loop) */ /* yield if threads are enabled (otherwise busy loop) */
/* TODO: the wait seems necessary on OS X to stop a print loop to ptys */ /* TODO: the wait seems necessary on OS X to stop a print loop to ptys */
if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER)))
sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG3); sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG3, SEXP_FALSE);
else /* no scheduler but output full, wait 5ms */ else /* no scheduler but output full, wait 5ms */
usleep(5*1000); usleep(5*1000);
fuel = 0; fuel = 0;
@ -1991,7 +1991,7 @@ sexp sexp_apply (sexp ctx, sexp proc, sexp args) {
if (sexp_port_stream(_ARG1)) clearerr(sexp_port_stream(_ARG1)); if (sexp_port_stream(_ARG1)) clearerr(sexp_port_stream(_ARG1));
/* TODO: block and unblock */ /* TODO: block and unblock */
if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER)))
sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG1); sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG1, SEXP_FALSE);
fuel = 0; fuel = 0;
ip--; /* try again */ ip--; /* try again */
} else } else
@ -2017,7 +2017,7 @@ sexp sexp_apply (sexp ctx, sexp proc, sexp args) {
&& (errno == EAGAIN)) { && (errno == EAGAIN)) {
if (sexp_port_stream(_ARG1)) clearerr(sexp_port_stream(_ARG1)); if (sexp_port_stream(_ARG1)) clearerr(sexp_port_stream(_ARG1));
if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER))) if (sexp_applicablep(sexp_global(ctx, SEXP_G_THREADS_BLOCKER)))
sexp_apply1(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG1); sexp_apply2(ctx, sexp_global(ctx, SEXP_G_THREADS_BLOCKER), _ARG1, SEXP_FALSE);
fuel = 0; fuel = 0;
ip--; /* try again */ ip--; /* try again */
} else } else
@ -2120,4 +2120,17 @@ sexp sexp_apply1 (sexp ctx, sexp f, sexp x) {
return res; return res;
} }
sexp sexp_apply2 (sexp ctx, sexp f, sexp x, sexp y) {
sexp res;
sexp_gc_var1(args);
if (sexp_opcodep(f) && sexp_opcode_func(f)) {
res = ((sexp_proc3)sexp_opcode_func(f))(ctx, f, 2, x, y);
} else {
sexp_gc_preserve1(ctx, args);
res = sexp_apply(ctx, f, args=sexp_list2(ctx, x, y));
sexp_gc_release1(ctx);
}
return res;
}
#endif #endif