Added multi-threaded (shared) versions of delays and promises

This commit is contained in:
Justin Ethier 2019-07-11 13:32:39 -04:00
parent 06ada5122c
commit 0b1c4662fb

View file

@ -24,6 +24,12 @@
swap! swap!
compare-and-set! compare-and-set!
atom-deref atom-deref
;; Delays
make-shared-delay
shared-delay
;; Promises
make-shared-promise
deliver
;; Futures ;; Futures
future? future?
future future
@ -62,16 +68,17 @@
(cond (cond
((atom? obj) (atom-deref obj)) ((atom? obj) (atom-deref obj))
((future? obj) (future-deref obj)) ((future? obj) (future-deref obj))
((shared-delay? obj) (shared-delay-deref obj))
((shared-promise? obj) (shared-promise-deref obj))
((shared-queue? obj) (shared-queue-remove! obj)) ((shared-queue? obj) (shared-queue-remove! obj))
(else obj))) (else obj)))
;; TODO: (realized? obj) - see clojure docs ;; Returns true if a value has been produced for a promise, delay, or future.
;; Returns true if a value has been produced for a promise, delay, future or lazy sequence.
(define (realized? obj) (define (realized? obj)
(cond (cond
;; TODO: ((future? obj) (future-done? obj)) ((future? obj) (future-done? obj))
;; TODO: ((shared-delay? obj) (shared-delay-realized? obj)) ((shared-delay? obj) (shared-delay-realized? obj))
;; TODO: ((shared-promise? obj) (shared-promise-realized? obj)) ((shared-promise? obj) (shared-promise-realized? obj))
(else #f))) (else #f)))
(define-c atom? (define-c atom?
@ -190,6 +197,87 @@
} }
return_closcall1(data, k, result); ") return_closcall1(data, k, result); ")
;; Delays
(define-record-type <shared-delay>
(%make-shared-delay done result lock)
shared-delay?
(done sd:done sd:set-done!)
(value sd:value sd:set-value!) ;; Either thunk or result
(lock sd:lock sd:set-lock!))
(define (make-shared-delay thunk)
(make-shared
(%make-shared-delay #f thunk (make-mutex))))
(define (shared-delay-deref d)
(when (not (shared-delay? d))
(error "Expected future but received" d))
(mutex-lock! (sd:lock d))
(cond
((sd:done d)
(sd:value d))
(else
(sd:set-value! d
(make-shared ((sd:value d)))) ;; Exec thunk and store result
(sd:set-done! d #t)))
(mutex-unlock! (sd:lock d))
)
(define (shared-delay-realized? obj)
(let ((rv #f))
(mutex-lock! (sd:lock obj))
(set! rv (sd:done obj))
(mutex-unlock! (sd:lock obj))
rv))
(define-syntax shared-delay
(er-macro-transformer
(lambda (expr rename compare)
`(make-shared-delay (lambda () ,(cadr expr))))))
;; Promises
(define-record-type <shared-promise>
(%make-shared-promise done value lock cv)
shared-promise?
(done sp:done sp:set-done!)
(value sp:value sp:set-value!)
(lock sp:lock sp:set-lock!)
(cv sp:cv sp:set-cv!))
(define (make-shared-promise)
(make-shared
(%make-shared-promise #f #f (make-mutex) (make-condition-variable))))
;; Blocks until given promise has a value, and returns that value.
(define (shared-promise-deref obj)
(when (not (shared-promise? obj))
(error "Expected shared promise but received" obj))
(mutex-lock! (sp:lock obj))
(if (sp:done obj)
(mutex-unlock! (sp:lock obj))
(mutex-unlock! (sp:lock obj) (sp:cv obj))) ;; wait until value is ready
(sp:value obj))
(define (shared-promise-realized? obj)
(let ((rv #f))
(mutex-lock! (sp:lock obj))
(set! rv (sp:done obj))
(mutex-unlock! (sp:lock obj))
rv))
;; Delivers `value` to shared promise `obj` and unblocks waiting threads.
;; Has no effect if a value has already been delivered.
(define (deliver obj value)
(when (not (shared-promise? obj))
(error "Expected shared promise but received" obj))
(mutex-lock! (sp:lock obj))
(when (not (sp:done obj))
(sp:set-value! obj (make-shared value))
(sp:set-done! obj #t))
(mutex-unlock! (sp:lock obj))
(condition-variable-broadcast! (sp:cv obj)))
;; Futures ;; Futures
(define-record-type <future> (define-record-type <future>
(make-future done result lock) (make-future done result lock)