Index: mojo/system/channel.cc |
diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc |
index 770275a1a8ff24fa2ecc7307d7855e4ec275ce83..2408a469a101931fc28c7669024aab241c44f8ab 100644 |
--- a/mojo/system/channel.cc |
+++ b/mojo/system/channel.cc |
@@ -21,12 +21,15 @@ COMPILE_ASSERT(Channel::kBootstrapEndpointId != |
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId |
Channel::kBootstrapEndpointId; |
-Channel::EndpointInfo::EndpointInfo() { |
+Channel::EndpointInfo::EndpointInfo() |
+ : state(STATE_NORMAL), |
+ port() { |
} |
Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe, |
unsigned port) |
- : message_pipe(message_pipe), |
+ : state(STATE_NORMAL), |
+ message_pipe(message_pipe), |
port(port) { |
} |
@@ -62,23 +65,34 @@ void Channel::Shutdown() { |
raw_channel_->Shutdown(); |
raw_channel_.reset(); |
- // This should not occur, but it probably mostly results in leaking; |
- // (Explicitly clearing the |local_id_to_endpoint_info_map_| would likely put |
- // things in an inconsistent state, which is worse. Note that if the map is |
- // nonempty, we probably won't be destroyed, since the endpoints have a |
- // reference to us.) |
- LOG_IF(ERROR, !local_id_to_endpoint_info_map_.empty()) |
- << "Channel shutting down with endpoints still attached"; |
- // TODO(vtl): This currently blows up, but the fix will be nontrivial. |
- // crbug.com/360081 |
- //DCHECK(local_id_to_endpoint_info_map_.empty()); |
+ // This shouldn't usually occur, but it should be okay if all the endpoints |
+ // are zombies (i.e., waiting to be removed, and not actually having any |
+ // references to |MessagePipe|s). |
+ // TODO(vtl): To make this actually okay, we need to make sure the other side |
+ // channels being killed off properly. |
+ LOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty()) |
+ << "Channel shutting down with endpoints still attached " |
+ "(hopefully all zombies)"; |
+ |
+#ifndef NDEBUG |
+ // Check that everything left is a zombie. (Note: We don't explicitly clear |
+ // |local_id_to_endpoint_info_map_|, since that would likely put us in an |
+ // inconsistent state if we have non-zombies.) |
+ for (IdToEndpointInfoMap::const_iterator it = |
+ local_id_to_endpoint_info_map_.begin(); |
+ it != local_id_to_endpoint_info_map_.end(); |
+ ++it) { |
+ DCHECK_NE(it->second.state, EndpointInfo::STATE_NORMAL); |
+ DCHECK(!it->second.message_pipe.get()); |
+ } |
+#endif |
} |
MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( |
- scoped_refptr<MessagePipe> message_pipe, unsigned port) { |
+ scoped_refptr<MessagePipe> message_pipe, |
+ unsigned port) { |
+ DCHECK(message_pipe); |
DCHECK(port == 0 || port == 1); |
- // Note: This assertion must *not* be done under |lock_|. |
- DCHECK_EQ(message_pipe->GetType(port), MessagePipeEndpoint::kTypeProxy); |
MessageInTransit::EndpointId local_id; |
{ |
@@ -98,8 +112,32 @@ MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( |
local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port); |
} |
- message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id); |
- return local_id; |
+ // This might fail if that port got an |OnPeerClose()| before attaching. |
+ if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id)) |
+ return local_id; |
+ |
+ // Note: If it failed, quite possibly the endpoint info was removed from that |
+ // map (there's a race between us adding it to the map above and calling |
+ // |Attach()|). And even if an entry exists for |local_id|, we need to check |
+ // that it's the one we added (and not some other one that was added since). |
+ { |
+ base::AutoLock locker(lock_); |
+ IdToEndpointInfoMap::iterator it = |
+ local_id_to_endpoint_info_map_.find(local_id); |
+ if (it != local_id_to_endpoint_info_map_.end() && |
+ it->second.message_pipe.get() == message_pipe.get() && |
+ it->second.port == port) { |
+ DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL); |
+ // TODO(vtl): FIXME -- This is wrong. We need to specify (to |
+ // |AttachMessagePipeEndpoint()| who's going to be responsible for calling |
+ // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a |
+ // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to |
+ // run, then we'll get messages to an "invalid" local ID (for running, for |
+ // removal). |
+ local_id_to_endpoint_info_map_.erase(it); |
+ } |
+ } |
+ return MessageInTransit::kInvalidEndpointId; |
} |
bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, |
@@ -115,6 +153,14 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, |
endpoint_info = it->second; |
} |
+ // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint| |
+ // and ignore it. |
+ if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { |
+ DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint " |
+ "(local ID " << local_id << ", remote ID " << remote_id << ")"; |
+ return true; |
+ } |
+ |
// TODO(vtl): FIXME -- We need to handle the case that message pipe is already |
// running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). |
endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); |
@@ -124,21 +170,21 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, |
void Channel::RunRemoteMessagePipeEndpoint( |
MessageInTransit::EndpointId local_id, |
MessageInTransit::EndpointId remote_id) { |
- base::AutoLock locker(lock_); |
- |
- DCHECK(local_id_to_endpoint_info_map_.find(local_id) != |
- local_id_to_endpoint_info_map_.end()); |
+#if DCHECK_IS_ON |
+ { |
+ base::AutoLock locker(lock_); |
+ DCHECK(local_id_to_endpoint_info_map_.find(local_id) != |
+ local_id_to_endpoint_info_map_.end()); |
+ } |
+#endif |
- scoped_ptr<MessageInTransit> message(new MessageInTransit( |
- MessageInTransit::kTypeChannel, |
- MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, |
- 0, 0, NULL)); |
- message->set_source_id(local_id); |
- message->set_destination_id(remote_id); |
- if (!raw_channel_->WriteMessage(message.Pass())) { |
- // TODO(vtl): FIXME -- I guess we should report the error back somehow so |
- // that the dispatcher can be closed? |
- CHECK(false) << "Not yet handled"; |
+ if (!SendControlMessage( |
+ MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, |
+ local_id, remote_id)) { |
+ HandleLocalError(base::StringPrintf( |
+ "Failed to send message to run remote message pipe endpoint (local ID " |
+ "%u, remote ID %u)", |
+ static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); |
} |
} |
@@ -160,20 +206,52 @@ bool Channel::IsWriteBufferEmpty() { |
return raw_channel_->IsWriteBufferEmpty(); |
} |
-void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) { |
+void Channel::DetachMessagePipeEndpoint( |
+ MessageInTransit::EndpointId local_id, |
+ MessageInTransit::EndpointId remote_id) { |
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
- base::AutoLock locker_(lock_); |
- local_id_to_endpoint_info_map_.erase(local_id); |
+ bool should_send_remove_message = false; |
+ { |
+ base::AutoLock locker_(lock_); |
+ IdToEndpointInfoMap::iterator it = |
+ local_id_to_endpoint_info_map_.find(local_id); |
+ DCHECK(it != local_id_to_endpoint_info_map_.end()); |
+ |
+ switch (it->second.state) { |
+ case EndpointInfo::STATE_NORMAL: |
+ it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK; |
+ it->second.message_pipe = NULL; |
+ should_send_remove_message = |
+ (remote_id != MessageInTransit::kInvalidEndpointId); |
+ break; |
+ case EndpointInfo::STATE_WAIT_LOCAL_DETACH: |
+ local_id_to_endpoint_info_map_.erase(it); |
+ break; |
+ case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK: |
+ NOTREACHED(); |
+ break; |
+ case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK: |
+ it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK; |
+ break; |
+ } |
+ } |
+ if (!should_send_remove_message) |
+ return; |
+ |
+ if (!SendControlMessage( |
+ MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint, |
+ local_id, remote_id)) { |
+ HandleLocalError(base::StringPrintf( |
+ "Failed to send message to remove remote message pipe endpoint (local " |
+ "ID %u, remote ID %u)", |
+ static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); |
+ } |
} |
Channel::~Channel() { |
// The channel should have been shut down first. |
DCHECK(!raw_channel_.get()); |
- |
- DLOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty()) |
- << "Destroying Channel with " << local_id_to_endpoint_info_map_.size() |
- << " endpoints still present"; |
} |
void Channel::OnReadMessage(const MessageInTransit::View& message_view) { |
@@ -249,6 +327,13 @@ void Channel::OnReadMessageForDownstream( |
endpoint_info = it->second; |
} |
+ // Ignore messages for zombie endpoints (not an error). |
+ if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { |
+ DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = " |
+ << local_id << ", remote ID = " << message_view.source_id() << ")"; |
+ return; |
+ } |
+ |
// We need to duplicate the message, because |EnqueueMessage()| will take |
// ownership of it. |
scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
@@ -261,7 +346,7 @@ void Channel::OnReadMessageForDownstream( |
// error, e.g., if the remote side is sending invalid control messages (to |
// the message pipe). |
HandleLocalError(base::StringPrintf( |
- "Failed to enqueue message to local destination ID %u (result %d)", |
+ "Failed to enqueue message to local ID %u (result %d)", |
static_cast<unsigned>(local_id), static_cast<int>(result))); |
return; |
} |
@@ -273,14 +358,35 @@ void Channel::OnReadMessageForChannel( |
switch (message_view.subtype()) { |
case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint: |
- // TODO(vtl): FIXME -- Error handling (also validation of |
- // source/destination IDs). |
- DVLOG(2) << "Handling channel message to run message pipe (local ID = " |
- << message_view.destination_id() << ", remote ID = " |
+ DVLOG(2) << "Handling channel message to run message pipe (local ID " |
+ << message_view.destination_id() << ", remote ID " |
<< message_view.source_id() << ")"; |
if (!RunMessagePipeEndpoint(message_view.destination_id(), |
- message_view.source_id())) |
- HandleRemoteError("Received invalid channel run message pipe message"); |
+ message_view.source_id())) { |
+ HandleRemoteError( |
+ "Received invalid channel message to run message pipe"); |
+ } |
+ break; |
+ case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint: |
+ DVLOG(2) << "Handling channel message to remove message pipe (local ID " |
+ << message_view.destination_id() << ", remote ID " |
+ << message_view.source_id() << ")"; |
+ if (!RemoveMessagePipeEndpoint(message_view.destination_id(), |
+ message_view.source_id())) { |
+ HandleRemoteError( |
+ "Received invalid channel message to remove message pipe"); |
+ } |
+ break; |
+ case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck: |
+ DVLOG(2) << "Handling channel message to ack remove message pipe (local " |
+ "ID " |
+ << message_view.destination_id() << ", remote ID " |
+ << message_view.source_id() << ")"; |
+ if (!RemoveMessagePipeEndpoint(message_view.destination_id(), |
+ message_view.source_id())) { |
+ HandleRemoteError( |
+ "Received invalid channel message to ack remove message pipe"); |
+ } |
break; |
default: |
HandleRemoteError("Received invalid channel message"); |
@@ -289,6 +395,62 @@ void Channel::OnReadMessageForChannel( |
} |
} |
+bool Channel::RemoveMessagePipeEndpoint( |
+ MessageInTransit::EndpointId local_id, |
+ MessageInTransit::EndpointId remote_id) { |
+ EndpointInfo endpoint_info; |
+ { |
+ base::AutoLock locker(lock_); |
+ |
+ IdToEndpointInfoMap::iterator it = |
+ local_id_to_endpoint_info_map_.find(local_id); |
+ if (it == local_id_to_endpoint_info_map_.end()) { |
+ DVLOG(2) << "Remove message pipe error: not found"; |
+ return false; |
+ } |
+ |
+ // If it's waiting for the remove ack, just do it and return. |
+ if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) { |
+ local_id_to_endpoint_info_map_.erase(it); |
+ return true; |
+ } |
+ |
+ if (it->second.state != EndpointInfo::STATE_NORMAL) { |
+ DVLOG(2) << "Remove message pipe error: wrong state"; |
+ return false; |
+ } |
+ |
+ it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH; |
+ endpoint_info = it->second; |
+ it->second.message_pipe = NULL; |
+ } |
+ |
+ if (!SendControlMessage( |
+ MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck, |
+ local_id, remote_id)) { |
+ HandleLocalError(base::StringPrintf( |
+ "Failed to send message to remove remote message pipe endpoint ack " |
+ "(local ID %u, remote ID %u)", |
+ static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); |
+ } |
+ |
+ endpoint_info.message_pipe->OnRemove(endpoint_info.port); |
+ |
+ return true; |
+} |
+ |
+bool Channel::SendControlMessage(MessageInTransit::Subtype subtype, |
+ MessageInTransit::EndpointId local_id, |
+ MessageInTransit::EndpointId remote_id) { |
+ DVLOG(2) << "Sending channel control message: subtype " << subtype |
+ << ", local ID " << local_id << ", remote ID " << remote_id; |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::kTypeChannel, subtype, 0, 0, NULL)); |
+ message->set_source_id(local_id); |
+ message->set_destination_id(remote_id); |
+ return WriteMessage(message.Pass()); |
+} |
+ |
void Channel::HandleRemoteError(const base::StringPiece& error_message) { |
// TODO(vtl): Is this how we really want to handle this? Probably we want to |
// terminate the connection, since it's spewing invalid stuff. |
@@ -297,6 +459,10 @@ void Channel::HandleRemoteError(const base::StringPiece& error_message) { |
void Channel::HandleLocalError(const base::StringPiece& error_message) { |
// TODO(vtl): Is this how we really want to handle this? |
+ // Sometimes we'll want to propagate the error back to the message pipe |
+ // (endpoint), and notify it that the remote is (effectively) closed. |
+ // Sometimes we'll want to kill the channel (and notify all the endpoints that |
+ // their remotes are dead. |
LOG(WARNING) << error_message; |
} |