/*  threads.c -- SRFI-18 thread primitives                    */
/*  Copyright (c) 2010 Alex Shinn.  All rights reserved.      */
/*  BSD-style license: http://synthcode.com/license.txt       */

#include <chibi/eval.h>
#include <time.h>
#include <sys/time.h>
#include <unistd.h>

#define sexp_mutex_name(x)       sexp_slot_ref(x, 0)
#define sexp_mutex_specific(x)   sexp_slot_ref(x, 1)
#define sexp_mutex_thread(x)     sexp_slot_ref(x, 2)
#define sexp_mutex_lockp(x)       sexp_slot_ref(x, 3)

#define sexp_condvar_name(x)     sexp_slot_ref(x, 0)
#define sexp_condvar_specific(x) sexp_slot_ref(x, 1)
#define sexp_condvar_threads(x)  sexp_slot_ref(x, 2)

#define timeval_le(a, b) (((a).tv_sec < (b).tv_sec) || (((a).tv_sec == (b).tv_sec) && ((a).tv_usec < (b).tv_usec)))
#define sexp_context_before(c, t) (((sexp_context_timeval(c).tv_sec != 0) || (sexp_context_timeval(c).tv_usec != 0)) && timeval_le(sexp_context_timeval(c), t))

/* static int mutex_id, condvar_id; */

/**************************** threads *************************************/

static void sexp_define_type_predicate (sexp ctx, sexp env, char *cname, sexp_uint_t type) {
  sexp_gc_var2(name, op);
  sexp_gc_preserve2(ctx, name, op);
  name = sexp_c_string(ctx, cname, -1);
  op = sexp_make_type_predicate(ctx, name, sexp_make_fixnum(type));
  sexp_env_define(ctx, env, name=sexp_intern(ctx, cname, -1), op);
  sexp_gc_release2(ctx);
}

sexp sexp_thread_timeoutp (sexp ctx sexp_api_params(self, n)) {
  return sexp_make_boolean(sexp_context_timeoutp(ctx));
}

sexp sexp_thread_name (sexp ctx sexp_api_params(self, n), sexp thread) {
  sexp_assert_type(ctx, sexp_contextp, SEXP_CONTEXT, thread);
  return sexp_context_name(thread);
}

sexp sexp_thread_specific (sexp ctx sexp_api_params(self, n), sexp thread) {
  sexp_assert_type(ctx, sexp_contextp, SEXP_CONTEXT, thread);
  return sexp_context_specific(thread);
}

sexp sexp_thread_specific_set (sexp ctx sexp_api_params(self, n), sexp thread, sexp val) {
  sexp_assert_type(ctx, sexp_contextp, SEXP_CONTEXT, thread);
  sexp_context_specific(thread) = val;
  return SEXP_VOID;
}

sexp sexp_current_thread (sexp ctx sexp_api_params(self, n)) {
  return ctx;
}

sexp sexp_make_thread (sexp ctx sexp_api_params(self, n), sexp thunk, sexp name) {
  sexp res, *stack;
  sexp_assert_type(ctx, sexp_procedurep, SEXP_PROCEDURE, thunk);
  res = sexp_make_eval_context(ctx, SEXP_FALSE, sexp_context_env(ctx), 0);
  sexp_context_proc(res) = thunk;
  sexp_context_ip(res) = sexp_bytecode_data(sexp_procedure_code(thunk));
  stack = sexp_stack_data(sexp_context_stack(res));
  stack[0] = stack[1] = stack[3] = SEXP_ZERO;
  stack[2] = sexp_global(ctx, SEXP_G_FINAL_RESUMER);
  sexp_context_top(res) = 4;
  sexp_context_last_fp(res) = 0;
  return res;
}

