Index: chrome/common/ipc_sync_channel.cc |
=================================================================== |
--- chrome/common/ipc_sync_channel.cc (revision 3707) |
+++ chrome/common/ipc_sync_channel.cc (working copy) |
@@ -37,17 +37,21 @@ |
class SyncChannel::ReceivedSyncMsgQueue : |
public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { |
public: |
- ReceivedSyncMsgQueue() : |
- blocking_event_(CreateEvent(NULL, FALSE, FALSE, NULL)), |
- task_pending_(false), |
- listener_message_loop_(MessageLoop::current()) { |
+ // Returns the ReceivedSyncMsgQueue instance for this thread, creating one |
+ // if necessary. Call RemoveListener on the same thread when done. |
+ static ReceivedSyncMsgQueue* AddListener() { |
+ // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple |
+ // SyncChannel objects can block the same thread). |
+ ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); |
+ if (!rv) { |
+ rv = new ReceivedSyncMsgQueue(); |
+ ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); |
+ } |
+ rv->listener_count_++; |
+ return rv; |
} |
~ReceivedSyncMsgQueue() { |
- DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
- DCHECK(MessageLoop::current() == listener_message_loop_); |
- CloseHandle(blocking_event_); |
- lazy_tls_ptr_.Pointer()->Set(NULL); |
} |
// Called on IPC thread when a synchronous message or reply arrives. |
@@ -66,7 +70,7 @@ |
channel_id)); |
} |
- SetEvent(blocking_event_); |
+ SetEvent(dispatch_event_); |
if (!was_task_pending) { |
listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( |
this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); |
@@ -105,7 +109,7 @@ |
} |
#ifdef IPC_MESSAGE_LOG_ENABLED |
- IPC::Logging* logger = IPC::Logging::current(); |
+ Logging* logger = Logging::current(); |
if (logger->Enabled()) |
logger->OnPreDispatchMessage(*message); |
#endif |
@@ -123,7 +127,7 @@ |
} |
// Called on the IPC thread when the current sync Send() call is unblocked. |
- void OnUnblock() { |
+ void DidUnblock() { |
if (!received_replies_.empty()) { |
MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( |
this, &ReceivedSyncMsgQueue::DispatchReplies)); |
@@ -149,9 +153,14 @@ |
message_queue_.push(temp_queue.front()); |
temp_queue.pop(); |
} |
+ |
+ if (--listener_count_ == 0) { |
+ DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
+ lazy_tls_ptr_.Pointer()->Set(NULL); |
+ } |
} |
- HANDLE blocking_event() { return blocking_event_; } |
+ HANDLE dispatch_event() { return dispatch_event_; } |
MessageLoop* listener_message_loop() { return listener_message_loop_; } |
// Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
@@ -159,12 +168,19 @@ |
lazy_tls_ptr_; |
private: |
+ ReceivedSyncMsgQueue() : |
+ dispatch_event_(CreateEvent(NULL, TRUE, FALSE, NULL)), |
+ task_pending_(false), |
+ listener_message_loop_(MessageLoop::current()), |
+ listener_count_(0) { |
+ } |
+ |
// Called on the ipc thread to check if we can unblock any current Send() |
// calls based on a queued reply. |
void DispatchReplies() { |
for (size_t i = 0; i < received_replies_.size(); ++i) { |
Message* message = received_replies_[i].message; |
- if (received_replies_[i].context->UnblockListener(message)) { |
+ if (received_replies_[i].context->TryToUnblockListener(message)) { |
delete message; |
received_replies_.erase(received_replies_.begin() + i); |
return; |
@@ -172,13 +188,6 @@ |
} |
} |
- // Set when we got a synchronous message that we must respond to as the |
- // sender needs its reply before it can reply to our original synchronous |
- // message. |
- HANDLE blocking_event_; |
- |
- MessageLoop* listener_message_loop_; |
- |
// Holds information about a queued synchronous message. |
struct ReceivedMessage { |
ReceivedMessage(Message* m, Channel::Listener* l, const std::wstring& i) |
@@ -190,8 +199,6 @@ |
typedef std::queue<ReceivedMessage> SyncMessageQueue; |
SyncMessageQueue message_queue_; |
- Lock message_lock_; |
- bool task_pending_; |
// Holds information about a queued reply message. |
struct Reply { |
@@ -204,6 +211,15 @@ |
}; |
std::vector<Reply> received_replies_; |
+ |
+ // Set when we got a synchronous message that we must respond to as the |
+ // sender needs its reply before it can reply to our original synchronous |
+ // message. |
+ ScopedHandle dispatch_event_; |
+ MessageLoop* listener_message_loop_; |
+ Lock message_lock_; |
+ bool task_pending_; |
+ int listener_count_; |
}; |
base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
@@ -212,120 +228,89 @@ |
SyncChannel::SyncContext::SyncContext( |
Channel::Listener* listener, |
MessageFilter* filter, |
- MessageLoop* ipc_thread) |
+ MessageLoop* ipc_thread, |
+ HANDLE shutdown_event) |
: ChannelProxy::Context(listener, filter, ipc_thread), |
- channel_closed_(false), |
- reply_deserialize_result_(false) { |
- // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple |
- // SyncChannel objects that can block the same thread). |
- received_sync_msgs_ = ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Get(); |
- |
- if (!received_sync_msgs_) { |
- // Stash a pointer to the listener thread's ReceivedSyncMsgQueue, as we |
- // need to be able to access it in the IPC thread. |
- received_sync_msgs_ = new ReceivedSyncMsgQueue(); |
- ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(received_sync_msgs_); |
- } |
- |
- // Addref manually so that we can ensure destruction on the listener thread |
- // (so that the TLS object is NULLd). |
- received_sync_msgs_->AddRef(); |
+ shutdown_event_(shutdown_event), |
+ received_sync_msgs_(ReceivedSyncMsgQueue::AddListener()){ |
} |
SyncChannel::SyncContext::~SyncContext() { |
while (!deserializers_.empty()) |
- PopDeserializer(true); |
- |
- received_sync_msgs_->listener_message_loop()->ReleaseSoon( |
- FROM_HERE, received_sync_msgs_); |
+ Pop(); |
} |
// Adds information about an outgoing sync message to the context so that |
// we know how to deserialize the reply. Returns a handle that's set when |
// the reply has arrived. |
-HANDLE SyncChannel::SyncContext::Push(IPC::SyncMessage* sync_msg) { |
- PendingSyncMsg pending(IPC::SyncMessage::GetMessageId(*sync_msg), |
+void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
+ PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), |
sync_msg->GetReplyDeserializer(), |
CreateEvent(NULL, FALSE, FALSE, NULL)); |
AutoLock auto_lock(deserializers_lock_); |
- deserializers_.push(pending); |
+ deserializers_.push_back(pending); |
+} |
- return pending.reply_event; |
+bool SyncChannel::SyncContext::Pop() { |
+ AutoLock auto_lock(deserializers_lock_); |
+ PendingSyncMsg msg = deserializers_.back(); |
+ delete msg.deserializer; |
+ CloseHandle(msg.done_event); |
+ deserializers_.pop_back(); |
+ return msg.send_result; |
} |
-HANDLE SyncChannel::SyncContext::blocking_event() { |
- return received_sync_msgs_->blocking_event(); |
+HANDLE SyncChannel::SyncContext::GetSendDoneEvent() { |
+ AutoLock auto_lock(deserializers_lock_); |
+ return deserializers_.back().done_event; |
} |
+HANDLE SyncChannel::SyncContext::GetDispatchEvent() { |
+ return received_sync_msgs_->dispatch_event(); |
+} |
+ |
void SyncChannel::SyncContext::DispatchMessages() { |
received_sync_msgs_->DispatchMessages(); |
} |
-void SyncChannel::SyncContext::RemoveListener(Channel::Listener* listener) { |
- received_sync_msgs_->RemoveListener(listener); |
-} |
- |
-bool SyncChannel::SyncContext::UnblockListener(const Message* msg) { |
- bool rv = false; |
- HANDLE reply_event = NULL; |
+bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
{ |
- if (channel_closed_) { |
- // The channel is closed, or we couldn't connect, so cancel all Send() |
- // calls. |
- reply_deserialize_result_ = false; |
- { |
- AutoLock auto_lock(deserializers_lock_); |
- if (!deserializers_.empty()) |
- reply_event = deserializers_.top().reply_event; |
- } |
+ AutoLock auto_lock(deserializers_lock_); |
+ if (deserializers_.empty() || |
+ !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
+ return false; |
+ } |
- if (reply_event) |
- PopDeserializer(false); |
- } else { |
- { |
- AutoLock auto_lock(deserializers_lock_); |
- if (deserializers_.empty()) |
- return false; |
- |
- if (!IPC::SyncMessage::IsMessageReplyTo(*msg, deserializers_.top().id)) |
- return false; |
- |
- rv = true; |
- if (msg->is_reply_error()) { |
- reply_deserialize_result_ = false; |
- } else { |
- reply_deserialize_result_ = deserializers_.top().deserializer-> |
- SerializeOutputParameters(*msg); |
- } |
- |
- // Can't CloseHandle the event just yet, since doing so might cause the |
- // Wait call above to never return. |
- reply_event = deserializers_.top().reply_event; |
- } |
- PopDeserializer(false); |
+ if (!msg->is_reply_error()) { |
+ deserializers_.back().send_result = deserializers_.back().deserializer-> |
+ SerializeOutputParameters(*msg); |
} |
+ SetEvent(deserializers_.back().done_event); |
} |
- if (reply_event) |
- SetEvent(reply_event); |
- |
// We got a reply to a synchronous Send() call that's blocking the listener |
// thread. However, further down the call stack there could be another |
// blocking Send() call, whose reply we received after we made this last |
// Send() call. So check if we have any queued replies available that |
// can now unblock the listener thread. |
- received_sync_msgs_->OnUnblock(); |
+ received_sync_msgs_->DidUnblock(); |
- return rv; |
+ return true; |
} |
-// Called on the IPC thread. |
+void SyncChannel::SyncContext::Clear() { |
+ CancelPendingSends(); |
+ received_sync_msgs_->RemoveListener(listener()); |
+ |
+ Context::Clear(); |
+} |
+ |
void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { |
// Give the filters a chance at processing this message. |
if (TryFilters(msg)) |
return; |
- if (UnblockListener(&msg)) |
+ if (TryToUnblockListener(&msg)) |
return; |
if (msg.should_unblock()) { |
@@ -338,149 +323,158 @@ |
return; |
} |
- return Context::OnMessageReceived(msg); |
+ return Context::OnMessageReceivedNoFilter(msg); |
} |
-// Called on the IPC thread. |
void SyncChannel::SyncContext::OnChannelError() { |
- channel_closed_ = true; |
- UnblockListener(NULL); |
- |
+ CancelPendingSends(); |
Context::OnChannelError(); |
} |
-void SyncChannel::SyncContext::PopDeserializer(bool close_reply_event) { |
- PendingSyncMsg msg = deserializers_.top(); |
- delete msg.deserializer; |
- if (close_reply_event) |
- CloseHandle(msg.reply_event); |
- deserializers_.pop(); |
+void SyncChannel::SyncContext::OnChannelOpened() { |
+ shutdown_watcher_.StartWatching(shutdown_event_, this); |
+ Context::OnChannelOpened(); |
} |
-SyncChannel::SyncChannel(const std::wstring& channel_id, Channel::Mode mode, |
- Channel::Listener* listener, MessageFilter* filter, |
- MessageLoop* ipc_message_loop, |
- bool create_pipe_now, HANDLE shutdown_event) |
- : ChannelProxy(channel_id, mode, ipc_message_loop, |
- new SyncContext(listener, filter, ipc_message_loop), |
- create_pipe_now), |
- shutdown_event_(shutdown_event), |
+void SyncChannel::SyncContext::OnChannelClosed() { |
+ shutdown_watcher_.StopWatching(); |
+ Context::OnChannelClosed(); |
+} |
+ |
+void SyncChannel::SyncContext::OnSendTimeout(int message_id) { |
+ AutoLock auto_lock(deserializers_lock_); |
+ PendingSyncMessageQueue::iterator iter; |
+ for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { |
+ if ((*iter).id == message_id) { |
+ SetEvent((*iter).done_event); |
+ break; |
+ } |
+ } |
+} |
+ |
+void SyncChannel::SyncContext::CancelPendingSends() { |
+ AutoLock auto_lock(deserializers_lock_); |
+ PendingSyncMessageQueue::iterator iter; |
+ for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) |
+ SetEvent((*iter).done_event); |
+} |
+ |
+void SyncChannel::SyncContext::OnObjectSignaled(HANDLE object) { |
+ DCHECK(object == shutdown_event_); |
+ // Process shut down before we can get a reply to a synchronous message. |
+ // Cancel pending Send calls, which will end up setting the send done event. |
+ CancelPendingSends(); |
+} |
+ |
+ |
+SyncChannel::SyncChannel( |
+ const std::wstring& channel_id, Channel::Mode mode, |
+ Channel::Listener* listener, MessageFilter* filter, |
+ MessageLoop* ipc_message_loop, bool create_pipe_now, HANDLE shutdown_event) |
+ : ChannelProxy( |
+ channel_id, mode, ipc_message_loop, |
+ new SyncContext(listener, filter, ipc_message_loop, shutdown_event), |
+ create_pipe_now), |
sync_messages_with_no_timeout_allowed_(true) { |
- DCHECK(shutdown_event_ != NULL); |
+ // Ideally we only want to watch this object when running a nested message |
+ // loop. However, we don't know when it exits if there's another nested |
+ // message loop running under it or not, so we wouldn't know whether to |
+ // stop or keep watching. So we always watch it, and create the event as |
+ // manual reset since the object watcher might otherwise reset the event |
+ // when we're doing a WaitForMultipleObjects. |
+ dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); |
} |
SyncChannel::~SyncChannel() { |
- // The listener ensures that its lifetime is greater than SyncChannel. But |
- // after SyncChannel is destructed there's no guarantee that the listener is |
- // still around, so we wouldn't want ReceivedSyncMsgQueue to call the |
- // listener. |
- sync_context()->RemoveListener(listener()); |
} |
-bool SyncChannel::Send(IPC::Message* message) { |
+bool SyncChannel::Send(Message* message) { |
return SendWithTimeout(message, INFINITE); |
} |
-bool SyncChannel::SendWithTimeout(IPC::Message* message, int timeout_ms) { |
- bool message_is_sync = message->is_sync(); |
- HANDLE pump_messages_event = NULL; |
+bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { |
+ if (!message->is_sync()) { |
+ ChannelProxy::Send(message); |
+ return true; |
+ } |
- HANDLE reply_event = NULL; |
- if (message_is_sync) { |
- DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); |
- IPC::SyncMessage* sync_msg = static_cast<IPC::SyncMessage*>(message); |
- reply_event = sync_context()->Push(sync_msg); |
- pump_messages_event = sync_msg->pump_messages_event(); |
+ // *this* might get deleted in WaitForReply. |
+ scoped_refptr<SyncContext> context(sync_context()); |
+ if (WaitForSingleObject(context->shutdown_event(), 0) == WAIT_OBJECT_0) { |
+ delete message; |
+ return false; |
} |
- // Send the message using the ChannelProxy |
+ DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); |
+ SyncMessage* sync_msg = static_cast<SyncMessage*>(message); |
+ context->Push(sync_msg); |
+ int message_id = SyncMessage::GetMessageId(*sync_msg); |
+ HANDLE pump_messages_event = sync_msg->pump_messages_event(); |
+ |
ChannelProxy::Send(message); |
- if (!message_is_sync) |
- return true; |
- do { |
- // Wait for reply, or for any other incoming synchronous message. |
- DCHECK(reply_event != NULL); |
- HANDLE objects[] = { shutdown_event_, |
- reply_event, |
- sync_context()->blocking_event(), |
- pump_messages_event}; |
+ if (timeout_ms != INFINITE) { |
+ // We use the sync message id so that when a message times out, we don't |
+ // confuse it with another send that is either above/below this Send in |
+ // the call stack. |
+ context->ipc_message_loop()->PostDelayedTask(FROM_HERE, |
+ NewRunnableMethod(context.get(), |
+ &SyncContext::OnSendTimeout, message_id), timeout_ms); |
+ } |
- DWORD result; |
- TimeTicks before = TimeTicks::Now(); |
- if (pump_messages_event == NULL) { |
- // No need to pump messages since we didn't get an event to check. |
- result = WaitForMultipleObjects(3, objects, FALSE, timeout_ms); |
- } else { |
- // If the event is set, then we pump messages. Otherwise we also wait on |
- // it so that if it gets set we start pumping messages. |
- if (WaitForSingleObject(pump_messages_event, 0) == WAIT_OBJECT_0) { |
- // Before calling MsgWaitForMultipleObjects() we check that our events |
- // are not signaled. The windows message queue might always have events |
- // starving the checking of our events otherwise. |
- result = WaitForMultipleObjects(3, objects, FALSE, 0); |
- if (result == WAIT_TIMEOUT) { |
- result = MsgWaitForMultipleObjects(3, objects, FALSE, timeout_ms, |
- QS_ALLINPUT); |
- } |
- } else { |
- result = WaitForMultipleObjects(4, objects, FALSE, timeout_ms); |
- } |
- } |
+ // Wait for reply, or for any other incoming synchronous messages. |
+ WaitForReply(pump_messages_event); |
- if (result == WAIT_OBJECT_0 || result == WAIT_TIMEOUT) { |
- // Process shut down before we can get a reply to a synchronous message, |
- // or timed-out. Unblock the thread. |
- sync_context()->PopDeserializer(true); |
- return false; |
- } |
+ return context->Pop(); |
+} |
- if (result == WAIT_OBJECT_0 + 1) { |
- // We got the reply to our synchronous message. |
- CloseHandle(reply_event); |
- return sync_context()->reply_deserialize_result(); |
- } |
- |
- if (result == WAIT_OBJECT_0 + 2) { |
+void SyncChannel::WaitForReply(HANDLE pump_messages_event) { |
+ while (true) { |
+ HANDLE objects[] = { sync_context()->GetDispatchEvent(), |
+ sync_context()->GetSendDoneEvent(), |
+ pump_messages_event }; |
+ uint32 count = pump_messages_event ? 3: 2; |
+ DWORD result = WaitForMultipleObjects(count, objects, FALSE, INFINITE); |
+ if (result == WAIT_OBJECT_0) { |
// We're waiting for a reply, but we received a blocking synchronous |
// call. We must process it or otherwise a deadlock might occur. |
+ ResetEvent(sync_context()->GetDispatchEvent()); |
sync_context()->DispatchMessages(); |
- } else if (result == WAIT_OBJECT_0 + 3) { |
- // Run a nested messsage loop to pump all the thread's messages. We |
- // shutdown the nested loop when there are no more messages. |
- pump_messages_events_.push(pump_messages_event); |
- bool old_state = MessageLoop::current()->NestableTasksAllowed(); |
- MessageLoop::current()->SetNestableTasksAllowed(true); |
- // Process a message, but come right back out of the MessageLoop (don't |
- // loop, sleep, or wait for a kMsgQuit). |
- MessageLoop::current()->RunAllPending(); |
- MessageLoop::current()->SetNestableTasksAllowed(old_state); |
- pump_messages_events_.pop(); |
- } else { |
- DCHECK(result == WAIT_OBJECT_0 + 4); |
- // We were doing a WaitForMultipleObjects, but now the pump messages |
- // event is set, so the next time we loop we'll use |
- // MsgWaitForMultipleObjects instead. |
+ continue; |
} |
- if (timeout_ms != INFINITE) { |
- TimeDelta time_delta = TimeTicks::Now() - before; |
- timeout_ms -= static_cast<int>(time_delta.InMilliseconds()); |
- if (timeout_ms <= 0) { |
- // We timed-out while processing messages. |
- sync_context()->PopDeserializer(true); |
- return false; |
- } |
- } |
+ if (result == WAIT_OBJECT_0 + 2) |
+ WaitForReplyWithNestedMessageLoop(); // Start a nested message loop. |
- // Continue looping until we either get the reply to our synchronous message |
- // or we time-out. |
- } while (true); |
+ break; |
+ } |
} |
-bool SyncChannel::UnblockListener(Message* message) { |
- return sync_context()->UnblockListener(message); |
+void SyncChannel::WaitForReplyWithNestedMessageLoop() { |
+ HANDLE old_done_event = send_done_watcher_.GetWatchedObject(); |
+ send_done_watcher_.StopWatching(); |
+ send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this); |
+ bool old_state = MessageLoop::current()->NestableTasksAllowed(); |
+ MessageLoop::current()->SetNestableTasksAllowed(true); |
+ MessageLoop::current()->Run(); |
+ MessageLoop::current()->SetNestableTasksAllowed(old_state); |
+ if (old_done_event) |
+ send_done_watcher_.StartWatching(old_done_event, this); |
} |
-} // namespace IPC |
+void SyncChannel::OnObjectSignaled(HANDLE object) { |
+ HANDLE dispatch_event = sync_context()->GetDispatchEvent(); |
+ if (object == dispatch_event) { |
+ // The call to DispatchMessages might delete this object, so reregister |
+ // the object watcher first. |
+ ResetEvent(dispatch_event); |
+ dispatch_watcher_.StartWatching(dispatch_event, this); |
+ sync_context()->DispatchMessages(); |
+ } else { |
+ // We got the reply, timed out or the process shutdown. |
+ DCHECK(object == sync_context()->GetSendDoneEvent()); |
+ MessageLoop::current()->Quit(); |
+ } |
+} |
+} // namespace IPC |