diff --git a/libs/cyclone/concurrent.sld b/libs/cyclone/concurrent.sld index 4b735bb8..54364954 100644 --- a/libs/cyclone/concurrent.sld +++ b/libs/cyclone/concurrent.sld @@ -24,6 +24,12 @@ swap! compare-and-set! atom-deref + ;; Delays + make-shared-delay + shared-delay + ;; Promises + make-shared-promise + deliver ;; Futures future? future @@ -62,16 +68,17 @@ (cond ((atom? obj) (atom-deref obj)) ((future? obj) (future-deref obj)) + ((shared-delay? obj) (shared-delay-deref obj)) + ((shared-promise? obj) (shared-promise-deref obj)) ((shared-queue? obj) (shared-queue-remove! obj)) (else obj))) -;; TODO: (realized? obj) - see clojure docs -;; Returns true if a value has been produced for a promise, delay, future or lazy sequence. +;; Returns true if a value has been produced for a promise, delay, or future. (define (realized? obj) (cond - ;; TODO: ((future? obj) (future-done? obj)) - ;; TODO: ((shared-delay? obj) (shared-delay-realized? obj)) - ;; TODO: ((shared-promise? obj) (shared-promise-realized? obj)) + ((future? obj) (future-done? obj)) + ((shared-delay? obj) (shared-delay-realized? obj)) + ((shared-promise? obj) (shared-promise-realized? obj)) (else #f))) (define-c atom? @@ -190,6 +197,87 @@ } return_closcall1(data, k, result); ") +;; Delays +(define-record-type + (%make-shared-delay done result lock) + shared-delay? + (done sd:done sd:set-done!) + (value sd:value sd:set-value!) ;; Either thunk or result + (lock sd:lock sd:set-lock!)) + +(define (make-shared-delay thunk) + (make-shared + (%make-shared-delay #f thunk (make-mutex)))) + +(define (shared-delay-deref d) + (when (not (shared-delay? d)) + (error "Expected future but received" d)) + (mutex-lock! (sd:lock d)) + (cond + ((sd:done d) + (sd:value d)) + (else + (sd:set-value! d + (make-shared ((sd:value d)))) ;; Exec thunk and store result + (sd:set-done! d #t))) + (mutex-unlock! (sd:lock d)) +) + +(define (shared-delay-realized? obj) + (let ((rv #f)) + (mutex-lock! (sd:lock obj)) + (set! rv (sd:done obj)) + (mutex-unlock! (sd:lock obj)) + rv)) + +(define-syntax shared-delay + (er-macro-transformer + (lambda (expr rename compare) + `(make-shared-delay (lambda () ,(cadr expr)))))) + +;; Promises + +(define-record-type + (%make-shared-promise done value lock cv) + shared-promise? + (done sp:done sp:set-done!) + (value sp:value sp:set-value!) + (lock sp:lock sp:set-lock!) + (cv sp:cv sp:set-cv!)) + +(define (make-shared-promise) + (make-shared + (%make-shared-promise #f #f (make-mutex) (make-condition-variable)))) + +;; Blocks until given promise has a value, and returns that value. +(define (shared-promise-deref obj) + (when (not (shared-promise? obj)) + (error "Expected shared promise but received" obj)) + (mutex-lock! (sp:lock obj)) + (if (sp:done obj) + (mutex-unlock! (sp:lock obj)) + (mutex-unlock! (sp:lock obj) (sp:cv obj))) ;; wait until value is ready + (sp:value obj)) + +(define (shared-promise-realized? obj) + (let ((rv #f)) + (mutex-lock! (sp:lock obj)) + (set! rv (sp:done obj)) + (mutex-unlock! (sp:lock obj)) + rv)) + +;; Delivers `value` to shared promise `obj` and unblocks waiting threads. +;; Has no effect if a value has already been delivered. +(define (deliver obj value) + (when (not (shared-promise? obj)) + (error "Expected shared promise but received" obj)) + (mutex-lock! (sp:lock obj)) + (when (not (sp:done obj)) + (sp:set-value! obj (make-shared value)) + (sp:set-done! obj #t)) + (mutex-unlock! (sp:lock obj)) + (condition-variable-broadcast! (sp:cv obj))) + ;; Futures (define-record-type (make-future done result lock)