sexp sexp_thread_start (sexp ctx sexp_api_params(self, n), sexp thread) {
  sexp cell;
  sexp_assert_type(ctx, sexp_contextp, SEXP_CONTEXT, thread);
  cell = sexp_cons(ctx, thread, SEXP_NULL);
  if (sexp_pairp(sexp_global(ctx, SEXP_G_THREADS_BACK))) {
    sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_BACK)) = cell;
    sexp_global(ctx, SEXP_G_THREADS_BACK) = cell;
  } else {			/* init queue */
    sexp_global(ctx, SEXP_G_THREADS_BACK) = sexp_global(ctx, SEXP_G_THREADS_FRONT) = cell;
  }
  return SEXP_VOID;
}

sexp sexp_thread_terminate (sexp ctx sexp_api_params(self, n), sexp thread) {
  sexp ls1=SEXP_NULL, ls2=sexp_global(ctx, SEXP_G_THREADS_FRONT);
  sexp_context_refuel(thread) = 0;
  for ( ; sexp_pairp(ls2) && (sexp_car(ls2) != thread); ls2=sexp_cdr(ls2))
    ls1 = ls2;
  if (sexp_pairp(ls2)) {
    if (ls1 == SEXP_NULL)
      sexp_global(ctx, SEXP_G_THREADS_FRONT) = sexp_cdr(ls2);
    else			/* splice */
      sexp_cdr(ls1) = sexp_cdr(ls2);
    if (ls2 == sexp_global(ctx, SEXP_G_THREADS_BACK))
      sexp_global(ctx, SEXP_G_THREADS_BACK) = ls1;
  } else {                      /* check for paused threads */
    ls1=SEXP_NULL, ls2=sexp_global(ctx, SEXP_G_THREADS_PAUSED);
    for ( ; sexp_pairp(ls2) && (sexp_car(ls2) != thread); ls2=sexp_cdr(ls2))
      ls1 = ls2;
    if (sexp_pairp(ls2)) {
      if (ls1 == SEXP_NULL)
        sexp_global(ctx, SEXP_G_THREADS_PAUSED) = sexp_cdr(ls2);
      else			/* splice */
        sexp_cdr(ls1) = sexp_cdr(ls2);
    }
  }
  /* return true if terminating self */
  return sexp_make_boolean(ctx == thread);
}

static void sexp_insert_timed (sexp ctx, sexp thread, sexp timeout) {
#if SEXP_USE_FLONUMS
  double d;
#endif
  sexp ls1=SEXP_NULL, ls2=sexp_global(ctx, SEXP_G_THREADS_PAUSED);
  if (sexp_integerp(timeout) || sexp_flonump(timeout))
    gettimeofday(&sexp_context_timeval(ctx), NULL);
  if (sexp_integerp(timeout)) {
    sexp_context_timeval(ctx).tv_sec += sexp_unbox_fixnum(timeout);
#if SEXP_USE_FLONUMS
  } else if (sexp_flonump(timeout)) {
    d = sexp_flonum_value(timeout);
    sexp_context_timeval(ctx).tv_sec += trunc(d);
    sexp_context_timeval(ctx).tv_usec += (d-trunc(d))*1000000;
#endif
  } else {
    sexp_context_timeval(ctx).tv_sec = 0;
    sexp_context_timeval(ctx).tv_usec = 0;
  }
  if (sexp_numberp(timeout))
    while (sexp_pairp(ls2)
           && sexp_context_before(sexp_car(ls2), sexp_context_timeval(ctx)))
      ls1=ls2, ls2=sexp_cdr(ls2);
  else
    while (sexp_pairp(ls2) && sexp_context_timeval(sexp_car(ls2)).tv_sec)
      ls1=ls2, ls2=sexp_cdr(ls2);
  if (ls1 == SEXP_NULL)
    sexp_global(ctx, SEXP_G_THREADS_PAUSED) = sexp_cons(ctx, thread, ls2);
  else
    sexp_cdr(ls1) = sexp_cons(ctx, thread, ls2);
}

sexp sexp_thread_join (sexp ctx sexp_api_params(self, n), sexp thread, sexp timeout) {
  sexp_assert_type(ctx, sexp_contextp, SEXP_CONTEXT, thread);
  if (sexp_context_refuel(thread) <= 0) /* return true if already terminated */ {
    return SEXP_TRUE;
  }
  sexp_context_timeoutp(ctx) = 0;
  sexp_context_waitp(ctx) = 1;
  sexp_context_event(ctx) = thread;
  sexp_insert_timed(ctx, ctx, timeout);
  return SEXP_FALSE;
}

sexp sexp_thread_sleep (sexp ctx sexp_api_params(self, n), sexp timeout) {
  sexp_assert_type(ctx, sexp_numberp, SEXP_NUMBER, timeout);
  sexp_context_waitp(ctx) = 1;
  sexp_insert_timed(ctx, ctx, timeout);
  return SEXP_FALSE;
}

/**************************** mutexes *************************************/

sexp sexp_mutex_state (sexp ctx sexp_api_params(self, n), sexp mutex) {
  /* sexp_assert_type(ctx, sexp_mutexp, mutex_id, timeout); */
  if (sexp_truep(sexp_mutex_lockp(mutex))) {
    if (sexp_contextp(sexp_mutex_thread(mutex)))
      return sexp_mutex_thread(mutex);
    else
      return sexp_intern(ctx, "not-owned", -1);
  } else {
    return sexp_intern(ctx, (sexp_mutex_thread(mutex) ? "not-abandoned" : "abandoned"), -1);
  }
}

sexp sexp_mutex_lock (sexp ctx sexp_api_params(self, n), sexp mutex, sexp timeout, sexp thread) {
  if (thread == SEXP_TRUE)
    thread = ctx;
  if (sexp_not(sexp_mutex_lockp(mutex))) {
    sexp_mutex_lockp(mutex) = SEXP_TRUE;
    sexp_mutex_thread(mutex) = thread;
    return SEXP_TRUE;
  } else {
    sexp_context_waitp(ctx) = 1;
    sexp_context_event(ctx) = mutex;
    sexp_insert_timed(ctx, ctx, timeout);
    return SEXP_FALSE;
  }
}

sexp sexp_mutex_unlock (sexp ctx sexp_api_params(self, n), sexp mutex, sexp condvar, sexp timeout) {
  sexp ls1, ls2;
  if (sexp_not(condvar)) {
    /* normal unlock - always succeeds, just need to unblock threads */
    if (sexp_truep(sexp_mutex_lockp(mutex))) {
      sexp_mutex_lockp(mutex) = SEXP_FALSE;
      sexp_mutex_thread(mutex) = ctx;
      /* search for threads blocked on this mutex */
      for (ls1=SEXP_NULL, ls2=sexp_global(ctx, SEXP_G_THREADS_PAUSED);
           sexp_pairp(ls2); ls1=ls2, ls2=sexp_cdr(ls2))
        if (sexp_context_event(sexp_car(ls2)) == mutex) {
          if (ls1==SEXP_NULL)
            sexp_global(ctx, SEXP_G_THREADS_PAUSED) = sexp_cdr(ls2);
          else
            sexp_cdr(ls1) = sexp_cdr(ls2);
          sexp_cdr(ls2) = sexp_global(ctx, SEXP_G_THREADS_FRONT);
          sexp_global(ctx, SEXP_G_THREADS_FRONT) = ls2;
          if (! sexp_pairp(sexp_cdr(ls2)))
            sexp_global(ctx, SEXP_G_THREADS_BACK) = ls2;
          sexp_context_waitp(sexp_car(ls2))
            = sexp_context_timeoutp(sexp_car(ls2)) = 0;
          break;
        }
    }
    return SEXP_TRUE;
  } else {
    /* wait on condition var */
    sexp_context_waitp(ctx) = 1;
    sexp_context_event(ctx) = condvar;
    sexp_insert_timed(ctx, ctx, timeout);
    return SEXP_FALSE;
  }
}

/**************************** condition variables *************************/

