OLD | NEW |
---|---|
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
(...skipping 141 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
152 return; | 152 return; |
153 } | 153 } |
154 | 154 |
155 fDestroyed_ = true; | 155 fDestroyed_ = true; |
156 // The signal is done from here to ensure | 156 // The signal is done from here to ensure |
157 // that it always gets called when the queue | 157 // that it always gets called when the queue |
158 // is going away. | 158 // is going away. |
159 SignalQueueDestroyed(); | 159 SignalQueueDestroyed(); |
160 MessageQueueManager::Remove(this); | 160 MessageQueueManager::Remove(this); |
161 Clear(NULL); | 161 Clear(NULL); |
162 | |
163 SharedScope ss(&ss_lock_); | |
162 if (ss_) { | 164 if (ss_) { |
163 ss_->SetMessageQueue(NULL); | 165 ss_->SetMessageQueue(NULL); |
164 } | 166 } |
165 } | 167 } |
166 | 168 |
169 SocketServer* MessageQueue::socketserver() { | |
170 SharedScope ss(&ss_lock_); | |
171 return ss_; | |
172 } | |
173 | |
167 void MessageQueue::set_socketserver(SocketServer* ss) { | 174 void MessageQueue::set_socketserver(SocketServer* ss) { |
175 ExclusiveScope es(&ss_lock_); | |
pthatcher1
2016/02/12 00:16:33
Can you leave a comment explaining why this place
joachim
2016/02/12 15:09:52
Done.
| |
168 ss_ = ss ? ss : default_ss_.get(); | 176 ss_ = ss ? ss : default_ss_.get(); |
169 ss_->SetMessageQueue(this); | 177 ss_->SetMessageQueue(this); |
170 } | 178 } |
171 | 179 |
172 void MessageQueue::Quit() { | 180 void MessageQueue::Quit() { |
173 fStop_ = true; | 181 fStop_ = true; |
182 SharedScope ss(&ss_lock_); | |
174 ss_->WakeUp(); | 183 ss_->WakeUp(); |
175 } | 184 } |
176 | 185 |
177 bool MessageQueue::IsQuitting() { | 186 bool MessageQueue::IsQuitting() { |
178 return fStop_; | 187 return fStop_; |
179 } | 188 } |
180 | 189 |
181 void MessageQueue::Restart() { | 190 void MessageQueue::Restart() { |
182 fStop_ = false; | 191 fStop_ = false; |
183 } | 192 } |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
270 | 279 |
271 int cmsNext; | 280 int cmsNext; |
272 if (cmsWait == kForever) { | 281 if (cmsWait == kForever) { |
273 cmsNext = cmsDelayNext; | 282 cmsNext = cmsDelayNext; |
274 } else { | 283 } else { |
275 cmsNext = std::max(0, cmsTotal - cmsElapsed); | 284 cmsNext = std::max(0, cmsTotal - cmsElapsed); |
276 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 285 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
277 cmsNext = cmsDelayNext; | 286 cmsNext = cmsDelayNext; |
278 } | 287 } |
279 | 288 |
280 // Wait and multiplex in the meantime | 289 { |
281 if (!ss_->Wait(cmsNext, process_io)) | 290 // Wait and multiplex in the meantime |
282 return false; | 291 SharedScope ss(&ss_lock_); |
292 if (!ss_->Wait(cmsNext, process_io)) | |
293 return false; | |
294 } | |
283 | 295 |
284 // If the specified timeout expired, return | 296 // If the specified timeout expired, return |
285 | 297 |
286 msCurrent = Time(); | 298 msCurrent = Time(); |
287 cmsElapsed = TimeDiff(msCurrent, msStart); | 299 cmsElapsed = TimeDiff(msCurrent, msStart); |
288 if (cmsWait != kForever) { | 300 if (cmsWait != kForever) { |
289 if (cmsElapsed >= cmsWait) | 301 if (cmsElapsed >= cmsWait) |
290 return false; | 302 return false; |
291 } | 303 } |
292 } | 304 } |
293 return false; | 305 return false; |
294 } | 306 } |
295 | 307 |
296 void MessageQueue::ReceiveSends() { | 308 void MessageQueue::ReceiveSends() { |
297 } | 309 } |
298 | 310 |
299 void MessageQueue::Post(MessageHandler* phandler, | 311 void MessageQueue::Post(MessageHandler* phandler, |
300 uint32_t id, | 312 uint32_t id, |
301 MessageData* pdata, | 313 MessageData* pdata, |
302 bool time_sensitive) { | 314 bool time_sensitive) { |
303 if (fStop_) | 315 if (fStop_) |
304 return; | 316 return; |
305 | 317 |
306 // Keep thread safe | 318 // Keep thread safe |
307 // Add the message to the end of the queue | 319 // Add the message to the end of the queue |
308 // Signal for the multiplexer to return | 320 // Signal for the multiplexer to return |
309 | 321 |
310 CritScope cs(&crit_); | 322 { |
311 Message msg; | 323 CritScope cs(&crit_); |
312 msg.phandler = phandler; | 324 Message msg; |
313 msg.message_id = id; | 325 msg.phandler = phandler; |
314 msg.pdata = pdata; | 326 msg.message_id = id; |
315 if (time_sensitive) { | 327 msg.pdata = pdata; |
316 msg.ts_sensitive = Time() + kMaxMsgLatency; | 328 if (time_sensitive) { |
329 msg.ts_sensitive = Time() + kMaxMsgLatency; | |
330 } | |
331 msgq_.push_back(msg); | |
317 } | 332 } |
318 msgq_.push_back(msg); | 333 { |
319 ss_->WakeUp(); | 334 SharedScope ss(&ss_lock_); |
335 ss_->WakeUp(); | |
336 } | |
320 } | 337 } |
321 | 338 |
322 void MessageQueue::PostDelayed(int cmsDelay, | 339 void MessageQueue::PostDelayed(int cmsDelay, |
323 MessageHandler* phandler, | 340 MessageHandler* phandler, |
324 uint32_t id, | 341 uint32_t id, |
325 MessageData* pdata) { | 342 MessageData* pdata) { |
326 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); | 343 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); |
327 } | 344 } |
328 | 345 |
329 void MessageQueue::PostAt(uint32_t tstamp, | 346 void MessageQueue::PostAt(uint32_t tstamp, |
330 MessageHandler* phandler, | 347 MessageHandler* phandler, |
331 uint32_t id, | 348 uint32_t id, |
332 MessageData* pdata) { | 349 MessageData* pdata) { |
333 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); | 350 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); |
334 } | 351 } |
335 | 352 |
336 void MessageQueue::DoDelayPost(int cmsDelay, | 353 void MessageQueue::DoDelayPost(int cmsDelay, |
337 uint32_t tstamp, | 354 uint32_t tstamp, |
338 MessageHandler* phandler, | 355 MessageHandler* phandler, |
339 uint32_t id, | 356 uint32_t id, |
340 MessageData* pdata) { | 357 MessageData* pdata) { |
341 if (fStop_) | 358 if (fStop_) |
342 return; | 359 return; |
343 | 360 |
344 // Keep thread safe | 361 // Keep thread safe |
345 // Add to the priority queue. Gets sorted soonest first. | 362 // Add to the priority queue. Gets sorted soonest first. |
346 // Signal for the multiplexer to return. | 363 // Signal for the multiplexer to return. |
347 | 364 |
348 CritScope cs(&crit_); | 365 { |
349 Message msg; | 366 CritScope cs(&crit_); |
350 msg.phandler = phandler; | 367 Message msg; |
351 msg.message_id = id; | 368 msg.phandler = phandler; |
352 msg.pdata = pdata; | 369 msg.message_id = id; |
353 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); | 370 msg.pdata = pdata; |
354 dmsgq_.push(dmsg); | 371 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); |
355 // If this message queue processes 1 message every millisecond for 50 days, | 372 dmsgq_.push(dmsg); |
356 // we will wrap this number. Even then, only messages with identical times | 373 // If this message queue processes 1 message every millisecond for 50 days, |
357 // will be misordered, and then only briefly. This is probably ok. | 374 // we will wrap this number. Even then, only messages with identical times |
358 VERIFY(0 != ++dmsgq_next_num_); | 375 // will be misordered, and then only briefly. This is probably ok. |
359 ss_->WakeUp(); | 376 VERIFY(0 != ++dmsgq_next_num_); |
377 } | |
378 { | |
379 SharedScope ss(&ss_lock_); | |
380 ss_->WakeUp(); | |
381 } | |
360 } | 382 } |
361 | 383 |
362 int MessageQueue::GetDelay() { | 384 int MessageQueue::GetDelay() { |
363 CritScope cs(&crit_); | 385 CritScope cs(&crit_); |
364 | 386 |
365 if (!msgq_.empty()) | 387 if (!msgq_.empty()) |
366 return 0; | 388 return 0; |
367 | 389 |
368 if (!dmsgq_.empty()) { | 390 if (!dmsgq_.empty()) { |
369 int delay = TimeUntil(dmsgq_.top().msTrigger_); | 391 int delay = TimeUntil(dmsgq_.top().msTrigger_); |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
423 } | 445 } |
424 dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 446 dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
425 dmsgq_.reheap(); | 447 dmsgq_.reheap(); |
426 } | 448 } |
427 | 449 |
428 void MessageQueue::Dispatch(Message *pmsg) { | 450 void MessageQueue::Dispatch(Message *pmsg) { |
429 pmsg->phandler->OnMessage(pmsg); | 451 pmsg->phandler->OnMessage(pmsg); |
430 } | 452 } |
431 | 453 |
432 } // namespace rtc | 454 } // namespace rtc |
OLD | NEW |