Index: mozilla/nsprpub/pr/src/misc/prtpool.c |
=================================================================== |
--- mozilla/nsprpub/pr/src/misc/prtpool.c (revision 191424) |
+++ mozilla/nsprpub/pr/src/misc/prtpool.c (working copy) |
@@ -1,1187 +0,0 @@ |
-/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ |
-/* This Source Code Form is subject to the terms of the Mozilla Public |
- * License, v. 2.0. If a copy of the MPL was not distributed with this |
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
- |
-#include "nspr.h" |
- |
-/* |
- * Thread pools |
- * Thread pools create and manage threads to provide support for |
- * scheduling jobs onto one or more threads. |
- * |
- */ |
-#ifdef OPT_WINNT |
-#include <windows.h> |
-#endif |
- |
-/* |
- * worker thread |
- */ |
-typedef struct wthread { |
- PRCList links; |
- PRThread *thread; |
-} wthread; |
- |
-/* |
- * queue of timer jobs |
- */ |
-typedef struct timer_jobq { |
- PRCList list; |
- PRLock *lock; |
- PRCondVar *cv; |
- PRInt32 cnt; |
- PRCList wthreads; |
-} timer_jobq; |
- |
-/* |
- * queue of jobs |
- */ |
-typedef struct tp_jobq { |
- PRCList list; |
- PRInt32 cnt; |
- PRLock *lock; |
- PRCondVar *cv; |
- PRCList wthreads; |
-#ifdef OPT_WINNT |
- HANDLE nt_completion_port; |
-#endif |
-} tp_jobq; |
- |
-/* |
- * queue of IO jobs |
- */ |
-typedef struct io_jobq { |
- PRCList list; |
- PRPollDesc *pollfds; |
- PRInt32 npollfds; |
- PRJob **polljobs; |
- PRLock *lock; |
- PRInt32 cnt; |
- PRFileDesc *notify_fd; |
- PRCList wthreads; |
-} io_jobq; |
- |
-/* |
- * Threadpool |
- */ |
-struct PRThreadPool { |
- PRInt32 init_threads; |
- PRInt32 max_threads; |
- PRInt32 current_threads; |
- PRInt32 idle_threads; |
- PRUint32 stacksize; |
- tp_jobq jobq; |
- io_jobq ioq; |
- timer_jobq timerq; |
- PRLock *join_lock; /* used with jobp->join_cv */ |
- PRCondVar *shutdown_cv; |
- PRBool shutdown; |
-}; |
- |
-typedef enum io_op_type |
- { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; |
- |
-#ifdef OPT_WINNT |
-typedef struct NT_notifier { |
- OVERLAPPED overlapped; /* must be first */ |
- PRJob *jobp; |
-} NT_notifier; |
-#endif |
- |
-struct PRJob { |
- PRCList links; /* for linking jobs */ |
- PRBool on_ioq; /* job on ioq */ |
- PRBool on_timerq; /* job on timerq */ |
- PRJobFn job_func; |
- void *job_arg; |
- PRCondVar *join_cv; |
- PRBool join_wait; /* == PR_TRUE, when waiting to join */ |
- PRCondVar *cancel_cv; /* for cancelling IO jobs */ |
- PRBool cancel_io; /* for cancelling IO jobs */ |
- PRThreadPool *tpool; /* back pointer to thread pool */ |
- PRJobIoDesc *iod; |
- io_op_type io_op; |
- PRInt16 io_poll_flags; |
- PRNetAddr *netaddr; |
- PRIntervalTime timeout; /* relative value */ |
- PRIntervalTime absolute; |
-#ifdef OPT_WINNT |
- NT_notifier nt_notifier; |
-#endif |
-}; |
- |
-#define JOB_LINKS_PTR(_qp) \ |
- ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links))) |
- |
-#define WTHREAD_LINKS_PTR(_qp) \ |
- ((wthread *) ((char *) (_qp) - offsetof(wthread, links))) |
- |
-#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv) |
- |
-#define JOIN_NOTIFY(_jobp) \ |
- PR_BEGIN_MACRO \ |
- PR_Lock(_jobp->tpool->join_lock); \ |
- _jobp->join_wait = PR_FALSE; \ |
- PR_NotifyCondVar(_jobp->join_cv); \ |
- PR_Unlock(_jobp->tpool->join_lock); \ |
- PR_END_MACRO |
- |
-#define CANCEL_IO_JOB(jobp) \ |
- PR_BEGIN_MACRO \ |
- jobp->cancel_io = PR_FALSE; \ |
- jobp->on_ioq = PR_FALSE; \ |
- PR_REMOVE_AND_INIT_LINK(&jobp->links); \ |
- tp->ioq.cnt--; \ |
- PR_NotifyCondVar(jobp->cancel_cv); \ |
- PR_END_MACRO |
- |
-static void delete_job(PRJob *jobp); |
-static PRThreadPool * alloc_threadpool(void); |
-static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp); |
-static void notify_ioq(PRThreadPool *tp); |
-static void notify_timerq(PRThreadPool *tp); |
- |
-/* |
- * locks are acquired in the following order |
- * |
- * tp->ioq.lock,tp->timerq.lock |
- * | |
- * V |
- * tp->jobq->lock |
- */ |
- |
-/* |
- * worker thread function |
- */ |
-static void wstart(void *arg) |
-{ |
-PRThreadPool *tp = (PRThreadPool *) arg; |
-PRCList *head; |
- |
- /* |
- * execute jobs until shutdown |
- */ |
- while (!tp->shutdown) { |
- PRJob *jobp; |
-#ifdef OPT_WINNT |
- BOOL rv; |
- DWORD unused, shutdown; |
- LPOVERLAPPED olp; |
- |
- PR_Lock(tp->jobq.lock); |
- tp->idle_threads++; |
- PR_Unlock(tp->jobq.lock); |
- rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, |
- &unused, &shutdown, &olp, INFINITE); |
- |
- PR_ASSERT(rv); |
- if (shutdown) |
- break; |
- jobp = ((NT_notifier *) olp)->jobp; |
- PR_Lock(tp->jobq.lock); |
- tp->idle_threads--; |
- tp->jobq.cnt--; |
- PR_Unlock(tp->jobq.lock); |
-#else |
- |
- PR_Lock(tp->jobq.lock); |
- while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { |
- tp->idle_threads++; |
- PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); |
- tp->idle_threads--; |
- } |
- if (tp->shutdown) { |
- PR_Unlock(tp->jobq.lock); |
- break; |
- } |
- head = PR_LIST_HEAD(&tp->jobq.list); |
- /* |
- * remove job from queue |
- */ |
- PR_REMOVE_AND_INIT_LINK(head); |
- tp->jobq.cnt--; |
- jobp = JOB_LINKS_PTR(head); |
- PR_Unlock(tp->jobq.lock); |
-#endif |
- |
- jobp->job_func(jobp->job_arg); |
- if (!JOINABLE_JOB(jobp)) { |
- delete_job(jobp); |
- } else { |
- JOIN_NOTIFY(jobp); |
- } |
- } |
- PR_Lock(tp->jobq.lock); |
- tp->current_threads--; |
- PR_Unlock(tp->jobq.lock); |
-} |
- |
-/* |
- * add a job to the work queue |
- */ |
-static void |
-add_to_jobq(PRThreadPool *tp, PRJob *jobp) |
-{ |
- /* |
- * add to jobq |
- */ |
-#ifdef OPT_WINNT |
- PR_Lock(tp->jobq.lock); |
- tp->jobq.cnt++; |
- PR_Unlock(tp->jobq.lock); |
- /* |
- * notify worker thread(s) |
- */ |
- PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, |
- FALSE, &jobp->nt_notifier.overlapped); |
-#else |
- PR_Lock(tp->jobq.lock); |
- PR_APPEND_LINK(&jobp->links,&tp->jobq.list); |
- tp->jobq.cnt++; |
- if ((tp->idle_threads < tp->jobq.cnt) && |
- (tp->current_threads < tp->max_threads)) { |
- wthread *wthrp; |
- /* |
- * increment thread count and unlock the jobq lock |
- */ |
- tp->current_threads++; |
- PR_Unlock(tp->jobq.lock); |
- /* create new worker thread */ |
- wthrp = PR_NEWZAP(wthread); |
- if (wthrp) { |
- wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, |
- tp, PR_PRIORITY_NORMAL, |
- PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); |
- if (NULL == wthrp->thread) { |
- PR_DELETE(wthrp); /* this sets wthrp to NULL */ |
- } |
- } |
- PR_Lock(tp->jobq.lock); |
- if (NULL == wthrp) { |
- tp->current_threads--; |
- } else { |
- PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); |
- } |
- } |
- /* |
- * wakeup a worker thread |
- */ |
- PR_NotifyCondVar(tp->jobq.cv); |
- PR_Unlock(tp->jobq.lock); |
-#endif |
-} |
- |
-/* |
- * io worker thread function |
- */ |
-static void io_wstart(void *arg) |
-{ |
-PRThreadPool *tp = (PRThreadPool *) arg; |
-int pollfd_cnt, pollfds_used; |
-int rv; |
-PRCList *qp, *nextqp; |
-PRPollDesc *pollfds; |
-PRJob **polljobs; |
-int poll_timeout; |
-PRIntervalTime now; |
- |
- /* |
- * scan io_jobq |
- * construct poll list |
- * call PR_Poll |
- * for all fds, for which poll returns true, move the job to |
- * jobq and wakeup worker thread. |
- */ |
- while (!tp->shutdown) { |
- PRJob *jobp; |
- |
- pollfd_cnt = tp->ioq.cnt + 10; |
- if (pollfd_cnt > tp->ioq.npollfds) { |
- |
- /* |
- * re-allocate pollfd array if the current one is not large |
- * enough |
- */ |
- if (NULL != tp->ioq.pollfds) |
- PR_Free(tp->ioq.pollfds); |
- tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * |
- (sizeof(PRPollDesc) + sizeof(PRJob *))); |
- PR_ASSERT(NULL != tp->ioq.pollfds); |
- /* |
- * array of pollfds |
- */ |
- pollfds = tp->ioq.pollfds; |
- tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); |
- /* |
- * parallel array of jobs |
- */ |
- polljobs = tp->ioq.polljobs; |
- tp->ioq.npollfds = pollfd_cnt; |
- } |
- |
- pollfds_used = 0; |
- /* |
- * add the notify fd; used for unblocking io thread(s) |
- */ |
- pollfds[pollfds_used].fd = tp->ioq.notify_fd; |
- pollfds[pollfds_used].in_flags = PR_POLL_READ; |
- pollfds[pollfds_used].out_flags = 0; |
- polljobs[pollfds_used] = NULL; |
- pollfds_used++; |
- /* |
- * fill in the pollfd array |
- */ |
- PR_Lock(tp->ioq.lock); |
- for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { |
- nextqp = qp->next; |
- jobp = JOB_LINKS_PTR(qp); |
- if (jobp->cancel_io) { |
- CANCEL_IO_JOB(jobp); |
- continue; |
- } |
- if (pollfds_used == (pollfd_cnt)) |
- break; |
- pollfds[pollfds_used].fd = jobp->iod->socket; |
- pollfds[pollfds_used].in_flags = jobp->io_poll_flags; |
- pollfds[pollfds_used].out_flags = 0; |
- polljobs[pollfds_used] = jobp; |
- |
- pollfds_used++; |
- } |
- if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { |
- qp = tp->ioq.list.next; |
- jobp = JOB_LINKS_PTR(qp); |
- if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) |
- poll_timeout = PR_INTERVAL_NO_TIMEOUT; |
- else if (PR_INTERVAL_NO_WAIT == jobp->timeout) |
- poll_timeout = PR_INTERVAL_NO_WAIT; |
- else { |
- poll_timeout = jobp->absolute - PR_IntervalNow(); |
- if (poll_timeout <= 0) /* already timed out */ |
- poll_timeout = PR_INTERVAL_NO_WAIT; |
- } |
- } else { |
- poll_timeout = PR_INTERVAL_NO_TIMEOUT; |
- } |
- PR_Unlock(tp->ioq.lock); |
- |
- /* |
- * XXXX |
- * should retry if more jobs have been added to the queue? |
- * |
- */ |
- PR_ASSERT(pollfds_used <= pollfd_cnt); |
- rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); |
- |
- if (tp->shutdown) { |
- break; |
- } |
- |
- if (rv > 0) { |
- /* |
- * at least one io event is set |
- */ |
- PRStatus rval_status; |
- PRInt32 index; |
- |
- PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); |
- /* |
- * reset the pollable event, if notified |
- */ |
- if (pollfds[0].out_flags & PR_POLL_READ) { |
- rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); |
- PR_ASSERT(PR_SUCCESS == rval_status); |
- } |
- |
- for(index = 1; index < (pollfds_used); index++) { |
- PRInt16 events = pollfds[index].in_flags; |
- PRInt16 revents = pollfds[index].out_flags; |
- jobp = polljobs[index]; |
- |
- if ((revents & PR_POLL_NVAL) || /* busted in all cases */ |
- (revents & PR_POLL_ERR) || |
- ((events & PR_POLL_WRITE) && |
- (revents & PR_POLL_HUP))) { /* write op & hup */ |
- PR_Lock(tp->ioq.lock); |
- if (jobp->cancel_io) { |
- CANCEL_IO_JOB(jobp); |
- PR_Unlock(tp->ioq.lock); |
- continue; |
- } |
- PR_REMOVE_AND_INIT_LINK(&jobp->links); |
- tp->ioq.cnt--; |
- jobp->on_ioq = PR_FALSE; |
- PR_Unlock(tp->ioq.lock); |
- |
- /* set error */ |
- if (PR_POLL_NVAL & revents) |
- jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; |
- else if (PR_POLL_HUP & revents) |
- jobp->iod->error = PR_CONNECT_RESET_ERROR; |
- else |
- jobp->iod->error = PR_IO_ERROR; |
- |
- /* |
- * add to jobq |
- */ |
- add_to_jobq(tp, jobp); |
- } else if (revents) { |
- /* |
- * add to jobq |
- */ |
- PR_Lock(tp->ioq.lock); |
- if (jobp->cancel_io) { |
- CANCEL_IO_JOB(jobp); |
- PR_Unlock(tp->ioq.lock); |
- continue; |
- } |
- PR_REMOVE_AND_INIT_LINK(&jobp->links); |
- tp->ioq.cnt--; |
- jobp->on_ioq = PR_FALSE; |
- PR_Unlock(tp->ioq.lock); |
- |
- if (jobp->io_op == JOB_IO_CONNECT) { |
- if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) |
- jobp->iod->error = 0; |
- else |
- jobp->iod->error = PR_GetError(); |
- } else |
- jobp->iod->error = 0; |
- |
- add_to_jobq(tp, jobp); |
- } |
- } |
- } |
- /* |
- * timeout processing |
- */ |
- now = PR_IntervalNow(); |
- PR_Lock(tp->ioq.lock); |
- for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { |
- nextqp = qp->next; |
- jobp = JOB_LINKS_PTR(qp); |
- if (jobp->cancel_io) { |
- CANCEL_IO_JOB(jobp); |
- continue; |
- } |
- if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) |
- break; |
- if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && |
- ((PRInt32)(jobp->absolute - now) > 0)) |
- break; |
- PR_REMOVE_AND_INIT_LINK(&jobp->links); |
- tp->ioq.cnt--; |
- jobp->on_ioq = PR_FALSE; |
- jobp->iod->error = PR_IO_TIMEOUT_ERROR; |
- add_to_jobq(tp, jobp); |
- } |
- PR_Unlock(tp->ioq.lock); |
- } |
-} |
- |
-/* |
- * timer worker thread function |
- */ |
-static void timer_wstart(void *arg) |
-{ |
-PRThreadPool *tp = (PRThreadPool *) arg; |
-PRCList *qp; |
-PRIntervalTime timeout; |
-PRIntervalTime now; |
- |
- /* |
- * call PR_WaitCondVar with minimum value of all timeouts |
- */ |
- while (!tp->shutdown) { |
- PRJob *jobp; |
- |
- PR_Lock(tp->timerq.lock); |
- if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { |
- timeout = PR_INTERVAL_NO_TIMEOUT; |
- } else { |
- PRCList *qp; |
- |
- qp = tp->timerq.list.next; |
- jobp = JOB_LINKS_PTR(qp); |
- |
- timeout = jobp->absolute - PR_IntervalNow(); |
- if (timeout <= 0) |
- timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ |
- } |
- if (PR_INTERVAL_NO_WAIT != timeout) |
- PR_WaitCondVar(tp->timerq.cv, timeout); |
- if (tp->shutdown) { |
- PR_Unlock(tp->timerq.lock); |
- break; |
- } |
- /* |
- * move expired-timer jobs to jobq |
- */ |
- now = PR_IntervalNow(); |
- while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { |
- qp = tp->timerq.list.next; |
- jobp = JOB_LINKS_PTR(qp); |
- |
- if ((PRInt32)(jobp->absolute - now) > 0) { |
- break; |
- } |
- /* |
- * job timed out |
- */ |
- PR_REMOVE_AND_INIT_LINK(&jobp->links); |
- tp->timerq.cnt--; |
- jobp->on_timerq = PR_FALSE; |
- add_to_jobq(tp, jobp); |
- } |
- PR_Unlock(tp->timerq.lock); |
- } |
-} |
- |
-static void |
-delete_threadpool(PRThreadPool *tp) |
-{ |
- if (NULL != tp) { |
- if (NULL != tp->shutdown_cv) |
- PR_DestroyCondVar(tp->shutdown_cv); |
- if (NULL != tp->jobq.cv) |
- PR_DestroyCondVar(tp->jobq.cv); |
- if (NULL != tp->jobq.lock) |
- PR_DestroyLock(tp->jobq.lock); |
- if (NULL != tp->join_lock) |
- PR_DestroyLock(tp->join_lock); |
-#ifdef OPT_WINNT |
- if (NULL != tp->jobq.nt_completion_port) |
- CloseHandle(tp->jobq.nt_completion_port); |
-#endif |
- /* Timer queue */ |
- if (NULL != tp->timerq.cv) |
- PR_DestroyCondVar(tp->timerq.cv); |
- if (NULL != tp->timerq.lock) |
- PR_DestroyLock(tp->timerq.lock); |
- |
- if (NULL != tp->ioq.lock) |
- PR_DestroyLock(tp->ioq.lock); |
- if (NULL != tp->ioq.pollfds) |
- PR_Free(tp->ioq.pollfds); |
- if (NULL != tp->ioq.notify_fd) |
- PR_DestroyPollableEvent(tp->ioq.notify_fd); |
- PR_Free(tp); |
- } |
- return; |
-} |
- |
-static PRThreadPool * |
-alloc_threadpool(void) |
-{ |
-PRThreadPool *tp; |
- |
- tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp)); |
- if (NULL == tp) |
- goto failed; |
- tp->jobq.lock = PR_NewLock(); |
- if (NULL == tp->jobq.lock) |
- goto failed; |
- tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); |
- if (NULL == tp->jobq.cv) |
- goto failed; |
- tp->join_lock = PR_NewLock(); |
- if (NULL == tp->join_lock) |
- goto failed; |
-#ifdef OPT_WINNT |
- tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, |
- NULL, 0, 0); |
- if (NULL == tp->jobq.nt_completion_port) |
- goto failed; |
-#endif |
- |
- tp->ioq.lock = PR_NewLock(); |
- if (NULL == tp->ioq.lock) |
- goto failed; |
- |
- /* Timer queue */ |
- |
- tp->timerq.lock = PR_NewLock(); |
- if (NULL == tp->timerq.lock) |
- goto failed; |
- tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); |
- if (NULL == tp->timerq.cv) |
- goto failed; |
- |
- tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); |
- if (NULL == tp->shutdown_cv) |
- goto failed; |
- tp->ioq.notify_fd = PR_NewPollableEvent(); |
- if (NULL == tp->ioq.notify_fd) |
- goto failed; |
- return tp; |
-failed: |
- delete_threadpool(tp); |
- PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); |
- return NULL; |
-} |
- |
-/* Create thread pool */ |
-PR_IMPLEMENT(PRThreadPool *) |
-PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, |
- PRUint32 stacksize) |
-{ |
-PRThreadPool *tp; |
-PRThread *thr; |
-int i; |
-wthread *wthrp; |
- |
- tp = alloc_threadpool(); |
- if (NULL == tp) |
- return NULL; |
- |
- tp->init_threads = initial_threads; |
- tp->max_threads = max_threads; |
- tp->stacksize = stacksize; |
- PR_INIT_CLIST(&tp->jobq.list); |
- PR_INIT_CLIST(&tp->ioq.list); |
- PR_INIT_CLIST(&tp->timerq.list); |
- PR_INIT_CLIST(&tp->jobq.wthreads); |
- PR_INIT_CLIST(&tp->ioq.wthreads); |
- PR_INIT_CLIST(&tp->timerq.wthreads); |
- tp->shutdown = PR_FALSE; |
- |
- PR_Lock(tp->jobq.lock); |
- for(i=0; i < initial_threads; ++i) { |
- |
- thr = PR_CreateThread(PR_USER_THREAD, wstart, |
- tp, PR_PRIORITY_NORMAL, |
- PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); |
- PR_ASSERT(thr); |
- wthrp = PR_NEWZAP(wthread); |
- PR_ASSERT(wthrp); |
- wthrp->thread = thr; |
- PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); |
- } |
- tp->current_threads = initial_threads; |
- |
- thr = PR_CreateThread(PR_USER_THREAD, io_wstart, |
- tp, PR_PRIORITY_NORMAL, |
- PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); |
- PR_ASSERT(thr); |
- wthrp = PR_NEWZAP(wthread); |
- PR_ASSERT(wthrp); |
- wthrp->thread = thr; |
- PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); |
- |
- thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, |
- tp, PR_PRIORITY_NORMAL, |
- PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); |
- PR_ASSERT(thr); |
- wthrp = PR_NEWZAP(wthread); |
- PR_ASSERT(wthrp); |
- wthrp->thread = thr; |
- PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); |
- |
- PR_Unlock(tp->jobq.lock); |
- return tp; |
-} |
- |
-static void |
-delete_job(PRJob *jobp) |
-{ |
- if (NULL != jobp) { |
- if (NULL != jobp->join_cv) { |
- PR_DestroyCondVar(jobp->join_cv); |
- jobp->join_cv = NULL; |
- } |
- if (NULL != jobp->cancel_cv) { |
- PR_DestroyCondVar(jobp->cancel_cv); |
- jobp->cancel_cv = NULL; |
- } |
- PR_DELETE(jobp); |
- } |
-} |
- |
-static PRJob * |
-alloc_job(PRBool joinable, PRThreadPool *tp) |
-{ |
- PRJob *jobp; |
- |
- jobp = PR_NEWZAP(PRJob); |
- if (NULL == jobp) |
- goto failed; |
- if (joinable) { |
- jobp->join_cv = PR_NewCondVar(tp->join_lock); |
- jobp->join_wait = PR_TRUE; |
- if (NULL == jobp->join_cv) |
- goto failed; |
- } else { |
- jobp->join_cv = NULL; |
- } |
-#ifdef OPT_WINNT |
- jobp->nt_notifier.jobp = jobp; |
-#endif |
- return jobp; |
-failed: |
- delete_job(jobp); |
- PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); |
- return NULL; |
-} |
- |
-/* queue a job */ |
-PR_IMPLEMENT(PRJob *) |
-PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable) |
-{ |
- PRJob *jobp; |
- |
- jobp = alloc_job(joinable, tpool); |
- if (NULL == jobp) |
- return NULL; |
- |
- jobp->job_func = fn; |
- jobp->job_arg = arg; |
- jobp->tpool = tpool; |
- |
- add_to_jobq(tpool, jobp); |
- return jobp; |
-} |
- |
-/* queue a job, when a socket is readable or writeable */ |
-static PRJob * |
-queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, |
- PRBool joinable, io_op_type op) |
-{ |
- PRJob *jobp; |
- PRIntervalTime now; |
- |
- jobp = alloc_job(joinable, tpool); |
- if (NULL == jobp) { |
- return NULL; |
- } |
- |
- /* |
- * Add a new job to io_jobq |
- * wakeup io worker thread |
- */ |
- |
- jobp->job_func = fn; |
- jobp->job_arg = arg; |
- jobp->tpool = tpool; |
- jobp->iod = iod; |
- if (JOB_IO_READ == op) { |
- jobp->io_op = JOB_IO_READ; |
- jobp->io_poll_flags = PR_POLL_READ; |
- } else if (JOB_IO_WRITE == op) { |
- jobp->io_op = JOB_IO_WRITE; |
- jobp->io_poll_flags = PR_POLL_WRITE; |
- } else if (JOB_IO_ACCEPT == op) { |
- jobp->io_op = JOB_IO_ACCEPT; |
- jobp->io_poll_flags = PR_POLL_READ; |
- } else if (JOB_IO_CONNECT == op) { |
- jobp->io_op = JOB_IO_CONNECT; |
- jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT; |
- } else { |
- delete_job(jobp); |
- PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); |
- return NULL; |
- } |
- |
- jobp->timeout = iod->timeout; |
- if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || |
- (PR_INTERVAL_NO_WAIT == iod->timeout)) { |
- jobp->absolute = iod->timeout; |
- } else { |
- now = PR_IntervalNow(); |
- jobp->absolute = now + iod->timeout; |
- } |
- |
- |
- PR_Lock(tpool->ioq.lock); |
- |
- if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || |
- (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { |
- PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); |
- } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { |
- PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); |
- } else { |
- PRCList *qp; |
- PRJob *tmp_jobp; |
- /* |
- * insert into the timeout-sorted ioq |
- */ |
- for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; |
- qp = qp->prev) { |
- tmp_jobp = JOB_LINKS_PTR(qp); |
- if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { |
- break; |
- } |
- } |
- PR_INSERT_AFTER(&jobp->links,qp); |
- } |
- |
- jobp->on_ioq = PR_TRUE; |
- tpool->ioq.cnt++; |
- /* |
- * notify io worker thread(s) |
- */ |
- PR_Unlock(tpool->ioq.lock); |
- notify_ioq(tpool); |
- return jobp; |
-} |
- |
-/* queue a job, when a socket is readable */ |
-PR_IMPLEMENT(PRJob *) |
-PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, |
- PRBool joinable) |
-{ |
- return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); |
-} |
- |
-/* queue a job, when a socket is writeable */ |
-PR_IMPLEMENT(PRJob *) |
-PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg, |
- PRBool joinable) |
-{ |
- return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); |
-} |
- |
- |
-/* queue a job, when a socket has a pending connection */ |
-PR_IMPLEMENT(PRJob *) |
-PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, |
- void * arg, PRBool joinable) |
-{ |
- return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); |
-} |
- |
-/* queue a job, when a socket can be connected */ |
-PR_IMPLEMENT(PRJob *) |
-PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, |
- const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable) |
-{ |
- PRStatus rv; |
- PRErrorCode err; |
- |
- rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); |
- if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){ |
- /* connection pending */ |
- return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); |
- } else { |
- /* |
- * connection succeeded or failed; add to jobq right away |
- */ |
- if (rv == PR_FAILURE) |
- iod->error = err; |
- else |
- iod->error = 0; |
- return(PR_QueueJob(tpool, fn, arg, joinable)); |
- } |
-} |
- |
-/* queue a job, when a timer expires */ |
-PR_IMPLEMENT(PRJob *) |
-PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, |
- PRJobFn fn, void * arg, PRBool joinable) |
-{ |
- PRIntervalTime now; |
- PRJob *jobp; |
- |
- if (PR_INTERVAL_NO_TIMEOUT == timeout) { |
- PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); |
- return NULL; |
- } |
- if (PR_INTERVAL_NO_WAIT == timeout) { |
- /* |
- * no waiting; add to jobq right away |
- */ |
- return(PR_QueueJob(tpool, fn, arg, joinable)); |
- } |
- jobp = alloc_job(joinable, tpool); |
- if (NULL == jobp) { |
- return NULL; |
- } |
- |
- /* |
- * Add a new job to timer_jobq |
- * wakeup timer worker thread |
- */ |
- |
- jobp->job_func = fn; |
- jobp->job_arg = arg; |
- jobp->tpool = tpool; |
- jobp->timeout = timeout; |
- |
- now = PR_IntervalNow(); |
- jobp->absolute = now + timeout; |
- |
- |
- PR_Lock(tpool->timerq.lock); |
- jobp->on_timerq = PR_TRUE; |
- if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) |
- PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); |
- else { |
- PRCList *qp; |
- PRJob *tmp_jobp; |
- /* |
- * insert into the sorted timer jobq |
- */ |
- for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; |
- qp = qp->prev) { |
- tmp_jobp = JOB_LINKS_PTR(qp); |
- if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { |
- break; |
- } |
- } |
- PR_INSERT_AFTER(&jobp->links,qp); |
- } |
- tpool->timerq.cnt++; |
- /* |
- * notify timer worker thread(s) |
- */ |
- notify_timerq(tpool); |
- PR_Unlock(tpool->timerq.lock); |
- return jobp; |
-} |
- |
-static void |
-notify_timerq(PRThreadPool *tp) |
-{ |
- /* |
- * wakeup the timer thread(s) |
- */ |
- PR_NotifyCondVar(tp->timerq.cv); |
-} |
- |
-static void |
-notify_ioq(PRThreadPool *tp) |
-{ |
-PRStatus rval_status; |
- |
- /* |
- * wakeup the io thread(s) |
- */ |
- rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); |
- PR_ASSERT(PR_SUCCESS == rval_status); |
-} |
- |
-/* |
- * cancel a job |
- * |
- * XXXX: is this needed? likely to be removed |
- */ |
-PR_IMPLEMENT(PRStatus) |
-PR_CancelJob(PRJob *jobp) { |
- |
- PRStatus rval = PR_FAILURE; |
- PRThreadPool *tp; |
- |
- if (jobp->on_timerq) { |
- /* |
- * now, check again while holding the timerq lock |
- */ |
- tp = jobp->tpool; |
- PR_Lock(tp->timerq.lock); |
- if (jobp->on_timerq) { |
- jobp->on_timerq = PR_FALSE; |
- PR_REMOVE_AND_INIT_LINK(&jobp->links); |
- tp->timerq.cnt--; |
- PR_Unlock(tp->timerq.lock); |
- if (!JOINABLE_JOB(jobp)) { |
- delete_job(jobp); |
- } else { |
- JOIN_NOTIFY(jobp); |
- } |
- rval = PR_SUCCESS; |
- } else |
- PR_Unlock(tp->timerq.lock); |
- } else if (jobp->on_ioq) { |
- /* |
- * now, check again while holding the ioq lock |
- */ |
- tp = jobp->tpool; |
- PR_Lock(tp->ioq.lock); |
- if (jobp->on_ioq) { |
- jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); |
- if (NULL == jobp->cancel_cv) { |
- PR_Unlock(tp->ioq.lock); |
- PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); |
- return PR_FAILURE; |
- } |
- /* |
- * mark job 'cancelled' and notify io thread(s) |
- * XXXX: |
- * this assumes there is only one io thread; when there |
- * are multiple threads, the io thread processing this job |
- * must be notified. |
- */ |
- jobp->cancel_io = PR_TRUE; |
- PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ |
- notify_ioq(tp); |
- PR_Lock(tp->ioq.lock); |
- while (jobp->cancel_io) |
- PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); |
- PR_Unlock(tp->ioq.lock); |
- PR_ASSERT(!jobp->on_ioq); |
- if (!JOINABLE_JOB(jobp)) { |
- delete_job(jobp); |
- } else { |
- JOIN_NOTIFY(jobp); |
- } |
- rval = PR_SUCCESS; |
- } else |
- PR_Unlock(tp->ioq.lock); |
- } |
- if (PR_FAILURE == rval) |
- PR_SetError(PR_INVALID_STATE_ERROR, 0); |
- return rval; |
-} |
- |
-/* join a job, wait until completion */ |
-PR_IMPLEMENT(PRStatus) |
-PR_JoinJob(PRJob *jobp) |
-{ |
- if (!JOINABLE_JOB(jobp)) { |
- PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); |
- return PR_FAILURE; |
- } |
- PR_Lock(jobp->tpool->join_lock); |
- while(jobp->join_wait) |
- PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); |
- PR_Unlock(jobp->tpool->join_lock); |
- delete_job(jobp); |
- return PR_SUCCESS; |
-} |
- |
-/* shutdown threadpool */ |
-PR_IMPLEMENT(PRStatus) |
-PR_ShutdownThreadPool(PRThreadPool *tpool) |
-{ |
-PRStatus rval = PR_SUCCESS; |
- |
- PR_Lock(tpool->jobq.lock); |
- tpool->shutdown = PR_TRUE; |
- PR_NotifyAllCondVar(tpool->shutdown_cv); |
- PR_Unlock(tpool->jobq.lock); |
- |
- return rval; |
-} |
- |
-/* |
- * join thread pool |
- * wait for termination of worker threads |
- * reclaim threadpool resources |
- */ |
-PR_IMPLEMENT(PRStatus) |
-PR_JoinThreadPool(PRThreadPool *tpool) |
-{ |
-PRStatus rval = PR_SUCCESS; |
-PRCList *head; |
-PRStatus rval_status; |
- |
- PR_Lock(tpool->jobq.lock); |
- while (!tpool->shutdown) |
- PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); |
- |
- /* |
- * wakeup worker threads |
- */ |
-#ifdef OPT_WINNT |
- /* |
- * post shutdown notification for all threads |
- */ |
- { |
- int i; |
- for(i=0; i < tpool->current_threads; i++) { |
- PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, |
- TRUE, NULL); |
- } |
- } |
-#else |
- PR_NotifyAllCondVar(tpool->jobq.cv); |
-#endif |
- |
- /* |
- * wakeup io thread(s) |
- */ |
- notify_ioq(tpool); |
- |
- /* |
- * wakeup timer thread(s) |
- */ |
- PR_Lock(tpool->timerq.lock); |
- notify_timerq(tpool); |
- PR_Unlock(tpool->timerq.lock); |
- |
- while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { |
- wthread *wthrp; |
- |
- head = PR_LIST_HEAD(&tpool->jobq.wthreads); |
- PR_REMOVE_AND_INIT_LINK(head); |
- PR_Unlock(tpool->jobq.lock); |
- wthrp = WTHREAD_LINKS_PTR(head); |
- rval_status = PR_JoinThread(wthrp->thread); |
- PR_ASSERT(PR_SUCCESS == rval_status); |
- PR_DELETE(wthrp); |
- PR_Lock(tpool->jobq.lock); |
- } |
- PR_Unlock(tpool->jobq.lock); |
- while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { |
- wthread *wthrp; |
- |
- head = PR_LIST_HEAD(&tpool->ioq.wthreads); |
- PR_REMOVE_AND_INIT_LINK(head); |
- wthrp = WTHREAD_LINKS_PTR(head); |
- rval_status = PR_JoinThread(wthrp->thread); |
- PR_ASSERT(PR_SUCCESS == rval_status); |
- PR_DELETE(wthrp); |
- } |
- |
- while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { |
- wthread *wthrp; |
- |
- head = PR_LIST_HEAD(&tpool->timerq.wthreads); |
- PR_REMOVE_AND_INIT_LINK(head); |
- wthrp = WTHREAD_LINKS_PTR(head); |
- rval_status = PR_JoinThread(wthrp->thread); |
- PR_ASSERT(PR_SUCCESS == rval_status); |
- PR_DELETE(wthrp); |
- } |
- |
- /* |
- * Delete queued jobs |
- */ |
- while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { |
- PRJob *jobp; |
- |
- head = PR_LIST_HEAD(&tpool->jobq.list); |
- PR_REMOVE_AND_INIT_LINK(head); |
- jobp = JOB_LINKS_PTR(head); |
- tpool->jobq.cnt--; |
- delete_job(jobp); |
- } |
- |
- /* delete io jobs */ |
- while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { |
- PRJob *jobp; |
- |
- head = PR_LIST_HEAD(&tpool->ioq.list); |
- PR_REMOVE_AND_INIT_LINK(head); |
- tpool->ioq.cnt--; |
- jobp = JOB_LINKS_PTR(head); |
- delete_job(jobp); |
- } |
- |
- /* delete timer jobs */ |
- while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { |
- PRJob *jobp; |
- |
- head = PR_LIST_HEAD(&tpool->timerq.list); |
- PR_REMOVE_AND_INIT_LINK(head); |
- tpool->timerq.cnt--; |
- jobp = JOB_LINKS_PTR(head); |
- delete_job(jobp); |
- } |
- |
- PR_ASSERT(0 == tpool->jobq.cnt); |
- PR_ASSERT(0 == tpool->ioq.cnt); |
- PR_ASSERT(0 == tpool->timerq.cnt); |
- |
- delete_threadpool(tpool); |
- return rval; |
-} |