Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(175)

Side by Side Diff: nspr/pr/src/io/prmwait.c

Issue 2078763002: Delete bundled copy of NSS and replace with README. (Closed) Base URL: https://chromium.googlesource.com/chromium/deps/nss@master
Patch Set: Delete bundled copy of NSS and replace with README. Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « nspr/pr/src/io/prmmap.c ('k') | nspr/pr/src/io/prpolevt.c » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6 #include "primpl.h"
7 #include "pprmwait.h"
8
9 #define _MW_REHASH_MAX 11
10
11 static PRLock *mw_lock = NULL;
12 static _PRGlobalState *mw_state = NULL;
13
14 static PRIntervalTime max_polling_interval;
15
16 #ifdef WINNT
17
18 typedef struct TimerEvent {
19 PRIntervalTime absolute;
20 void (*func)(void *);
21 void *arg;
22 LONG ref_count;
23 PRCList links;
24 } TimerEvent;
25
26 #define TIMER_EVENT_PTR(_qp) \
27 ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
28
29 struct {
30 PRLock *ml;
31 PRCondVar *new_timer;
32 PRCondVar *cancel_timer;
33 PRThread *manager_thread;
34 PRCList timer_queue;
35 } tm_vars;
36
37 static PRStatus TimerInit(void);
38 static void TimerManager(void *arg);
39 static TimerEvent *CreateTimer(PRIntervalTime timeout,
40 void (*func)(void *), void *arg);
41 static PRBool CancelTimer(TimerEvent *timer);
42
43 static void TimerManager(void *arg)
44 {
45 PRIntervalTime now;
46 PRIntervalTime timeout;
47 PRCList *head;
48 TimerEvent *timer;
49
50 PR_Lock(tm_vars.ml);
51 while (1)
52 {
53 if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue))
54 {
55 PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
56 }
57 else
58 {
59 now = PR_IntervalNow();
60 head = PR_LIST_HEAD(&tm_vars.timer_queue);
61 timer = TIMER_EVENT_PTR(head);
62 if ((PRInt32) (now - timer->absolute) >= 0)
63 {
64 PR_REMOVE_LINK(head);
65 /*
66 * make its prev and next point to itself so that
67 * it's obvious that it's not on the timer_queue.
68 */
69 PR_INIT_CLIST(head);
70 PR_ASSERT(2 == timer->ref_count);
71 PR_Unlock(tm_vars.ml);
72 timer->func(timer->arg);
73 PR_Lock(tm_vars.ml);
74 timer->ref_count -= 1;
75 if (0 == timer->ref_count)
76 {
77 PR_NotifyAllCondVar(tm_vars.cancel_timer);
78 }
79 }
80 else
81 {
82 timeout = (PRIntervalTime)(timer->absolute - now);
83 PR_WaitCondVar(tm_vars.new_timer, timeout);
84 }
85 }
86 }
87 PR_Unlock(tm_vars.ml);
88 }
89
90 static TimerEvent *CreateTimer(
91 PRIntervalTime timeout,
92 void (*func)(void *),
93 void *arg)
94 {
95 TimerEvent *timer;
96 PRCList *links, *tail;
97 TimerEvent *elem;
98
99 timer = PR_NEW(TimerEvent);
100 if (NULL == timer)
101 {
102 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
103 return timer;
104 }
105 timer->absolute = PR_IntervalNow() + timeout;
106 timer->func = func;
107 timer->arg = arg;
108 timer->ref_count = 2;
109 PR_Lock(tm_vars.ml);
110 tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
111 while (links->prev != tail)
112 {
113 elem = TIMER_EVENT_PTR(links);
114 if ((PRInt32)(timer->absolute - elem->absolute) >= 0)
115 {
116 break;
117 }
118 links = links->prev;
119 }
120 PR_INSERT_AFTER(&timer->links, links);
121 PR_NotifyCondVar(tm_vars.new_timer);
122 PR_Unlock(tm_vars.ml);
123 return timer;
124 }
125
126 static PRBool CancelTimer(TimerEvent *timer)
127 {
128 PRBool canceled = PR_FALSE;
129
130 PR_Lock(tm_vars.ml);
131 timer->ref_count -= 1;
132 if (timer->links.prev == &timer->links)
133 {
134 while (timer->ref_count == 1)
135 {
136 PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
137 }
138 }
139 else
140 {
141 PR_REMOVE_LINK(&timer->links);
142 canceled = PR_TRUE;
143 }
144 PR_Unlock(tm_vars.ml);
145 PR_DELETE(timer);
146 return canceled;
147 }
148
149 static PRStatus TimerInit(void)
150 {
151 tm_vars.ml = PR_NewLock();
152 if (NULL == tm_vars.ml)
153 {
154 goto failed;
155 }
156 tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
157 if (NULL == tm_vars.new_timer)
158 {
159 goto failed;
160 }
161 tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
162 if (NULL == tm_vars.cancel_timer)
163 {
164 goto failed;
165 }
166 PR_INIT_CLIST(&tm_vars.timer_queue);
167 tm_vars.manager_thread = PR_CreateThread(
168 PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
169 PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
170 if (NULL == tm_vars.manager_thread)
171 {
172 goto failed;
173 }
174 return PR_SUCCESS;
175
176 failed:
177 if (NULL != tm_vars.cancel_timer)
178 {
179 PR_DestroyCondVar(tm_vars.cancel_timer);
180 }
181 if (NULL != tm_vars.new_timer)
182 {
183 PR_DestroyCondVar(tm_vars.new_timer);
184 }
185 if (NULL != tm_vars.ml)
186 {
187 PR_DestroyLock(tm_vars.ml);
188 }
189 return PR_FAILURE;
190 }
191
192 #endif /* WINNT */
193
194 /******************************************************************/
195 /******************************************************************/
196 /************************ The private portion *********************/
197 /******************************************************************/
198 /******************************************************************/
199 void _PR_InitMW(void)
200 {
201 #ifdef WINNT
202 /*
203 * We use NT 4's InterlockedCompareExchange() to operate
204 * on PRMWStatus variables.
205 */
206 PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
207 TimerInit();
208 #endif
209 mw_lock = PR_NewLock();
210 PR_ASSERT(NULL != mw_lock);
211 mw_state = PR_NEWZAP(_PRGlobalState);
212 PR_ASSERT(NULL != mw_state);
213 PR_INIT_CLIST(&mw_state->group_list);
214 max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
215 } /* _PR_InitMW */
216
217 void _PR_CleanupMW(void)
218 {
219 PR_DestroyLock(mw_lock);
220 mw_lock = NULL;
221 if (mw_state->group) {
222 PR_DestroyWaitGroup(mw_state->group);
223 /* mw_state->group is set to NULL as a side effect. */
224 }
225 PR_DELETE(mw_state);
226 } /* _PR_CleanupMW */
227
228 static PRWaitGroup *MW_Init2(void)
229 {
230 PRWaitGroup *group = mw_state->group; /* it's the null group */
231 if (NULL == group) /* there is this special case */
232 {
233 group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
234 if (NULL == group) goto failed_alloc;
235 PR_Lock(mw_lock);
236 if (NULL == mw_state->group)
237 {
238 mw_state->group = group;
239 group = NULL;
240 }
241 PR_Unlock(mw_lock);
242 if (group != NULL) (void)PR_DestroyWaitGroup(group);
243 group = mw_state->group; /* somebody beat us to it */
244 }
245 failed_alloc:
246 return group; /* whatever */
247 } /* MW_Init2 */
248
249 static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
250 {
251 /*
252 ** The entries are put in the table using the fd (PRFileDesc*) of
253 ** the receive descriptor as the key. This allows us to locate
254 ** the appropriate entry aqain when the poll operation finishes.
255 **
256 ** The pointer to the file descriptor object is first divided by
257 ** the natural alignment of a pointer in the belief that object
258 ** will have at least that many zeros in the low order bits.
259 ** This may not be a good assuption.
260 **
261 ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
262 ** that we declare defeat and force the table to be reconstructed.
263 ** Since some fds might be added more than once, won't that cause
264 ** collisions even in an empty table?
265 */
266 PRIntn rehash = _MW_REHASH_MAX;
267 PRRecvWait **waiter;
268 PRUintn hidx = _MW_HASH(desc->fd, hash->length);
269 PRUintn hoffset = 0;
270
271 while (rehash-- > 0)
272 {
273 waiter = &hash->recv_wait;
274 if (NULL == waiter[hidx])
275 {
276 waiter[hidx] = desc;
277 hash->count += 1;
278 #if 0
279 printf("Adding 0x%x->0x%x ", desc, desc->fd);
280 printf(
281 "table[%u:%u:*%u]: 0x%x->0x%x\n",
282 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd) ;
283 #endif
284 return _prmw_success;
285 }
286 if (desc == waiter[hidx])
287 {
288 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */
289 return _prmw_error;
290 }
291 #if 0
292 printf("Failing 0x%x->0x%x ", desc, desc->fd);
293 printf(
294 "table[*%u:%u:%u]: 0x%x->0x%x\n",
295 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
296 #endif
297 if (0 == hoffset)
298 {
299 hoffset = _MW_HASH2(desc->fd, hash->length);
300 PR_ASSERT(0 != hoffset);
301 }
302 hidx = (hidx + hoffset) % (hash->length);
303 }
304 return _prmw_rehash;
305 } /* MW_AddHashInternal */
306
307 static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
308 {
309 PRRecvWait **desc;
310 PRUint32 pidx, length;
311 _PRWaiterHash *newHash, *oldHash = group->waiter;
312 PRBool retry;
313 _PR_HashStory hrv;
314
315 static const PRInt32 prime_number[] = {
316 _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
317 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771};
318 PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
319
320 /* look up the next size we'd like to use for the hash table */
321 for (pidx = 0; pidx < primes; ++pidx)
322 {
323 if (prime_number[pidx] == oldHash->length)
324 {
325 break;
326 }
327 }
328 /* table size must be one of the prime numbers */
329 PR_ASSERT(pidx < primes);
330
331 /* if pidx == primes - 1, we can't expand the table any more */
332 while (pidx < primes - 1)
333 {
334 /* next size */
335 ++pidx;
336 length = prime_number[pidx];
337
338 /* allocate the new hash table and fill it in with the old */
339 newHash = (_PRWaiterHash*)PR_CALLOC(
340 sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
341 if (NULL == newHash)
342 {
343 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
344 return _prmw_error;
345 }
346
347 newHash->length = length;
348 retry = PR_FALSE;
349 for (desc = &oldHash->recv_wait;
350 newHash->count < oldHash->count; ++desc)
351 {
352 PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
353 if (NULL != *desc)
354 {
355 hrv = MW_AddHashInternal(*desc, newHash);
356 PR_ASSERT(_prmw_error != hrv);
357 if (_prmw_success != hrv)
358 {
359 PR_DELETE(newHash);
360 retry = PR_TRUE;
361 break;
362 }
363 }
364 }
365 if (retry) continue;
366
367 PR_DELETE(group->waiter);
368 group->waiter = newHash;
369 group->p_timestamp += 1;
370 return _prmw_success;
371 }
372
373 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
374 return _prmw_error; /* we're hosed */
375 } /* MW_ExpandHashInternal */
376
377 #ifndef WINNT
378 static void _MW_DoneInternal(
379 PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
380 {
381 /*
382 ** Add this receive wait object to the list of finished I/O
383 ** operations for this particular group. If there are other
384 ** threads waiting on the group, notify one. If not, arrange
385 ** for this thread to return.
386 */
387
388 #if 0
389 printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
390 #endif
391 (*waiter)->outcome = outcome;
392 PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
393 PR_NotifyCondVar(group->io_complete);
394 PR_ASSERT(0 != group->waiter->count);
395 group->waiter->count -= 1;
396 *waiter = NULL;
397 } /* _MW_DoneInternal */
398 #endif /* WINNT */
399
400 static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
401 {
402 /*
403 ** Find the receive wait object corresponding to the file descriptor.
404 ** Only search the wait group specified.
405 */
406 PRRecvWait **desc;
407 PRIntn rehash = _MW_REHASH_MAX;
408 _PRWaiterHash *hash = group->waiter;
409 PRUintn hidx = _MW_HASH(fd, hash->length);
410 PRUintn hoffset = 0;
411
412 while (rehash-- > 0)
413 {
414 desc = (&hash->recv_wait) + hidx;
415 if ((*desc != NULL) && ((*desc)->fd == fd)) return desc;
416 if (0 == hoffset)
417 {
418 hoffset = _MW_HASH2(fd, hash->length);
419 PR_ASSERT(0 != hoffset);
420 }
421 hidx = (hidx + hoffset) % (hash->length);
422 }
423 return NULL;
424 } /* _MW_LookupInternal */
425
426 #ifndef WINNT
427 static PRStatus _MW_PollInternal(PRWaitGroup *group)
428 {
429 PRRecvWait **waiter;
430 PRStatus rv = PR_FAILURE;
431 PRInt32 count, count_ready;
432 PRIntervalTime polling_interval;
433
434 group->poller = PR_GetCurrentThread();
435
436 while (PR_TRUE)
437 {
438 PRIntervalTime now, since_last_poll;
439 PRPollDesc *poll_list;
440
441 while (0 == group->waiter->count)
442 {
443 PRStatus st;
444 st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
445 if (_prmw_running != group->state)
446 {
447 PR_SetError(PR_INVALID_STATE_ERROR, 0);
448 goto aborted;
449 }
450 if (_MW_ABORTED(st)) goto aborted;
451 }
452
453 /*
454 ** There's something to do. See if our existing polling list
455 ** is large enough for what we have to do?
456 */
457
458 while (group->polling_count < group->waiter->count)
459 {
460 PRUint32 old_count = group->waiter->count;
461 PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
462 PRSize new_size = sizeof(PRPollDesc) * new_count;
463 PRPollDesc *old_polling_list = group->polling_list;
464
465 PR_Unlock(group->ml);
466 poll_list = (PRPollDesc*)PR_CALLOC(new_size);
467 if (NULL == poll_list)
468 {
469 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
470 PR_Lock(group->ml);
471 goto failed_alloc;
472 }
473 if (NULL != old_polling_list)
474 PR_DELETE(old_polling_list);
475 PR_Lock(group->ml);
476 if (_prmw_running != group->state)
477 {
478 PR_DELETE(poll_list);
479 PR_SetError(PR_INVALID_STATE_ERROR, 0);
480 goto aborted;
481 }
482 group->polling_list = poll_list;
483 group->polling_count = new_count;
484 }
485
486 now = PR_IntervalNow();
487 polling_interval = max_polling_interval;
488 since_last_poll = now - group->last_poll;
489
490 waiter = &group->waiter->recv_wait;
491 poll_list = group->polling_list;
492 for (count = 0; count < group->waiter->count; ++waiter)
493 {
494 PR_ASSERT(waiter < &group->waiter->recv_wait
495 + group->waiter->length);
496 if (NULL != *waiter) /* a live one! */
497 {
498 if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
499 && (since_last_poll >= (*waiter)->timeout))
500 _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
501 else
502 {
503 if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
504 {
505 (*waiter)->timeout -= since_last_poll;
506 if ((*waiter)->timeout < polling_interval)
507 polling_interval = (*waiter)->timeout;
508 }
509 PR_ASSERT(poll_list < group->polling_list
510 + group->polling_count);
511 poll_list->fd = (*waiter)->fd;
512 poll_list->in_flags = PR_POLL_READ;
513 poll_list->out_flags = 0;
514 #if 0
515 printf(
516 "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
517 poll_list, count, poll_list->fd, (*waiter)->timeout);
518 #endif
519 poll_list += 1;
520 count += 1;
521 }
522 }
523 }
524
525 PR_ASSERT(count == group->waiter->count);
526
527 /*
528 ** If there are no more threads waiting for completion,
529 ** we need to return.
530 */
531 if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
532 && (1 == group->waiting_threads)) break;
533
534 if (0 == count) continue; /* wait for new business */
535
536 group->last_poll = now;
537
538 PR_Unlock(group->ml);
539
540 count_ready = PR_Poll(group->polling_list, count, polling_interval);
541
542 PR_Lock(group->ml);
543
544 if (_prmw_running != group->state)
545 {
546 PR_SetError(PR_INVALID_STATE_ERROR, 0);
547 goto aborted;
548 }
549 if (-1 == count_ready)
550 {
551 goto failed_poll; /* that's a shame */
552 }
553 else if (0 < count_ready)
554 {
555 for (poll_list = group->polling_list; count > 0;
556 poll_list++, count--)
557 {
558 PR_ASSERT(
559 poll_list < group->polling_list + group->polling_count);
560 if (poll_list->out_flags != 0)
561 {
562 waiter = _MW_LookupInternal(group, poll_list->fd);
563 /*
564 ** If 'waiter' is NULL, that means the wait receive
565 ** descriptor has been canceled.
566 */
567 if (NULL != waiter)
568 _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
569 }
570 }
571 }
572 /*
573 ** If there are no more threads waiting for completion,
574 ** we need to return.
575 ** This thread was "borrowed" to do the polling, but it really
576 ** belongs to the client.
577 */
578 if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
579 && (1 == group->waiting_threads)) break;
580 }
581
582 rv = PR_SUCCESS;
583
584 aborted:
585 failed_poll:
586 failed_alloc:
587 group->poller = NULL; /* we were that, not we ain't */
588 if ((_prmw_running == group->state) && (group->waiting_threads > 1))
589 {
590 /* Wake up one thread to become the new poller. */
591 PR_NotifyCondVar(group->io_complete);
592 }
593 return rv; /* we return with the lock held */
594 } /* _MW_PollInternal */
595 #endif /* !WINNT */
596
597 static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
598 {
599 PRMWGroupState rv = group->state;
600 /*
601 ** Looking at the group's fields is safe because
602 ** once the group's state is no longer running, it
603 ** cannot revert and there is a safe check on entry
604 ** to make sure no more threads are made to wait.
605 */
606 if ((_prmw_stopping == rv)
607 && (0 == group->waiting_threads))
608 {
609 rv = group->state = _prmw_stopped;
610 PR_NotifyCondVar(group->mw_manage);
611 }
612 return rv;
613 } /* MW_TestForShutdownInternal */
614
615 #ifndef WINNT
616 static void _MW_InitialRecv(PRCList *io_ready)
617 {
618 PRRecvWait *desc = (PRRecvWait*)io_ready;
619 if ((NULL == desc->buffer.start)
620 || (0 == desc->buffer.length))
621 desc->bytesRecv = 0;
622 else
623 {
624 desc->bytesRecv = (desc->fd->methods->recv)(
625 desc->fd, desc->buffer.start,
626 desc->buffer.length, 0, desc->timeout);
627 if (desc->bytesRecv < 0) /* SetError should already be there */
628 desc->outcome = PR_MW_FAILURE;
629 }
630 } /* _MW_InitialRecv */
631 #endif
632
633 #ifdef WINNT
634 static void NT_TimeProc(void *arg)
635 {
636 _MDOverlapped *overlapped = (_MDOverlapped *)arg;
637 PRRecvWait *desc = overlapped->data.mw.desc;
638 PRFileDesc *bottom;
639
640 if (InterlockedCompareExchange((LONG *)&desc->outcome,
641 (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING)
642 {
643 /* This wait recv descriptor has already completed. */
644 return;
645 }
646
647 /* close the osfd to abort the outstanding async io request */
648 /* $$$$
649 ** Little late to be checking if NSPR's on the bottom of stack,
650 ** but if we don't check, we can't assert that the private data
651 ** is what we think it is.
652 ** $$$$
653 */
654 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
655 PR_ASSERT(NULL != bottom);
656 if (NULL != bottom) /* now what!?!?! */
657 {
658 bottom->secret->state = _PR_FILEDESC_CLOSED;
659 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
660 {
661 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
662 PR_NOT_REACHED("What shall I do?");
663 }
664 }
665 return;
666 } /* NT_TimeProc */
667
668 static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd)
669 {
670 PRRecvWait **waiter;
671
672 _PR_MD_LOCK(&group->mdlock);
673 waiter = _MW_LookupInternal(group, fd);
674 if (NULL != waiter)
675 {
676 group->waiter->count -= 1;
677 *waiter = NULL;
678 }
679 _PR_MD_UNLOCK(&group->mdlock);
680 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
681 }
682
683 PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd)
684 {
685 PRRecvWait **waiter;
686
687 waiter = _MW_LookupInternal(group, fd);
688 if (NULL != waiter)
689 {
690 group->waiter->count -= 1;
691 *waiter = NULL;
692 }
693 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
694 }
695 #endif /* WINNT */
696
697 /******************************************************************/
698 /******************************************************************/
699 /********************** The public API portion ********************/
700 /******************************************************************/
701 /******************************************************************/
702 PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
703 PRWaitGroup *group, PRRecvWait *desc)
704 {
705 _PR_HashStory hrv;
706 PRStatus rv = PR_FAILURE;
707 #ifdef WINNT
708 _MDOverlapped *overlapped;
709 HANDLE hFile;
710 BOOL bResult;
711 DWORD dwError;
712 PRFileDesc *bottom;
713 #endif
714
715 if (!_pr_initialized) _PR_ImplicitInitialization();
716 if ((NULL == group) && (NULL == (group = MW_Init2())))
717 {
718 return rv;
719 }
720
721 PR_ASSERT(NULL != desc->fd);
722
723 desc->outcome = PR_MW_PENDING; /* nice, well known value */
724 desc->bytesRecv = 0; /* likewise, though this value is ambiguious */
725
726 PR_Lock(group->ml);
727
728 if (_prmw_running != group->state)
729 {
730 /* Not allowed to add after cancelling the group */
731 desc->outcome = PR_MW_INTERRUPT;
732 PR_SetError(PR_INVALID_STATE_ERROR, 0);
733 PR_Unlock(group->ml);
734 return rv;
735 }
736
737 #ifdef WINNT
738 _PR_MD_LOCK(&group->mdlock);
739 #endif
740
741 /*
742 ** If the waiter count is zero at this point, there's no telling
743 ** how long we've been idle. Therefore, initialize the beginning
744 ** of the timing interval. As long as the list doesn't go empty,
745 ** it will maintain itself.
746 */
747 if (0 == group->waiter->count)
748 group->last_poll = PR_IntervalNow();
749
750 do
751 {
752 hrv = MW_AddHashInternal(desc, group->waiter);
753 if (_prmw_rehash != hrv) break;
754 hrv = MW_ExpandHashInternal(group); /* gruesome */
755 if (_prmw_success != hrv) break;
756 } while (PR_TRUE);
757
758 #ifdef WINNT
759 _PR_MD_UNLOCK(&group->mdlock);
760 #endif
761
762 PR_NotifyCondVar(group->new_business); /* tell the world */
763 rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
764 PR_Unlock(group->ml);
765
766 #ifdef WINNT
767 overlapped = PR_NEWZAP(_MDOverlapped);
768 if (NULL == overlapped)
769 {
770 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
771 NT_HashRemove(group, desc->fd);
772 return rv;
773 }
774 overlapped->ioModel = _MD_MultiWaitIO;
775 overlapped->data.mw.desc = desc;
776 overlapped->data.mw.group = group;
777 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
778 {
779 overlapped->data.mw.timer = CreateTimer(
780 desc->timeout,
781 NT_TimeProc,
782 overlapped);
783 if (0 == overlapped->data.mw.timer)
784 {
785 NT_HashRemove(group, desc->fd);
786 PR_DELETE(overlapped);
787 /*
788 * XXX It appears that a maximum of 16 timer events can
789 * be outstanding. GetLastError() returns 0 when I try it.
790 */
791 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
792 return PR_FAILURE;
793 }
794 }
795
796 /* Reach to the bottom layer to get the OS fd */
797 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
798 PR_ASSERT(NULL != bottom);
799 if (NULL == bottom)
800 {
801 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
802 return PR_FAILURE;
803 }
804 hFile = (HANDLE)bottom->secret->md.osfd;
805 if (!bottom->secret->md.io_model_committed)
806 {
807 PRInt32 st;
808 st = _md_Associate(hFile);
809 PR_ASSERT(0 != st);
810 bottom->secret->md.io_model_committed = PR_TRUE;
811 }
812 bResult = ReadFile(hFile,
813 desc->buffer.start,
814 (DWORD)desc->buffer.length,
815 NULL,
816 &overlapped->overlapped);
817 if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)
818 {
819 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
820 {
821 if (InterlockedCompareExchange((LONG *)&desc->outcome,
822 (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING)
823 == (LONG)PR_MW_PENDING)
824 {
825 CancelTimer(overlapped->data.mw.timer);
826 }
827 NT_HashRemove(group, desc->fd);
828 PR_DELETE(overlapped);
829 }
830 _PR_MD_MAP_READ_ERROR(dwError);
831 rv = PR_FAILURE;
832 }
833 #endif
834
835 return rv;
836 } /* PR_AddWaitFileDesc */
837
838 PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
839 {
840 PRCList *io_ready = NULL;
841 #ifdef WINNT
842 PRThread *me = _PR_MD_CURRENT_THREAD();
843 _MDOverlapped *overlapped;
844 #endif
845
846 if (!_pr_initialized) _PR_ImplicitInitialization();
847 if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init;
848
849 PR_Lock(group->ml);
850
851 if (_prmw_running != group->state)
852 {
853 PR_SetError(PR_INVALID_STATE_ERROR, 0);
854 goto invalid_state;
855 }
856
857 group->waiting_threads += 1; /* the polling thread is counted */
858
859 #ifdef WINNT
860 _PR_MD_LOCK(&group->mdlock);
861 while (PR_CLIST_IS_EMPTY(&group->io_ready))
862 {
863 _PR_THREAD_LOCK(me);
864 me->state = _PR_IO_WAIT;
865 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
866 if (!_PR_IS_NATIVE_THREAD(me))
867 {
868 _PR_SLEEPQ_LOCK(me->cpu);
869 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
870 _PR_SLEEPQ_UNLOCK(me->cpu);
871 }
872 _PR_THREAD_UNLOCK(me);
873 _PR_MD_UNLOCK(&group->mdlock);
874 PR_Unlock(group->ml);
875 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
876 me->state = _PR_RUNNING;
877 PR_Lock(group->ml);
878 _PR_MD_LOCK(&group->mdlock);
879 if (_PR_PENDING_INTERRUPT(me)) {
880 PR_REMOVE_LINK(&me->waitQLinks);
881 _PR_MD_UNLOCK(&group->mdlock);
882 me->flags &= ~_PR_INTERRUPT;
883 me->io_suspended = PR_FALSE;
884 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
885 goto aborted;
886 }
887 }
888 io_ready = PR_LIST_HEAD(&group->io_ready);
889 PR_ASSERT(io_ready != NULL);
890 PR_REMOVE_LINK(io_ready);
891 _PR_MD_UNLOCK(&group->mdlock);
892 overlapped = (_MDOverlapped *)
893 ((char *)io_ready - offsetof(_MDOverlapped, data));
894 io_ready = &overlapped->data.mw.desc->internal;
895 #else
896 do
897 {
898 /*
899 ** If the I/O ready list isn't empty, have this thread
900 ** return with the first receive wait object that's available.
901 */
902 if (PR_CLIST_IS_EMPTY(&group->io_ready))
903 {
904 /*
905 ** Is there a polling thread yet? If not, grab this thread
906 ** and use it.
907 */
908 if (NULL == group->poller)
909 {
910 /*
911 ** This thread will stay do polling until it becomes the only on e
912 ** left to service a completion. Then it will return and there w ill
913 ** be none left to actually poll or to run completions.
914 **
915 ** The polling function should only return w/ failure or
916 ** with some I/O ready.
917 */
918 if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll;
919 }
920 else
921 {
922 /*
923 ** There are four reasons a thread can be awakened from
924 ** a wait on the io_complete condition variable.
925 ** 1. Some I/O has completed, i.e., the io_ready list
926 ** is nonempty.
927 ** 2. The wait group is canceled.
928 ** 3. The thread is interrupted.
929 ** 4. The current polling thread has to leave and needs
930 ** a replacement.
931 ** The logic to find a new polling thread is made more
932 ** complicated by all the other possible events.
933 ** I tried my best to write the logic clearly, but
934 ** it is still full of if's with continue and goto.
935 */
936 PRStatus st;
937 do
938 {
939 st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEO UT);
940 if (_prmw_running != group->state)
941 {
942 PR_SetError(PR_INVALID_STATE_ERROR, 0);
943 goto aborted;
944 }
945 if (_MW_ABORTED(st) || (NULL == group->poller)) break;
946 } while (PR_CLIST_IS_EMPTY(&group->io_ready));
947
948 /*
949 ** The thread is interrupted and has to leave. It might
950 ** have also been awakened to process ready i/o or be the
951 ** new poller. To be safe, if either condition is true,
952 ** we awaken another thread to take its place.
953 */
954 if (_MW_ABORTED(st))
955 {
956 if ((NULL == group->poller
957 || !PR_CLIST_IS_EMPTY(&group->io_ready))
958 && group->waiting_threads > 1)
959 PR_NotifyCondVar(group->io_complete);
960 goto aborted;
961 }
962
963 /*
964 ** A new poller is needed, but can I be the new poller?
965 ** If there is no i/o ready, sure. But if there is any
966 ** i/o ready, it has a higher priority. I want to
967 ** process the ready i/o first and wake up another
968 ** thread to be the new poller.
969 */
970 if (NULL == group->poller)
971 {
972 if (PR_CLIST_IS_EMPTY(&group->io_ready))
973 continue;
974 if (group->waiting_threads > 1)
975 PR_NotifyCondVar(group->io_complete);
976 }
977 }
978 PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
979 }
980 io_ready = PR_LIST_HEAD(&group->io_ready);
981 PR_NotifyCondVar(group->io_taken);
982 PR_ASSERT(io_ready != NULL);
983 PR_REMOVE_LINK(io_ready);
984 } while (NULL == io_ready);
985
986 failed_poll:
987
988 #endif
989
990 aborted:
991
992 group->waiting_threads -= 1;
993 invalid_state:
994 (void)MW_TestForShutdownInternal(group);
995 PR_Unlock(group->ml);
996
997 failed_init:
998 if (NULL != io_ready)
999 {
1000 /* If the operation failed, record the reason why */
1001 switch (((PRRecvWait*)io_ready)->outcome)
1002 {
1003 case PR_MW_PENDING:
1004 PR_ASSERT(0);
1005 break;
1006 case PR_MW_SUCCESS:
1007 #ifndef WINNT
1008 _MW_InitialRecv(io_ready);
1009 #endif
1010 break;
1011 #ifdef WINNT
1012 case PR_MW_FAILURE:
1013 _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
1014 break;
1015 #endif
1016 case PR_MW_TIMEOUT:
1017 PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
1018 break;
1019 case PR_MW_INTERRUPT:
1020 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
1021 break;
1022 default: break;
1023 }
1024 #ifdef WINNT
1025 if (NULL != overlapped->data.mw.timer)
1026 {
1027 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1028 != overlapped->data.mw.desc->timeout);
1029 CancelTimer(overlapped->data.mw.timer);
1030 }
1031 else
1032 {
1033 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1034 == overlapped->data.mw.desc->timeout);
1035 }
1036 PR_DELETE(overlapped);
1037 #endif
1038 }
1039 return (PRRecvWait*)io_ready;
1040 } /* PR_WaitRecvReady */
1041
1042 PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *des c)
1043 {
1044 #if !defined(WINNT)
1045 PRRecvWait **recv_wait;
1046 #endif
1047 PRStatus rv = PR_SUCCESS;
1048 if (NULL == group) group = mw_state->group;
1049 PR_ASSERT(NULL != group);
1050 if (NULL == group)
1051 {
1052 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1053 return PR_FAILURE;
1054 }
1055
1056 PR_Lock(group->ml);
1057
1058 if (_prmw_running != group->state)
1059 {
1060 PR_SetError(PR_INVALID_STATE_ERROR, 0);
1061 rv = PR_FAILURE;
1062 goto unlock;
1063 }
1064
1065 #ifdef WINNT
1066 if (InterlockedCompareExchange((LONG *)&desc->outcome,
1067 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING)
1068 {
1069 PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
1070 PR_ASSERT(NULL != bottom);
1071 if (NULL == bottom)
1072 {
1073 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1074 goto unlock;
1075 }
1076 bottom->secret->state = _PR_FILEDESC_CLOSED;
1077 #if 0
1078 fprintf(stderr, "cancel wait recv: closing socket\n");
1079 #endif
1080 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1081 {
1082 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
1083 exit(1);
1084 }
1085 }
1086 #else
1087 if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
1088 {
1089 /* it was in the wait table */
1090 _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
1091 goto unlock;
1092 }
1093 if (!PR_CLIST_IS_EMPTY(&group->io_ready))
1094 {
1095 /* is it already complete? */
1096 PRCList *head = PR_LIST_HEAD(&group->io_ready);
1097 do
1098 {
1099 PRRecvWait *done = (PRRecvWait*)head;
1100 if (done == desc) goto unlock;
1101 head = PR_NEXT_LINK(head);
1102 } while (head != &group->io_ready);
1103 }
1104 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1105 rv = PR_FAILURE;
1106
1107 #endif
1108 unlock:
1109 PR_Unlock(group->ml);
1110 return rv;
1111 } /* PR_CancelWaitFileDesc */
1112
1113 PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
1114 {
1115 PRRecvWait **desc;
1116 PRRecvWait *recv_wait = NULL;
1117 #ifdef WINNT
1118 _MDOverlapped *overlapped;
1119 PRRecvWait **end;
1120 PRThread *me = _PR_MD_CURRENT_THREAD();
1121 #endif
1122
1123 if (NULL == group) group = mw_state->group;
1124 PR_ASSERT(NULL != group);
1125 if (NULL == group)
1126 {
1127 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1128 return NULL;
1129 }
1130
1131 PR_Lock(group->ml);
1132 if (_prmw_stopped != group->state)
1133 {
1134 if (_prmw_running == group->state)
1135 group->state = _prmw_stopping; /* so nothing new comes in */
1136 if (0 == group->waiting_threads) /* is there anybody else? */
1137 group->state = _prmw_stopped; /* we can stop right now */
1138 else
1139 {
1140 PR_NotifyAllCondVar(group->new_business);
1141 PR_NotifyAllCondVar(group->io_complete);
1142 }
1143 while (_prmw_stopped != group->state)
1144 (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
1145 }
1146
1147 #ifdef WINNT
1148 _PR_MD_LOCK(&group->mdlock);
1149 #endif
1150 /* make all the existing descriptors look done/interrupted */
1151 #ifdef WINNT
1152 end = &group->waiter->recv_wait + group->waiter->length;
1153 for (desc = &group->waiter->recv_wait; desc < end; ++desc)
1154 {
1155 if (NULL != *desc)
1156 {
1157 if (InterlockedCompareExchange((LONG *)&(*desc)->outcome,
1158 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING)
1159 == (LONG)PR_MW_PENDING)
1160 {
1161 PRFileDesc *bottom = PR_GetIdentitiesLayer(
1162 (*desc)->fd, PR_NSPR_IO_LAYER);
1163 PR_ASSERT(NULL != bottom);
1164 if (NULL == bottom)
1165 {
1166 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1167 goto invalid_arg;
1168 }
1169 bottom->secret->state = _PR_FILEDESC_CLOSED;
1170 #if 0
1171 fprintf(stderr, "cancel wait group: closing socket\n");
1172 #endif
1173 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1174 {
1175 fprintf(stderr, "closesocket failed: %d\n",
1176 WSAGetLastError());
1177 exit(1);
1178 }
1179 }
1180 }
1181 }
1182 while (group->waiter->count > 0)
1183 {
1184 _PR_THREAD_LOCK(me);
1185 me->state = _PR_IO_WAIT;
1186 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
1187 if (!_PR_IS_NATIVE_THREAD(me))
1188 {
1189 _PR_SLEEPQ_LOCK(me->cpu);
1190 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
1191 _PR_SLEEPQ_UNLOCK(me->cpu);
1192 }
1193 _PR_THREAD_UNLOCK(me);
1194 _PR_MD_UNLOCK(&group->mdlock);
1195 PR_Unlock(group->ml);
1196 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
1197 me->state = _PR_RUNNING;
1198 PR_Lock(group->ml);
1199 _PR_MD_LOCK(&group->mdlock);
1200 }
1201 #else
1202 for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
1203 {
1204 PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
1205 if (NULL != *desc)
1206 _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
1207 }
1208 #endif
1209
1210 /* take first element of finished list and return it or NULL */
1211 if (PR_CLIST_IS_EMPTY(&group->io_ready))
1212 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1213 else
1214 {
1215 PRCList *head = PR_LIST_HEAD(&group->io_ready);
1216 PR_REMOVE_AND_INIT_LINK(head);
1217 #ifdef WINNT
1218 overlapped = (_MDOverlapped *)
1219 ((char *)head - offsetof(_MDOverlapped, data));
1220 head = &overlapped->data.mw.desc->internal;
1221 if (NULL != overlapped->data.mw.timer)
1222 {
1223 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1224 != overlapped->data.mw.desc->timeout);
1225 CancelTimer(overlapped->data.mw.timer);
1226 }
1227 else
1228 {
1229 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1230 == overlapped->data.mw.desc->timeout);
1231 }
1232 PR_DELETE(overlapped);
1233 #endif
1234 recv_wait = (PRRecvWait*)head;
1235 }
1236 #ifdef WINNT
1237 invalid_arg:
1238 _PR_MD_UNLOCK(&group->mdlock);
1239 #endif
1240 PR_Unlock(group->ml);
1241
1242 return recv_wait;
1243 } /* PR_CancelWaitGroup */
1244
1245 PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
1246 {
1247 PRWaitGroup *wg;
1248
1249 if (NULL == (wg = PR_NEWZAP(PRWaitGroup)))
1250 {
1251 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1252 goto failed;
1253 }
1254 /* the wait group itself */
1255 wg->ml = PR_NewLock();
1256 if (NULL == wg->ml) goto failed_lock;
1257 wg->io_taken = PR_NewCondVar(wg->ml);
1258 if (NULL == wg->io_taken) goto failed_cvar0;
1259 wg->io_complete = PR_NewCondVar(wg->ml);
1260 if (NULL == wg->io_complete) goto failed_cvar1;
1261 wg->new_business = PR_NewCondVar(wg->ml);
1262 if (NULL == wg->new_business) goto failed_cvar2;
1263 wg->mw_manage = PR_NewCondVar(wg->ml);
1264 if (NULL == wg->mw_manage) goto failed_cvar3;
1265
1266 PR_INIT_CLIST(&wg->group_link);
1267 PR_INIT_CLIST(&wg->io_ready);
1268
1269 /* the waiters sequence */
1270 wg->waiter = (_PRWaiterHash*)PR_CALLOC(
1271 sizeof(_PRWaiterHash) +
1272 (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
1273 if (NULL == wg->waiter)
1274 {
1275 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1276 goto failed_waiter;
1277 }
1278 wg->waiter->count = 0;
1279 wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
1280
1281 #ifdef WINNT
1282 _PR_MD_NEW_LOCK(&wg->mdlock);
1283 PR_INIT_CLIST(&wg->wait_list);
1284 #endif /* WINNT */
1285
1286 PR_Lock(mw_lock);
1287 PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
1288 PR_Unlock(mw_lock);
1289 return wg;
1290
1291 failed_waiter:
1292 PR_DestroyCondVar(wg->mw_manage);
1293 failed_cvar3:
1294 PR_DestroyCondVar(wg->new_business);
1295 failed_cvar2:
1296 PR_DestroyCondVar(wg->io_complete);
1297 failed_cvar1:
1298 PR_DestroyCondVar(wg->io_taken);
1299 failed_cvar0:
1300 PR_DestroyLock(wg->ml);
1301 failed_lock:
1302 PR_DELETE(wg);
1303 wg = NULL;
1304
1305 failed:
1306 return wg;
1307 } /* MW_CreateWaitGroup */
1308
1309 PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
1310 {
1311 PRStatus rv = PR_SUCCESS;
1312 if (NULL == group) group = mw_state->group;
1313 PR_ASSERT(NULL != group);
1314 if (NULL != group)
1315 {
1316 PR_Lock(group->ml);
1317 if ((group->waiting_threads == 0)
1318 && (group->waiter->count == 0)
1319 && PR_CLIST_IS_EMPTY(&group->io_ready))
1320 {
1321 group->state = _prmw_stopped;
1322 }
1323 else
1324 {
1325 PR_SetError(PR_INVALID_STATE_ERROR, 0);
1326 rv = PR_FAILURE;
1327 }
1328 PR_Unlock(group->ml);
1329 if (PR_FAILURE == rv) return rv;
1330
1331 PR_Lock(mw_lock);
1332 PR_REMOVE_LINK(&group->group_link);
1333 PR_Unlock(mw_lock);
1334
1335 #ifdef WINNT
1336 /*
1337 * XXX make sure wait_list is empty and waiter is empty.
1338 * These must be checked while holding mdlock.
1339 */
1340 _PR_MD_FREE_LOCK(&group->mdlock);
1341 #endif
1342
1343 PR_DELETE(group->waiter);
1344 PR_DELETE(group->polling_list);
1345 PR_DestroyCondVar(group->mw_manage);
1346 PR_DestroyCondVar(group->new_business);
1347 PR_DestroyCondVar(group->io_complete);
1348 PR_DestroyCondVar(group->io_taken);
1349 PR_DestroyLock(group->ml);
1350 if (group == mw_state->group) mw_state->group = NULL;
1351 PR_DELETE(group);
1352 }
1353 else
1354 {
1355 /* The default wait group is not created yet. */
1356 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1357 rv = PR_FAILURE;
1358 }
1359 return rv;
1360 } /* PR_DestroyWaitGroup */
1361
1362 /**********************************************************************
1363 ***********************************************************************
1364 ******************** Wait group enumerations **************************
1365 ***********************************************************************
1366 **********************************************************************/
1367
1368 PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group)
1369 {
1370 PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator);
1371 if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1372 else
1373 {
1374 enumerator->group = group;
1375 enumerator->seal = _PR_ENUM_SEALED;
1376 }
1377 return enumerator;
1378 } /* PR_CreateMWaitEnumerator */
1379
1380 PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator)
1381 {
1382 PR_ASSERT(NULL != enumerator);
1383 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1384 if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal))
1385 {
1386 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1387 return PR_FAILURE;
1388 }
1389 enumerator->seal = _PR_ENUM_UNSEALED;
1390 PR_Free(enumerator);
1391 return PR_SUCCESS;
1392 } /* PR_DestroyMWaitEnumerator */
1393
1394 PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup(
1395 PRMWaitEnumerator *enumerator, const PRRecvWait *previous)
1396 {
1397 PRRecvWait *result = NULL;
1398
1399 /* entry point sanity checking */
1400 PR_ASSERT(NULL != enumerator);
1401 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1402 if ((NULL == enumerator)
1403 || (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument;
1404
1405 /* beginning of enumeration */
1406 if (NULL == previous)
1407 {
1408 if (NULL == enumerator->group)
1409 {
1410 enumerator->group = mw_state->group;
1411 if (NULL == enumerator->group)
1412 {
1413 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1414 return NULL;
1415 }
1416 }
1417 enumerator->waiter = &enumerator->group->waiter->recv_wait;
1418 enumerator->p_timestamp = enumerator->group->p_timestamp;
1419 enumerator->thread = PR_GetCurrentThread();
1420 enumerator->index = 0;
1421 }
1422 /* continuing an enumeration */
1423 else
1424 {
1425 PRThread *me = PR_GetCurrentThread();
1426 PR_ASSERT(me == enumerator->thread);
1427 if (me != enumerator->thread) goto bad_argument;
1428
1429 /* need to restart the enumeration */
1430 if (enumerator->p_timestamp != enumerator->group->p_timestamp)
1431 return PR_EnumerateWaitGroup(enumerator, NULL);
1432 }
1433
1434 /* actually progress the enumeration */
1435 #if defined(WINNT)
1436 _PR_MD_LOCK(&enumerator->group->mdlock);
1437 #else
1438 PR_Lock(enumerator->group->ml);
1439 #endif
1440 while (enumerator->index++ < enumerator->group->waiter->length)
1441 {
1442 if (NULL != (result = *(enumerator->waiter)++)) break;
1443 }
1444 #if defined(WINNT)
1445 _PR_MD_UNLOCK(&enumerator->group->mdlock);
1446 #else
1447 PR_Unlock(enumerator->group->ml);
1448 #endif
1449
1450 return result; /* what we live for */
1451
1452 bad_argument:
1453 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1454 return NULL; /* probably ambiguous */
1455 } /* PR_EnumerateWaitGroup */
1456
1457 /* prmwait.c */
OLDNEW
« no previous file with comments | « nspr/pr/src/io/prmmap.c ('k') | nspr/pr/src/io/prpolevt.c » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698