diff --git a/libs/cyclone/concurrent.sld b/libs/cyclone/concurrent.sld index 12977173..aa2484da 100644 --- a/libs/cyclone/concurrent.sld +++ b/libs/cyclone/concurrent.sld @@ -25,6 +25,24 @@ future-call future-deref future-done? + ;; Shared Queues + shared-queue? + make-shared-queue + shared-queue + shared-queue-add! + shared-queue-remove! + shared-queue-clear! + shared-queue-size + shared-queue-wait-count + shared-queue-capacity + shared-queue-empty? + ;; Thread Pool + make-thread-pool + thread-pool? + thread-pool-size + thread-pool-idling-count + thread-pool-idling? + thread-pool-push-task! ;; Immutable objects immutable? ;; Shared objects @@ -214,5 +232,205 @@ result)) ;; END Futures +;; Shared Queues +;; +;; Each is a vector containing a circular buffer of objects that are intended +;; to be shared among many threads. All operations are locked and thread-safe, +;; and the queue will ensure any objects added are made into shared objects for +;; use by other threads. +;; +;; Removal from a queue is a blocking operation, so threads can easily wait for +;; new data to arrive. +(define *default-sq-table-size* 64) + +(define-record-type + (%make-shared-queue store start end wait-count lock cv) + shared-queue? + (store q:store q:set-store!) + (start q:start q:set-start!) + (end q:end q:set-end!) + (wait-count q:wait-count q:set-wait-count) + (lock q:lock q:set-lock!) + (cv q:cv q:set-cv!) + ) + +(define (make-shared-queue) + (make-shared + (%make-shared-queue + (make-vector *default-sq-table-size* #f) + 0 + 0 + 0 + (make-mutex) + (make-condition-variable) + ))) + +(define (shared-queue . elems) + (let ((q (make-shared-queue))) + (for-each + (lambda (elem) + (%shared-queue-add! q elem)) + (reverse elems)))) + +;; Increment an index, possibly back around to the beginning of the queue +(define (inc index capacity) + (if (= index (- capacity 1)) + 0 + (+ index 1))) + +;; Inner add, assumes we already have the lock +(define (%shared-queue-add! q obj) + (vector-set! (q:store q) (q:end q) (make-shared obj)) + (q:set-end! q (inc (q:end q) (vector-length (q:store q)))) + (when (= (q:start q) (q:end q)) + (%shared-queue-resize! q)) +) + +(define (shared-queue-add! q obj) + (mutex-lock! (q:lock q)) + (%shared-queue-add! q (make-shared obj)) + (mutex-unlock! (q:lock q)) + (condition-variable-signal! (q:cv q)) +) + +(define (%shared-queue-resize! q) + ;; (write "TODO: resize the queue")(newline) + ;; TODO: assumes we already have the lock + ;; TODO: error if size is larger than fixnum?? + (let* ((old-start (q:start q)) + (old-end (q:end q)) + (old-store (q:store q)) + (new-store (make-vector (* (vector-length old-store) 2) #f))) + (q:set-start! q 0) + (q:set-end! q 0) + (q:set-store! q new-store) + (let loop ((i 0) + (start old-start)) + (when (not (= i (vector-length old-store))) + (%shared-queue-add! q (vector-ref old-store start)) + (loop (+ i 1) (inc start (vector-length old-store)))))) +) + +;; Blocks if queue is empty (!) +;; should we have a failsafe if the same thread that is doing adds, then +;; does a blocking remove?? +(define (shared-queue-remove! q) + (let loop ((waiting #f)) + (mutex-lock! (q:lock q)) + ;; If thread was previously waiting, clear that status + (when waiting + (set! waiting #f) + (q:set-wait-count q (- (q:wait-count q) 1))) + + (cond + ((= (q:start q) (q:end q)) + ;; Let Q know we are waiting + (set! waiting #t) + (q:set-wait-count q (+ (q:wait-count q) 1)) + ;; Wait for CV, indicating data is ready + (mutex-unlock! (q:lock q) (q:cv q)) + (loop waiting)) + (else + (let ((result (vector-ref (q:store q) (q:start q)))) + (q:set-start! q (inc (q:start q) (vector-length (q:store q)))) + (mutex-unlock! (q:lock q)) + result))))) + +(define (shared-queue-clear! q) + (mutex-lock! (q:lock q)) + (q:set-start! q 0) + (q:set-end! q 0) + (mutex-unlock! (q:lock q))) + +(define (shared-queue-wait-count q) + (define result 0) + (mutex-lock! (q:lock q)) + (set! result (q:wait-count q)) + (mutex-unlock! (q:lock q)) + result) + +;; Return current length of the queue +(define (shared-queue-size q) + (define result 0) + (mutex-lock! (q:lock q)) + (set! result (%shared-queue-size q)) + (mutex-unlock! (q:lock q)) + result) + +(define (%shared-queue-size q) + (let ((start (q:start q)) + (end (q:end q)) + (capacity (vector-length (q:store q)))) + (cond + ((< end start) (+ (- capacity start) end)) + ((> end start) (- end start)) + (else 0)))) ;; (= end start) + +(define (shared-queue-empty? q) + (= 0 (shared-queue-size q))) + +;; Return max size of the queue (until resize occurs) +(define (shared-queue-capacity q) + (define result 0) + (mutex-lock! (q:lock q)) + (set! result (vector-length (q:store q))) + (mutex-unlock! (q:lock q)) + result) + +;- shared-queue->list + +;; END Shared Queues + +;; Thread Pool + (define-record-type + (%make-thread-pool jobq threads ) + thread-pool? + (jobq tp:jobq tp-set-jobq!) + (threads tp:threads tp:set-threads!) + ) + +(define (default-handler err) #f) +(define (%make-thread-pool-thread q) ;; TODO: optional exception handler + (make-thread + (lambda () + (let loop () + (with-handler + (lambda (e) + ;(write `(error ,e)) + (newline) + ) + ; default-handler ;; TODO: allow passing this in + (let ((thunk (shared-queue-remove! q))) + (thunk)) + ) + (loop) + )))) + +(define (make-thread-pool size) ;; TODO: optional exception handler + (let ((tp (%make-thread-pool (make-shared-queue) '() size))) + (do ((i size (- i 1))) + ((zero? i)) + (let ((t (%make-thread-pool-thread (tp:jobq tp)))) + (tp:set-threads! tp (cons t (tp:threads tp))) + (thread-start! t))) + (share-all!) + tp)) + +(define (thread-pool-size tp) + (shared-queue-size (tp:jobq tp))) + +(define (thread-pool-idling-count tp) + (shared-queue-wait-count (tp:jobq tp))) + +(define (thread-pool-idling? tp) + (> (thread-pool-idling-count tp) 0)) + +(define (thread-pool-push-task! tp thunk) + (shared-queue-add! (tp:jobq tp) (make-shared thunk))) + +; ?? - thread-pool-wait-all! + +;; END Thread Pool + ) ) diff --git a/libs/cyclone/test-shared-queue.scm b/libs/cyclone/test-shared-queue.scm index 182996ed..5c8fcf22 100644 --- a/libs/cyclone/test-shared-queue.scm +++ b/libs/cyclone/test-shared-queue.scm @@ -1,46 +1,45 @@ (import (scheme base) - ;(cyclone concurrent) + (cyclone concurrent) ;(srfi 18) - (shared-queue) (cyclone test)) (test-group "basic" - (define q (make-queue)) - (test "predicate" #t (queue? q)) - (test "empty" #t (queue-empty? q)) - (test "empty" 0 (queue-size q)) - (queue-add! q 'a) - (test "add a" #f (queue-empty? q)) - (test "add a" 1 (queue-size q)) - (queue-add! q 'b) - (test "add b" #f (queue-empty? q)) - (test "add b" 2 (queue-size q)) - (queue-add! q 'c) - (queue-add! q 'd) - (queue-add! q 'e) - (queue-add! q 'f) - (queue-add! q 'g) - (queue-add! q 'h) - (queue-add! q 'i) - (queue-add! q 'j) - (test "add many" #f (queue-empty? q)) - (test "add many" 10 (queue-size q)) - (test "remove" 'a (queue-remove! q)) - (test "remove a - size" 9 (queue-size q)) - (test "remove" 'b (queue-remove! q)) - (test "remove" 'c (queue-remove! q)) - (test "remove" 'd (queue-remove! q)) - (test "remove d - size" 6 (queue-size q)) + (define q (make-shared-queue)) + (test "predicate" #t (shared-queue? q)) + (test "empty" #t (shared-queue-empty? q)) + (test "empty" 0 (shared-queue-size q)) + (shared-queue-add! q 'a) + (test "add a" #f (shared-queue-empty? q)) + (test "add a" 1 (shared-queue-size q)) + (shared-queue-add! q 'b) + (test "add b" #f (shared-queue-empty? q)) + (test "add b" 2 (shared-queue-size q)) + (shared-queue-add! q 'c) + (shared-queue-add! q 'd) + (shared-queue-add! q 'e) + (shared-queue-add! q 'f) + (shared-queue-add! q 'g) + (shared-queue-add! q 'h) + (shared-queue-add! q 'i) + (shared-queue-add! q 'j) + (test "add many" #f (shared-queue-empty? q)) + (test "add many" 10 (shared-queue-size q)) + (test "remove" 'a (shared-queue-remove! q)) + (test "remove a - size" 9 (shared-queue-size q)) + (test "remove" 'b (shared-queue-remove! q)) + (test "remove" 'c (shared-queue-remove! q)) + (test "remove" 'd (shared-queue-remove! q)) + (test "remove d - size" 6 (shared-queue-size q)) - (queue-add! q 'a) - (queue-add! q 'b) - (queue-add! q 'c) - (queue-add! q 'd) - (test "add many back" 10 (queue-size q)) + (shared-queue-add! q 'a) + (shared-queue-add! q 'b) + (shared-queue-add! q 'c) + (shared-queue-add! q 'd) + (test "add many back" 10 (shared-queue-size q)) - (queue-clear! q) - (test "clear" 0 (queue-size q)) + (shared-queue-clear! q) + (test "clear" 0 (shared-queue-size q)) ) (test-exit)