From 2affd2c6779021faef1597568d5dee79b33136b2 Mon Sep 17 00:00:00 2001 From: Alex Shinn Date: Sat, 11 Dec 2010 22:15:17 -0800 Subject: [PATCH] creating initial API for C functions to block on input ports --- eval.c | 2 ++ include/chibi/sexp.h | 22 ++++++++++++++++-- main.c | 6 ++--- sexp.c | 40 ++++++++++++++++++++++++++++---- vm.c | 55 ++++++++++++++++++++++++++------------------ 5 files changed, 92 insertions(+), 33 deletions(-) diff --git a/eval.c b/eval.c index 4683edea..d4e06c88 100644 --- a/eval.c +++ b/eval.c @@ -346,6 +346,8 @@ void sexp_init_eval_context_globals (sexp ctx) { tmp = sexp_c_string(ctx, ".", 1); sexp_push(ctx, sexp_global(ctx, SEXP_G_MODULE_PATH), tmp); #if SEXP_USE_GREEN_THREADS + sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR) + = sexp_user_exception(ctx, SEXP_FALSE, "I/O would block", SEXP_NULL); sexp_global(ctx, SEXP_G_THREADS_FRONT) = SEXP_NULL; sexp_global(ctx, SEXP_G_THREADS_BACK) = SEXP_NULL; sexp_global(ctx, SEXP_G_THREADS_SIGNALS) = SEXP_ZERO; diff --git a/include/chibi/sexp.h b/include/chibi/sexp.h index cd2d4727..57722e57 100644 --- a/include/chibi/sexp.h +++ b/include/chibi/sexp.h @@ -23,6 +23,7 @@ extern "C" { #if SEXP_USE_GREEN_THREADS #include #include +#include #endif #endif @@ -266,8 +267,8 @@ struct sexp_struct { struct { FILE *stream; char *buf; - char openp, no_closep, sourcep; - sexp_uint_t offset, line; + char openp, no_closep, sourcep, blockedp; + sexp_uint_t offset, line, flags; size_t size; sexp name; sexp cookie; @@ -662,10 +663,12 @@ SEXP_API sexp sexp_make_unsigned_integer(sexp ctx, sexp_luint_t x); #define sexp_port_openp(p) (sexp_pred_field(p, port, sexp_portp, openp)) #define sexp_port_no_closep(p) (sexp_pred_field(p, port, sexp_portp, no_closep)) #define sexp_port_sourcep(p) (sexp_pred_field(p, port, sexp_portp, sourcep)) +#define sexp_port_blockedp(p) (sexp_pred_field(p, port, sexp_portp, blockedp)) #define sexp_port_cookie(p) (sexp_pred_field(p, port, sexp_portp, cookie)) #define sexp_port_buf(p) (sexp_pred_field(p, port, sexp_portp, buf)) #define sexp_port_size(p) (sexp_pred_field(p, port, sexp_portp, size)) #define sexp_port_offset(p) (sexp_pred_field(p, port, sexp_portp, offset)) +#define sexp_port_flags(p) (sexp_pred_field(p, port, sexp_portp, flags)) #define sexp_exception_kind(x) (sexp_field(x, exception, SEXP_EXCEPTION, kind)) #define sexp_exception_message(x) (sexp_field(x, exception, SEXP_EXCEPTION, message)) @@ -916,6 +919,7 @@ enum sexp_context_globals { SEXP_G_WEAK_REFERENCE_CACHE, #endif #if SEXP_USE_GREEN_THREADS + SEXP_G_IO_BLOCK_ERROR, SEXP_G_THREADS_SCHEDULER, SEXP_G_THREADS_FRONT, SEXP_G_THREADS_BACK, @@ -1040,6 +1044,20 @@ SEXP_API sexp sexp_utf8_substring_op (sexp ctx sexp_api_params(self, n), sexp st SEXP_API void sexp_utf8_encode_char (unsigned char* p, int len, int c); #endif +#if SEXP_USE_GREEN_THREADS +SEXP_API int sexp_maybe_block_port (sexp ctx, sexp in, int forcep); +SEXP_API void sexp_maybe_unblock_port (sexp ctx, sexp in); +#define sexp_check_block_port(ctx, in, forcep) \ + if (sexp_maybe_block_port(ctx, in, forcep)) \ + return sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR) +#else +#define sexp_maybe_block_port(ctx, in, forcep) 0 +#define sexp_maybe_unblock_port(ctx, in) 0 +#define sexp_check_block_port(ctx, in, forcep) 0 +#endif + +#define SEXP_PORT_UNKNOWN_FLAGS -1uL + #define sexp_assert_type(ctx, pred, type_id, obj) if (! pred(obj)) return sexp_type_exception(ctx, self, type_id, obj) #define SEXP_COPY_DEFAULT SEXP_ZERO diff --git a/main.c b/main.c index f20eda87..ee6d044c 100644 --- a/main.c +++ b/main.c @@ -12,10 +12,6 @@ #define sexp_version_string "chibi-scheme "sexp_version" \""sexp_release_name"\" " -#if SEXP_USE_GREEN_THREADS -#include -#endif - #ifdef PLAN9 #define exit_failure() exits("ERROR") #else @@ -41,7 +37,9 @@ static void repl (sexp ctx) { while (1) { sexp_write_string(ctx, "> ", out); sexp_flush(ctx, out); + sexp_maybe_block_port(ctx, in, 1); obj = sexp_read(ctx, in); + sexp_maybe_unblock_port(ctx, in); if (obj == SEXP_EOF) break; if (sexp_exceptionp(obj)) { diff --git a/sexp.c b/sexp.c index 68c35bf9..6a753fc5 100644 --- a/sexp.c +++ b/sexp.c @@ -1219,10 +1219,12 @@ sexp sexp_make_input_port (sexp ctx, FILE* in, sexp name) { sexp_port_stream(p) = in; sexp_port_name(p) = name; sexp_port_line(p) = 1; + sexp_port_flags(p) = SEXP_PORT_UNKNOWN_FLAGS; sexp_port_buf(p) = NULL; sexp_port_openp(p) = 1; sexp_port_no_closep(p) = 0; sexp_port_sourcep(p) = 0; + sexp_port_blockedp(p) = 0; sexp_port_cookie(p) = SEXP_VOID; return p; } @@ -1620,20 +1622,49 @@ static int sexp_decode_utf8_char(const unsigned char* s) { } #endif +#if SEXP_USE_GREEN_THREADS +int sexp_maybe_block_port (sexp ctx, sexp in, int forcep) { + sexp f; + int c; + if (sexp_port_fileno(in) >= 0) { + if (sexp_port_flags(in) == SEXP_PORT_UNKNOWN_FLAGS) + sexp_port_flags(in) = fcntl(sexp_port_fileno(in), F_GETFL); + if (sexp_port_flags(in) & O_NONBLOCK) { + if (!forcep + && (((c = sexp_read_char(ctx, in)) == EOF) + && sexp_opcodep((f=sexp_global(ctx, SEXP_G_THREADS_BLOCKER))))) { + ((sexp_proc2)sexp_opcode_func(f))(ctx sexp_api_pass(f, 1), in); + return 1; + } else { + if (!forcep) sexp_push_char(ctx, c, in); + sexp_port_blockedp(in) = 1; + fcntl(sexp_port_fileno(in), F_SETFL, sexp_port_flags(in) & ~O_NONBLOCK); + } + } + } + return 0; +} + +void sexp_maybe_unblock_port (sexp ctx, sexp in) { + if (sexp_port_blockedp(in)) { + sexp_port_blockedp(in) = 0; + fcntl(sexp_port_fileno(in), F_SETFL, sexp_port_flags(in)); + } +} +#endif + sexp sexp_read_raw (sexp ctx, sexp in) { char *str; int c1, c2, line; sexp tmp2; sexp_gc_var2(res, tmp); + sexp_check_block_port(ctx, in, 0); sexp_gc_preserve2(ctx, res, tmp); scan_loop: switch (c1 = sexp_read_char(ctx, in)) { case EOF: - if (sexp_at_eofp(in)) - res = SEXP_EOF; - else - goto scan_loop; + res = SEXP_EOF; break; case ';': while ((c1 = sexp_read_char(ctx, in)) != EOF) @@ -1887,6 +1918,7 @@ sexp sexp_read_raw (sexp ctx, sexp in) { if (sexp_port_sourcep(in) && sexp_pointerp(res)) sexp_immutablep(res) = 1; + sexp_maybe_unblock_port(ctx, in); sexp_gc_release2(ctx); return res; } diff --git a/vm.c b/vm.c index 13b7cbcf..b06ff59d 100644 --- a/vm.c +++ b/vm.c @@ -543,6 +543,27 @@ static int sexp_check_type(sexp ctx, sexp a, sexp b) { && sexp_vector_ref(v, sexp_make_fixnum(d)) == b; } +#if SEXP_USE_GREEN_THREADS +#define sexp_fcall_return(x, i) \ + if (sexp_exceptionp(x)) { \ + if (x == sexp_global(ctx, SEXP_G_IO_BLOCK_ERROR)) { \ + fuel = 0; ip--; goto loop; \ + } else { \ + top -= i; \ + _ARG1 = x; \ + ip += sizeof(sexp); \ + goto call_error_handler; \ + } \ + } else { \ + top -= i; \ + _ARG1 = x; \ + ip += sizeof(sexp); \ + } +#else +#define sexp_fcall_return(x, i) \ + top -= i; _ARG1 = x; ip += s; sexp_check_exception(x); +#endif + #if SEXP_USE_DEBUG_VM #include "opt/opcode_names.h" #endif @@ -743,43 +764,34 @@ sexp sexp_vm (sexp ctx, sexp proc) { break; case SEXP_OP_FCALL0: _ALIGN_IP(); - tmp1 = _WORD0; sexp_context_top(ctx) = top; sexp_context_last_fp(ctx) = fp; - _PUSH(((sexp_proc1)sexp_opcode_func(_WORD0))(ctx sexp_api_pass(_WORD0, 0))); - ip += sizeof(sexp); - sexp_check_exception(); + tmp1 = ((sexp_proc1)sexp_opcode_func(_WORD0))(ctx sexp_api_pass(_WORD0, 0)); + sexp_fcall_return(tmp1, -1) break; case SEXP_OP_FCALL1: _ALIGN_IP(); sexp_context_top(ctx) = top; - _ARG1 = ((sexp_proc2)sexp_opcode_func(_WORD0))(ctx sexp_api_pass(_WORD0, 1), _ARG1); - ip += sizeof(sexp); - sexp_check_exception(); + tmp1 = ((sexp_proc2)sexp_opcode_func(_WORD0))(ctx sexp_api_pass(_WORD0, 1), _ARG1); + sexp_fcall_return(tmp1, 0) break; case SEXP_OP_FCALL2: _ALIGN_IP(); sexp_context_top(ctx) = top; - _ARG2 = ((sexp_proc3)sexp_opcode_func(_WORD0))(ctx sexp_api_pass(_WORD0, 2), _ARG1, _ARG2); - top--; - ip += sizeof(sexp); - sexp_check_exception(); + tmp1 = ((sexp_proc3)sexp_opcode_func(_WORD0))(ctx sexp_api_pass(_WORD0, 2), _ARG1, _ARG2); + sexp_fcall_return(tmp1, 1) break; case SEXP_OP_FCALL3: _ALIGN_IP(); sexp_context_top(ctx) = top; - _ARG3 = ((sexp_proc4)sexp_opcode_func(_WORD0))(ctx sexp_api_pass(_WORD0, 3), _ARG1, _ARG2, _ARG3); - top -= 2; - ip += sizeof(sexp); - sexp_check_exception(); + tmp1 = ((sexp_proc4)sexp_opcode_func(_WORD0))(ctx sexp_api_pass(_WORD0, 3), _ARG1, _ARG2, _ARG3); + sexp_fcall_return(tmp1, 2) break; case SEXP_OP_FCALL4: _ALIGN_IP(); sexp_context_top(ctx) = top; - _ARG4 = ((sexp_proc5)sexp_opcode_func(_WORD0))(ctx sexp_api_pass(_WORD0, 4), _ARG1, _ARG2, _ARG3, _ARG4); - top -= 3; - ip += sizeof(sexp); - sexp_check_exception(); + tmp1 = ((sexp_proc5)sexp_opcode_func(_WORD0))(ctx sexp_api_pass(_WORD0, 4), _ARG1, _ARG2, _ARG3, _ARG4); + sexp_fcall_return(tmp1, 3) break; #if SEXP_USE_EXTENDED_FCALL case SEXP_OP_FCALLN: @@ -787,10 +799,7 @@ sexp sexp_vm (sexp ctx, sexp proc) { sexp_context_top(ctx) = top; i = sexp_opcode_num_args(_WORD0); tmp1 = sexp_fcall(ctx, self, i, _WORD0); - top -= (i-1); - _ARG1 = tmp1; - ip += sizeof(sexp); - sexp_check_exception(); + sexp_fcall_return(tmp1, i-1) break; #endif case SEXP_OP_JUMP_UNLESS: