Use atomic operations from libck

This commit is contained in:
Justin Ethier 2015-12-29 22:05:07 -05:00
parent fa72a604fb
commit 71af17d7e7
3 changed files with 41 additions and 48 deletions

71
gc.c
View file

@ -15,6 +15,7 @@
*/
#include <ck_array.h>
#include <ck_pr.h>
#include "cyclone/types.h"
#include <time.h>
@ -773,9 +774,9 @@ heap references, so they can be marked to avoid collection.
*/
void gc_mut_update(gc_thread_data *thd, object old_obj, object value)
{
int status = ATOMIC_GET(&gc_status_col),
stage = ATOMIC_GET(&gc_stage);
if (ATOMIC_GET(&(thd->gc_status)) != STATUS_ASYNC) {
int status = ck_pr_load_int(&gc_status_col),
stage = ck_pr_load_int(&gc_stage);
if (ck_pr_load_int(&(thd->gc_status)) != STATUS_ASYNC) {
//fprintf(stderr, "DEBUG - GC sync marking heap obj %p ", old_obj);
//Cyc_display(old_obj, stderr);
//fprintf(stderr, " and new value %p ", value);
@ -856,10 +857,10 @@ void gc_mut_cooperate(gc_thread_data *thd, int buf_len)
// a race condition
//
// TODO: should use an atomic comparison here
status_c = ATOMIC_GET(&gc_status_col);
status_m = ATOMIC_GET(&(thd->gc_status));
status_c = ck_pr_load_int(&gc_status_col);
status_m = ck_pr_load_int(&(thd->gc_status));
if (status_m != status_c) {
ATOMIC_SET_IF_EQ(&(thd->gc_status), status_m, status_c);
ck_pr_cas_int(&(thd->gc_status), status_m, status_c);
if (status_m == STATUS_ASYNC) {
// Async is done, so clean up old mark data from the last collection
pthread_mutex_lock(&(thd->lock));
@ -885,7 +886,7 @@ void gc_mut_cooperate(gc_thread_data *thd, int buf_len)
gc_mark_gray(thd, thd->moveBuf[i]);
}
pthread_mutex_unlock(&(thd->lock));
thd->gc_alloc_color = ATOMIC_GET(&gc_color_mark);
thd->gc_alloc_color = ck_pr_load_int(&gc_color_mark);
}
}
#if GC_DEBUG_VERBOSE
@ -903,12 +904,12 @@ void gc_mut_cooperate(gc_thread_data *thd, int buf_len)
// Initiate collection cycle if free space is too low.
// Threshold is intentially low because we have to go through an
// entire handshake/trace/sweep cycle, ideally without growing heap.
if (ATOMIC_GET(&gc_stage) == STAGE_RESTING &&
if (ck_pr_load_int(&gc_stage) == STAGE_RESTING &&
(cached_heap_free_size < (cached_heap_total_size * 0.50))){
#if GC_DEBUG_TRACE
fprintf(stdout, "Less than 50%% of the heap is free, initiating collector\n");
#endif
ATOMIC_SET_IF_EQ(&gc_stage, STAGE_RESTING, STAGE_CLEAR_OR_MARKING);
ck_pr_cas_int(&gc_stage, STAGE_RESTING, STAGE_CLEAR_OR_MARKING);
}
}
@ -1034,7 +1035,7 @@ void gc_mark_black(object obj)
// TODO: is sync required to get colors? probably not on the collector
// thread (at least) since colors are only changed once during the clear
// phase and before the first handshake.
int markColor = ATOMIC_GET(&gc_color_mark);
int markColor = ck_pr_load_int(&gc_color_mark);
if (is_object_type(obj) && mark(obj) != markColor) {
// Gray any child objects
// Note we probably should use some form of atomics/synchronization
@ -1136,8 +1137,8 @@ void gc_handshake(gc_status_type s)
void gc_post_handshake(gc_status_type s)
{
int status = ATOMIC_GET(&gc_status_col);
while (!ATOMIC_SET_IF_EQ(&gc_status_col, status, s)){}
int status = ck_pr_load_int(&gc_status_col);
while (!ck_pr_cas_int(&gc_status_col, status, s)){}
}
void gc_wait_handshake()
@ -1152,18 +1153,18 @@ void gc_wait_handshake()
CK_ARRAY_FOREACH(&Cyc_mutators, &iterator, &m) {
while (1) {
// TODO: use an atomic comparison
statusc = ATOMIC_GET(&gc_status_col);
statusm = ATOMIC_GET(&(m->gc_status));
statusc = ck_pr_load_int(&gc_status_col);
statusm = ck_pr_load_int(&(m->gc_status));
if (statusc == statusm) {
// Handshake succeeded, check next mutator
break;
}
thread_status = ATOMIC_GET(&(m->thread_state));
thread_status = ck_pr_load_int((int *)&(m->thread_state));
if (thread_status == CYC_THREAD_STATE_BLOCKED ||
thread_status == CYC_THREAD_STATE_BLOCKED_COOPERATING) {
if (statusm == STATUS_ASYNC) { // Prev state
ATOMIC_SET_IF_EQ(&(m->gc_status), statusm, statusc);
ck_pr_cas_int(&(m->gc_status), statusm, statusc);
// Async is done, so clean up old mark data from the last collection
pthread_mutex_lock(&(m->lock));
m->last_write = 0;
@ -1171,20 +1172,20 @@ void gc_wait_handshake()
m->pending_writes = 0;
pthread_mutex_unlock(&(m->lock));
}else if (statusm == STATUS_SYNC1) {
ATOMIC_SET_IF_EQ(&(m->gc_status), statusm, statusc);
ck_pr_cas_int(&(m->gc_status), statusm, statusc);
} else if (statusm == STATUS_SYNC2) {
printf("DEBUG - is mutator still blocked?\n");
// Check again, if thread is still blocked we need to cooperate
if (ATOMIC_SET_IF_EQ(&(m->thread_state),
if (ck_pr_cas_int((int *)&(m->thread_state),
CYC_THREAD_STATE_BLOCKED,
CYC_THREAD_STATE_BLOCKED_COOPERATING)
||
ATOMIC_SET_IF_EQ(&(m->thread_state),
ck_pr_cas_int((int *)&(m->thread_state),
CYC_THREAD_STATE_BLOCKED_COOPERATING,
CYC_THREAD_STATE_BLOCKED_COOPERATING)
) {
printf("DEBUG - update mutator GC status\n");
ATOMIC_SET_IF_EQ(&(m->gc_status), statusm, statusc);
ck_pr_cas_int(&(m->gc_status), statusm, statusc);
pthread_mutex_lock(&(m->lock));
printf("DEBUG - collector is cooperating for blocked mutator\n");
buf_len = gc_minor(m, m->stack_limit, m->stack_start, m->gc_cont, NULL, 0);
@ -1197,7 +1198,7 @@ printf("DEBUG - collector is cooperating for blocked mutator\n");
for (i = 0; i < buf_len; i++) {
gc_mark_gray(m, m->moveBuf[i]);
}
m->gc_alloc_color = ATOMIC_GET(&gc_color_mark);
m->gc_alloc_color = ck_pr_load_int(&gc_color_mark);
pthread_mutex_unlock(&(m->lock));
}
}
@ -1233,12 +1234,12 @@ void gc_collector()
time_t sweep_start = time(NULL);
#endif
//clear :
ATOMIC_SET_IF_EQ(&gc_stage, STAGE_RESTING, STAGE_CLEAR_OR_MARKING);
ck_pr_cas_int(&gc_stage, STAGE_RESTING, STAGE_CLEAR_OR_MARKING);
// exchange values of markColor and clearColor
old_clear = ATOMIC_GET(&gc_color_clear);
old_mark = ATOMIC_GET(&gc_color_mark);
while(!ATOMIC_SET_IF_EQ(&gc_color_clear, old_clear, old_mark)){}
while(!ATOMIC_SET_IF_EQ(&gc_color_mark, old_mark, old_clear)){}
old_clear = ck_pr_load_int(&gc_color_clear);
old_mark = ck_pr_load_int(&gc_color_mark);
while(!ck_pr_cas_int(&gc_color_clear, old_clear, old_mark)){}
while(!ck_pr_cas_int(&gc_color_mark, old_mark, old_clear)){}
#if GC_DEBUG_TRACE
fprintf(stderr, "DEBUG - swap clear %d / mark %d\n", gc_color_clear, gc_color_mark);
#endif
@ -1251,7 +1252,7 @@ fprintf(stderr, "DEBUG - after handshake sync 1\n");
#if GC_DEBUG_TRACE
fprintf(stderr, "DEBUG - after handshake sync 2\n");
#endif
ATOMIC_SET_IF_EQ(&gc_stage, STAGE_CLEAR_OR_MARKING, STAGE_TRACING);
ck_pr_cas_int(&gc_stage, STAGE_CLEAR_OR_MARKING, STAGE_TRACING);
gc_post_handshake(STATUS_ASYNC);
#if GC_DEBUG_TRACE
fprintf(stderr, "DEBUG - after post_handshake async\n");
@ -1267,7 +1268,7 @@ fprintf(stderr, "DEBUG - after wait_handshake async\n");
fprintf(stderr, "DEBUG - after trace\n");
//debug_dump_globals();
#endif
ATOMIC_SET_IF_EQ(&gc_stage, STAGE_TRACING, STAGE_SWEEPING);
ck_pr_cas_int(&gc_stage, STAGE_TRACING, STAGE_SWEEPING);
//
//sweep :
max_freed = gc_sweep(gc_get_heap(), &freed);
@ -1291,7 +1292,7 @@ fprintf(stderr, "DEBUG - after wait_handshake async\n");
#endif
gc_free_old_thread_data();
// Idle the GC thread
ATOMIC_SET_IF_EQ(&gc_stage, STAGE_SWEEPING, STAGE_RESTING);
ck_pr_cas_int(&gc_stage, STAGE_SWEEPING, STAGE_RESTING);
}
void *collector_main(void *arg)
@ -1304,7 +1305,7 @@ void *collector_main(void *arg)
//sparse enough (would be difficult to do without relocations, though
tim.tv_nsec = 100 * NANOSECONDS_PER_MILLISECOND;
while (1) {
stage = ATOMIC_GET(&gc_stage);
stage = ck_pr_load_int(&gc_stage);
if (stage != STAGE_RESTING) {
gc_collector();
}
@ -1357,8 +1358,8 @@ void gc_thread_data_init(gc_thread_data *thd, int mut_num, char *stack_base, lon
thd->gc_num_args = 0;
thd->moveBufLen = 0;
gc_thr_grow_move_buffer(thd);
thd->gc_alloc_color = ATOMIC_GET(&gc_color_clear);
thd->gc_status = ATOMIC_GET(&gc_status_col);
thd->gc_alloc_color = ck_pr_load_int(&gc_color_clear);
thd->gc_status = ck_pr_load_int(&gc_status_col);
thd->last_write = 0;
thd->last_read = 0;
thd->mark_buffer_len = 128;
@ -1390,7 +1391,7 @@ void gc_thread_data_free(gc_thread_data *thd)
void gc_mutator_thread_blocked(gc_thread_data *thd, object cont)
{
if(!ATOMIC_SET_IF_EQ(&(thd->thread_state),
if(!ck_pr_cas_int((int *)&(thd->thread_state),
CYC_THREAD_STATE_RUNNABLE,
CYC_THREAD_STATE_BLOCKED)){
fprintf(stderr, "Unable to change thread from runnable to blocked. status = %d\n", thd->thread_state);
@ -1405,7 +1406,7 @@ void gc_mutator_thread_runnable(gc_thread_data *thd, object result)
// Transition from blocked back to runnable using CAS.
// If we are unable to transition back, assume collector
// has cooperated on behalf of this mutator thread.
if (!ATOMIC_SET_IF_EQ(&(thd->thread_state),
if (!ck_pr_cas_int((int *)&(thd->thread_state),
CYC_THREAD_STATE_BLOCKED,
CYC_THREAD_STATE_RUNNABLE)){
printf("DEBUG - Collector cooperated, wait for it to finish. status is %d\n", thd->thread_state);
@ -1413,7 +1414,7 @@ printf("DEBUG - Collector cooperated, wait for it to finish. status is %d\n", th
pthread_mutex_lock(&(thd->lock));
pthread_mutex_unlock(&(thd->lock));
// update thread status
while(!ATOMIC_SET_IF_EQ(&(thd->thread_state),
while(!ck_pr_cas_int((int *)&(thd->thread_state),
CYC_THREAD_STATE_BLOCKED_COOPERATING,
CYC_THREAD_STATE_RUNNABLE)){}
// transport result to heap, if necessary (IE, is not a value type)

View file

@ -447,13 +447,4 @@ void gc_mutator_thread_runnable(gc_thread_data *thd, object result);
gc_heap *gc_get_heap();
int gc_minor(void *data, object low_limit, object high_limit, closure cont, object *args, int num_args);
// Atomics section
// TODO: this is all compiler dependent, need to use different macros depending
// upon the compiler (and arch)
// TODO: relocate this to its own header?
#define ATOMIC_INC(ptr) __sync_fetch_and_add((ptr),1)
#define ATOMIC_DEC(ptr) __sync_fetch_and_sub((ptr),1)
#define ATOMIC_GET(ptr) __sync_fetch_and_add((ptr),0)
#define ATOMIC_SET_IF_EQ(ptr, oldv, newv) __sync_bool_compare_and_swap(ptr, oldv, newv)
#endif /* CYCLONE_TYPES_H */

View file

@ -7,6 +7,7 @@
*/
#include <ck_hs.h>
#include <ck_pr.h>
#include "cyclone/types.h"
#include "cyclone/runtime.h"
#include "cyclone/ck_ht_hash.h"
@ -2408,9 +2409,9 @@ void GC(void *data, closure cont, object *args, int num_args)
int alloci = gc_minor(data, low_limit, high_limit, cont, args, num_args);
// Cooperate with the collector thread
gc_mut_cooperate((gc_thread_data *)data, alloci);
//#if GC_DEBUG_TRACE
#if GC_DEBUG_TRACE
fprintf(stderr, "done with minor GC\n");
//#endif
#endif
// Let it all go, Neo...
longjmp(*(((gc_thread_data *)data)->jmp_start), 1);
}
@ -2796,7 +2797,7 @@ void *Cyc_init_thread(object thunk)
// returns instance so would need to malloc here
// would also need to update termination code to free that memory
gc_add_mutator(thd);
ATOMIC_SET_IF_EQ(&(thd->thread_state), CYC_THREAD_STATE_NEW, CYC_THREAD_STATE_RUNNABLE);
ck_pr_cas_int((int *)&(thd->thread_state), CYC_THREAD_STATE_NEW, CYC_THREAD_STATE_RUNNABLE);
Cyc_start_thread(thd);
return NULL;
}
@ -2860,7 +2861,7 @@ void Cyc_exit_thread(gc_thread_data *thd)
//printf("DEBUG - exiting thread\n");
// Remove thread from the list of mutators, and mark its data to be freed
gc_remove_mutator(thd);
ATOMIC_SET_IF_EQ(&(thd->thread_state), CYC_THREAD_STATE_RUNNABLE, CYC_THREAD_STATE_TERMINATED);
ck_pr_cas_int((int *)&(thd->thread_state), CYC_THREAD_STATE_RUNNABLE, CYC_THREAD_STATE_TERMINATED);
pthread_exit(NULL); // For now, just a proof of concept
}