Index: trunk/src/base/message_loop/message_loop.cc |
=================================================================== |
--- trunk/src/base/message_loop/message_loop.cc (revision 212951) |
+++ trunk/src/base/message_loop/message_loop.cc (working copy) |
@@ -13,6 +13,7 @@ |
#include "base/lazy_instance.h" |
#include "base/logging.h" |
#include "base/memory/scoped_ptr.h" |
+#include "base/message_loop/message_loop_proxy_impl.h" |
#include "base/message_loop/message_pump_default.h" |
#include "base/metrics/histogram.h" |
#include "base/metrics/statistics_recorder.h" |
@@ -88,6 +89,14 @@ |
MessageLoop::MessagePumpFactory* message_pump_for_ui_factory_ = NULL; |
+// Create a process-wide unique ID to represent this task in trace events. This |
+// will be mangled with a Process ID hash to reduce the likelyhood of colliding |
+// with MessageLoop pointers on other processes. |
+uint64 GetTaskTraceID(const PendingTask& task, MessageLoop* loop) { |
+ return (static_cast<uint64>(task.sequence_num) << 32) | |
+ static_cast<uint64>(reinterpret_cast<intptr_t>(loop)); |
+} |
+ |
// Returns true if MessagePump::ScheduleWork() must be called one |
// time for every task that is added to the MessageLoop incoming queue. |
bool AlwaysNotifyPump(MessageLoop::Type type) { |
@@ -137,19 +146,18 @@ |
MessageLoop::MessageLoop(Type type) |
: type_(type), |
+ nestable_tasks_allowed_(true), |
exception_restoration_(false), |
- nestable_tasks_allowed_(true), |
+ message_histogram_(NULL), |
+ run_loop_(NULL), |
#if defined(OS_WIN) |
os_modal_loop_(false), |
#endif // OS_WIN |
- message_histogram_(NULL), |
- run_loop_(NULL) { |
+ next_sequence_num_(0) { |
DCHECK(!current()) << "should only have one message loop per thread"; |
lazy_tls_ptr.Pointer()->Set(this); |
- incoming_task_queue_ = new internal::IncomingTaskQueue(this); |
- message_loop_proxy_ = |
- new internal::MessageLoopProxyImpl(incoming_task_queue_); |
+ message_loop_proxy_ = new MessageLoopProxyImpl(); |
thread_task_runner_handle_.reset( |
new ThreadTaskRunnerHandle(message_loop_proxy_)); |
@@ -169,7 +177,7 @@ |
#define MESSAGE_PUMP_UI NULL |
// ipc_channel_nacl.cc uses a worker thread to do socket reads currently, and |
// doesn't require extra support for watching file descriptors. |
-#define MESSAGE_PUMP_IO new MessagePumpDefault() |
+#define MESSAGE_PUMP_IO new MessagePumpDefault(); |
#elif defined(OS_POSIX) // POSIX but not MACOSX. |
#define MESSAGE_PUMP_UI new MessagePumpForUI() |
#define MESSAGE_PUMP_IO new MessagePumpLibevent() |
@@ -179,14 +187,14 @@ |
if (type_ == TYPE_UI) { |
if (message_pump_for_ui_factory_) |
- pump_.reset(message_pump_for_ui_factory_()); |
+ pump_ = message_pump_for_ui_factory_(); |
else |
- pump_.reset(MESSAGE_PUMP_UI); |
+ pump_ = MESSAGE_PUMP_UI; |
} else if (type_ == TYPE_IO) { |
- pump_.reset(MESSAGE_PUMP_IO); |
+ pump_ = MESSAGE_PUMP_IO; |
} else { |
DCHECK_EQ(TYPE_DEFAULT, type_); |
- pump_.reset(new MessagePumpDefault()); |
+ pump_ = new MessagePumpDefault(); |
} |
} |
@@ -218,13 +226,23 @@ |
thread_task_runner_handle_.reset(); |
- // Tell the incoming queue that we are dying. |
- incoming_task_queue_->WillDestroyCurrentMessageLoop(); |
- incoming_task_queue_ = NULL; |
+ // Tell the message_loop_proxy that we are dying. |
+ static_cast<MessageLoopProxyImpl*>(message_loop_proxy_.get())-> |
+ WillDestroyCurrentMessageLoop(); |
message_loop_proxy_ = NULL; |
// OK, now make it so that no one can find us. |
lazy_tls_ptr.Pointer()->Set(NULL); |
+ |
+#if defined(OS_WIN) |
+ // If we left the high-resolution timer activated, deactivate it now. |
+ // Doing this is not-critical, it is mainly to make sure we track |
+ // the high resolution timer activations properly in our unit tests. |
+ if (!high_resolution_timer_expiration_.is_null()) { |
+ Time::ActivateHighResolutionTimer(false); |
+ high_resolution_timer_expiration_ = TimeTicks(); |
+ } |
+#endif |
} |
// static |
@@ -265,14 +283,18 @@ |
const tracked_objects::Location& from_here, |
const Closure& task) { |
DCHECK(!task.is_null()) << from_here.ToString(); |
- incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), true); |
+ PendingTask pending_task( |
+ from_here, task, CalculateDelayedRuntime(TimeDelta()), true); |
+ AddToIncomingQueue(&pending_task, false); |
} |
bool MessageLoop::TryPostTask( |
const tracked_objects::Location& from_here, |
const Closure& task) { |
DCHECK(!task.is_null()) << from_here.ToString(); |
- return incoming_task_queue_->TryAddToIncomingQueue(from_here, task); |
+ PendingTask pending_task( |
+ from_here, task, CalculateDelayedRuntime(TimeDelta()), true); |
+ return AddToIncomingQueue(&pending_task, true); |
} |
void MessageLoop::PostDelayedTask( |
@@ -280,14 +302,18 @@ |
const Closure& task, |
TimeDelta delay) { |
DCHECK(!task.is_null()) << from_here.ToString(); |
- incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, true); |
+ PendingTask pending_task( |
+ from_here, task, CalculateDelayedRuntime(delay), true); |
+ AddToIncomingQueue(&pending_task, false); |
} |
void MessageLoop::PostNonNestableTask( |
const tracked_objects::Location& from_here, |
const Closure& task) { |
DCHECK(!task.is_null()) << from_here.ToString(); |
- incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), false); |
+ PendingTask pending_task( |
+ from_here, task, CalculateDelayedRuntime(TimeDelta()), false); |
+ AddToIncomingQueue(&pending_task, false); |
} |
void MessageLoop::PostNonNestableDelayedTask( |
@@ -295,7 +321,9 @@ |
const Closure& task, |
TimeDelta delay) { |
DCHECK(!task.is_null()) << from_here.ToString(); |
- incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, false); |
+ PendingTask pending_task( |
+ from_here, task, CalculateDelayedRuntime(delay), false); |
+ AddToIncomingQueue(&pending_task, false); |
} |
void MessageLoop::Run() { |
@@ -367,26 +395,17 @@ |
task_observers_.RemoveObserver(task_observer); |
} |
+void MessageLoop::AssertIdle() const { |
+ // We only check |incoming_queue_|, since we don't want to lock |work_queue_|. |
+ AutoLock lock(incoming_queue_lock_); |
+ DCHECK(incoming_queue_.empty()); |
+} |
+ |
bool MessageLoop::is_running() const { |
DCHECK_EQ(this, current()); |
return run_loop_ != NULL; |
} |
-bool MessageLoop::IsHighResolutionTimerEnabledForTesting() { |
- return incoming_task_queue_->IsHighResolutionTimerEnabledForTesting(); |
-} |
- |
-bool MessageLoop::IsIdleForTesting() { |
- // We only check the imcoming queue|, since we don't want to lock the work |
- // queue. |
- return incoming_task_queue_->IsIdleForTesting(); |
-} |
- |
-void MessageLoop::LockWaitUnLockForTesting(WaitableEvent* caller_wait, |
- WaitableEvent* caller_signal) { |
- incoming_task_queue_->LockWaitUnLockForTesting(caller_wait, caller_signal); |
-} |
- |
//------------------------------------------------------------------------------ |
// Runs the loop in two different SEH modes: |
@@ -451,7 +470,7 @@ |
tracked_objects::ThreadData::NowForStartOfRun(pending_task.birth_tally); |
TRACE_EVENT_FLOW_END1("task", "MessageLoop::PostTask", |
- TRACE_ID_MANGLE(GetTaskTraceID(pending_task)), |
+ TRACE_ID_MANGLE(GetTaskTraceID(pending_task, this)), |
"queue_duration", |
(start_time - pending_task.EffectiveTimePosted()).InMilliseconds()); |
TRACE_EVENT2("task", "MessageLoop::RunTask", |
@@ -504,6 +523,24 @@ |
delayed_work_queue_.push(pending_task); |
} |
+void MessageLoop::ReloadWorkQueue() { |
+ // We can improve performance of our loading tasks from incoming_queue_ to |
+ // work_queue_ by waiting until the last minute (work_queue_ is empty) to |
+ // load. That reduces the number of locks-per-task significantly when our |
+ // queues get large. |
+ if (!work_queue_.empty()) |
+ return; // Wait till we *really* need to lock and load. |
+ |
+ // Acquire all we can from the inter-thread queue with one lock acquisition. |
+ { |
+ AutoLock lock(incoming_queue_lock_); |
+ if (incoming_queue_.empty()) |
+ return; |
+ incoming_queue_.Swap(&work_queue_); // Constant time |
+ DCHECK(incoming_queue_.empty()); |
+ } |
+} |
+ |
bool MessageLoop::DeletePendingTasks() { |
bool did_work = !work_queue_.empty(); |
while (!work_queue_.empty()) { |
@@ -533,25 +570,87 @@ |
return did_work; |
} |
-uint64 MessageLoop::GetTaskTraceID(const PendingTask& task) { |
- return (static_cast<uint64>(task.sequence_num) << 32) | |
- static_cast<uint64>(reinterpret_cast<intptr_t>(this)); |
-} |
+TimeTicks MessageLoop::CalculateDelayedRuntime(TimeDelta delay) { |
+ TimeTicks delayed_run_time; |
+ if (delay > TimeDelta()) { |
+ delayed_run_time = TimeTicks::Now() + delay; |
-void MessageLoop::ReloadWorkQueue() { |
- // We can improve performance of our loading tasks from the incoming queue to |
- // |*work_queue| by waiting until the last minute (|*work_queue| is empty) to |
- // load. That reduces the number of locks-per-task significantly when our |
- // queues get large. |
- if (work_queue_.empty()) |
- incoming_task_queue_->ReloadWorkQueue(&work_queue_); |
+#if defined(OS_WIN) |
+ if (high_resolution_timer_expiration_.is_null()) { |
+ // Windows timers are granular to 15.6ms. If we only set high-res |
+ // timers for those under 15.6ms, then a 18ms timer ticks at ~32ms, |
+ // which as a percentage is pretty inaccurate. So enable high |
+ // res timers for any timer which is within 2x of the granularity. |
+ // This is a tradeoff between accuracy and power management. |
+ bool needs_high_res_timers = delay.InMilliseconds() < |
+ (2 * Time::kMinLowResolutionThresholdMs); |
+ if (needs_high_res_timers) { |
+ if (Time::ActivateHighResolutionTimer(true)) { |
+ high_resolution_timer_expiration_ = TimeTicks::Now() + |
+ TimeDelta::FromMilliseconds(kHighResolutionTimerModeLeaseTimeMs); |
+ } |
+ } |
+ } |
+#endif |
+ } else { |
+ DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative"; |
+ } |
+ |
+#if defined(OS_WIN) |
+ if (!high_resolution_timer_expiration_.is_null()) { |
+ if (TimeTicks::Now() > high_resolution_timer_expiration_) { |
+ Time::ActivateHighResolutionTimer(false); |
+ high_resolution_timer_expiration_ = TimeTicks(); |
+ } |
+ } |
+#endif |
+ |
+ return delayed_run_time; |
} |
-void MessageLoop::ScheduleWork(bool was_empty) { |
- // The Android UI message loop needs to get notified each time |
- // a task is added to the incoming queue. |
- if (was_empty || AlwaysNotifyPump(type_)) |
- pump_->ScheduleWork(); |
+// Possibly called on a background thread! |
+bool MessageLoop::AddToIncomingQueue(PendingTask* pending_task, |
+ bool use_try_lock) { |
+ // Warning: Don't try to short-circuit, and handle this thread's tasks more |
+ // directly, as it could starve handling of foreign threads. Put every task |
+ // into this queue. |
+ |
+ scoped_refptr<MessagePump> pump; |
+ { |
+ if (use_try_lock) { |
+ if (!incoming_queue_lock_.Try()) { |
+ pending_task->task.Reset(); |
+ return false; |
+ } |
+ } else { |
+ incoming_queue_lock_.Acquire(); |
+ } |
+ AutoLock locked(incoming_queue_lock_, AutoLock::AlreadyAcquired()); |
+ // Initialize the sequence number. The sequence number is used for delayed |
+ // tasks (to faciliate FIFO sorting when two tasks have the same |
+ // delayed_run_time value) and for identifying the task in about:tracing. |
+ pending_task->sequence_num = next_sequence_num_++; |
+ |
+ TRACE_EVENT_FLOW_BEGIN0("task", "MessageLoop::PostTask", |
+ TRACE_ID_MANGLE(GetTaskTraceID(*pending_task, this))); |
+ |
+ bool was_empty = incoming_queue_.empty(); |
+ incoming_queue_.push(*pending_task); |
+ pending_task->task.Reset(); |
+ // The Android UI message loop needs to get notified each time |
+ // a task is added to the incoming queue. |
+ if (!was_empty && !AlwaysNotifyPump(type_)) |
+ return true; // Someone else should have started the sub-pump. |
+ |
+ pump = pump_; |
+ } |
+ // Since the incoming_queue_ may contain a task that destroys this message |
+ // loop, we cannot exit incoming_queue_lock_ until we are done with |this|. |
+ // We use a stack-based reference to the message pump so that we can call |
+ // ScheduleWork outside of incoming_queue_lock_. |
+ |
+ pump->ScheduleWork(); |
+ return true; |
} |
//------------------------------------------------------------------------------ |