Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(40)

Side by Side Diff: webrtc/base/messagequeue.cc

Issue 1675923002: Prevent data race in MessageQueue. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 141 matching lines...) Expand 10 before | Expand all | Expand 10 after
152 return; 152 return;
153 } 153 }
154 154
155 fDestroyed_ = true; 155 fDestroyed_ = true;
156 // The signal is done from here to ensure 156 // The signal is done from here to ensure
157 // that it always gets called when the queue 157 // that it always gets called when the queue
158 // is going away. 158 // is going away.
159 SignalQueueDestroyed(); 159 SignalQueueDestroyed();
160 MessageQueueManager::Remove(this); 160 MessageQueueManager::Remove(this);
161 Clear(NULL); 161 Clear(NULL);
162
163 SharedScope ss(&ss_lock_);
162 if (ss_) { 164 if (ss_) {
163 ss_->SetMessageQueue(NULL); 165 ss_->SetMessageQueue(NULL);
164 } 166 }
165 } 167 }
166 168
169 SocketServer* MessageQueue::socketserver() {
170 SharedScope ss(&ss_lock_);
171 return ss_;
172 }
173
167 void MessageQueue::set_socketserver(SocketServer* ss) { 174 void MessageQueue::set_socketserver(SocketServer* ss) {
175 ExclusiveScope es(&ss_lock_);
pthatcher1 2016/02/12 00:16:33 Can you leave a comment explaining why this place
joachim 2016/02/12 15:09:52 Done.
168 ss_ = ss ? ss : default_ss_.get(); 176 ss_ = ss ? ss : default_ss_.get();
169 ss_->SetMessageQueue(this); 177 ss_->SetMessageQueue(this);
170 } 178 }
171 179
172 void MessageQueue::Quit() { 180 void MessageQueue::Quit() {
173 fStop_ = true; 181 fStop_ = true;
182 SharedScope ss(&ss_lock_);
174 ss_->WakeUp(); 183 ss_->WakeUp();
175 } 184 }
176 185
177 bool MessageQueue::IsQuitting() { 186 bool MessageQueue::IsQuitting() {
178 return fStop_; 187 return fStop_;
179 } 188 }
180 189
181 void MessageQueue::Restart() { 190 void MessageQueue::Restart() {
182 fStop_ = false; 191 fStop_ = false;
183 } 192 }
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
270 279
271 int cmsNext; 280 int cmsNext;
272 if (cmsWait == kForever) { 281 if (cmsWait == kForever) {
273 cmsNext = cmsDelayNext; 282 cmsNext = cmsDelayNext;
274 } else { 283 } else {
275 cmsNext = std::max(0, cmsTotal - cmsElapsed); 284 cmsNext = std::max(0, cmsTotal - cmsElapsed);
276 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 285 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
277 cmsNext = cmsDelayNext; 286 cmsNext = cmsDelayNext;
278 } 287 }
279 288
280 // Wait and multiplex in the meantime 289 {
281 if (!ss_->Wait(cmsNext, process_io)) 290 // Wait and multiplex in the meantime
282 return false; 291 SharedScope ss(&ss_lock_);
292 if (!ss_->Wait(cmsNext, process_io))
293 return false;
294 }
283 295
284 // If the specified timeout expired, return 296 // If the specified timeout expired, return
285 297
286 msCurrent = Time(); 298 msCurrent = Time();
287 cmsElapsed = TimeDiff(msCurrent, msStart); 299 cmsElapsed = TimeDiff(msCurrent, msStart);
288 if (cmsWait != kForever) { 300 if (cmsWait != kForever) {
289 if (cmsElapsed >= cmsWait) 301 if (cmsElapsed >= cmsWait)
290 return false; 302 return false;
291 } 303 }
292 } 304 }
293 return false; 305 return false;
294 } 306 }
295 307
296 void MessageQueue::ReceiveSends() { 308 void MessageQueue::ReceiveSends() {
297 } 309 }
298 310
299 void MessageQueue::Post(MessageHandler* phandler, 311 void MessageQueue::Post(MessageHandler* phandler,
300 uint32_t id, 312 uint32_t id,
301 MessageData* pdata, 313 MessageData* pdata,
302 bool time_sensitive) { 314 bool time_sensitive) {
303 if (fStop_) 315 if (fStop_)
304 return; 316 return;
305 317
306 // Keep thread safe 318 // Keep thread safe
307 // Add the message to the end of the queue 319 // Add the message to the end of the queue
308 // Signal for the multiplexer to return 320 // Signal for the multiplexer to return
309 321
310 CritScope cs(&crit_); 322 {
311 Message msg; 323 CritScope cs(&crit_);
312 msg.phandler = phandler; 324 Message msg;
313 msg.message_id = id; 325 msg.phandler = phandler;
314 msg.pdata = pdata; 326 msg.message_id = id;
315 if (time_sensitive) { 327 msg.pdata = pdata;
316 msg.ts_sensitive = Time() + kMaxMsgLatency; 328 if (time_sensitive) {
329 msg.ts_sensitive = Time() + kMaxMsgLatency;
330 }
331 msgq_.push_back(msg);
317 } 332 }
318 msgq_.push_back(msg); 333 {
319 ss_->WakeUp(); 334 SharedScope ss(&ss_lock_);
335 ss_->WakeUp();
336 }
320 } 337 }
321 338
322 void MessageQueue::PostDelayed(int cmsDelay, 339 void MessageQueue::PostDelayed(int cmsDelay,
323 MessageHandler* phandler, 340 MessageHandler* phandler,
324 uint32_t id, 341 uint32_t id,
325 MessageData* pdata) { 342 MessageData* pdata) {
326 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); 343 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
327 } 344 }
328 345
329 void MessageQueue::PostAt(uint32_t tstamp, 346 void MessageQueue::PostAt(uint32_t tstamp,
330 MessageHandler* phandler, 347 MessageHandler* phandler,
331 uint32_t id, 348 uint32_t id,
332 MessageData* pdata) { 349 MessageData* pdata) {
333 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); 350 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
334 } 351 }
335 352
336 void MessageQueue::DoDelayPost(int cmsDelay, 353 void MessageQueue::DoDelayPost(int cmsDelay,
337 uint32_t tstamp, 354 uint32_t tstamp,
338 MessageHandler* phandler, 355 MessageHandler* phandler,
339 uint32_t id, 356 uint32_t id,
340 MessageData* pdata) { 357 MessageData* pdata) {
341 if (fStop_) 358 if (fStop_)
342 return; 359 return;
343 360
344 // Keep thread safe 361 // Keep thread safe
345 // Add to the priority queue. Gets sorted soonest first. 362 // Add to the priority queue. Gets sorted soonest first.
346 // Signal for the multiplexer to return. 363 // Signal for the multiplexer to return.
347 364
348 CritScope cs(&crit_); 365 {
349 Message msg; 366 CritScope cs(&crit_);
350 msg.phandler = phandler; 367 Message msg;
351 msg.message_id = id; 368 msg.phandler = phandler;
352 msg.pdata = pdata; 369 msg.message_id = id;
353 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); 370 msg.pdata = pdata;
354 dmsgq_.push(dmsg); 371 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
355 // If this message queue processes 1 message every millisecond for 50 days, 372 dmsgq_.push(dmsg);
356 // we will wrap this number. Even then, only messages with identical times 373 // If this message queue processes 1 message every millisecond for 50 days,
357 // will be misordered, and then only briefly. This is probably ok. 374 // we will wrap this number. Even then, only messages with identical times
358 VERIFY(0 != ++dmsgq_next_num_); 375 // will be misordered, and then only briefly. This is probably ok.
359 ss_->WakeUp(); 376 VERIFY(0 != ++dmsgq_next_num_);
377 }
378 {
379 SharedScope ss(&ss_lock_);
380 ss_->WakeUp();
381 }
360 } 382 }
361 383
362 int MessageQueue::GetDelay() { 384 int MessageQueue::GetDelay() {
363 CritScope cs(&crit_); 385 CritScope cs(&crit_);
364 386
365 if (!msgq_.empty()) 387 if (!msgq_.empty())
366 return 0; 388 return 0;
367 389
368 if (!dmsgq_.empty()) { 390 if (!dmsgq_.empty()) {
369 int delay = TimeUntil(dmsgq_.top().msTrigger_); 391 int delay = TimeUntil(dmsgq_.top().msTrigger_);
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
423 } 445 }
424 dmsgq_.container().erase(new_end, dmsgq_.container().end()); 446 dmsgq_.container().erase(new_end, dmsgq_.container().end());
425 dmsgq_.reheap(); 447 dmsgq_.reheap();
426 } 448 }
427 449
428 void MessageQueue::Dispatch(Message *pmsg) { 450 void MessageQueue::Dispatch(Message *pmsg) {
429 pmsg->phandler->OnMessage(pmsg); 451 pmsg->phandler->OnMessage(pmsg);
430 } 452 }
431 453
432 } // namespace rtc 454 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | webrtc/base/thread.cc » ('J')

Powered by Google App Engine
This is Rietveld 408576698