Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(396)

Side by Side Diff: base/message_loop/message_loop.cc

Issue 17567007: Made MessagePump a non-thread safe class. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: - Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/message_loop/message_loop.h" 5 #include "base/message_loop/message_loop.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/compiler_specific.h" 10 #include "base/compiler_specific.h"
(...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after
160 #define MESSAGE_PUMP_IO new MessagePumpIOSForIO() 160 #define MESSAGE_PUMP_IO new MessagePumpIOSForIO()
161 #elif defined(OS_MACOSX) 161 #elif defined(OS_MACOSX)
162 #define MESSAGE_PUMP_UI MessagePumpMac::Create() 162 #define MESSAGE_PUMP_UI MessagePumpMac::Create()
163 #define MESSAGE_PUMP_IO new MessagePumpLibevent() 163 #define MESSAGE_PUMP_IO new MessagePumpLibevent()
164 #elif defined(OS_NACL) 164 #elif defined(OS_NACL)
165 // Currently NaCl doesn't have a UI MessageLoop. 165 // Currently NaCl doesn't have a UI MessageLoop.
166 // TODO(abarth): Figure out if we need this. 166 // TODO(abarth): Figure out if we need this.
167 #define MESSAGE_PUMP_UI NULL 167 #define MESSAGE_PUMP_UI NULL
168 // ipc_channel_nacl.cc uses a worker thread to do socket reads currently, and 168 // ipc_channel_nacl.cc uses a worker thread to do socket reads currently, and
169 // doesn't require extra support for watching file descriptors. 169 // doesn't require extra support for watching file descriptors.
170 #define MESSAGE_PUMP_IO new MessagePumpDefault(); 170 #define MESSAGE_PUMP_IO new MessagePumpDefault()
171 #elif defined(OS_POSIX) // POSIX but not MACOSX. 171 #elif defined(OS_POSIX) // POSIX but not MACOSX.
172 #define MESSAGE_PUMP_UI new MessagePumpForUI() 172 #define MESSAGE_PUMP_UI new MessagePumpForUI()
173 #define MESSAGE_PUMP_IO new MessagePumpLibevent() 173 #define MESSAGE_PUMP_IO new MessagePumpLibevent()
174 #else 174 #else
175 #error Not implemented 175 #error Not implemented
176 #endif 176 #endif
177 177
178 if (type_ == TYPE_UI) { 178 if (type_ == TYPE_UI) {
179 if (message_pump_for_ui_factory_) 179 if (message_pump_for_ui_factory_)
180 pump_ = message_pump_for_ui_factory_(); 180 pump_.reset(message_pump_for_ui_factory_());
181 else 181 else
182 pump_ = MESSAGE_PUMP_UI; 182 pump_.reset(MESSAGE_PUMP_UI);
183 } else if (type_ == TYPE_IO) { 183 } else if (type_ == TYPE_IO) {
184 pump_ = MESSAGE_PUMP_IO; 184 pump_.reset(MESSAGE_PUMP_IO);
185 } else { 185 } else {
186 DCHECK_EQ(TYPE_DEFAULT, type_); 186 DCHECK_EQ(TYPE_DEFAULT, type_);
187 pump_ = new MessagePumpDefault(); 187 pump_.reset(new MessagePumpDefault());
188 } 188 }
189 } 189 }
190 190
191 MessageLoop::~MessageLoop() { 191 MessageLoop::~MessageLoop() {
192 DCHECK_EQ(this, current()); 192 DCHECK_EQ(this, current());
193 193
194 DCHECK(!run_loop_); 194 DCHECK(!run_loop_);
195 195
196 // Clean up any unprocessed tasks, but take care: deleting a task could 196 // Clean up any unprocessed tasks, but take care: deleting a task could
197 // result in the addition of more tasks (e.g., via DeleteSoon). We set a 197 // result in the addition of more tasks (e.g., via DeleteSoon). We set a
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
266 void MessageLoop::RemoveDestructionObserver( 266 void MessageLoop::RemoveDestructionObserver(
267 DestructionObserver* destruction_observer) { 267 DestructionObserver* destruction_observer) {
268 DCHECK_EQ(this, current()); 268 DCHECK_EQ(this, current());
269 destruction_observers_.RemoveObserver(destruction_observer); 269 destruction_observers_.RemoveObserver(destruction_observer);
270 } 270 }
271 271
272 void MessageLoop::PostTask( 272 void MessageLoop::PostTask(
273 const tracked_objects::Location& from_here, 273 const tracked_objects::Location& from_here,
274 const Closure& task) { 274 const Closure& task) {
275 DCHECK(!task.is_null()) << from_here.ToString(); 275 DCHECK(!task.is_null()) << from_here.ToString();
276 PendingTask pending_task( 276 message_loop_proxy_->AddToIncomingQueue(from_here, task, TimeDelta(), true);
277 from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
278 AddToIncomingQueue(&pending_task, false);
279 } 277 }
280 278
281 bool MessageLoop::TryPostTask( 279 bool MessageLoop::TryPostTask(
282 const tracked_objects::Location& from_here, 280 const tracked_objects::Location& from_here,
283 const Closure& task) { 281 const Closure& task) {
284 DCHECK(!task.is_null()) << from_here.ToString(); 282 DCHECK(!task.is_null()) << from_here.ToString();
285 PendingTask pending_task( 283 return message_loop_proxy_->TryAddToIncomingQueue(from_here, task);
286 from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
287 return AddToIncomingQueue(&pending_task, true);
288 } 284 }
289 285
290 void MessageLoop::PostDelayedTask( 286 void MessageLoop::PostDelayedTask(
291 const tracked_objects::Location& from_here, 287 const tracked_objects::Location& from_here,
292 const Closure& task, 288 const Closure& task,
293 TimeDelta delay) { 289 TimeDelta delay) {
294 DCHECK(!task.is_null()) << from_here.ToString(); 290 DCHECK(!task.is_null()) << from_here.ToString();
295 PendingTask pending_task( 291 message_loop_proxy_->AddToIncomingQueue(from_here, task, delay, true);
296 from_here, task, CalculateDelayedRuntime(delay), true);
297 AddToIncomingQueue(&pending_task, false);
298 } 292 }
299 293
300 void MessageLoop::PostNonNestableTask( 294 void MessageLoop::PostNonNestableTask(
301 const tracked_objects::Location& from_here, 295 const tracked_objects::Location& from_here,
302 const Closure& task) { 296 const Closure& task) {
303 DCHECK(!task.is_null()) << from_here.ToString(); 297 DCHECK(!task.is_null()) << from_here.ToString();
304 PendingTask pending_task( 298 message_loop_proxy_->AddToIncomingQueue(from_here, task, TimeDelta(), false);
305 from_here, task, CalculateDelayedRuntime(TimeDelta()), false);
306 AddToIncomingQueue(&pending_task, false);
307 } 299 }
308 300
309 void MessageLoop::PostNonNestableDelayedTask( 301 void MessageLoop::PostNonNestableDelayedTask(
310 const tracked_objects::Location& from_here, 302 const tracked_objects::Location& from_here,
311 const Closure& task, 303 const Closure& task,
312 TimeDelta delay) { 304 TimeDelta delay) {
313 DCHECK(!task.is_null()) << from_here.ToString(); 305 DCHECK(!task.is_null()) << from_here.ToString();
314 PendingTask pending_task( 306 message_loop_proxy_->AddToIncomingQueue(from_here, task, delay, false);
315 from_here, task, CalculateDelayedRuntime(delay), false);
316 AddToIncomingQueue(&pending_task, false);
317 } 307 }
318 308
319 void MessageLoop::Run() { 309 void MessageLoop::Run() {
320 RunLoop run_loop; 310 RunLoop run_loop;
321 run_loop.Run(); 311 run_loop.Run();
322 } 312 }
323 313
324 void MessageLoop::RunUntilIdle() { 314 void MessageLoop::RunUntilIdle() {
325 RunLoop run_loop; 315 RunLoop run_loop;
326 run_loop.RunUntilIdle(); 316 run_loop.RunUntilIdle();
(...skipping 23 matching lines...) Expand all
350 340
351 static void QuitCurrentWhenIdle() { 341 static void QuitCurrentWhenIdle() {
352 MessageLoop::current()->QuitWhenIdle(); 342 MessageLoop::current()->QuitWhenIdle();
353 } 343 }
354 344
355 // static 345 // static
356 Closure MessageLoop::QuitWhenIdleClosure() { 346 Closure MessageLoop::QuitWhenIdleClosure() {
357 return Bind(&QuitCurrentWhenIdle); 347 return Bind(&QuitCurrentWhenIdle);
358 } 348 }
359 349
350 scoped_refptr<MessageLoopProxy> MessageLoop::message_loop_proxy() {
rvargas (doing something else) 2013/06/28 03:00:51 why de-inline this?
alexeypa (please no reviews) 2013/06/28 17:00:57 "base/message_loop/message_loop_proxy_impl.h" cann
351 return message_loop_proxy_;
352 }
353
360 void MessageLoop::SetNestableTasksAllowed(bool allowed) { 354 void MessageLoop::SetNestableTasksAllowed(bool allowed) {
361 if (nestable_tasks_allowed_ != allowed) { 355 if (nestable_tasks_allowed_ != allowed) {
362 nestable_tasks_allowed_ = allowed; 356 nestable_tasks_allowed_ = allowed;
363 if (!nestable_tasks_allowed_) 357 if (!nestable_tasks_allowed_)
364 return; 358 return;
365 // Start the native pump if we are not already pumping. 359 // Start the native pump if we are not already pumping.
366 pump_->ScheduleWork(); 360 pump_->ScheduleWork();
367 } 361 }
368 } 362 }
369 363
370 bool MessageLoop::NestableTasksAllowed() const { 364 bool MessageLoop::NestableTasksAllowed() const {
371 return nestable_tasks_allowed_; 365 return nestable_tasks_allowed_;
372 } 366 }
373 367
374 bool MessageLoop::IsNested() { 368 bool MessageLoop::IsNested() {
375 return run_loop_->run_depth_ > 1; 369 return run_loop_->run_depth_ > 1;
376 } 370 }
377 371
378 void MessageLoop::AddTaskObserver(TaskObserver* task_observer) { 372 void MessageLoop::AddTaskObserver(TaskObserver* task_observer) {
379 DCHECK_EQ(this, current()); 373 DCHECK_EQ(this, current());
380 task_observers_.AddObserver(task_observer); 374 task_observers_.AddObserver(task_observer);
381 } 375 }
382 376
383 void MessageLoop::RemoveTaskObserver(TaskObserver* task_observer) { 377 void MessageLoop::RemoveTaskObserver(TaskObserver* task_observer) {
384 DCHECK_EQ(this, current()); 378 DCHECK_EQ(this, current());
385 task_observers_.RemoveObserver(task_observer); 379 task_observers_.RemoveObserver(task_observer);
386 } 380 }
387 381
388 void MessageLoop::AssertIdle() const { 382 void MessageLoop::AssertIdle() const {
rvargas (doing something else) 2013/06/28 03:00:51 This should probably be a ForTest method.
alexeypa (please no reviews) 2013/06/28 17:00:57 Done.
389 // We only check |incoming_queue_|, since we don't want to lock |work_queue_|. 383 // We only check |incoming_queue_|, since we don't want to lock |work_queue_|.
390 AutoLock lock(incoming_queue_lock_); 384 AutoLock lock(message_loop_proxy_->message_loop_lock_);
rvargas (doing something else) 2013/06/28 03:00:51 we should do something to avoid grabbing this lock
alexeypa (please no reviews) 2013/06/28 17:00:57 Done.
391 DCHECK(incoming_queue_.empty()); 385 DCHECK(incoming_queue_.empty());
392 } 386 }
393 387
394 bool MessageLoop::is_running() const { 388 bool MessageLoop::is_running() const {
395 DCHECK_EQ(this, current()); 389 DCHECK_EQ(this, current());
396 return run_loop_ != NULL; 390 return run_loop_ != NULL;
397 } 391 }
398 392
399 //------------------------------------------------------------------------------ 393 //------------------------------------------------------------------------------
400 394
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
513 void MessageLoop::ReloadWorkQueue() { 507 void MessageLoop::ReloadWorkQueue() {
514 // We can improve performance of our loading tasks from incoming_queue_ to 508 // We can improve performance of our loading tasks from incoming_queue_ to
515 // work_queue_ by waiting until the last minute (work_queue_ is empty) to 509 // work_queue_ by waiting until the last minute (work_queue_ is empty) to
516 // load. That reduces the number of locks-per-task significantly when our 510 // load. That reduces the number of locks-per-task significantly when our
517 // queues get large. 511 // queues get large.
518 if (!work_queue_.empty()) 512 if (!work_queue_.empty())
519 return; // Wait till we *really* need to lock and load. 513 return; // Wait till we *really* need to lock and load.
520 514
521 // Acquire all we can from the inter-thread queue with one lock acquisition. 515 // Acquire all we can from the inter-thread queue with one lock acquisition.
522 { 516 {
523 AutoLock lock(incoming_queue_lock_); 517 AutoLock lock(message_loop_proxy_->message_loop_lock_);
rvargas (doing something else) 2013/06/28 03:00:51 Maybe message_loop_proxy_->LoadPendingTasks(); M
alexeypa (please no reviews) 2013/06/28 17:00:57 Done.
524 if (incoming_queue_.empty()) 518 if (incoming_queue_.empty())
525 return; 519 return;
526 incoming_queue_.Swap(&work_queue_); // Constant time 520 incoming_queue_.Swap(&work_queue_); // Constant time
527 DCHECK(incoming_queue_.empty()); 521 DCHECK(incoming_queue_.empty());
528 } 522 }
529 } 523 }
530 524
531 bool MessageLoop::DeletePendingTasks() { 525 bool MessageLoop::DeletePendingTasks() {
532 bool did_work = !work_queue_.empty(); 526 bool did_work = !work_queue_.empty();
533 while (!work_queue_.empty()) { 527 while (!work_queue_.empty()) {
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
589 Time::ActivateHighResolutionTimer(false); 583 Time::ActivateHighResolutionTimer(false);
590 high_resolution_timer_expiration_ = TimeTicks(); 584 high_resolution_timer_expiration_ = TimeTicks();
591 } 585 }
592 } 586 }
593 #endif 587 #endif
594 588
595 return delayed_run_time; 589 return delayed_run_time;
596 } 590 }
597 591
598 // Possibly called on a background thread! 592 // Possibly called on a background thread!
599 bool MessageLoop::AddToIncomingQueue(PendingTask* pending_task, 593 void MessageLoop::AddToIncomingQueue(const tracked_objects::Location& from_here,
600 bool use_try_lock) { 594 const Closure& task,
595 TimeDelta delay,
596 bool nestable) {
601 // Warning: Don't try to short-circuit, and handle this thread's tasks more 597 // Warning: Don't try to short-circuit, and handle this thread's tasks more
602 // directly, as it could starve handling of foreign threads. Put every task 598 // directly, as it could starve handling of foreign threads. Put every task
603 // into this queue. 599 // into this queue.
604 600
605 scoped_refptr<MessagePump> pump; 601 // This should only be called while the lock is taken.
606 { 602 message_loop_proxy_->message_loop_lock_.AssertAcquired();
607 if (use_try_lock) {
608 if (!incoming_queue_lock_.Try()) {
609 pending_task->task.Reset();
610 return false;
611 }
612 } else {
613 incoming_queue_lock_.Acquire();
614 }
615 AutoLock locked(incoming_queue_lock_, AutoLock::AlreadyAcquired());
616 // Initialize the sequence number. The sequence number is used for delayed
617 // tasks (to faciliate FIFO sorting when two tasks have the same
618 // delayed_run_time value) and for identifying the task in about:tracing.
619 pending_task->sequence_num = next_sequence_num_++;
620 603
621 TRACE_EVENT_FLOW_BEGIN0("task", "MessageLoop::PostTask", 604 PendingTask pending_task(
622 TRACE_ID_MANGLE(GetTaskTraceID(*pending_task, this))); 605 from_here, task, CalculateDelayedRuntime(delay), nestable);
623 606
624 bool was_empty = incoming_queue_.empty(); 607 // Initialize the sequence number. The sequence number is used for delayed
625 incoming_queue_.push(*pending_task); 608 // tasks (to faciliate FIFO sorting when two tasks have the same
626 pending_task->task.Reset(); 609 // delayed_run_time value) and for identifying the task in about:tracing.
627 if (!was_empty) 610 pending_task.sequence_num = next_sequence_num_++;
628 return true; // Someone else should have started the sub-pump.
629 611
630 pump = pump_; 612 TRACE_EVENT_FLOW_BEGIN0("task", "MessageLoop::PostTask",
631 } 613 TRACE_ID_MANGLE(GetTaskTraceID(pending_task, this)));
632 // Since the incoming_queue_ may contain a task that destroys this message
633 // loop, we cannot exit incoming_queue_lock_ until we are done with |this|.
634 // We use a stack-based reference to the message pump so that we can call
635 // ScheduleWork outside of incoming_queue_lock_.
636 614
637 pump->ScheduleWork(); 615 bool was_empty = incoming_queue_.empty();
638 return true; 616 incoming_queue_.push(pending_task);
617 pending_task.task.Reset();
618
619 // Wake up the pump.
620 if (was_empty)
621 pump_->ScheduleWork();
639 } 622 }
640 623
641 //------------------------------------------------------------------------------ 624 //------------------------------------------------------------------------------
642 // Method and data for histogramming events and actions taken by each instance 625 // Method and data for histogramming events and actions taken by each instance
643 // on each thread. 626 // on each thread.
644 627
645 void MessageLoop::StartHistogrammer() { 628 void MessageLoop::StartHistogrammer() {
646 #if !defined(OS_NACL) // NaCl build has no metrics code. 629 #if !defined(OS_NACL) // NaCl build has no metrics code.
647 if (enable_histogrammer_ && !message_histogram_ 630 if (enable_histogrammer_ && !message_histogram_
648 && StatisticsRecorder::IsActive()) { 631 && StatisticsRecorder::IsActive()) {
(...skipping 176 matching lines...) Expand 10 before | Expand all | Expand 10 after
825 fd, 808 fd,
826 persistent, 809 persistent,
827 mode, 810 mode,
828 controller, 811 controller,
829 delegate); 812 delegate);
830 } 813 }
831 814
832 #endif 815 #endif
833 816
834 } // namespace base 817 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698