mirror of
https://github.com/justinethier/cyclone.git
synced 2025-07-17 01:37:34 +02:00
Integrate SQ and TP operations
This commit is contained in:
parent
a18f0cc2b3
commit
46c8809287
2 changed files with 252 additions and 35 deletions
|
@ -25,6 +25,24 @@
|
||||||
future-call
|
future-call
|
||||||
future-deref
|
future-deref
|
||||||
future-done?
|
future-done?
|
||||||
|
;; Shared Queues
|
||||||
|
shared-queue?
|
||||||
|
make-shared-queue
|
||||||
|
shared-queue
|
||||||
|
shared-queue-add!
|
||||||
|
shared-queue-remove!
|
||||||
|
shared-queue-clear!
|
||||||
|
shared-queue-size
|
||||||
|
shared-queue-wait-count
|
||||||
|
shared-queue-capacity
|
||||||
|
shared-queue-empty?
|
||||||
|
;; Thread Pool
|
||||||
|
make-thread-pool
|
||||||
|
thread-pool?
|
||||||
|
thread-pool-size
|
||||||
|
thread-pool-idling-count
|
||||||
|
thread-pool-idling?
|
||||||
|
thread-pool-push-task!
|
||||||
;; Immutable objects
|
;; Immutable objects
|
||||||
immutable?
|
immutable?
|
||||||
;; Shared objects
|
;; Shared objects
|
||||||
|
@ -214,5 +232,205 @@
|
||||||
result))
|
result))
|
||||||
;; END Futures
|
;; END Futures
|
||||||
|
|
||||||
|
;; Shared Queues
|
||||||
|
;;
|
||||||
|
;; Each is a vector containing a circular buffer of objects that are intended
|
||||||
|
;; to be shared among many threads. All operations are locked and thread-safe,
|
||||||
|
;; and the queue will ensure any objects added are made into shared objects for
|
||||||
|
;; use by other threads.
|
||||||
|
;;
|
||||||
|
;; Removal from a queue is a blocking operation, so threads can easily wait for
|
||||||
|
;; new data to arrive.
|
||||||
|
(define *default-sq-table-size* 64)
|
||||||
|
|
||||||
|
(define-record-type <shared-queue>
|
||||||
|
(%make-shared-queue store start end wait-count lock cv)
|
||||||
|
shared-queue?
|
||||||
|
(store q:store q:set-store!)
|
||||||
|
(start q:start q:set-start!)
|
||||||
|
(end q:end q:set-end!)
|
||||||
|
(wait-count q:wait-count q:set-wait-count)
|
||||||
|
(lock q:lock q:set-lock!)
|
||||||
|
(cv q:cv q:set-cv!)
|
||||||
|
)
|
||||||
|
|
||||||
|
(define (make-shared-queue)
|
||||||
|
(make-shared
|
||||||
|
(%make-shared-queue
|
||||||
|
(make-vector *default-sq-table-size* #f)
|
||||||
|
0
|
||||||
|
0
|
||||||
|
0
|
||||||
|
(make-mutex)
|
||||||
|
(make-condition-variable)
|
||||||
|
)))
|
||||||
|
|
||||||
|
(define (shared-queue . elems)
|
||||||
|
(let ((q (make-shared-queue)))
|
||||||
|
(for-each
|
||||||
|
(lambda (elem)
|
||||||
|
(%shared-queue-add! q elem))
|
||||||
|
(reverse elems))))
|
||||||
|
|
||||||
|
;; Increment an index, possibly back around to the beginning of the queue
|
||||||
|
(define (inc index capacity)
|
||||||
|
(if (= index (- capacity 1))
|
||||||
|
0
|
||||||
|
(+ index 1)))
|
||||||
|
|
||||||
|
;; Inner add, assumes we already have the lock
|
||||||
|
(define (%shared-queue-add! q obj)
|
||||||
|
(vector-set! (q:store q) (q:end q) (make-shared obj))
|
||||||
|
(q:set-end! q (inc (q:end q) (vector-length (q:store q))))
|
||||||
|
(when (= (q:start q) (q:end q))
|
||||||
|
(%shared-queue-resize! q))
|
||||||
|
)
|
||||||
|
|
||||||
|
(define (shared-queue-add! q obj)
|
||||||
|
(mutex-lock! (q:lock q))
|
||||||
|
(%shared-queue-add! q (make-shared obj))
|
||||||
|
(mutex-unlock! (q:lock q))
|
||||||
|
(condition-variable-signal! (q:cv q))
|
||||||
|
)
|
||||||
|
|
||||||
|
(define (%shared-queue-resize! q)
|
||||||
|
;; (write "TODO: resize the queue")(newline)
|
||||||
|
;; TODO: assumes we already have the lock
|
||||||
|
;; TODO: error if size is larger than fixnum??
|
||||||
|
(let* ((old-start (q:start q))
|
||||||
|
(old-end (q:end q))
|
||||||
|
(old-store (q:store q))
|
||||||
|
(new-store (make-vector (* (vector-length old-store) 2) #f)))
|
||||||
|
(q:set-start! q 0)
|
||||||
|
(q:set-end! q 0)
|
||||||
|
(q:set-store! q new-store)
|
||||||
|
(let loop ((i 0)
|
||||||
|
(start old-start))
|
||||||
|
(when (not (= i (vector-length old-store)))
|
||||||
|
(%shared-queue-add! q (vector-ref old-store start))
|
||||||
|
(loop (+ i 1) (inc start (vector-length old-store))))))
|
||||||
|
)
|
||||||
|
|
||||||
|
;; Blocks if queue is empty (!)
|
||||||
|
;; should we have a failsafe if the same thread that is doing adds, then
|
||||||
|
;; does a blocking remove??
|
||||||
|
(define (shared-queue-remove! q)
|
||||||
|
(let loop ((waiting #f))
|
||||||
|
(mutex-lock! (q:lock q))
|
||||||
|
;; If thread was previously waiting, clear that status
|
||||||
|
(when waiting
|
||||||
|
(set! waiting #f)
|
||||||
|
(q:set-wait-count q (- (q:wait-count q) 1)))
|
||||||
|
|
||||||
|
(cond
|
||||||
|
((= (q:start q) (q:end q))
|
||||||
|
;; Let Q know we are waiting
|
||||||
|
(set! waiting #t)
|
||||||
|
(q:set-wait-count q (+ (q:wait-count q) 1))
|
||||||
|
;; Wait for CV, indicating data is ready
|
||||||
|
(mutex-unlock! (q:lock q) (q:cv q))
|
||||||
|
(loop waiting))
|
||||||
|
(else
|
||||||
|
(let ((result (vector-ref (q:store q) (q:start q))))
|
||||||
|
(q:set-start! q (inc (q:start q) (vector-length (q:store q))))
|
||||||
|
(mutex-unlock! (q:lock q))
|
||||||
|
result)))))
|
||||||
|
|
||||||
|
(define (shared-queue-clear! q)
|
||||||
|
(mutex-lock! (q:lock q))
|
||||||
|
(q:set-start! q 0)
|
||||||
|
(q:set-end! q 0)
|
||||||
|
(mutex-unlock! (q:lock q)))
|
||||||
|
|
||||||
|
(define (shared-queue-wait-count q)
|
||||||
|
(define result 0)
|
||||||
|
(mutex-lock! (q:lock q))
|
||||||
|
(set! result (q:wait-count q))
|
||||||
|
(mutex-unlock! (q:lock q))
|
||||||
|
result)
|
||||||
|
|
||||||
|
;; Return current length of the queue
|
||||||
|
(define (shared-queue-size q)
|
||||||
|
(define result 0)
|
||||||
|
(mutex-lock! (q:lock q))
|
||||||
|
(set! result (%shared-queue-size q))
|
||||||
|
(mutex-unlock! (q:lock q))
|
||||||
|
result)
|
||||||
|
|
||||||
|
(define (%shared-queue-size q)
|
||||||
|
(let ((start (q:start q))
|
||||||
|
(end (q:end q))
|
||||||
|
(capacity (vector-length (q:store q))))
|
||||||
|
(cond
|
||||||
|
((< end start) (+ (- capacity start) end))
|
||||||
|
((> end start) (- end start))
|
||||||
|
(else 0)))) ;; (= end start)
|
||||||
|
|
||||||
|
(define (shared-queue-empty? q)
|
||||||
|
(= 0 (shared-queue-size q)))
|
||||||
|
|
||||||
|
;; Return max size of the queue (until resize occurs)
|
||||||
|
(define (shared-queue-capacity q)
|
||||||
|
(define result 0)
|
||||||
|
(mutex-lock! (q:lock q))
|
||||||
|
(set! result (vector-length (q:store q)))
|
||||||
|
(mutex-unlock! (q:lock q))
|
||||||
|
result)
|
||||||
|
|
||||||
|
;- shared-queue->list
|
||||||
|
|
||||||
|
;; END Shared Queues
|
||||||
|
|
||||||
|
;; Thread Pool
|
||||||
|
(define-record-type <thread-pool>
|
||||||
|
(%make-thread-pool jobq threads )
|
||||||
|
thread-pool?
|
||||||
|
(jobq tp:jobq tp-set-jobq!)
|
||||||
|
(threads tp:threads tp:set-threads!)
|
||||||
|
)
|
||||||
|
|
||||||
|
(define (default-handler err) #f)
|
||||||
|
(define (%make-thread-pool-thread q) ;; TODO: optional exception handler
|
||||||
|
(make-thread
|
||||||
|
(lambda ()
|
||||||
|
(let loop ()
|
||||||
|
(with-handler
|
||||||
|
(lambda (e)
|
||||||
|
;(write `(error ,e))
|
||||||
|
(newline)
|
||||||
|
)
|
||||||
|
; default-handler ;; TODO: allow passing this in
|
||||||
|
(let ((thunk (shared-queue-remove! q)))
|
||||||
|
(thunk))
|
||||||
|
)
|
||||||
|
(loop)
|
||||||
|
))))
|
||||||
|
|
||||||
|
(define (make-thread-pool size) ;; TODO: optional exception handler
|
||||||
|
(let ((tp (%make-thread-pool (make-shared-queue) '() size)))
|
||||||
|
(do ((i size (- i 1)))
|
||||||
|
((zero? i))
|
||||||
|
(let ((t (%make-thread-pool-thread (tp:jobq tp))))
|
||||||
|
(tp:set-threads! tp (cons t (tp:threads tp)))
|
||||||
|
(thread-start! t)))
|
||||||
|
(share-all!)
|
||||||
|
tp))
|
||||||
|
|
||||||
|
(define (thread-pool-size tp)
|
||||||
|
(shared-queue-size (tp:jobq tp)))
|
||||||
|
|
||||||
|
(define (thread-pool-idling-count tp)
|
||||||
|
(shared-queue-wait-count (tp:jobq tp)))
|
||||||
|
|
||||||
|
(define (thread-pool-idling? tp)
|
||||||
|
(> (thread-pool-idling-count tp) 0))
|
||||||
|
|
||||||
|
(define (thread-pool-push-task! tp thunk)
|
||||||
|
(shared-queue-add! (tp:jobq tp) (make-shared thunk)))
|
||||||
|
|
||||||
|
; ?? - thread-pool-wait-all!
|
||||||
|
|
||||||
|
;; END Thread Pool
|
||||||
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,46 +1,45 @@
|
||||||
(import
|
(import
|
||||||
(scheme base)
|
(scheme base)
|
||||||
;(cyclone concurrent)
|
(cyclone concurrent)
|
||||||
;(srfi 18)
|
;(srfi 18)
|
||||||
(shared-queue)
|
|
||||||
(cyclone test))
|
(cyclone test))
|
||||||
|
|
||||||
(test-group "basic"
|
(test-group "basic"
|
||||||
(define q (make-queue))
|
(define q (make-shared-queue))
|
||||||
(test "predicate" #t (queue? q))
|
(test "predicate" #t (shared-queue? q))
|
||||||
(test "empty" #t (queue-empty? q))
|
(test "empty" #t (shared-queue-empty? q))
|
||||||
(test "empty" 0 (queue-size q))
|
(test "empty" 0 (shared-queue-size q))
|
||||||
(queue-add! q 'a)
|
(shared-queue-add! q 'a)
|
||||||
(test "add a" #f (queue-empty? q))
|
(test "add a" #f (shared-queue-empty? q))
|
||||||
(test "add a" 1 (queue-size q))
|
(test "add a" 1 (shared-queue-size q))
|
||||||
(queue-add! q 'b)
|
(shared-queue-add! q 'b)
|
||||||
(test "add b" #f (queue-empty? q))
|
(test "add b" #f (shared-queue-empty? q))
|
||||||
(test "add b" 2 (queue-size q))
|
(test "add b" 2 (shared-queue-size q))
|
||||||
(queue-add! q 'c)
|
(shared-queue-add! q 'c)
|
||||||
(queue-add! q 'd)
|
(shared-queue-add! q 'd)
|
||||||
(queue-add! q 'e)
|
(shared-queue-add! q 'e)
|
||||||
(queue-add! q 'f)
|
(shared-queue-add! q 'f)
|
||||||
(queue-add! q 'g)
|
(shared-queue-add! q 'g)
|
||||||
(queue-add! q 'h)
|
(shared-queue-add! q 'h)
|
||||||
(queue-add! q 'i)
|
(shared-queue-add! q 'i)
|
||||||
(queue-add! q 'j)
|
(shared-queue-add! q 'j)
|
||||||
(test "add many" #f (queue-empty? q))
|
(test "add many" #f (shared-queue-empty? q))
|
||||||
(test "add many" 10 (queue-size q))
|
(test "add many" 10 (shared-queue-size q))
|
||||||
(test "remove" 'a (queue-remove! q))
|
(test "remove" 'a (shared-queue-remove! q))
|
||||||
(test "remove a - size" 9 (queue-size q))
|
(test "remove a - size" 9 (shared-queue-size q))
|
||||||
(test "remove" 'b (queue-remove! q))
|
(test "remove" 'b (shared-queue-remove! q))
|
||||||
(test "remove" 'c (queue-remove! q))
|
(test "remove" 'c (shared-queue-remove! q))
|
||||||
(test "remove" 'd (queue-remove! q))
|
(test "remove" 'd (shared-queue-remove! q))
|
||||||
(test "remove d - size" 6 (queue-size q))
|
(test "remove d - size" 6 (shared-queue-size q))
|
||||||
|
|
||||||
(queue-add! q 'a)
|
(shared-queue-add! q 'a)
|
||||||
(queue-add! q 'b)
|
(shared-queue-add! q 'b)
|
||||||
(queue-add! q 'c)
|
(shared-queue-add! q 'c)
|
||||||
(queue-add! q 'd)
|
(shared-queue-add! q 'd)
|
||||||
(test "add many back" 10 (queue-size q))
|
(test "add many back" 10 (shared-queue-size q))
|
||||||
|
|
||||||
(queue-clear! q)
|
(shared-queue-clear! q)
|
||||||
(test "clear" 0 (queue-size q))
|
(test "clear" 0 (shared-queue-size q))
|
||||||
)
|
)
|
||||||
|
|
||||||
(test-exit)
|
(test-exit)
|
||||||
|
|
Loading…
Add table
Reference in a new issue