OLD | NEW |
| (Empty) |
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ | |
2 /* This Source Code Form is subject to the terms of the Mozilla Public | |
3 * License, v. 2.0. If a copy of the MPL was not distributed with this | |
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ | |
5 | |
6 #include "primpl.h" | |
7 #include "pprmwait.h" | |
8 | |
9 #define _MW_REHASH_MAX 11 | |
10 | |
11 static PRLock *mw_lock = NULL; | |
12 static _PRGlobalState *mw_state = NULL; | |
13 | |
14 static PRIntervalTime max_polling_interval; | |
15 | |
16 #ifdef WINNT | |
17 | |
18 typedef struct TimerEvent { | |
19 PRIntervalTime absolute; | |
20 void (*func)(void *); | |
21 void *arg; | |
22 LONG ref_count; | |
23 PRCList links; | |
24 } TimerEvent; | |
25 | |
26 #define TIMER_EVENT_PTR(_qp) \ | |
27 ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links))) | |
28 | |
29 struct { | |
30 PRLock *ml; | |
31 PRCondVar *new_timer; | |
32 PRCondVar *cancel_timer; | |
33 PRThread *manager_thread; | |
34 PRCList timer_queue; | |
35 } tm_vars; | |
36 | |
37 static PRStatus TimerInit(void); | |
38 static void TimerManager(void *arg); | |
39 static TimerEvent *CreateTimer(PRIntervalTime timeout, | |
40 void (*func)(void *), void *arg); | |
41 static PRBool CancelTimer(TimerEvent *timer); | |
42 | |
43 static void TimerManager(void *arg) | |
44 { | |
45 PRIntervalTime now; | |
46 PRIntervalTime timeout; | |
47 PRCList *head; | |
48 TimerEvent *timer; | |
49 | |
50 PR_Lock(tm_vars.ml); | |
51 while (1) | |
52 { | |
53 if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue)) | |
54 { | |
55 PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT); | |
56 } | |
57 else | |
58 { | |
59 now = PR_IntervalNow(); | |
60 head = PR_LIST_HEAD(&tm_vars.timer_queue); | |
61 timer = TIMER_EVENT_PTR(head); | |
62 if ((PRInt32) (now - timer->absolute) >= 0) | |
63 { | |
64 PR_REMOVE_LINK(head); | |
65 /* | |
66 * make its prev and next point to itself so that | |
67 * it's obvious that it's not on the timer_queue. | |
68 */ | |
69 PR_INIT_CLIST(head); | |
70 PR_ASSERT(2 == timer->ref_count); | |
71 PR_Unlock(tm_vars.ml); | |
72 timer->func(timer->arg); | |
73 PR_Lock(tm_vars.ml); | |
74 timer->ref_count -= 1; | |
75 if (0 == timer->ref_count) | |
76 { | |
77 PR_NotifyAllCondVar(tm_vars.cancel_timer); | |
78 } | |
79 } | |
80 else | |
81 { | |
82 timeout = (PRIntervalTime)(timer->absolute - now); | |
83 PR_WaitCondVar(tm_vars.new_timer, timeout); | |
84 } | |
85 } | |
86 } | |
87 PR_Unlock(tm_vars.ml); | |
88 } | |
89 | |
90 static TimerEvent *CreateTimer( | |
91 PRIntervalTime timeout, | |
92 void (*func)(void *), | |
93 void *arg) | |
94 { | |
95 TimerEvent *timer; | |
96 PRCList *links, *tail; | |
97 TimerEvent *elem; | |
98 | |
99 timer = PR_NEW(TimerEvent); | |
100 if (NULL == timer) | |
101 { | |
102 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); | |
103 return timer; | |
104 } | |
105 timer->absolute = PR_IntervalNow() + timeout; | |
106 timer->func = func; | |
107 timer->arg = arg; | |
108 timer->ref_count = 2; | |
109 PR_Lock(tm_vars.ml); | |
110 tail = links = PR_LIST_TAIL(&tm_vars.timer_queue); | |
111 while (links->prev != tail) | |
112 { | |
113 elem = TIMER_EVENT_PTR(links); | |
114 if ((PRInt32)(timer->absolute - elem->absolute) >= 0) | |
115 { | |
116 break; | |
117 } | |
118 links = links->prev; | |
119 } | |
120 PR_INSERT_AFTER(&timer->links, links); | |
121 PR_NotifyCondVar(tm_vars.new_timer); | |
122 PR_Unlock(tm_vars.ml); | |
123 return timer; | |
124 } | |
125 | |
126 static PRBool CancelTimer(TimerEvent *timer) | |
127 { | |
128 PRBool canceled = PR_FALSE; | |
129 | |
130 PR_Lock(tm_vars.ml); | |
131 timer->ref_count -= 1; | |
132 if (timer->links.prev == &timer->links) | |
133 { | |
134 while (timer->ref_count == 1) | |
135 { | |
136 PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT); | |
137 } | |
138 } | |
139 else | |
140 { | |
141 PR_REMOVE_LINK(&timer->links); | |
142 canceled = PR_TRUE; | |
143 } | |
144 PR_Unlock(tm_vars.ml); | |
145 PR_DELETE(timer); | |
146 return canceled; | |
147 } | |
148 | |
149 static PRStatus TimerInit(void) | |
150 { | |
151 tm_vars.ml = PR_NewLock(); | |
152 if (NULL == tm_vars.ml) | |
153 { | |
154 goto failed; | |
155 } | |
156 tm_vars.new_timer = PR_NewCondVar(tm_vars.ml); | |
157 if (NULL == tm_vars.new_timer) | |
158 { | |
159 goto failed; | |
160 } | |
161 tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml); | |
162 if (NULL == tm_vars.cancel_timer) | |
163 { | |
164 goto failed; | |
165 } | |
166 PR_INIT_CLIST(&tm_vars.timer_queue); | |
167 tm_vars.manager_thread = PR_CreateThread( | |
168 PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL, | |
169 PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0); | |
170 if (NULL == tm_vars.manager_thread) | |
171 { | |
172 goto failed; | |
173 } | |
174 return PR_SUCCESS; | |
175 | |
176 failed: | |
177 if (NULL != tm_vars.cancel_timer) | |
178 { | |
179 PR_DestroyCondVar(tm_vars.cancel_timer); | |
180 } | |
181 if (NULL != tm_vars.new_timer) | |
182 { | |
183 PR_DestroyCondVar(tm_vars.new_timer); | |
184 } | |
185 if (NULL != tm_vars.ml) | |
186 { | |
187 PR_DestroyLock(tm_vars.ml); | |
188 } | |
189 return PR_FAILURE; | |
190 } | |
191 | |
192 #endif /* WINNT */ | |
193 | |
194 /******************************************************************/ | |
195 /******************************************************************/ | |
196 /************************ The private portion *********************/ | |
197 /******************************************************************/ | |
198 /******************************************************************/ | |
199 void _PR_InitMW(void) | |
200 { | |
201 #ifdef WINNT | |
202 /* | |
203 * We use NT 4's InterlockedCompareExchange() to operate | |
204 * on PRMWStatus variables. | |
205 */ | |
206 PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus)); | |
207 TimerInit(); | |
208 #endif | |
209 mw_lock = PR_NewLock(); | |
210 PR_ASSERT(NULL != mw_lock); | |
211 mw_state = PR_NEWZAP(_PRGlobalState); | |
212 PR_ASSERT(NULL != mw_state); | |
213 PR_INIT_CLIST(&mw_state->group_list); | |
214 max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL); | |
215 } /* _PR_InitMW */ | |
216 | |
217 void _PR_CleanupMW(void) | |
218 { | |
219 PR_DestroyLock(mw_lock); | |
220 mw_lock = NULL; | |
221 if (mw_state->group) { | |
222 PR_DestroyWaitGroup(mw_state->group); | |
223 /* mw_state->group is set to NULL as a side effect. */ | |
224 } | |
225 PR_DELETE(mw_state); | |
226 } /* _PR_CleanupMW */ | |
227 | |
228 static PRWaitGroup *MW_Init2(void) | |
229 { | |
230 PRWaitGroup *group = mw_state->group; /* it's the null group */ | |
231 if (NULL == group) /* there is this special case */ | |
232 { | |
233 group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH); | |
234 if (NULL == group) goto failed_alloc; | |
235 PR_Lock(mw_lock); | |
236 if (NULL == mw_state->group) | |
237 { | |
238 mw_state->group = group; | |
239 group = NULL; | |
240 } | |
241 PR_Unlock(mw_lock); | |
242 if (group != NULL) (void)PR_DestroyWaitGroup(group); | |
243 group = mw_state->group; /* somebody beat us to it */ | |
244 } | |
245 failed_alloc: | |
246 return group; /* whatever */ | |
247 } /* MW_Init2 */ | |
248 | |
249 static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash) | |
250 { | |
251 /* | |
252 ** The entries are put in the table using the fd (PRFileDesc*) of | |
253 ** the receive descriptor as the key. This allows us to locate | |
254 ** the appropriate entry aqain when the poll operation finishes. | |
255 ** | |
256 ** The pointer to the file descriptor object is first divided by | |
257 ** the natural alignment of a pointer in the belief that object | |
258 ** will have at least that many zeros in the low order bits. | |
259 ** This may not be a good assuption. | |
260 ** | |
261 ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After | |
262 ** that we declare defeat and force the table to be reconstructed. | |
263 ** Since some fds might be added more than once, won't that cause | |
264 ** collisions even in an empty table? | |
265 */ | |
266 PRIntn rehash = _MW_REHASH_MAX; | |
267 PRRecvWait **waiter; | |
268 PRUintn hidx = _MW_HASH(desc->fd, hash->length); | |
269 PRUintn hoffset = 0; | |
270 | |
271 while (rehash-- > 0) | |
272 { | |
273 waiter = &hash->recv_wait; | |
274 if (NULL == waiter[hidx]) | |
275 { | |
276 waiter[hidx] = desc; | |
277 hash->count += 1; | |
278 #if 0 | |
279 printf("Adding 0x%x->0x%x ", desc, desc->fd); | |
280 printf( | |
281 "table[%u:%u:*%u]: 0x%x->0x%x\n", | |
282 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd)
; | |
283 #endif | |
284 return _prmw_success; | |
285 } | |
286 if (desc == waiter[hidx]) | |
287 { | |
288 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table
*/ | |
289 return _prmw_error; | |
290 } | |
291 #if 0 | |
292 printf("Failing 0x%x->0x%x ", desc, desc->fd); | |
293 printf( | |
294 "table[*%u:%u:%u]: 0x%x->0x%x\n", | |
295 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd); | |
296 #endif | |
297 if (0 == hoffset) | |
298 { | |
299 hoffset = _MW_HASH2(desc->fd, hash->length); | |
300 PR_ASSERT(0 != hoffset); | |
301 } | |
302 hidx = (hidx + hoffset) % (hash->length); | |
303 } | |
304 return _prmw_rehash; | |
305 } /* MW_AddHashInternal */ | |
306 | |
307 static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group) | |
308 { | |
309 PRRecvWait **desc; | |
310 PRUint32 pidx, length; | |
311 _PRWaiterHash *newHash, *oldHash = group->waiter; | |
312 PRBool retry; | |
313 _PR_HashStory hrv; | |
314 | |
315 static const PRInt32 prime_number[] = { | |
316 _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427, | |
317 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771}; | |
318 PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32)); | |
319 | |
320 /* look up the next size we'd like to use for the hash table */ | |
321 for (pidx = 0; pidx < primes; ++pidx) | |
322 { | |
323 if (prime_number[pidx] == oldHash->length) | |
324 { | |
325 break; | |
326 } | |
327 } | |
328 /* table size must be one of the prime numbers */ | |
329 PR_ASSERT(pidx < primes); | |
330 | |
331 /* if pidx == primes - 1, we can't expand the table any more */ | |
332 while (pidx < primes - 1) | |
333 { | |
334 /* next size */ | |
335 ++pidx; | |
336 length = prime_number[pidx]; | |
337 | |
338 /* allocate the new hash table and fill it in with the old */ | |
339 newHash = (_PRWaiterHash*)PR_CALLOC( | |
340 sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*))); | |
341 if (NULL == newHash) | |
342 { | |
343 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); | |
344 return _prmw_error; | |
345 } | |
346 | |
347 newHash->length = length; | |
348 retry = PR_FALSE; | |
349 for (desc = &oldHash->recv_wait; | |
350 newHash->count < oldHash->count; ++desc) | |
351 { | |
352 PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length); | |
353 if (NULL != *desc) | |
354 { | |
355 hrv = MW_AddHashInternal(*desc, newHash); | |
356 PR_ASSERT(_prmw_error != hrv); | |
357 if (_prmw_success != hrv) | |
358 { | |
359 PR_DELETE(newHash); | |
360 retry = PR_TRUE; | |
361 break; | |
362 } | |
363 } | |
364 } | |
365 if (retry) continue; | |
366 | |
367 PR_DELETE(group->waiter); | |
368 group->waiter = newHash; | |
369 group->p_timestamp += 1; | |
370 return _prmw_success; | |
371 } | |
372 | |
373 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); | |
374 return _prmw_error; /* we're hosed */ | |
375 } /* MW_ExpandHashInternal */ | |
376 | |
377 #ifndef WINNT | |
378 static void _MW_DoneInternal( | |
379 PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome) | |
380 { | |
381 /* | |
382 ** Add this receive wait object to the list of finished I/O | |
383 ** operations for this particular group. If there are other | |
384 ** threads waiting on the group, notify one. If not, arrange | |
385 ** for this thread to return. | |
386 */ | |
387 | |
388 #if 0 | |
389 printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd); | |
390 #endif | |
391 (*waiter)->outcome = outcome; | |
392 PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready); | |
393 PR_NotifyCondVar(group->io_complete); | |
394 PR_ASSERT(0 != group->waiter->count); | |
395 group->waiter->count -= 1; | |
396 *waiter = NULL; | |
397 } /* _MW_DoneInternal */ | |
398 #endif /* WINNT */ | |
399 | |
400 static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd) | |
401 { | |
402 /* | |
403 ** Find the receive wait object corresponding to the file descriptor. | |
404 ** Only search the wait group specified. | |
405 */ | |
406 PRRecvWait **desc; | |
407 PRIntn rehash = _MW_REHASH_MAX; | |
408 _PRWaiterHash *hash = group->waiter; | |
409 PRUintn hidx = _MW_HASH(fd, hash->length); | |
410 PRUintn hoffset = 0; | |
411 | |
412 while (rehash-- > 0) | |
413 { | |
414 desc = (&hash->recv_wait) + hidx; | |
415 if ((*desc != NULL) && ((*desc)->fd == fd)) return desc; | |
416 if (0 == hoffset) | |
417 { | |
418 hoffset = _MW_HASH2(fd, hash->length); | |
419 PR_ASSERT(0 != hoffset); | |
420 } | |
421 hidx = (hidx + hoffset) % (hash->length); | |
422 } | |
423 return NULL; | |
424 } /* _MW_LookupInternal */ | |
425 | |
426 #ifndef WINNT | |
427 static PRStatus _MW_PollInternal(PRWaitGroup *group) | |
428 { | |
429 PRRecvWait **waiter; | |
430 PRStatus rv = PR_FAILURE; | |
431 PRInt32 count, count_ready; | |
432 PRIntervalTime polling_interval; | |
433 | |
434 group->poller = PR_GetCurrentThread(); | |
435 | |
436 while (PR_TRUE) | |
437 { | |
438 PRIntervalTime now, since_last_poll; | |
439 PRPollDesc *poll_list; | |
440 | |
441 while (0 == group->waiter->count) | |
442 { | |
443 PRStatus st; | |
444 st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT); | |
445 if (_prmw_running != group->state) | |
446 { | |
447 PR_SetError(PR_INVALID_STATE_ERROR, 0); | |
448 goto aborted; | |
449 } | |
450 if (_MW_ABORTED(st)) goto aborted; | |
451 } | |
452 | |
453 /* | |
454 ** There's something to do. See if our existing polling list | |
455 ** is large enough for what we have to do? | |
456 */ | |
457 | |
458 while (group->polling_count < group->waiter->count) | |
459 { | |
460 PRUint32 old_count = group->waiter->count; | |
461 PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE); | |
462 PRSize new_size = sizeof(PRPollDesc) * new_count; | |
463 PRPollDesc *old_polling_list = group->polling_list; | |
464 | |
465 PR_Unlock(group->ml); | |
466 poll_list = (PRPollDesc*)PR_CALLOC(new_size); | |
467 if (NULL == poll_list) | |
468 { | |
469 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); | |
470 PR_Lock(group->ml); | |
471 goto failed_alloc; | |
472 } | |
473 if (NULL != old_polling_list) | |
474 PR_DELETE(old_polling_list); | |
475 PR_Lock(group->ml); | |
476 if (_prmw_running != group->state) | |
477 { | |
478 PR_DELETE(poll_list); | |
479 PR_SetError(PR_INVALID_STATE_ERROR, 0); | |
480 goto aborted; | |
481 } | |
482 group->polling_list = poll_list; | |
483 group->polling_count = new_count; | |
484 } | |
485 | |
486 now = PR_IntervalNow(); | |
487 polling_interval = max_polling_interval; | |
488 since_last_poll = now - group->last_poll; | |
489 | |
490 waiter = &group->waiter->recv_wait; | |
491 poll_list = group->polling_list; | |
492 for (count = 0; count < group->waiter->count; ++waiter) | |
493 { | |
494 PR_ASSERT(waiter < &group->waiter->recv_wait | |
495 + group->waiter->length); | |
496 if (NULL != *waiter) /* a live one! */ | |
497 { | |
498 if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) | |
499 && (since_last_poll >= (*waiter)->timeout)) | |
500 _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT); | |
501 else | |
502 { | |
503 if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) | |
504 { | |
505 (*waiter)->timeout -= since_last_poll; | |
506 if ((*waiter)->timeout < polling_interval) | |
507 polling_interval = (*waiter)->timeout; | |
508 } | |
509 PR_ASSERT(poll_list < group->polling_list | |
510 + group->polling_count); | |
511 poll_list->fd = (*waiter)->fd; | |
512 poll_list->in_flags = PR_POLL_READ; | |
513 poll_list->out_flags = 0; | |
514 #if 0 | |
515 printf( | |
516 "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n", | |
517 poll_list, count, poll_list->fd, (*waiter)->timeout); | |
518 #endif | |
519 poll_list += 1; | |
520 count += 1; | |
521 } | |
522 } | |
523 } | |
524 | |
525 PR_ASSERT(count == group->waiter->count); | |
526 | |
527 /* | |
528 ** If there are no more threads waiting for completion, | |
529 ** we need to return. | |
530 */ | |
531 if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) | |
532 && (1 == group->waiting_threads)) break; | |
533 | |
534 if (0 == count) continue; /* wait for new business */ | |
535 | |
536 group->last_poll = now; | |
537 | |
538 PR_Unlock(group->ml); | |
539 | |
540 count_ready = PR_Poll(group->polling_list, count, polling_interval); | |
541 | |
542 PR_Lock(group->ml); | |
543 | |
544 if (_prmw_running != group->state) | |
545 { | |
546 PR_SetError(PR_INVALID_STATE_ERROR, 0); | |
547 goto aborted; | |
548 } | |
549 if (-1 == count_ready) | |
550 { | |
551 goto failed_poll; /* that's a shame */ | |
552 } | |
553 else if (0 < count_ready) | |
554 { | |
555 for (poll_list = group->polling_list; count > 0; | |
556 poll_list++, count--) | |
557 { | |
558 PR_ASSERT( | |
559 poll_list < group->polling_list + group->polling_count); | |
560 if (poll_list->out_flags != 0) | |
561 { | |
562 waiter = _MW_LookupInternal(group, poll_list->fd); | |
563 /* | |
564 ** If 'waiter' is NULL, that means the wait receive | |
565 ** descriptor has been canceled. | |
566 */ | |
567 if (NULL != waiter) | |
568 _MW_DoneInternal(group, waiter, PR_MW_SUCCESS); | |
569 } | |
570 } | |
571 } | |
572 /* | |
573 ** If there are no more threads waiting for completion, | |
574 ** we need to return. | |
575 ** This thread was "borrowed" to do the polling, but it really | |
576 ** belongs to the client. | |
577 */ | |
578 if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) | |
579 && (1 == group->waiting_threads)) break; | |
580 } | |
581 | |
582 rv = PR_SUCCESS; | |
583 | |
584 aborted: | |
585 failed_poll: | |
586 failed_alloc: | |
587 group->poller = NULL; /* we were that, not we ain't */ | |
588 if ((_prmw_running == group->state) && (group->waiting_threads > 1)) | |
589 { | |
590 /* Wake up one thread to become the new poller. */ | |
591 PR_NotifyCondVar(group->io_complete); | |
592 } | |
593 return rv; /* we return with the lock held */ | |
594 } /* _MW_PollInternal */ | |
595 #endif /* !WINNT */ | |
596 | |
597 static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group) | |
598 { | |
599 PRMWGroupState rv = group->state; | |
600 /* | |
601 ** Looking at the group's fields is safe because | |
602 ** once the group's state is no longer running, it | |
603 ** cannot revert and there is a safe check on entry | |
604 ** to make sure no more threads are made to wait. | |
605 */ | |
606 if ((_prmw_stopping == rv) | |
607 && (0 == group->waiting_threads)) | |
608 { | |
609 rv = group->state = _prmw_stopped; | |
610 PR_NotifyCondVar(group->mw_manage); | |
611 } | |
612 return rv; | |
613 } /* MW_TestForShutdownInternal */ | |
614 | |
615 #ifndef WINNT | |
616 static void _MW_InitialRecv(PRCList *io_ready) | |
617 { | |
618 PRRecvWait *desc = (PRRecvWait*)io_ready; | |
619 if ((NULL == desc->buffer.start) | |
620 || (0 == desc->buffer.length)) | |
621 desc->bytesRecv = 0; | |
622 else | |
623 { | |
624 desc->bytesRecv = (desc->fd->methods->recv)( | |
625 desc->fd, desc->buffer.start, | |
626 desc->buffer.length, 0, desc->timeout); | |
627 if (desc->bytesRecv < 0) /* SetError should already be there */ | |
628 desc->outcome = PR_MW_FAILURE; | |
629 } | |
630 } /* _MW_InitialRecv */ | |
631 #endif | |
632 | |
633 #ifdef WINNT | |
634 static void NT_TimeProc(void *arg) | |
635 { | |
636 _MDOverlapped *overlapped = (_MDOverlapped *)arg; | |
637 PRRecvWait *desc = overlapped->data.mw.desc; | |
638 PRFileDesc *bottom; | |
639 | |
640 if (InterlockedCompareExchange((LONG *)&desc->outcome, | |
641 (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING) | |
642 { | |
643 /* This wait recv descriptor has already completed. */ | |
644 return; | |
645 } | |
646 | |
647 /* close the osfd to abort the outstanding async io request */ | |
648 /* $$$$ | |
649 ** Little late to be checking if NSPR's on the bottom of stack, | |
650 ** but if we don't check, we can't assert that the private data | |
651 ** is what we think it is. | |
652 ** $$$$ | |
653 */ | |
654 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); | |
655 PR_ASSERT(NULL != bottom); | |
656 if (NULL != bottom) /* now what!?!?! */ | |
657 { | |
658 bottom->secret->state = _PR_FILEDESC_CLOSED; | |
659 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) | |
660 { | |
661 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); | |
662 PR_NOT_REACHED("What shall I do?"); | |
663 } | |
664 } | |
665 return; | |
666 } /* NT_TimeProc */ | |
667 | |
668 static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd) | |
669 { | |
670 PRRecvWait **waiter; | |
671 | |
672 _PR_MD_LOCK(&group->mdlock); | |
673 waiter = _MW_LookupInternal(group, fd); | |
674 if (NULL != waiter) | |
675 { | |
676 group->waiter->count -= 1; | |
677 *waiter = NULL; | |
678 } | |
679 _PR_MD_UNLOCK(&group->mdlock); | |
680 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE; | |
681 } | |
682 | |
683 PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd) | |
684 { | |
685 PRRecvWait **waiter; | |
686 | |
687 waiter = _MW_LookupInternal(group, fd); | |
688 if (NULL != waiter) | |
689 { | |
690 group->waiter->count -= 1; | |
691 *waiter = NULL; | |
692 } | |
693 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE; | |
694 } | |
695 #endif /* WINNT */ | |
696 | |
697 /******************************************************************/ | |
698 /******************************************************************/ | |
699 /********************** The public API portion ********************/ | |
700 /******************************************************************/ | |
701 /******************************************************************/ | |
702 PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc( | |
703 PRWaitGroup *group, PRRecvWait *desc) | |
704 { | |
705 _PR_HashStory hrv; | |
706 PRStatus rv = PR_FAILURE; | |
707 #ifdef WINNT | |
708 _MDOverlapped *overlapped; | |
709 HANDLE hFile; | |
710 BOOL bResult; | |
711 DWORD dwError; | |
712 PRFileDesc *bottom; | |
713 #endif | |
714 | |
715 if (!_pr_initialized) _PR_ImplicitInitialization(); | |
716 if ((NULL == group) && (NULL == (group = MW_Init2()))) | |
717 { | |
718 return rv; | |
719 } | |
720 | |
721 PR_ASSERT(NULL != desc->fd); | |
722 | |
723 desc->outcome = PR_MW_PENDING; /* nice, well known value */ | |
724 desc->bytesRecv = 0; /* likewise, though this value is ambiguious */ | |
725 | |
726 PR_Lock(group->ml); | |
727 | |
728 if (_prmw_running != group->state) | |
729 { | |
730 /* Not allowed to add after cancelling the group */ | |
731 desc->outcome = PR_MW_INTERRUPT; | |
732 PR_SetError(PR_INVALID_STATE_ERROR, 0); | |
733 PR_Unlock(group->ml); | |
734 return rv; | |
735 } | |
736 | |
737 #ifdef WINNT | |
738 _PR_MD_LOCK(&group->mdlock); | |
739 #endif | |
740 | |
741 /* | |
742 ** If the waiter count is zero at this point, there's no telling | |
743 ** how long we've been idle. Therefore, initialize the beginning | |
744 ** of the timing interval. As long as the list doesn't go empty, | |
745 ** it will maintain itself. | |
746 */ | |
747 if (0 == group->waiter->count) | |
748 group->last_poll = PR_IntervalNow(); | |
749 | |
750 do | |
751 { | |
752 hrv = MW_AddHashInternal(desc, group->waiter); | |
753 if (_prmw_rehash != hrv) break; | |
754 hrv = MW_ExpandHashInternal(group); /* gruesome */ | |
755 if (_prmw_success != hrv) break; | |
756 } while (PR_TRUE); | |
757 | |
758 #ifdef WINNT | |
759 _PR_MD_UNLOCK(&group->mdlock); | |
760 #endif | |
761 | |
762 PR_NotifyCondVar(group->new_business); /* tell the world */ | |
763 rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE; | |
764 PR_Unlock(group->ml); | |
765 | |
766 #ifdef WINNT | |
767 overlapped = PR_NEWZAP(_MDOverlapped); | |
768 if (NULL == overlapped) | |
769 { | |
770 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); | |
771 NT_HashRemove(group, desc->fd); | |
772 return rv; | |
773 } | |
774 overlapped->ioModel = _MD_MultiWaitIO; | |
775 overlapped->data.mw.desc = desc; | |
776 overlapped->data.mw.group = group; | |
777 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) | |
778 { | |
779 overlapped->data.mw.timer = CreateTimer( | |
780 desc->timeout, | |
781 NT_TimeProc, | |
782 overlapped); | |
783 if (0 == overlapped->data.mw.timer) | |
784 { | |
785 NT_HashRemove(group, desc->fd); | |
786 PR_DELETE(overlapped); | |
787 /* | |
788 * XXX It appears that a maximum of 16 timer events can | |
789 * be outstanding. GetLastError() returns 0 when I try it. | |
790 */ | |
791 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError()); | |
792 return PR_FAILURE; | |
793 } | |
794 } | |
795 | |
796 /* Reach to the bottom layer to get the OS fd */ | |
797 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); | |
798 PR_ASSERT(NULL != bottom); | |
799 if (NULL == bottom) | |
800 { | |
801 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
802 return PR_FAILURE; | |
803 } | |
804 hFile = (HANDLE)bottom->secret->md.osfd; | |
805 if (!bottom->secret->md.io_model_committed) | |
806 { | |
807 PRInt32 st; | |
808 st = _md_Associate(hFile); | |
809 PR_ASSERT(0 != st); | |
810 bottom->secret->md.io_model_committed = PR_TRUE; | |
811 } | |
812 bResult = ReadFile(hFile, | |
813 desc->buffer.start, | |
814 (DWORD)desc->buffer.length, | |
815 NULL, | |
816 &overlapped->overlapped); | |
817 if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING) | |
818 { | |
819 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) | |
820 { | |
821 if (InterlockedCompareExchange((LONG *)&desc->outcome, | |
822 (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING) | |
823 == (LONG)PR_MW_PENDING) | |
824 { | |
825 CancelTimer(overlapped->data.mw.timer); | |
826 } | |
827 NT_HashRemove(group, desc->fd); | |
828 PR_DELETE(overlapped); | |
829 } | |
830 _PR_MD_MAP_READ_ERROR(dwError); | |
831 rv = PR_FAILURE; | |
832 } | |
833 #endif | |
834 | |
835 return rv; | |
836 } /* PR_AddWaitFileDesc */ | |
837 | |
838 PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group) | |
839 { | |
840 PRCList *io_ready = NULL; | |
841 #ifdef WINNT | |
842 PRThread *me = _PR_MD_CURRENT_THREAD(); | |
843 _MDOverlapped *overlapped; | |
844 #endif | |
845 | |
846 if (!_pr_initialized) _PR_ImplicitInitialization(); | |
847 if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init; | |
848 | |
849 PR_Lock(group->ml); | |
850 | |
851 if (_prmw_running != group->state) | |
852 { | |
853 PR_SetError(PR_INVALID_STATE_ERROR, 0); | |
854 goto invalid_state; | |
855 } | |
856 | |
857 group->waiting_threads += 1; /* the polling thread is counted */ | |
858 | |
859 #ifdef WINNT | |
860 _PR_MD_LOCK(&group->mdlock); | |
861 while (PR_CLIST_IS_EMPTY(&group->io_ready)) | |
862 { | |
863 _PR_THREAD_LOCK(me); | |
864 me->state = _PR_IO_WAIT; | |
865 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list); | |
866 if (!_PR_IS_NATIVE_THREAD(me)) | |
867 { | |
868 _PR_SLEEPQ_LOCK(me->cpu); | |
869 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT); | |
870 _PR_SLEEPQ_UNLOCK(me->cpu); | |
871 } | |
872 _PR_THREAD_UNLOCK(me); | |
873 _PR_MD_UNLOCK(&group->mdlock); | |
874 PR_Unlock(group->ml); | |
875 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT); | |
876 me->state = _PR_RUNNING; | |
877 PR_Lock(group->ml); | |
878 _PR_MD_LOCK(&group->mdlock); | |
879 if (_PR_PENDING_INTERRUPT(me)) { | |
880 PR_REMOVE_LINK(&me->waitQLinks); | |
881 _PR_MD_UNLOCK(&group->mdlock); | |
882 me->flags &= ~_PR_INTERRUPT; | |
883 me->io_suspended = PR_FALSE; | |
884 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0); | |
885 goto aborted; | |
886 } | |
887 } | |
888 io_ready = PR_LIST_HEAD(&group->io_ready); | |
889 PR_ASSERT(io_ready != NULL); | |
890 PR_REMOVE_LINK(io_ready); | |
891 _PR_MD_UNLOCK(&group->mdlock); | |
892 overlapped = (_MDOverlapped *) | |
893 ((char *)io_ready - offsetof(_MDOverlapped, data)); | |
894 io_ready = &overlapped->data.mw.desc->internal; | |
895 #else | |
896 do | |
897 { | |
898 /* | |
899 ** If the I/O ready list isn't empty, have this thread | |
900 ** return with the first receive wait object that's available. | |
901 */ | |
902 if (PR_CLIST_IS_EMPTY(&group->io_ready)) | |
903 { | |
904 /* | |
905 ** Is there a polling thread yet? If not, grab this thread | |
906 ** and use it. | |
907 */ | |
908 if (NULL == group->poller) | |
909 { | |
910 /* | |
911 ** This thread will stay do polling until it becomes the only on
e | |
912 ** left to service a completion. Then it will return and there w
ill | |
913 ** be none left to actually poll or to run completions. | |
914 ** | |
915 ** The polling function should only return w/ failure or | |
916 ** with some I/O ready. | |
917 */ | |
918 if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll; | |
919 } | |
920 else | |
921 { | |
922 /* | |
923 ** There are four reasons a thread can be awakened from | |
924 ** a wait on the io_complete condition variable. | |
925 ** 1. Some I/O has completed, i.e., the io_ready list | |
926 ** is nonempty. | |
927 ** 2. The wait group is canceled. | |
928 ** 3. The thread is interrupted. | |
929 ** 4. The current polling thread has to leave and needs | |
930 ** a replacement. | |
931 ** The logic to find a new polling thread is made more | |
932 ** complicated by all the other possible events. | |
933 ** I tried my best to write the logic clearly, but | |
934 ** it is still full of if's with continue and goto. | |
935 */ | |
936 PRStatus st; | |
937 do | |
938 { | |
939 st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEO
UT); | |
940 if (_prmw_running != group->state) | |
941 { | |
942 PR_SetError(PR_INVALID_STATE_ERROR, 0); | |
943 goto aborted; | |
944 } | |
945 if (_MW_ABORTED(st) || (NULL == group->poller)) break; | |
946 } while (PR_CLIST_IS_EMPTY(&group->io_ready)); | |
947 | |
948 /* | |
949 ** The thread is interrupted and has to leave. It might | |
950 ** have also been awakened to process ready i/o or be the | |
951 ** new poller. To be safe, if either condition is true, | |
952 ** we awaken another thread to take its place. | |
953 */ | |
954 if (_MW_ABORTED(st)) | |
955 { | |
956 if ((NULL == group->poller | |
957 || !PR_CLIST_IS_EMPTY(&group->io_ready)) | |
958 && group->waiting_threads > 1) | |
959 PR_NotifyCondVar(group->io_complete); | |
960 goto aborted; | |
961 } | |
962 | |
963 /* | |
964 ** A new poller is needed, but can I be the new poller? | |
965 ** If there is no i/o ready, sure. But if there is any | |
966 ** i/o ready, it has a higher priority. I want to | |
967 ** process the ready i/o first and wake up another | |
968 ** thread to be the new poller. | |
969 */ | |
970 if (NULL == group->poller) | |
971 { | |
972 if (PR_CLIST_IS_EMPTY(&group->io_ready)) | |
973 continue; | |
974 if (group->waiting_threads > 1) | |
975 PR_NotifyCondVar(group->io_complete); | |
976 } | |
977 } | |
978 PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready)); | |
979 } | |
980 io_ready = PR_LIST_HEAD(&group->io_ready); | |
981 PR_NotifyCondVar(group->io_taken); | |
982 PR_ASSERT(io_ready != NULL); | |
983 PR_REMOVE_LINK(io_ready); | |
984 } while (NULL == io_ready); | |
985 | |
986 failed_poll: | |
987 | |
988 #endif | |
989 | |
990 aborted: | |
991 | |
992 group->waiting_threads -= 1; | |
993 invalid_state: | |
994 (void)MW_TestForShutdownInternal(group); | |
995 PR_Unlock(group->ml); | |
996 | |
997 failed_init: | |
998 if (NULL != io_ready) | |
999 { | |
1000 /* If the operation failed, record the reason why */ | |
1001 switch (((PRRecvWait*)io_ready)->outcome) | |
1002 { | |
1003 case PR_MW_PENDING: | |
1004 PR_ASSERT(0); | |
1005 break; | |
1006 case PR_MW_SUCCESS: | |
1007 #ifndef WINNT | |
1008 _MW_InitialRecv(io_ready); | |
1009 #endif | |
1010 break; | |
1011 #ifdef WINNT | |
1012 case PR_MW_FAILURE: | |
1013 _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error); | |
1014 break; | |
1015 #endif | |
1016 case PR_MW_TIMEOUT: | |
1017 PR_SetError(PR_IO_TIMEOUT_ERROR, 0); | |
1018 break; | |
1019 case PR_MW_INTERRUPT: | |
1020 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0); | |
1021 break; | |
1022 default: break; | |
1023 } | |
1024 #ifdef WINNT | |
1025 if (NULL != overlapped->data.mw.timer) | |
1026 { | |
1027 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT | |
1028 != overlapped->data.mw.desc->timeout); | |
1029 CancelTimer(overlapped->data.mw.timer); | |
1030 } | |
1031 else | |
1032 { | |
1033 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT | |
1034 == overlapped->data.mw.desc->timeout); | |
1035 } | |
1036 PR_DELETE(overlapped); | |
1037 #endif | |
1038 } | |
1039 return (PRRecvWait*)io_ready; | |
1040 } /* PR_WaitRecvReady */ | |
1041 | |
1042 PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *des
c) | |
1043 { | |
1044 #if !defined(WINNT) | |
1045 PRRecvWait **recv_wait; | |
1046 #endif | |
1047 PRStatus rv = PR_SUCCESS; | |
1048 if (NULL == group) group = mw_state->group; | |
1049 PR_ASSERT(NULL != group); | |
1050 if (NULL == group) | |
1051 { | |
1052 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
1053 return PR_FAILURE; | |
1054 } | |
1055 | |
1056 PR_Lock(group->ml); | |
1057 | |
1058 if (_prmw_running != group->state) | |
1059 { | |
1060 PR_SetError(PR_INVALID_STATE_ERROR, 0); | |
1061 rv = PR_FAILURE; | |
1062 goto unlock; | |
1063 } | |
1064 | |
1065 #ifdef WINNT | |
1066 if (InterlockedCompareExchange((LONG *)&desc->outcome, | |
1067 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) | |
1068 { | |
1069 PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); | |
1070 PR_ASSERT(NULL != bottom); | |
1071 if (NULL == bottom) | |
1072 { | |
1073 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
1074 goto unlock; | |
1075 } | |
1076 bottom->secret->state = _PR_FILEDESC_CLOSED; | |
1077 #if 0 | |
1078 fprintf(stderr, "cancel wait recv: closing socket\n"); | |
1079 #endif | |
1080 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) | |
1081 { | |
1082 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); | |
1083 exit(1); | |
1084 } | |
1085 } | |
1086 #else | |
1087 if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd))) | |
1088 { | |
1089 /* it was in the wait table */ | |
1090 _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT); | |
1091 goto unlock; | |
1092 } | |
1093 if (!PR_CLIST_IS_EMPTY(&group->io_ready)) | |
1094 { | |
1095 /* is it already complete? */ | |
1096 PRCList *head = PR_LIST_HEAD(&group->io_ready); | |
1097 do | |
1098 { | |
1099 PRRecvWait *done = (PRRecvWait*)head; | |
1100 if (done == desc) goto unlock; | |
1101 head = PR_NEXT_LINK(head); | |
1102 } while (head != &group->io_ready); | |
1103 } | |
1104 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
1105 rv = PR_FAILURE; | |
1106 | |
1107 #endif | |
1108 unlock: | |
1109 PR_Unlock(group->ml); | |
1110 return rv; | |
1111 } /* PR_CancelWaitFileDesc */ | |
1112 | |
1113 PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group) | |
1114 { | |
1115 PRRecvWait **desc; | |
1116 PRRecvWait *recv_wait = NULL; | |
1117 #ifdef WINNT | |
1118 _MDOverlapped *overlapped; | |
1119 PRRecvWait **end; | |
1120 PRThread *me = _PR_MD_CURRENT_THREAD(); | |
1121 #endif | |
1122 | |
1123 if (NULL == group) group = mw_state->group; | |
1124 PR_ASSERT(NULL != group); | |
1125 if (NULL == group) | |
1126 { | |
1127 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
1128 return NULL; | |
1129 } | |
1130 | |
1131 PR_Lock(group->ml); | |
1132 if (_prmw_stopped != group->state) | |
1133 { | |
1134 if (_prmw_running == group->state) | |
1135 group->state = _prmw_stopping; /* so nothing new comes in */ | |
1136 if (0 == group->waiting_threads) /* is there anybody else? */ | |
1137 group->state = _prmw_stopped; /* we can stop right now */ | |
1138 else | |
1139 { | |
1140 PR_NotifyAllCondVar(group->new_business); | |
1141 PR_NotifyAllCondVar(group->io_complete); | |
1142 } | |
1143 while (_prmw_stopped != group->state) | |
1144 (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT); | |
1145 } | |
1146 | |
1147 #ifdef WINNT | |
1148 _PR_MD_LOCK(&group->mdlock); | |
1149 #endif | |
1150 /* make all the existing descriptors look done/interrupted */ | |
1151 #ifdef WINNT | |
1152 end = &group->waiter->recv_wait + group->waiter->length; | |
1153 for (desc = &group->waiter->recv_wait; desc < end; ++desc) | |
1154 { | |
1155 if (NULL != *desc) | |
1156 { | |
1157 if (InterlockedCompareExchange((LONG *)&(*desc)->outcome, | |
1158 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) | |
1159 == (LONG)PR_MW_PENDING) | |
1160 { | |
1161 PRFileDesc *bottom = PR_GetIdentitiesLayer( | |
1162 (*desc)->fd, PR_NSPR_IO_LAYER); | |
1163 PR_ASSERT(NULL != bottom); | |
1164 if (NULL == bottom) | |
1165 { | |
1166 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
1167 goto invalid_arg; | |
1168 } | |
1169 bottom->secret->state = _PR_FILEDESC_CLOSED; | |
1170 #if 0 | |
1171 fprintf(stderr, "cancel wait group: closing socket\n"); | |
1172 #endif | |
1173 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) | |
1174 { | |
1175 fprintf(stderr, "closesocket failed: %d\n", | |
1176 WSAGetLastError()); | |
1177 exit(1); | |
1178 } | |
1179 } | |
1180 } | |
1181 } | |
1182 while (group->waiter->count > 0) | |
1183 { | |
1184 _PR_THREAD_LOCK(me); | |
1185 me->state = _PR_IO_WAIT; | |
1186 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list); | |
1187 if (!_PR_IS_NATIVE_THREAD(me)) | |
1188 { | |
1189 _PR_SLEEPQ_LOCK(me->cpu); | |
1190 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT); | |
1191 _PR_SLEEPQ_UNLOCK(me->cpu); | |
1192 } | |
1193 _PR_THREAD_UNLOCK(me); | |
1194 _PR_MD_UNLOCK(&group->mdlock); | |
1195 PR_Unlock(group->ml); | |
1196 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT); | |
1197 me->state = _PR_RUNNING; | |
1198 PR_Lock(group->ml); | |
1199 _PR_MD_LOCK(&group->mdlock); | |
1200 } | |
1201 #else | |
1202 for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc) | |
1203 { | |
1204 PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length); | |
1205 if (NULL != *desc) | |
1206 _MW_DoneInternal(group, desc, PR_MW_INTERRUPT); | |
1207 } | |
1208 #endif | |
1209 | |
1210 /* take first element of finished list and return it or NULL */ | |
1211 if (PR_CLIST_IS_EMPTY(&group->io_ready)) | |
1212 PR_SetError(PR_GROUP_EMPTY_ERROR, 0); | |
1213 else | |
1214 { | |
1215 PRCList *head = PR_LIST_HEAD(&group->io_ready); | |
1216 PR_REMOVE_AND_INIT_LINK(head); | |
1217 #ifdef WINNT | |
1218 overlapped = (_MDOverlapped *) | |
1219 ((char *)head - offsetof(_MDOverlapped, data)); | |
1220 head = &overlapped->data.mw.desc->internal; | |
1221 if (NULL != overlapped->data.mw.timer) | |
1222 { | |
1223 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT | |
1224 != overlapped->data.mw.desc->timeout); | |
1225 CancelTimer(overlapped->data.mw.timer); | |
1226 } | |
1227 else | |
1228 { | |
1229 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT | |
1230 == overlapped->data.mw.desc->timeout); | |
1231 } | |
1232 PR_DELETE(overlapped); | |
1233 #endif | |
1234 recv_wait = (PRRecvWait*)head; | |
1235 } | |
1236 #ifdef WINNT | |
1237 invalid_arg: | |
1238 _PR_MD_UNLOCK(&group->mdlock); | |
1239 #endif | |
1240 PR_Unlock(group->ml); | |
1241 | |
1242 return recv_wait; | |
1243 } /* PR_CancelWaitGroup */ | |
1244 | |
1245 PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */) | |
1246 { | |
1247 PRWaitGroup *wg; | |
1248 | |
1249 if (NULL == (wg = PR_NEWZAP(PRWaitGroup))) | |
1250 { | |
1251 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); | |
1252 goto failed; | |
1253 } | |
1254 /* the wait group itself */ | |
1255 wg->ml = PR_NewLock(); | |
1256 if (NULL == wg->ml) goto failed_lock; | |
1257 wg->io_taken = PR_NewCondVar(wg->ml); | |
1258 if (NULL == wg->io_taken) goto failed_cvar0; | |
1259 wg->io_complete = PR_NewCondVar(wg->ml); | |
1260 if (NULL == wg->io_complete) goto failed_cvar1; | |
1261 wg->new_business = PR_NewCondVar(wg->ml); | |
1262 if (NULL == wg->new_business) goto failed_cvar2; | |
1263 wg->mw_manage = PR_NewCondVar(wg->ml); | |
1264 if (NULL == wg->mw_manage) goto failed_cvar3; | |
1265 | |
1266 PR_INIT_CLIST(&wg->group_link); | |
1267 PR_INIT_CLIST(&wg->io_ready); | |
1268 | |
1269 /* the waiters sequence */ | |
1270 wg->waiter = (_PRWaiterHash*)PR_CALLOC( | |
1271 sizeof(_PRWaiterHash) + | |
1272 (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*))); | |
1273 if (NULL == wg->waiter) | |
1274 { | |
1275 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); | |
1276 goto failed_waiter; | |
1277 } | |
1278 wg->waiter->count = 0; | |
1279 wg->waiter->length = _PR_DEFAULT_HASH_LENGTH; | |
1280 | |
1281 #ifdef WINNT | |
1282 _PR_MD_NEW_LOCK(&wg->mdlock); | |
1283 PR_INIT_CLIST(&wg->wait_list); | |
1284 #endif /* WINNT */ | |
1285 | |
1286 PR_Lock(mw_lock); | |
1287 PR_APPEND_LINK(&wg->group_link, &mw_state->group_list); | |
1288 PR_Unlock(mw_lock); | |
1289 return wg; | |
1290 | |
1291 failed_waiter: | |
1292 PR_DestroyCondVar(wg->mw_manage); | |
1293 failed_cvar3: | |
1294 PR_DestroyCondVar(wg->new_business); | |
1295 failed_cvar2: | |
1296 PR_DestroyCondVar(wg->io_complete); | |
1297 failed_cvar1: | |
1298 PR_DestroyCondVar(wg->io_taken); | |
1299 failed_cvar0: | |
1300 PR_DestroyLock(wg->ml); | |
1301 failed_lock: | |
1302 PR_DELETE(wg); | |
1303 wg = NULL; | |
1304 | |
1305 failed: | |
1306 return wg; | |
1307 } /* MW_CreateWaitGroup */ | |
1308 | |
1309 PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group) | |
1310 { | |
1311 PRStatus rv = PR_SUCCESS; | |
1312 if (NULL == group) group = mw_state->group; | |
1313 PR_ASSERT(NULL != group); | |
1314 if (NULL != group) | |
1315 { | |
1316 PR_Lock(group->ml); | |
1317 if ((group->waiting_threads == 0) | |
1318 && (group->waiter->count == 0) | |
1319 && PR_CLIST_IS_EMPTY(&group->io_ready)) | |
1320 { | |
1321 group->state = _prmw_stopped; | |
1322 } | |
1323 else | |
1324 { | |
1325 PR_SetError(PR_INVALID_STATE_ERROR, 0); | |
1326 rv = PR_FAILURE; | |
1327 } | |
1328 PR_Unlock(group->ml); | |
1329 if (PR_FAILURE == rv) return rv; | |
1330 | |
1331 PR_Lock(mw_lock); | |
1332 PR_REMOVE_LINK(&group->group_link); | |
1333 PR_Unlock(mw_lock); | |
1334 | |
1335 #ifdef WINNT | |
1336 /* | |
1337 * XXX make sure wait_list is empty and waiter is empty. | |
1338 * These must be checked while holding mdlock. | |
1339 */ | |
1340 _PR_MD_FREE_LOCK(&group->mdlock); | |
1341 #endif | |
1342 | |
1343 PR_DELETE(group->waiter); | |
1344 PR_DELETE(group->polling_list); | |
1345 PR_DestroyCondVar(group->mw_manage); | |
1346 PR_DestroyCondVar(group->new_business); | |
1347 PR_DestroyCondVar(group->io_complete); | |
1348 PR_DestroyCondVar(group->io_taken); | |
1349 PR_DestroyLock(group->ml); | |
1350 if (group == mw_state->group) mw_state->group = NULL; | |
1351 PR_DELETE(group); | |
1352 } | |
1353 else | |
1354 { | |
1355 /* The default wait group is not created yet. */ | |
1356 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
1357 rv = PR_FAILURE; | |
1358 } | |
1359 return rv; | |
1360 } /* PR_DestroyWaitGroup */ | |
1361 | |
1362 /********************************************************************** | |
1363 *********************************************************************** | |
1364 ******************** Wait group enumerations ************************** | |
1365 *********************************************************************** | |
1366 **********************************************************************/ | |
1367 | |
1368 PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group) | |
1369 { | |
1370 PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator); | |
1371 if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); | |
1372 else | |
1373 { | |
1374 enumerator->group = group; | |
1375 enumerator->seal = _PR_ENUM_SEALED; | |
1376 } | |
1377 return enumerator; | |
1378 } /* PR_CreateMWaitEnumerator */ | |
1379 | |
1380 PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator) | |
1381 { | |
1382 PR_ASSERT(NULL != enumerator); | |
1383 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal); | |
1384 if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) | |
1385 { | |
1386 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
1387 return PR_FAILURE; | |
1388 } | |
1389 enumerator->seal = _PR_ENUM_UNSEALED; | |
1390 PR_Free(enumerator); | |
1391 return PR_SUCCESS; | |
1392 } /* PR_DestroyMWaitEnumerator */ | |
1393 | |
1394 PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup( | |
1395 PRMWaitEnumerator *enumerator, const PRRecvWait *previous) | |
1396 { | |
1397 PRRecvWait *result = NULL; | |
1398 | |
1399 /* entry point sanity checking */ | |
1400 PR_ASSERT(NULL != enumerator); | |
1401 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal); | |
1402 if ((NULL == enumerator) | |
1403 || (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument; | |
1404 | |
1405 /* beginning of enumeration */ | |
1406 if (NULL == previous) | |
1407 { | |
1408 if (NULL == enumerator->group) | |
1409 { | |
1410 enumerator->group = mw_state->group; | |
1411 if (NULL == enumerator->group) | |
1412 { | |
1413 PR_SetError(PR_GROUP_EMPTY_ERROR, 0); | |
1414 return NULL; | |
1415 } | |
1416 } | |
1417 enumerator->waiter = &enumerator->group->waiter->recv_wait; | |
1418 enumerator->p_timestamp = enumerator->group->p_timestamp; | |
1419 enumerator->thread = PR_GetCurrentThread(); | |
1420 enumerator->index = 0; | |
1421 } | |
1422 /* continuing an enumeration */ | |
1423 else | |
1424 { | |
1425 PRThread *me = PR_GetCurrentThread(); | |
1426 PR_ASSERT(me == enumerator->thread); | |
1427 if (me != enumerator->thread) goto bad_argument; | |
1428 | |
1429 /* need to restart the enumeration */ | |
1430 if (enumerator->p_timestamp != enumerator->group->p_timestamp) | |
1431 return PR_EnumerateWaitGroup(enumerator, NULL); | |
1432 } | |
1433 | |
1434 /* actually progress the enumeration */ | |
1435 #if defined(WINNT) | |
1436 _PR_MD_LOCK(&enumerator->group->mdlock); | |
1437 #else | |
1438 PR_Lock(enumerator->group->ml); | |
1439 #endif | |
1440 while (enumerator->index++ < enumerator->group->waiter->length) | |
1441 { | |
1442 if (NULL != (result = *(enumerator->waiter)++)) break; | |
1443 } | |
1444 #if defined(WINNT) | |
1445 _PR_MD_UNLOCK(&enumerator->group->mdlock); | |
1446 #else | |
1447 PR_Unlock(enumerator->group->ml); | |
1448 #endif | |
1449 | |
1450 return result; /* what we live for */ | |
1451 | |
1452 bad_argument: | |
1453 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
1454 return NULL; /* probably ambiguous */ | |
1455 } /* PR_EnumerateWaitGroup */ | |
1456 | |
1457 /* prmwait.c */ | |
OLD | NEW |