From 06ada5122c78de515d2f179ce0650821504cf116 Mon Sep 17 00:00:00 2001 From: Justin Ethier Date: Thu, 11 Jul 2019 13:07:20 -0400 Subject: [PATCH] WIP, adding delay/promise --- libs/cyclone/concurrent.sld | 8 ++--- libs/cyclone/delay-promise.scm | 57 ++++++++++++++++++++++++++++------ 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/libs/cyclone/concurrent.sld b/libs/cyclone/concurrent.sld index 55541dc7..4b735bb8 100644 --- a/libs/cyclone/concurrent.sld +++ b/libs/cyclone/concurrent.sld @@ -69,9 +69,9 @@ ;; Returns true if a value has been produced for a promise, delay, future or lazy sequence. (define (realized? obj) (cond - ;; TODO: ((future? obj) - ;; TODO: ((shared-delay? obj) - ;; TODO: ((shared-promise? obj) + ;; TODO: ((future? obj) (future-done? obj)) + ;; TODO: ((shared-delay? obj) (shared-delay-realized? obj)) + ;; TODO: ((shared-promise? obj) (shared-promise-realized? obj)) (else #f))) (define-c atom? @@ -225,7 +225,7 @@ (t (make-thread tfnc)) ) (thread-start! t) - ftr)) + (make-shared ftr))) (define (future-done? ftr) (when (not (future? ftr)) diff --git a/libs/cyclone/delay-promise.scm b/libs/cyclone/delay-promise.scm index 3bd26af7..00d5080f 100644 --- a/libs/cyclone/delay-promise.scm +++ b/libs/cyclone/delay-promise.scm @@ -11,7 +11,8 @@ (lock sd:lock sd:set-lock!)) (define (make-shared-delay thunk) - (%make-shared-delay #f thunk (make-mutex))) + (make-shared + (%make-shared-delay #f thunk (make-mutex)))) (define (shared-delay-deref d) (when (not (shared-delay? d)) @@ -27,6 +28,13 @@ (mutex-unlock! (sd:lock d)) ) +(define (shared-delay-realized? obj) + (let ((rv #f)) + (mutex-lock! (sd:lock obj)) + (set! rv (sd:done obj)) + (mutex-unlock! (sd:lock obj)) + rv)) + (define-syntax shared-delay (er-macro-transformer (lambda (expr rename compare) @@ -54,15 +62,46 @@ (cv sp:cv sp:set-cv!)) (define (make-shared-promise) - (%make-shared-promise #f #f (make-mutex) (make-condition-variable))) + (make-shared + (%make-shared-promise #f #f (make-mutex) (make-condition-variable)))) +;; Blocks until given promise has a value, and returns that value. (define (shared-promise-deref obj) - ;; TODO: block on CV until ready - ;; return sp:value -) + (when (not (shared-promise? obj)) + (error "Expected shared promise but received" obj)) + (mutex-lock! (sp:lock obj)) + (if (sp:done obj) + (mutex-unlock! (sp:lock obj)) + (mutex-unlock! (sp:lock obj) (sp:cv obj))) ;; wait until value is ready + (sp:value obj)) -(define (deliver obj) - ;; TODO: if not delivered, compute value and signal all on the cv - ;; else, do nothing -) +(define (shared-promise-realized? obj) + (let ((rv #f)) + (mutex-lock! (sp:lock obj)) + (set! rv (sp:done obj)) + (mutex-unlock! (sp:lock obj)) + rv)) + +;; Delivers `value` to shared promise `obj` and unblocks waiting threads. +;; Has no effect if a value has already been delivered. +(define (deliver obj value) + (when (not (shared-promise? obj)) + (error "Expected shared promise but received" obj)) + (mutex-lock! (sp:lock obj)) + (when (not (sp:done obj)) + (sp:set-value! obj (make-shared value)) + (sp:set-done! obj #t)) + (mutex-unlock! (sp:lock obj)) + (condition-variable-broadcast! (sp:cv obj))) + + +(define tp (make-thread-pool 4)) +(define sp (make-shared-promise)) +(thread-pool-push-task! tp + (lambda () + (thread-sleep! 1) + (deliver sp (list (+ 1 2 3))))) +(write + `(received promised value of ,(shared-promise-deref sp))) +(newline)