Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1128)

Unified Diff: chrome/common/ipc_sync_channel.cc

Issue 8001: Make IPC::SyncChannel not duplicate the underlying MessageLoop implementation... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 12 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « chrome/common/ipc_sync_channel.h ('k') | chrome/common/ipc_sync_channel_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: chrome/common/ipc_sync_channel.cc
===================================================================
--- chrome/common/ipc_sync_channel.cc (revision 3707)
+++ chrome/common/ipc_sync_channel.cc (working copy)
@@ -37,17 +37,21 @@
class SyncChannel::ReceivedSyncMsgQueue :
public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
public:
- ReceivedSyncMsgQueue() :
- blocking_event_(CreateEvent(NULL, FALSE, FALSE, NULL)),
- task_pending_(false),
- listener_message_loop_(MessageLoop::current()) {
+ // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
+ // if necessary. Call RemoveListener on the same thread when done.
+ static ReceivedSyncMsgQueue* AddListener() {
+ // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
+ // SyncChannel objects can block the same thread).
+ ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
+ if (!rv) {
+ rv = new ReceivedSyncMsgQueue();
+ ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
+ }
+ rv->listener_count_++;
+ return rv;
}
~ReceivedSyncMsgQueue() {
- DCHECK(lazy_tls_ptr_.Pointer()->Get());
- DCHECK(MessageLoop::current() == listener_message_loop_);
- CloseHandle(blocking_event_);
- lazy_tls_ptr_.Pointer()->Set(NULL);
}
// Called on IPC thread when a synchronous message or reply arrives.
@@ -66,7 +70,7 @@
channel_id));
}
- SetEvent(blocking_event_);
+ SetEvent(dispatch_event_);
if (!was_task_pending) {
listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
this, &ReceivedSyncMsgQueue::DispatchMessagesTask));
@@ -105,7 +109,7 @@
}
#ifdef IPC_MESSAGE_LOG_ENABLED
- IPC::Logging* logger = IPC::Logging::current();
+ Logging* logger = Logging::current();
if (logger->Enabled())
logger->OnPreDispatchMessage(*message);
#endif
@@ -123,7 +127,7 @@
}
// Called on the IPC thread when the current sync Send() call is unblocked.
- void OnUnblock() {
+ void DidUnblock() {
if (!received_replies_.empty()) {
MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
this, &ReceivedSyncMsgQueue::DispatchReplies));
@@ -149,9 +153,14 @@
message_queue_.push(temp_queue.front());
temp_queue.pop();
}
+
+ if (--listener_count_ == 0) {
+ DCHECK(lazy_tls_ptr_.Pointer()->Get());
+ lazy_tls_ptr_.Pointer()->Set(NULL);
+ }
}
- HANDLE blocking_event() { return blocking_event_; }
+ HANDLE dispatch_event() { return dispatch_event_; }
MessageLoop* listener_message_loop() { return listener_message_loop_; }
// Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
@@ -159,12 +168,19 @@
lazy_tls_ptr_;
private:
+ ReceivedSyncMsgQueue() :
+ dispatch_event_(CreateEvent(NULL, TRUE, FALSE, NULL)),
+ task_pending_(false),
+ listener_message_loop_(MessageLoop::current()),
+ listener_count_(0) {
+ }
+
// Called on the ipc thread to check if we can unblock any current Send()
// calls based on a queued reply.
void DispatchReplies() {
for (size_t i = 0; i < received_replies_.size(); ++i) {
Message* message = received_replies_[i].message;
- if (received_replies_[i].context->UnblockListener(message)) {
+ if (received_replies_[i].context->TryToUnblockListener(message)) {
delete message;
received_replies_.erase(received_replies_.begin() + i);
return;
@@ -172,13 +188,6 @@
}
}
- // Set when we got a synchronous message that we must respond to as the
- // sender needs its reply before it can reply to our original synchronous
- // message.
- HANDLE blocking_event_;
-
- MessageLoop* listener_message_loop_;
-
// Holds information about a queued synchronous message.
struct ReceivedMessage {
ReceivedMessage(Message* m, Channel::Listener* l, const std::wstring& i)
@@ -190,8 +199,6 @@
typedef std::queue<ReceivedMessage> SyncMessageQueue;
SyncMessageQueue message_queue_;
- Lock message_lock_;
- bool task_pending_;
// Holds information about a queued reply message.
struct Reply {
@@ -204,6 +211,15 @@
};
std::vector<Reply> received_replies_;
+
+ // Set when we got a synchronous message that we must respond to as the
+ // sender needs its reply before it can reply to our original synchronous
+ // message.
+ ScopedHandle dispatch_event_;
+ MessageLoop* listener_message_loop_;
+ Lock message_lock_;
+ bool task_pending_;
+ int listener_count_;
};
base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
@@ -212,120 +228,89 @@
SyncChannel::SyncContext::SyncContext(
Channel::Listener* listener,
MessageFilter* filter,
- MessageLoop* ipc_thread)
+ MessageLoop* ipc_thread,
+ HANDLE shutdown_event)
: ChannelProxy::Context(listener, filter, ipc_thread),
- channel_closed_(false),
- reply_deserialize_result_(false) {
- // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
- // SyncChannel objects that can block the same thread).
- received_sync_msgs_ = ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Get();
-
- if (!received_sync_msgs_) {
- // Stash a pointer to the listener thread's ReceivedSyncMsgQueue, as we
- // need to be able to access it in the IPC thread.
- received_sync_msgs_ = new ReceivedSyncMsgQueue();
- ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(received_sync_msgs_);
- }
-
- // Addref manually so that we can ensure destruction on the listener thread
- // (so that the TLS object is NULLd).
- received_sync_msgs_->AddRef();
+ shutdown_event_(shutdown_event),
+ received_sync_msgs_(ReceivedSyncMsgQueue::AddListener()){
}
SyncChannel::SyncContext::~SyncContext() {
while (!deserializers_.empty())
- PopDeserializer(true);
-
- received_sync_msgs_->listener_message_loop()->ReleaseSoon(
- FROM_HERE, received_sync_msgs_);
+ Pop();
}
// Adds information about an outgoing sync message to the context so that
// we know how to deserialize the reply. Returns a handle that's set when
// the reply has arrived.
-HANDLE SyncChannel::SyncContext::Push(IPC::SyncMessage* sync_msg) {
- PendingSyncMsg pending(IPC::SyncMessage::GetMessageId(*sync_msg),
+void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
+ PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
sync_msg->GetReplyDeserializer(),
CreateEvent(NULL, FALSE, FALSE, NULL));
AutoLock auto_lock(deserializers_lock_);
- deserializers_.push(pending);
+ deserializers_.push_back(pending);
+}
- return pending.reply_event;
+bool SyncChannel::SyncContext::Pop() {
+ AutoLock auto_lock(deserializers_lock_);
+ PendingSyncMsg msg = deserializers_.back();
+ delete msg.deserializer;
+ CloseHandle(msg.done_event);
+ deserializers_.pop_back();
+ return msg.send_result;
}
-HANDLE SyncChannel::SyncContext::blocking_event() {
- return received_sync_msgs_->blocking_event();
+HANDLE SyncChannel::SyncContext::GetSendDoneEvent() {
+ AutoLock auto_lock(deserializers_lock_);
+ return deserializers_.back().done_event;
}
+HANDLE SyncChannel::SyncContext::GetDispatchEvent() {
+ return received_sync_msgs_->dispatch_event();
+}
+
void SyncChannel::SyncContext::DispatchMessages() {
received_sync_msgs_->DispatchMessages();
}
-void SyncChannel::SyncContext::RemoveListener(Channel::Listener* listener) {
- received_sync_msgs_->RemoveListener(listener);
-}
-
-bool SyncChannel::SyncContext::UnblockListener(const Message* msg) {
- bool rv = false;
- HANDLE reply_event = NULL;
+bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
{
- if (channel_closed_) {
- // The channel is closed, or we couldn't connect, so cancel all Send()
- // calls.
- reply_deserialize_result_ = false;
- {
- AutoLock auto_lock(deserializers_lock_);
- if (!deserializers_.empty())
- reply_event = deserializers_.top().reply_event;
- }
+ AutoLock auto_lock(deserializers_lock_);
+ if (deserializers_.empty() ||
+ !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
+ return false;
+ }
- if (reply_event)
- PopDeserializer(false);
- } else {
- {
- AutoLock auto_lock(deserializers_lock_);
- if (deserializers_.empty())
- return false;
-
- if (!IPC::SyncMessage::IsMessageReplyTo(*msg, deserializers_.top().id))
- return false;
-
- rv = true;
- if (msg->is_reply_error()) {
- reply_deserialize_result_ = false;
- } else {
- reply_deserialize_result_ = deserializers_.top().deserializer->
- SerializeOutputParameters(*msg);
- }
-
- // Can't CloseHandle the event just yet, since doing so might cause the
- // Wait call above to never return.
- reply_event = deserializers_.top().reply_event;
- }
- PopDeserializer(false);
+ if (!msg->is_reply_error()) {
+ deserializers_.back().send_result = deserializers_.back().deserializer->
+ SerializeOutputParameters(*msg);
}
+ SetEvent(deserializers_.back().done_event);
}
- if (reply_event)
- SetEvent(reply_event);
-
// We got a reply to a synchronous Send() call that's blocking the listener
// thread. However, further down the call stack there could be another
// blocking Send() call, whose reply we received after we made this last
// Send() call. So check if we have any queued replies available that
// can now unblock the listener thread.
- received_sync_msgs_->OnUnblock();
+ received_sync_msgs_->DidUnblock();
- return rv;
+ return true;
}
-// Called on the IPC thread.
+void SyncChannel::SyncContext::Clear() {
+ CancelPendingSends();
+ received_sync_msgs_->RemoveListener(listener());
+
+ Context::Clear();
+}
+
void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
// Give the filters a chance at processing this message.
if (TryFilters(msg))
return;
- if (UnblockListener(&msg))
+ if (TryToUnblockListener(&msg))
return;
if (msg.should_unblock()) {
@@ -338,149 +323,158 @@
return;
}
- return Context::OnMessageReceived(msg);
+ return Context::OnMessageReceivedNoFilter(msg);
}
-// Called on the IPC thread.
void SyncChannel::SyncContext::OnChannelError() {
- channel_closed_ = true;
- UnblockListener(NULL);
-
+ CancelPendingSends();
Context::OnChannelError();
}
-void SyncChannel::SyncContext::PopDeserializer(bool close_reply_event) {
- PendingSyncMsg msg = deserializers_.top();
- delete msg.deserializer;
- if (close_reply_event)
- CloseHandle(msg.reply_event);
- deserializers_.pop();
+void SyncChannel::SyncContext::OnChannelOpened() {
+ shutdown_watcher_.StartWatching(shutdown_event_, this);
+ Context::OnChannelOpened();
}
-SyncChannel::SyncChannel(const std::wstring& channel_id, Channel::Mode mode,
- Channel::Listener* listener, MessageFilter* filter,
- MessageLoop* ipc_message_loop,
- bool create_pipe_now, HANDLE shutdown_event)
- : ChannelProxy(channel_id, mode, ipc_message_loop,
- new SyncContext(listener, filter, ipc_message_loop),
- create_pipe_now),
- shutdown_event_(shutdown_event),
+void SyncChannel::SyncContext::OnChannelClosed() {
+ shutdown_watcher_.StopWatching();
+ Context::OnChannelClosed();
+}
+
+void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
+ AutoLock auto_lock(deserializers_lock_);
+ PendingSyncMessageQueue::iterator iter;
+ for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
+ if ((*iter).id == message_id) {
+ SetEvent((*iter).done_event);
+ break;
+ }
+ }
+}
+
+void SyncChannel::SyncContext::CancelPendingSends() {
+ AutoLock auto_lock(deserializers_lock_);
+ PendingSyncMessageQueue::iterator iter;
+ for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
+ SetEvent((*iter).done_event);
+}
+
+void SyncChannel::SyncContext::OnObjectSignaled(HANDLE object) {
+ DCHECK(object == shutdown_event_);
+ // Process shut down before we can get a reply to a synchronous message.
+ // Cancel pending Send calls, which will end up setting the send done event.
+ CancelPendingSends();
+}
+
+
+SyncChannel::SyncChannel(
+ const std::wstring& channel_id, Channel::Mode mode,
+ Channel::Listener* listener, MessageFilter* filter,
+ MessageLoop* ipc_message_loop, bool create_pipe_now, HANDLE shutdown_event)
+ : ChannelProxy(
+ channel_id, mode, ipc_message_loop,
+ new SyncContext(listener, filter, ipc_message_loop, shutdown_event),
+ create_pipe_now),
sync_messages_with_no_timeout_allowed_(true) {
- DCHECK(shutdown_event_ != NULL);
+ // Ideally we only want to watch this object when running a nested message
+ // loop. However, we don't know when it exits if there's another nested
+ // message loop running under it or not, so we wouldn't know whether to
+ // stop or keep watching. So we always watch it, and create the event as
+ // manual reset since the object watcher might otherwise reset the event
+ // when we're doing a WaitForMultipleObjects.
+ dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this);
}
SyncChannel::~SyncChannel() {
- // The listener ensures that its lifetime is greater than SyncChannel. But
- // after SyncChannel is destructed there's no guarantee that the listener is
- // still around, so we wouldn't want ReceivedSyncMsgQueue to call the
- // listener.
- sync_context()->RemoveListener(listener());
}
-bool SyncChannel::Send(IPC::Message* message) {
+bool SyncChannel::Send(Message* message) {
return SendWithTimeout(message, INFINITE);
}
-bool SyncChannel::SendWithTimeout(IPC::Message* message, int timeout_ms) {
- bool message_is_sync = message->is_sync();
- HANDLE pump_messages_event = NULL;
+bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
+ if (!message->is_sync()) {
+ ChannelProxy::Send(message);
+ return true;
+ }
- HANDLE reply_event = NULL;
- if (message_is_sync) {
- DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE);
- IPC::SyncMessage* sync_msg = static_cast<IPC::SyncMessage*>(message);
- reply_event = sync_context()->Push(sync_msg);
- pump_messages_event = sync_msg->pump_messages_event();
+ // *this* might get deleted in WaitForReply.
+ scoped_refptr<SyncContext> context(sync_context());
+ if (WaitForSingleObject(context->shutdown_event(), 0) == WAIT_OBJECT_0) {
+ delete message;
+ return false;
}
- // Send the message using the ChannelProxy
+ DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE);
+ SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
+ context->Push(sync_msg);
+ int message_id = SyncMessage::GetMessageId(*sync_msg);
+ HANDLE pump_messages_event = sync_msg->pump_messages_event();
+
ChannelProxy::Send(message);
- if (!message_is_sync)
- return true;
- do {
- // Wait for reply, or for any other incoming synchronous message.
- DCHECK(reply_event != NULL);
- HANDLE objects[] = { shutdown_event_,
- reply_event,
- sync_context()->blocking_event(),
- pump_messages_event};
+ if (timeout_ms != INFINITE) {
+ // We use the sync message id so that when a message times out, we don't
+ // confuse it with another send that is either above/below this Send in
+ // the call stack.
+ context->ipc_message_loop()->PostDelayedTask(FROM_HERE,
+ NewRunnableMethod(context.get(),
+ &SyncContext::OnSendTimeout, message_id), timeout_ms);
+ }
- DWORD result;
- TimeTicks before = TimeTicks::Now();
- if (pump_messages_event == NULL) {
- // No need to pump messages since we didn't get an event to check.
- result = WaitForMultipleObjects(3, objects, FALSE, timeout_ms);
- } else {
- // If the event is set, then we pump messages. Otherwise we also wait on
- // it so that if it gets set we start pumping messages.
- if (WaitForSingleObject(pump_messages_event, 0) == WAIT_OBJECT_0) {
- // Before calling MsgWaitForMultipleObjects() we check that our events
- // are not signaled. The windows message queue might always have events
- // starving the checking of our events otherwise.
- result = WaitForMultipleObjects(3, objects, FALSE, 0);
- if (result == WAIT_TIMEOUT) {
- result = MsgWaitForMultipleObjects(3, objects, FALSE, timeout_ms,
- QS_ALLINPUT);
- }
- } else {
- result = WaitForMultipleObjects(4, objects, FALSE, timeout_ms);
- }
- }
+ // Wait for reply, or for any other incoming synchronous messages.
+ WaitForReply(pump_messages_event);
- if (result == WAIT_OBJECT_0 || result == WAIT_TIMEOUT) {
- // Process shut down before we can get a reply to a synchronous message,
- // or timed-out. Unblock the thread.
- sync_context()->PopDeserializer(true);
- return false;
- }
+ return context->Pop();
+}
- if (result == WAIT_OBJECT_0 + 1) {
- // We got the reply to our synchronous message.
- CloseHandle(reply_event);
- return sync_context()->reply_deserialize_result();
- }
-
- if (result == WAIT_OBJECT_0 + 2) {
+void SyncChannel::WaitForReply(HANDLE pump_messages_event) {
+ while (true) {
+ HANDLE objects[] = { sync_context()->GetDispatchEvent(),
+ sync_context()->GetSendDoneEvent(),
+ pump_messages_event };
+ uint32 count = pump_messages_event ? 3: 2;
+ DWORD result = WaitForMultipleObjects(count, objects, FALSE, INFINITE);
+ if (result == WAIT_OBJECT_0) {
// We're waiting for a reply, but we received a blocking synchronous
// call. We must process it or otherwise a deadlock might occur.
+ ResetEvent(sync_context()->GetDispatchEvent());
sync_context()->DispatchMessages();
- } else if (result == WAIT_OBJECT_0 + 3) {
- // Run a nested messsage loop to pump all the thread's messages. We
- // shutdown the nested loop when there are no more messages.
- pump_messages_events_.push(pump_messages_event);
- bool old_state = MessageLoop::current()->NestableTasksAllowed();
- MessageLoop::current()->SetNestableTasksAllowed(true);
- // Process a message, but come right back out of the MessageLoop (don't
- // loop, sleep, or wait for a kMsgQuit).
- MessageLoop::current()->RunAllPending();
- MessageLoop::current()->SetNestableTasksAllowed(old_state);
- pump_messages_events_.pop();
- } else {
- DCHECK(result == WAIT_OBJECT_0 + 4);
- // We were doing a WaitForMultipleObjects, but now the pump messages
- // event is set, so the next time we loop we'll use
- // MsgWaitForMultipleObjects instead.
+ continue;
}
- if (timeout_ms != INFINITE) {
- TimeDelta time_delta = TimeTicks::Now() - before;
- timeout_ms -= static_cast<int>(time_delta.InMilliseconds());
- if (timeout_ms <= 0) {
- // We timed-out while processing messages.
- sync_context()->PopDeserializer(true);
- return false;
- }
- }
+ if (result == WAIT_OBJECT_0 + 2)
+ WaitForReplyWithNestedMessageLoop(); // Start a nested message loop.
- // Continue looping until we either get the reply to our synchronous message
- // or we time-out.
- } while (true);
+ break;
+ }
}
-bool SyncChannel::UnblockListener(Message* message) {
- return sync_context()->UnblockListener(message);
+void SyncChannel::WaitForReplyWithNestedMessageLoop() {
+ HANDLE old_done_event = send_done_watcher_.GetWatchedObject();
+ send_done_watcher_.StopWatching();
+ send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this);
+ bool old_state = MessageLoop::current()->NestableTasksAllowed();
+ MessageLoop::current()->SetNestableTasksAllowed(true);
+ MessageLoop::current()->Run();
+ MessageLoop::current()->SetNestableTasksAllowed(old_state);
+ if (old_done_event)
+ send_done_watcher_.StartWatching(old_done_event, this);
}
-} // namespace IPC
+void SyncChannel::OnObjectSignaled(HANDLE object) {
+ HANDLE dispatch_event = sync_context()->GetDispatchEvent();
+ if (object == dispatch_event) {
+ // The call to DispatchMessages might delete this object, so reregister
+ // the object watcher first.
+ ResetEvent(dispatch_event);
+ dispatch_watcher_.StartWatching(dispatch_event, this);
+ sync_context()->DispatchMessages();
+ } else {
+ // We got the reply, timed out or the process shutdown.
+ DCHECK(object == sync_context()->GetSendDoneEvent());
+ MessageLoop::current()->Quit();
+ }
+}
+} // namespace IPC
« no previous file with comments | « chrome/common/ipc_sync_channel.h ('k') | chrome/common/ipc_sync_channel_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698