sexp sexp_condition_variable_signal (sexp ctx sexp_api_params(self, n), sexp condvar) {
  sexp ls1=SEXP_NULL, ls2=sexp_global(ctx, SEXP_G_THREADS_PAUSED);
  for ( ; sexp_pairp(ls2); ls1=ls2, ls2=sexp_cdr(ls2))
    if (sexp_context_event(sexp_car(ls2)) == condvar) {
      if (ls1==SEXP_NULL)
	sexp_global(ctx, SEXP_G_THREADS_PAUSED) = sexp_cdr(ls2);
      else
	sexp_cdr(ls1) = sexp_cdr(ls2);
      sexp_cdr(ls2) = sexp_global(ctx, SEXP_G_THREADS_FRONT);
      sexp_global(ctx, SEXP_G_THREADS_FRONT) = ls2;
      if (! sexp_pairp(sexp_cdr(ls2)))
	sexp_global(ctx, SEXP_G_THREADS_BACK) = ls2;
      sexp_context_waitp(sexp_car(ls2)) = sexp_context_timeoutp(sexp_car(ls2)) = 0;
      return SEXP_TRUE;
    }
  return SEXP_FALSE;
}

sexp sexp_condition_variable_broadcast (sexp ctx sexp_api_params(self, n), sexp condvar) {
  sexp res = SEXP_FALSE;
  while (sexp_truep(sexp_condition_variable_signal(ctx, self, n, condvar)))
    res = SEXP_TRUE;
  return res;
}

/**************************** the scheduler *******************************/

void sexp_wait_on_single_thread (sexp ctx) {
  struct timeval tval;
  useconds_t usecs = 0;
  gettimeofday(&tval, NULL);
  if (tval.tv_sec < sexp_context_timeval(ctx).tv_sec)
    usecs = (sexp_context_timeval(ctx).tv_sec - tval.tv_sec) * 1000000;
  if (tval.tv_usec < sexp_context_timeval(ctx).tv_usec)
    usecs += sexp_context_timeval(ctx).tv_usec - tval.tv_usec;
  usleep(usecs);
}

sexp sexp_scheduler (sexp ctx sexp_api_params(self, n), sexp root_thread) {
  struct timeval tval;
  sexp res, ls1, ls2, tmp, paused, front=sexp_global(ctx, SEXP_G_THREADS_FRONT);

  paused = sexp_global(ctx, SEXP_G_THREADS_PAUSED);

  /* if we've terminated, check threads joining us */
  if (sexp_context_refuel(ctx) <= 0) {
    for (ls1=SEXP_NULL, ls2=paused; sexp_pairp(ls2); ) {
      if (sexp_context_event(sexp_car(ls2)) == ctx) {
        sexp_context_waitp(sexp_car(ls2)) = 0;
        sexp_context_timeoutp(sexp_car(ls2)) = 0;
        if (ls1==SEXP_NULL)
          sexp_global(ctx, SEXP_G_THREADS_PAUSED) = paused = sexp_cdr(ls2);
        else
          sexp_cdr(ls1) = sexp_cdr(ls2);
        tmp = sexp_cdr(ls2);
        sexp_cdr(ls2) = SEXP_NULL;
	if (! sexp_pairp(sexp_global(ctx, SEXP_G_THREADS_BACK))) {
	  sexp_global(ctx, SEXP_G_THREADS_FRONT) = front = ls2;
	} else {
	  sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_BACK)) = ls2;
	}
	sexp_global(ctx, SEXP_G_THREADS_BACK) = ls2;
        ls2 = tmp;
      } else {
        ls1 = ls2;
        ls2 = sexp_cdr(ls2);
      }
    }
  }

  /* check timeouts */
  if (sexp_pairp(paused)) {
    if (gettimeofday(&tval, NULL) == 0) {
      ls1 = SEXP_NULL;
      ls2 = paused;
      while (sexp_pairp(ls2) && sexp_context_before(sexp_car(ls2), tval)) {
        sexp_context_timeoutp(sexp_car(ls2)) = 1;
        sexp_context_waitp(ctx) = 0;
        ls1 = ls2;
        ls2 = sexp_cdr(ls2);
      }
      if (sexp_pairp(ls1)) {
        sexp_cdr(ls1) = SEXP_NULL;
	if (! sexp_pairp(sexp_global(ctx, SEXP_G_THREADS_BACK))) {
	  sexp_global(ctx, SEXP_G_THREADS_FRONT) = front = paused;
	} else {
	  sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_BACK)) = paused;
	}
	sexp_global(ctx, SEXP_G_THREADS_BACK) = ls1;
        sexp_global(ctx, SEXP_G_THREADS_PAUSED) = paused = ls2;
      }
    }
  }

  /* dequeue next thread */
  if (sexp_pairp(front)) {
    res = sexp_car(front);
    if ((sexp_context_refuel(ctx) <= 0) || sexp_context_waitp(ctx)) {
      /* either terminated or paused */
      sexp_global(ctx, SEXP_G_THREADS_FRONT) = sexp_cdr(front);
      if (! sexp_pairp(sexp_cdr(front)))
        sexp_global(ctx, SEXP_G_THREADS_BACK) = SEXP_NULL;
    } else {
      /* swap with front of queue */
      sexp_car(sexp_global(ctx, SEXP_G_THREADS_FRONT)) = ctx;
      /* rotate front of queue to back */
      sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_BACK))
        = sexp_global(ctx, SEXP_G_THREADS_FRONT);
      sexp_global(ctx, SEXP_G_THREADS_FRONT)
        = sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_FRONT));
      sexp_global(ctx, SEXP_G_THREADS_BACK)
        = sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_BACK));
      sexp_cdr(sexp_global(ctx, SEXP_G_THREADS_BACK)) = SEXP_NULL;
    }
  } else {
    res = ctx;
  }

  if (sexp_context_waitp(res)) {
    /* the only thread available was waiting */
    sexp_wait_on_single_thread(res);
    sexp_context_timeoutp(res) = 1;
    sexp_context_waitp(res) = 0;
  }

  return res;
}

