WIP - thread pool

This commit is contained in:
Justin Ethier 2019-06-28 13:33:13 -04:00
parent 46c8809287
commit 84f46476b4

View file

@ -7,6 +7,7 @@
(import (import
(scheme base) (scheme base)
(srfi 18) (srfi 18)
(scheme write) ;; TODO: debugging only!
) )
(include-c-header "<ck_pr.h>") (include-c-header "<ck_pr.h>")
(export (export
@ -43,6 +44,7 @@
thread-pool-idling-count thread-pool-idling-count
thread-pool-idling? thread-pool-idling?
thread-pool-push-task! thread-pool-push-task!
;thread-pool-release!
;; Immutable objects ;; Immutable objects
immutable? immutable?
;; Shared objects ;; Shared objects
@ -389,28 +391,32 @@
(threads tp:threads tp:set-threads!) (threads tp:threads tp:set-threads!)
) )
(define (default-handler err) #f) (define (thread-pool-default-handler err)
(define (%make-thread-pool-thread q) ;; TODO: optional exception handler ;; TODO: why is this never being called??
(write "called default error handler") (newline)
#f)
(define (%make-thread-pool-thread q handler)
(make-thread (make-thread
(lambda () (lambda ()
(let loop () (let loop ()
(with-handler (with-handler
(lambda (e) handler
;(write `(error ,e))
(newline)
)
; default-handler ;; TODO: allow passing this in
(let ((thunk (shared-queue-remove! q))) (let ((thunk (shared-queue-remove! q)))
(thunk)) (thunk))
) )
(loop) (loop)
)))) ))))
(define (make-thread-pool size) ;; TODO: optional exception handler (define (make-thread-pool size . opts)
(let ((tp (%make-thread-pool (make-shared-queue) '() size))) (let ((tp (%make-thread-pool (make-shared-queue) '() size))
(handler (if (and (pair? opts)
(procedure? (car opts)))
(car opts)
thread-pool-default-handler)))
(do ((i size (- i 1))) (do ((i size (- i 1)))
((zero? i)) ((zero? i))
(let ((t (%make-thread-pool-thread (tp:jobq tp)))) (let ((t (%make-thread-pool-thread (tp:jobq tp) (make-shared handler))))
(tp:set-threads! tp (cons t (tp:threads tp))) (tp:set-threads! tp (cons t (tp:threads tp)))
(thread-start! t))) (thread-start! t)))
(share-all!) (share-all!)
@ -428,6 +434,9 @@
(define (thread-pool-push-task! tp thunk) (define (thread-pool-push-task! tp thunk)
(shared-queue-add! (tp:jobq tp) (make-shared thunk))) (shared-queue-add! (tp:jobq tp) (make-shared thunk)))
;; Stop all thread pool threads, effectively GC'ing the thread pool
;; TODO: (define (thread-pool-release! tp . opts) ;; opt is how - 'terminate (unsafe) / join(safe)
; ?? - thread-pool-wait-all! ; ?? - thread-pool-wait-all!
;; END Thread Pool ;; END Thread Pool