Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(288)

Unified Diff: mojo/system/channel.cc

Issue 240133005: Mojo: Make some attempts towards fixing remote message pipe closure. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: fix some locking issues Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/system/channel.h ('k') | mojo/system/local_message_pipe_endpoint.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/system/channel.cc
diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc
index 770275a1a8ff24fa2ecc7307d7855e4ec275ce83..2408a469a101931fc28c7669024aab241c44f8ab 100644
--- a/mojo/system/channel.cc
+++ b/mojo/system/channel.cc
@@ -21,12 +21,15 @@ COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
Channel::kBootstrapEndpointId;
-Channel::EndpointInfo::EndpointInfo() {
+Channel::EndpointInfo::EndpointInfo()
+ : state(STATE_NORMAL),
+ port() {
}
Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
unsigned port)
- : message_pipe(message_pipe),
+ : state(STATE_NORMAL),
+ message_pipe(message_pipe),
port(port) {
}
@@ -62,23 +65,34 @@ void Channel::Shutdown() {
raw_channel_->Shutdown();
raw_channel_.reset();
- // This should not occur, but it probably mostly results in leaking;
- // (Explicitly clearing the |local_id_to_endpoint_info_map_| would likely put
- // things in an inconsistent state, which is worse. Note that if the map is
- // nonempty, we probably won't be destroyed, since the endpoints have a
- // reference to us.)
- LOG_IF(ERROR, !local_id_to_endpoint_info_map_.empty())
- << "Channel shutting down with endpoints still attached";
- // TODO(vtl): This currently blows up, but the fix will be nontrivial.
- // crbug.com/360081
- //DCHECK(local_id_to_endpoint_info_map_.empty());
+ // This shouldn't usually occur, but it should be okay if all the endpoints
+ // are zombies (i.e., waiting to be removed, and not actually having any
+ // references to |MessagePipe|s).
+ // TODO(vtl): To make this actually okay, we need to make sure the other side
+ // channels being killed off properly.
+ LOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty())
+ << "Channel shutting down with endpoints still attached "
+ "(hopefully all zombies)";
+
+#ifndef NDEBUG
+ // Check that everything left is a zombie. (Note: We don't explicitly clear
+ // |local_id_to_endpoint_info_map_|, since that would likely put us in an
+ // inconsistent state if we have non-zombies.)
+ for (IdToEndpointInfoMap::const_iterator it =
+ local_id_to_endpoint_info_map_.begin();
+ it != local_id_to_endpoint_info_map_.end();
+ ++it) {
+ DCHECK_NE(it->second.state, EndpointInfo::STATE_NORMAL);
+ DCHECK(!it->second.message_pipe.get());
+ }
+#endif
}
MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
- scoped_refptr<MessagePipe> message_pipe, unsigned port) {
+ scoped_refptr<MessagePipe> message_pipe,
+ unsigned port) {
+ DCHECK(message_pipe);
DCHECK(port == 0 || port == 1);
- // Note: This assertion must *not* be done under |lock_|.
- DCHECK_EQ(message_pipe->GetType(port), MessagePipeEndpoint::kTypeProxy);
MessageInTransit::EndpointId local_id;
{
@@ -98,8 +112,32 @@ MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
}
- message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
- return local_id;
+ // This might fail if that port got an |OnPeerClose()| before attaching.
+ if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id))
+ return local_id;
+
+ // Note: If it failed, quite possibly the endpoint info was removed from that
+ // map (there's a race between us adding it to the map above and calling
+ // |Attach()|). And even if an entry exists for |local_id|, we need to check
+ // that it's the one we added (and not some other one that was added since).
+ {
+ base::AutoLock locker(lock_);
+ IdToEndpointInfoMap::iterator it =
+ local_id_to_endpoint_info_map_.find(local_id);
+ if (it != local_id_to_endpoint_info_map_.end() &&
+ it->second.message_pipe.get() == message_pipe.get() &&
+ it->second.port == port) {
+ DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL);
+ // TODO(vtl): FIXME -- This is wrong. We need to specify (to
+ // |AttachMessagePipeEndpoint()| who's going to be responsible for calling
+ // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a
+ // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to
+ // run, then we'll get messages to an "invalid" local ID (for running, for
+ // removal).
+ local_id_to_endpoint_info_map_.erase(it);
+ }
+ }
+ return MessageInTransit::kInvalidEndpointId;
}
bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
@@ -115,6 +153,14 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
endpoint_info = it->second;
}
+ // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
+ // and ignore it.
+ if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
+ DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
+ "(local ID " << local_id << ", remote ID " << remote_id << ")";
+ return true;
+ }
+
// TODO(vtl): FIXME -- We need to handle the case that message pipe is already
// running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
@@ -124,21 +170,21 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
void Channel::RunRemoteMessagePipeEndpoint(
MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id) {
- base::AutoLock locker(lock_);
-
- DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
- local_id_to_endpoint_info_map_.end());
+#if DCHECK_IS_ON
+ {
+ base::AutoLock locker(lock_);
+ DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
+ local_id_to_endpoint_info_map_.end());
+ }
+#endif
- scoped_ptr<MessageInTransit> message(new MessageInTransit(
- MessageInTransit::kTypeChannel,
- MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
- 0, 0, NULL));
- message->set_source_id(local_id);
- message->set_destination_id(remote_id);
- if (!raw_channel_->WriteMessage(message.Pass())) {
- // TODO(vtl): FIXME -- I guess we should report the error back somehow so
- // that the dispatcher can be closed?
- CHECK(false) << "Not yet handled";
+ if (!SendControlMessage(
+ MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
+ local_id, remote_id)) {
+ HandleLocalError(base::StringPrintf(
+ "Failed to send message to run remote message pipe endpoint (local ID "
+ "%u, remote ID %u)",
+ static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
}
}
@@ -160,20 +206,52 @@ bool Channel::IsWriteBufferEmpty() {
return raw_channel_->IsWriteBufferEmpty();
}
-void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
+void Channel::DetachMessagePipeEndpoint(
+ MessageInTransit::EndpointId local_id,
+ MessageInTransit::EndpointId remote_id) {
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
- base::AutoLock locker_(lock_);
- local_id_to_endpoint_info_map_.erase(local_id);
+ bool should_send_remove_message = false;
+ {
+ base::AutoLock locker_(lock_);
+ IdToEndpointInfoMap::iterator it =
+ local_id_to_endpoint_info_map_.find(local_id);
+ DCHECK(it != local_id_to_endpoint_info_map_.end());
+
+ switch (it->second.state) {
+ case EndpointInfo::STATE_NORMAL:
+ it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
+ it->second.message_pipe = NULL;
+ should_send_remove_message =
+ (remote_id != MessageInTransit::kInvalidEndpointId);
+ break;
+ case EndpointInfo::STATE_WAIT_LOCAL_DETACH:
+ local_id_to_endpoint_info_map_.erase(it);
+ break;
+ case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK:
+ NOTREACHED();
+ break;
+ case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK:
+ it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
+ break;
+ }
+ }
+ if (!should_send_remove_message)
+ return;
+
+ if (!SendControlMessage(
+ MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
+ local_id, remote_id)) {
+ HandleLocalError(base::StringPrintf(
+ "Failed to send message to remove remote message pipe endpoint (local "
+ "ID %u, remote ID %u)",
+ static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
+ }
}
Channel::~Channel() {
// The channel should have been shut down first.
DCHECK(!raw_channel_.get());
-
- DLOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty())
- << "Destroying Channel with " << local_id_to_endpoint_info_map_.size()
- << " endpoints still present";
}
void Channel::OnReadMessage(const MessageInTransit::View& message_view) {
@@ -249,6 +327,13 @@ void Channel::OnReadMessageForDownstream(
endpoint_info = it->second;
}
+ // Ignore messages for zombie endpoints (not an error).
+ if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
+ DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
+ << local_id << ", remote ID = " << message_view.source_id() << ")";
+ return;
+ }
+
// We need to duplicate the message, because |EnqueueMessage()| will take
// ownership of it.
scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
@@ -261,7 +346,7 @@ void Channel::OnReadMessageForDownstream(
// error, e.g., if the remote side is sending invalid control messages (to
// the message pipe).
HandleLocalError(base::StringPrintf(
- "Failed to enqueue message to local destination ID %u (result %d)",
+ "Failed to enqueue message to local ID %u (result %d)",
static_cast<unsigned>(local_id), static_cast<int>(result)));
return;
}
@@ -273,14 +358,35 @@ void Channel::OnReadMessageForChannel(
switch (message_view.subtype()) {
case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
- // TODO(vtl): FIXME -- Error handling (also validation of
- // source/destination IDs).
- DVLOG(2) << "Handling channel message to run message pipe (local ID = "
- << message_view.destination_id() << ", remote ID = "
+ DVLOG(2) << "Handling channel message to run message pipe (local ID "
+ << message_view.destination_id() << ", remote ID "
<< message_view.source_id() << ")";
if (!RunMessagePipeEndpoint(message_view.destination_id(),
- message_view.source_id()))
- HandleRemoteError("Received invalid channel run message pipe message");
+ message_view.source_id())) {
+ HandleRemoteError(
+ "Received invalid channel message to run message pipe");
+ }
+ break;
+ case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
+ DVLOG(2) << "Handling channel message to remove message pipe (local ID "
+ << message_view.destination_id() << ", remote ID "
+ << message_view.source_id() << ")";
+ if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
+ message_view.source_id())) {
+ HandleRemoteError(
+ "Received invalid channel message to remove message pipe");
+ }
+ break;
+ case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
+ DVLOG(2) << "Handling channel message to ack remove message pipe (local "
+ "ID "
+ << message_view.destination_id() << ", remote ID "
+ << message_view.source_id() << ")";
+ if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
+ message_view.source_id())) {
+ HandleRemoteError(
+ "Received invalid channel message to ack remove message pipe");
+ }
break;
default:
HandleRemoteError("Received invalid channel message");
@@ -289,6 +395,62 @@ void Channel::OnReadMessageForChannel(
}
}
+bool Channel::RemoveMessagePipeEndpoint(
+ MessageInTransit::EndpointId local_id,
+ MessageInTransit::EndpointId remote_id) {
+ EndpointInfo endpoint_info;
+ {
+ base::AutoLock locker(lock_);
+
+ IdToEndpointInfoMap::iterator it =
+ local_id_to_endpoint_info_map_.find(local_id);
+ if (it == local_id_to_endpoint_info_map_.end()) {
+ DVLOG(2) << "Remove message pipe error: not found";
+ return false;
+ }
+
+ // If it's waiting for the remove ack, just do it and return.
+ if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) {
+ local_id_to_endpoint_info_map_.erase(it);
+ return true;
+ }
+
+ if (it->second.state != EndpointInfo::STATE_NORMAL) {
+ DVLOG(2) << "Remove message pipe error: wrong state";
+ return false;
+ }
+
+ it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH;
+ endpoint_info = it->second;
+ it->second.message_pipe = NULL;
+ }
+
+ if (!SendControlMessage(
+ MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
+ local_id, remote_id)) {
+ HandleLocalError(base::StringPrintf(
+ "Failed to send message to remove remote message pipe endpoint ack "
+ "(local ID %u, remote ID %u)",
+ static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
+ }
+
+ endpoint_info.message_pipe->OnRemove(endpoint_info.port);
+
+ return true;
+}
+
+bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
+ MessageInTransit::EndpointId local_id,
+ MessageInTransit::EndpointId remote_id) {
+ DVLOG(2) << "Sending channel control message: subtype " << subtype
+ << ", local ID " << local_id << ", remote ID " << remote_id;
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::kTypeChannel, subtype, 0, 0, NULL));
+ message->set_source_id(local_id);
+ message->set_destination_id(remote_id);
+ return WriteMessage(message.Pass());
+}
+
void Channel::HandleRemoteError(const base::StringPiece& error_message) {
// TODO(vtl): Is this how we really want to handle this? Probably we want to
// terminate the connection, since it's spewing invalid stuff.
@@ -297,6 +459,10 @@ void Channel::HandleRemoteError(const base::StringPiece& error_message) {
void Channel::HandleLocalError(const base::StringPiece& error_message) {
// TODO(vtl): Is this how we really want to handle this?
+ // Sometimes we'll want to propagate the error back to the message pipe
+ // (endpoint), and notify it that the remote is (effectively) closed.
+ // Sometimes we'll want to kill the channel (and notify all the endpoints that
+ // their remotes are dead.
LOG(WARNING) << error_message;
}
« no previous file with comments | « mojo/system/channel.h ('k') | mojo/system/local_message_pipe_endpoint.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698