mirror of
https://github.com/justinethier/cyclone.git
synced 2025-05-19 13:49:16 +02:00
Build-out the library using code from atomics.scm
This commit is contained in:
parent
a5066eceaf
commit
eb83b51bc1
1 changed files with 125 additions and 31 deletions
|
@ -1,40 +1,134 @@
|
|||
(define-library (cyclone concurrency)
|
||||
;; TODO:
|
||||
;how to determine if an object can be shared between threads (EG: on the heap)?
|
||||
;
|
||||
;what data type to use for concurrency objects?
|
||||
;what data structures to support - queue, ??
|
||||
;what operations to support - promises, ??
|
||||
;what objects to support - atomics, refs, ??
|
||||
;
|
||||
;what role do atomics play? how does that affect GC?
|
||||
|
||||
;; Some additional notes:
|
||||
;; Add atomic bit field to cvar, maybe even an atomic type field too (?)
|
||||
;; Notes:
|
||||
;; see: https://clojure.github.io/clojure/clojure.core-api.html#clojure.core/compare-and-set!
|
||||
;; initial operations to support:
|
||||
;; - create atomic
|
||||
;; - ref atomic
|
||||
;; - swap, see https://clojure.github.io/clojure/clojure.core-api.html#clojure.core/swap!
|
||||
;; - compare and swap?
|
||||
;;
|
||||
;; (Make-shared exprs)
|
||||
;; Safe way to allocate vars on heap. Tbd if an obj with pointers always necessitates a minor GC
|
||||
;;
|
||||
;;
|
||||
;; once that starts going, double-back to how to allocate shared objects effectively.
|
||||
;; probably want a (make-shared)
|
||||
;; may also way a way to allocate multiple shared objects at once (since a minor GC will likely be req'd)
|
||||
(define-library (cyclone concurrency)
|
||||
(import
|
||||
(scheme base)
|
||||
)
|
||||
(include-c-header "<ck_pr.h>")
|
||||
(export
|
||||
immutable?
|
||||
;; Atoms
|
||||
make-atom
|
||||
atom
|
||||
atom?
|
||||
ref
|
||||
swap!
|
||||
compare-and-set!
|
||||
;; Shared objects
|
||||
make-shared
|
||||
share-all!
|
||||
)
|
||||
(begin
|
||||
(define (dummy) #f)
|
||||
(define-c immutable?
|
||||
"(void *data, int argc, closure _, object k, object obj)"
|
||||
"object result = boolean_f;
|
||||
if (is_object_type(obj) &&
|
||||
(type_of(obj) == pair_tag ||
|
||||
type_of(obj) == vector_tag ||
|
||||
type_of(obj) == bytevector_tag ||
|
||||
type_of(obj) == string_tag
|
||||
) &&
|
||||
immutable(obj) ) {
|
||||
result = boolean_t;
|
||||
}
|
||||
return_closcall1(data, k, result); ")
|
||||
|
||||
(define-c atom?
|
||||
"(void *data, int argc, closure _, object k, object obj)"
|
||||
" object result = Cyc_is_atomic(obj);
|
||||
return_closcall1(data, k, result); ")
|
||||
;;
|
||||
;; Alloc on the heap since by definition atoms are used by multiple threads
|
||||
;;
|
||||
;; We also enforce that an object added to an atom must be an immutable and shared (IE, not thread-local).
|
||||
;; Because we enforce these guarantees:
|
||||
;; - The only way for application-code to change the object is via atomic calls. No other synchronization is required
|
||||
;; - Atoms do not need to have a write barrier because the atom's heap object can never refer to a stack object
|
||||
;; - We do not need to travese an entire pair/vector to determine all members are shared. Only the first object needs to be checked.
|
||||
(define-c %make-atom
|
||||
"(void *data, int argc, closure _, object k, object obj)"
|
||||
" int heap_grown;
|
||||
atomic atm;
|
||||
atomic_type tmp;
|
||||
Cyc_verify_immutable(data, obj); // TODO: verify obj is not on local stack???
|
||||
if (gc_is_stack_obj(data, obj)){
|
||||
Cyc_rt_raise2(data, \"Atom cannot contain a thread-local object\", obj);
|
||||
}
|
||||
tmp.hdr.mark = gc_color_red;
|
||||
tmp.hdr.grayed = 0;
|
||||
tmp.tag = atomic_tag;
|
||||
tmp.obj = obj;
|
||||
atm = gc_alloc(((gc_thread_data *)data)->heap, sizeof(atomic_type), (char *)(&tmp), (gc_thread_data *)data, &heap_grown);
|
||||
ck_pr_store_ptr(&(atm->obj), obj); // Needed??
|
||||
return_closcall1(data, k, atm); ")
|
||||
|
||||
(define (make-atom obj)
|
||||
(%make-atom (make-shared obj)))
|
||||
|
||||
(define (atom . obj)
|
||||
(if (pair? obj)
|
||||
(%make-atom (make-shared (car obj)))
|
||||
(%make-atom #f)))
|
||||
|
||||
;; - ref atomic
|
||||
(define-c ref
|
||||
"(void *data, int argc, closure _, object k, object obj)"
|
||||
" atomic a;
|
||||
Cyc_check_atomic(data, obj);
|
||||
a = (atomic) obj;
|
||||
return_closcall1(data, k, ck_pr_load_ptr(&(a->obj)));")
|
||||
|
||||
;; - swap, see https://clojure.github.io/clojure/clojure.core-api.html#clojure.core/swap!
|
||||
;; Clojure docs:
|
||||
;; Atomically swaps the value of atom to be:
|
||||
;; (apply f current-value-of-atom args). Note that f may be called
|
||||
;; multiple times, and thus should be free of side effects. Returns
|
||||
;; the value that was swapped in.
|
||||
;; (swap! atom f)(swap! atom f x)(swap! atom f x y)(swap! atom f x y & args)
|
||||
;;
|
||||
;; Notes:
|
||||
;; swap! takes the current value of the Atom, calls the function on it (in this case, inc), and sets the new value. However, just before setting the new value, it checks to make sure the old value is still in there. If it's different, it starts over. It calls the function again with the new value it found. It keeps doing this until it finally writes the value. Because it can check the value and write it in one go, it's an atomic operation, hence the name.
|
||||
;;
|
||||
;; That means the function can get called multiple times. That means it needs to be a pure function. Another thing is that you can't control the order of the function calls. If multiple threads are swapping to an Atom at the same time, order is out of the window. So make sure your functions are independent of order, like we talked about before.
|
||||
;;
|
||||
(define (swap! atom f . args)
|
||||
(let* ((oldval (ref atom))
|
||||
(newval (make-shared (apply f oldval args))))
|
||||
(if (compare-and-set! atom oldval newval)
|
||||
newval ;; value did not change, return new one
|
||||
(apply swap! atom f args) ;; Value changed, try again
|
||||
)))
|
||||
|
||||
;; Return a reference to an object that can be safely shared by many threads.
|
||||
;;
|
||||
;; If the given object is atomic or already shared it it simply returned.
|
||||
;; Otherwise it is necessary to create a copy of the object.
|
||||
;;
|
||||
;; Note this function may trigger a minor GC if a thread-local pair or vector
|
||||
;; is passed.
|
||||
(define-c make-shared
|
||||
"(void *data, int argc, closure _, object k, object obj)"
|
||||
" Cyc_make_shared_object(data, k, obj); ")
|
||||
|
||||
;; Allow all objects currently on the calling thread's local stack to be shared
|
||||
;; with other threads.
|
||||
(define-c share-all!
|
||||
"(void *data, int argc, closure _, object k)"
|
||||
" Cyc_trigger_minor_gc(data, k); ")
|
||||
|
||||
;; (compare-and-set! atom oldval newval)
|
||||
;; https://clojuredocs.org/clojure.core/compare-and-set!
|
||||
;; Atomically sets the value of atom to newval if and only if the
|
||||
;; current value of the atom is identical to oldval. Returns true if
|
||||
;; set happened, else false
|
||||
(define-c compare-and-set!
|
||||
"(void *data, int argc, closure _, object k, object obj, object oldval, object newval)"
|
||||
" atomic a;
|
||||
Cyc_check_atomic(data, obj);
|
||||
Cyc_verify_immutable(data, newval);
|
||||
if (gc_is_stack_obj(data, obj)){
|
||||
Cyc_rt_raise2(data, \"Atom cannot contain a thread-local object\", obj);
|
||||
}
|
||||
a = (atomic) obj;
|
||||
bool result = ck_pr_cas_ptr(&(a->obj), oldval, newval);
|
||||
object rv = result ? boolean_t : boolean_f;
|
||||
return_closcall1(data, k, rv); ")
|
||||
|
||||
)
|
||||
)
|
||||
|
|
Loading…
Add table
Reference in a new issue