/**************************************************************************/

sexp sexp_init_library (sexp ctx sexp_api_params(self, n), sexp env) {

  sexp_define_type_predicate(ctx, env, "thread?", SEXP_CONTEXT);
  sexp_define_foreign(ctx, env, "thread-timeout?", 0, sexp_thread_timeoutp);
  sexp_define_foreign(ctx, env, "current-thread", 0, sexp_current_thread);
  sexp_define_foreign_opt(ctx, env, "make-thread", 2, sexp_make_thread, SEXP_FALSE);
  sexp_define_foreign(ctx, env, "thread-start!", 1, sexp_thread_start);
  sexp_define_foreign(ctx, env, "%thread-terminate!", 1, sexp_thread_terminate);
  sexp_define_foreign(ctx, env, "%thread-join!", 2, sexp_thread_join);
  sexp_define_foreign(ctx, env, "%thread-sleep!", 1, sexp_thread_sleep);
  sexp_define_foreign(ctx, env, "thread-name", 1, sexp_thread_name);
  sexp_define_foreign(ctx, env, "thread-specific", 1, sexp_thread_specific);
  sexp_define_foreign(ctx, env, "thread-specific-set!", 2, sexp_thread_specific_set);
  sexp_define_foreign(ctx, env, "mutex-state", 1, sexp_mutex_state);
  sexp_define_foreign(ctx, env, "%mutex-lock!", 3, sexp_mutex_lock);
  sexp_define_foreign(ctx, env, "%mutex-unlock!", 3, sexp_mutex_unlock);
  sexp_define_foreign(ctx, env, "condition-variable-signal!", 1, sexp_condition_variable_signal);
  sexp_define_foreign(ctx, env, "condition-variable-broadcast!", 1, sexp_condition_variable_broadcast);

  sexp_global(ctx, SEXP_G_THREADS_SCHEDULER)
    = sexp_make_foreign(ctx, "scheduler", 0, 0, (sexp_proc1)sexp_scheduler, SEXP_FALSE);

  return SEXP_VOID;
}