From 71e5aa2dd6346436b75665542321d75cdef44829 Mon Sep 17 00:00:00 2001 From: yorickhardy <45015436+yorickhardy@users.noreply.github.com> Date: Tue, 21 Jan 2025 04:10:49 +0200 Subject: [PATCH] Improve garbage collection for terminated threads (#550) * gc: add a function to force the collector to run This requires adding a "forced" stage for the collector, which is the initial stage for a forced collection. Thereafter, the collector continues to the usual stages of collection. * runtime: force the garbage collector to run when a thread exits This is a first attempt to improve the memory usage reported in issue #534. * srfi-18: call Cyc_end_thread on thread exits This ensures that the collector has a chance to run whenever a thread exits. Attempts to partially address issue #534. * gc: free unused parts of the heap before merging When a thread exits, the heap is merged into the main thread. Before doing so, free any unused parts of the heap to reduce memory usage. Attempts to partially address issue #534. * srfi-18: thread-terminate! takes a thread as argument * gc: revert adding STAGE_FORCING Use gc_start_major_collection() instead. Partial work towards addressing issue #534. * gc: free empty pages in gc_heap_merge() Moving the code from gc_merge_all_heaps to gc_heap_merge removes special handling of the start of the list and is (hopefully) easier to read. Partial work towards addressing issue #534. * gc: oops, forgot the "freed" count Partial work towards addressing issue #534. * gc: oops, forgot the "freed" count (again) Partial work towards addressing issue #534. * types: update forward declaration of gc_heap_merge() Partial work towards addressing issue #534. * gc: remove accidental double counting * runtime: small (cosmetic) simplification * srfi-18: add a slot for thread context in the thread object Partial work towards addressing issue #534. * srfi-18: do a minor gc when terminating a thread This ensures that any objects which are part of the thread context are transferred to the heap. Partial work towards addressing issue #534. * types.h: make gc_alloc_pair public This will be used to create the thread context. Partial work towards addressing issue #534. * gc: prepare heap objects for sweeping Also introduce a global variable to track whether merged heaps need to be swept. Partial work towards addressing issue #534. * gc: create a context for terminated thread objects The context ensures that parametrised objects, continuations and exception handlers can still be traced but are no longer root objects (after thread terminations) and can be GCd eventually. Partial work towards addressing issue #534. * gc: sweep and free empty heaps for the primordial thread The primordial thread may not have an opportunity to sweep heap pages which have been merged from terminated threads. So sweep any unswept pages during the cooperation phase. Partial work towards addressing issue #534. * srfi-18: revert thread-terminate! changes These changes need to be revisited, and are not suitable for the threads garbage collection pull request. --- gc.c | 101 ++++++++++++++++++++++++++++++++++++++-- include/cyclone/types.h | 5 +- runtime.c | 3 +- srfi/18.sld | 13 +++++- 4 files changed, 114 insertions(+), 8 deletions(-) diff --git a/gc.c b/gc.c index 434c2bdb..0c1aad9b 100644 --- a/gc.c +++ b/gc.c @@ -55,6 +55,7 @@ static unsigned char gc_color_purple = 1; // There are many "shades" of pu static int gc_status_col = STATUS_SYNC1; static int gc_stage = STAGE_RESTING; +static int gc_threads_merged = 0; // Does not need sync, only used by collector thread static void **mark_stack = NULL; @@ -1901,6 +1902,37 @@ void gc_mut_update(gc_thread_data * thd, object old_obj, object value) } } +static void gc_sweep_primordial_thread_heap() { + int heap_type, must_free; + gc_heap *h, *prev, *next, *sweep; + pthread_mutex_lock(&(primordial_thread->lock)); + for (heap_type = 0; heap_type < NUM_HEAP_TYPES; heap_type++) { + prev = primordial_thread->heap->heap[heap_type]; + h = prev->next; + while(h != NULL) { + next = h->next; + must_free = 0; + if (h->is_unswept) { + if (h->type <= LAST_FIXED_SIZE_HEAP_TYPE) { + sweep = gc_sweep_fixed_size(h, primordial_thread); + } else { + sweep = gc_sweep(h, primordial_thread); + } + must_free = (sweep == NULL); + } else { + must_free = gc_is_heap_empty(h); + } + if (must_free) { + gc_heap_free(h, prev); + } else { + prev = h; + } + h = next; + } + } + pthread_mutex_unlock(&(primordial_thread->lock)); +} + /** * @brief Called by a mutator to cooperate with the collector thread * @param thd Mutator's thread data @@ -1911,11 +1943,22 @@ void gc_mut_update(gc_thread_data * thd, object old_obj, object value) */ void gc_mut_cooperate(gc_thread_data * thd, int buf_len) { - int i, status_c, status_m; + int i, status_c, status_m, stage, merged; #if GC_DEBUG_VERBOSE int debug_print = 0; #endif + // Since terminated threads' heap pages are merged into + // the primordial thread's heap, it may be that a sweep + // for the primordeal thread is never triggered even though + // the heep keeps growing. Perform a sweep here if necessary. + stage = ck_pr_load_int(&gc_stage); + merged = ck_pr_load_int(&gc_threads_merged); + if ((thd == primordial_thread) && (merged == 1) && ((stage == STAGE_SWEEPING) || (stage == STAGE_RESTING))) { + gc_sweep_primordial_thread_heap(); + ck_pr_cas_int(&gc_threads_merged, 1, 0); + } + // Handle any pending marks from write barrier gc_sum_pending_writes(thd, 0); @@ -2739,10 +2782,28 @@ void gc_thread_data_free(gc_thread_data * thd) * * This function assumes appropriate locks are already held. */ -void gc_heap_merge(gc_heap * hdest, gc_heap * hsrc) +int gc_heap_merge(gc_heap * hdest, gc_heap * hsrc) { + int freed = 0; gc_heap *last = gc_heap_last(hdest); + gc_heap *cur = hsrc, *prev = last, *next; last->next = hsrc; + // free any empty heaps and convert remaining heaps + // to free list so that they can be swept + while (cur != NULL) { + cur->is_unswept = 1; + next = cur->next; + if (gc_is_heap_empty(cur)) { + freed += cur->size; + gc_heap_free(cur, prev); + } else { + gc_convert_heap_page_to_free_list(cur, primordial_thread); + ck_pr_cas_int(&gc_threads_merged, 0, 1); + prev = cur; + } + cur = next; + } + return freed; } /** @@ -2755,15 +2816,45 @@ void gc_heap_merge(gc_heap * hdest, gc_heap * hsrc) void gc_merge_all_heaps(gc_thread_data * dest, gc_thread_data * src) { gc_heap *hdest, *hsrc; - int heap_type; + int freed, heap_type, i; + pair_type *context = NULL; + vector_type *v = src->scm_thread_obj; + + // The following objects are part of the thread context and should + // be stored on the primordial thread's heap. Make this explicit by + // including it in the thread object. + if (src->gc_num_args > 0) { + for (i=src->gc_num_args-1; i>=0; --i) { + context = gc_alloc_pair(dest, (src->gc_args)[i], context); + } + } + if (src->gc_cont != NULL && is_object_type(src->gc_cont)) { + context = gc_alloc_pair(dest, src->gc_cont, context); + } + if (src->exception_handler_stack != NULL) { + context = gc_alloc_pair(dest, src->exception_handler_stack, context); + } + if (src->param_objs != NULL) { + context = gc_alloc_pair(dest, src->param_objs, context); + } + + if (context != NULL) { + gc_mark_black(context); + v->elements[8] = context; + } for (heap_type = 0; heap_type < NUM_HEAP_TYPES; heap_type++) { hdest = dest->heap->heap[heap_type]; hsrc = src->heap->heap[heap_type]; + if (!hdest) { + fprintf(stderr, "WARNING !!!!! merging heap type %d does not happen: hdest = %p hsrc = %p size = %d\n", + heap_type, hdest, hsrc, hsrc->size); + fflush(stderr); + } if (hdest && hsrc) { - gc_heap_merge(hdest, hsrc); + freed = gc_heap_merge(hdest, hsrc); ck_pr_add_ptr(&(dest->cached_heap_total_sizes[heap_type]), - ck_pr_load_ptr(&(src->cached_heap_total_sizes[heap_type]))); + ck_pr_load_ptr(&(src->cached_heap_total_sizes[heap_type]))-freed); ck_pr_add_ptr(&(dest->cached_heap_free_sizes[heap_type]), ck_pr_load_ptr(&(src->cached_heap_free_sizes[heap_type]))); } diff --git a/include/cyclone/types.h b/include/cyclone/types.h index c37ae282..486dc818 100644 --- a/include/cyclone/types.h +++ b/include/cyclone/types.h @@ -385,7 +385,7 @@ int gc_is_mutator_new(gc_thread_data * thd); void gc_sleep_ms(int ms); gc_heap *gc_heap_create(int heap_type, size_t size, gc_thread_data * thd); gc_heap *gc_heap_free(gc_heap * page, gc_heap * prev_page); -void gc_heap_merge(gc_heap * hdest, gc_heap * hsrc); +int gc_heap_merge(gc_heap * hdest, gc_heap * hsrc); void gc_merge_all_heaps(gc_thread_data * dest, gc_thread_data * src); void gc_print_stats(gc_heap * h); gc_heap *gc_grow_heap(gc_heap * h, size_t size, gc_thread_data * thd); @@ -1261,6 +1261,9 @@ typedef pair_type *pair; n->pair_car = a; \ n->pair_cdr = d; +/** Create a new pair in the thread's heap */ +void *gc_alloc_pair(gc_thread_data * data, object head, object tail); + /** * Set members of the given pair * @param n - Pointer to a pair object diff --git a/runtime.c b/runtime.c index b192dc8d..2a29de26 100644 --- a/runtime.c +++ b/runtime.c @@ -5364,7 +5364,7 @@ void _Cyc_91end_91thread_67(void *data, object clo, int argc, object * args) vector_type *v = d->scm_thread_obj; v->elements[7] = args[0]; // Store thread result - Cyc_end_thread((gc_thread_data *) data); + Cyc_end_thread(d); object cont = args[0]; return_closcall1(data, cont, boolean_f); } @@ -7197,6 +7197,7 @@ void Cyc_exit_thread(void *data, object _, int argc, object * args) gc_remove_mutator(thd); ck_pr_cas_int((int *)&(thd->thread_state), CYC_THREAD_STATE_RUNNABLE, CYC_THREAD_STATE_TERMINATED); + gc_start_major_collection(thd); pthread_exit(NULL); } diff --git a/srfi/18.sld b/srfi/18.sld index 6dc100d6..bcda0cf8 100644 --- a/srfi/18.sld +++ b/srfi/18.sld @@ -74,6 +74,7 @@ ;; - internal ;; - end of thread cont (or #f for default) ;; - end-result - Result of thread that terminates successfully + ;; - internal thread context at termination, e.g. parameterised objects (vector 'cyc-thread-obj thunk @@ -82,6 +83,7 @@ #f #f #f + #f #f))) (define (thread-name t) (vector-ref t 3)) @@ -118,13 +120,21 @@ make_c_opaque(co, td); return_closcall1(data, k, &co); ") + (define-c %end-thread! + "(void *data, int argc, closure _, object k, object ret)" + " gc_thread_data *d = data; + vector_type *v = d->scm_thread_obj; + v->elements[7] = ret; // Store thread result + Cyc_end_thread(d); + return_closcall1(data, k, boolean_f);") + (define (thread-start! t) ;; Initiate a GC prior to running the thread, in case ;; it contains any closures on the "parent" thread's stack (let* ((thunk (vector-ref t 1)) (thread-params (cons t (lambda () (vector-set! t 5 #f) - (thunk))))) + (let ((r (thunk))) (%end-thread! r)))))) (vector-set! t 5 (%get-thread-data)) ;; Temporarily make parent thread ;; data available for child init (Cyc-minor-gc) @@ -180,6 +190,7 @@ (cond ((and (thread? t) (Cyc-opaque? (vector-ref t 2))) (%thread-join! (vector-ref t 2)) + (Cyc-minor-gc) (vector-ref t 7)) (else #f))) ;; TODO: raise an error instead?