diff --git a/libs/cyclone/shared-queue.sld b/libs/cyclone/shared-queue.sld index 2d854be7..a30ef63f 100644 --- a/libs/cyclone/shared-queue.sld +++ b/libs/cyclone/shared-queue.sld @@ -31,13 +31,23 @@ queue-remove! queue-clear! queue-size + queue-wait-count queue-capacity queue-empty? + + make-thread-pool + thread-pool? + thread-pool-size + thread-pool-idling-count + thread-pool-idling? +; thread-pool-push-task! +; ;;thread-pool-wait-all! +; ;;thread-pool-release! ) (begin -(define *default-table-size* 4) ;; TODO: 64) +(define *default-table-size* 64) ;TODO: how will data structure work? ;probably want a circular queue, add at end and remove from start @@ -93,7 +103,7 @@ (define (queue-add! q obj) (mutex-lock! (q:lock q)) - (%queue-add! q obj) + (%queue-add! q (make-shared obj)) (mutex-unlock! (q:lock q)) (condition-variable-signal! (q:cv q)) ) @@ -147,6 +157,13 @@ (q:set-end! q 0) (mutex-unlock! (q:lock q))) +(define (queue-wait-count q) + (define result 0) + (mutex-lock! (q:lock q)) + (set! result (q:wait-count q)) + (mutex-unlock! (q:lock q)) + result) + ;; Return current length of the queue (define (queue-size q) (define result 0) @@ -176,4 +193,51 @@ result) ;- queue->list + + + + + + (define-record-type + (%make-thread-pool jobq threads num-threads ) + thread-pool? + (jobq tp:jobq tp-set-jobq!) + (threads tp:threads tp:set-threads!) + (num-threads tp:num-threads tp:set-num-threads!) + ) + +(define (default-handler err) #f) +(define (%make-thread-pool-thread q) ;; TODO: optional exception handler + (make-thread + (lambda () + (let loop () + (with-handler + default-handler ;; TODO: allow passing this in + (let ((thunk (queue-remove! q))) + (thunk)) + ))))) + +(define (make-thread-pool size) + (let ((tp (%make-thread-pool (make-queue) '() size))) + (do ((i size (- i 1))) + ((zero? i)) + (let ((t (%make-thread-pool-thread (tp:jobq tp)))) + (tp:set-threads! (cons t (tp:threads))) + (thread-start! t))) + (share-all!) + tp)) + +(define (thread-pool-size tp) + (queue-size (tp:jobq tp))) + +(define (thread-pool-idling-count tp) + (queue-wait-count (tp:jobq tp))) + +(define (thread-pool-idling? tp) + (> (thread-pool-idling-count tp) 0)) + +; TODO: thread-pool-push-task! + +; ?? - thread-pool-wait-all! + ))