Chromium Code Reviews| Index: mojo/system/channel.cc |
| diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc |
| index 770275a1a8ff24fa2ecc7307d7855e4ec275ce83..0c74cc3d52e24b5d7b202d498700d62f76f9e28f 100644 |
| --- a/mojo/system/channel.cc |
| +++ b/mojo/system/channel.cc |
| @@ -21,12 +21,15 @@ COMPILE_ASSERT(Channel::kBootstrapEndpointId != |
| STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId |
| Channel::kBootstrapEndpointId; |
| -Channel::EndpointInfo::EndpointInfo() { |
| +Channel::EndpointInfo::EndpointInfo() |
| + : state(STATE_NORMAL), |
| + port() { |
| } |
| Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe, |
| unsigned port) |
| - : message_pipe(message_pipe), |
| + : state(STATE_NORMAL), |
| + message_pipe(message_pipe), |
| port(port) { |
| } |
| @@ -62,23 +65,34 @@ void Channel::Shutdown() { |
| raw_channel_->Shutdown(); |
| raw_channel_.reset(); |
| - // This should not occur, but it probably mostly results in leaking; |
| - // (Explicitly clearing the |local_id_to_endpoint_info_map_| would likely put |
| - // things in an inconsistent state, which is worse. Note that if the map is |
| - // nonempty, we probably won't be destroyed, since the endpoints have a |
| - // reference to us.) |
| - LOG_IF(ERROR, !local_id_to_endpoint_info_map_.empty()) |
| - << "Channel shutting down with endpoints still attached"; |
| - // TODO(vtl): This currently blows up, but the fix will be nontrivial. |
| - // crbug.com/360081 |
| - //DCHECK(local_id_to_endpoint_info_map_.empty()); |
| + // This shouldn't usually occur, but it should be okay if all the endpoints |
| + // are zombies (i.e., waiting to be removed, and not actually having any |
| + // references to |MessagePipe|s). |
| + // TODO(vtl): To make this actually okay, we need to make sure the other side |
| + // channels being killed off properly. |
| + LOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty()) |
| + << "Channel shutting down with endpoints still attached " |
| + "(hopefully all zombies)"; |
|
darin (slow to review)
2014/04/16 21:33:43
nit: indentation
viettrungluu
2014/04/16 22:25:01
Done.
|
| + |
| +#ifndef NDEBUG |
| + // Check that everything left is a zombie. (Note: We don't explicitly clear |
| + // |local_id_to_endpoint_info_map_|, since that would likely put us in an |
| + // inconsistent state if we have non-zombies.) |
| + for (IdToEndpointInfoMap::const_iterator it = |
| + local_id_to_endpoint_info_map_.begin(); |
| + it != local_id_to_endpoint_info_map_.end(); |
| + ++it) { |
| + DCHECK_NE(it->second.state, EndpointInfo::STATE_NORMAL); |
| + DCHECK(!it->second.message_pipe.get()); |
| + } |
| +#endif |
| } |
| MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( |
| - scoped_refptr<MessagePipe> message_pipe, unsigned port) { |
| + scoped_refptr<MessagePipe> message_pipe, |
| + unsigned port) { |
| + DCHECK(message_pipe); |
| DCHECK(port == 0 || port == 1); |
| - // Note: This assertion must *not* be done under |lock_|. |
| - DCHECK_EQ(message_pipe->GetType(port), MessagePipeEndpoint::kTypeProxy); |
| MessageInTransit::EndpointId local_id; |
| { |
| @@ -98,8 +112,32 @@ MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( |
| local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port); |
| } |
| - message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id); |
| - return local_id; |
| + // This might fail if that port got an |OnPeerClose()| before attaching. |
| + if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id)) |
| + return local_id; |
| + |
| + // Note: If it failed, quite possibly the endpoint info was removed from that |
| + // map (there's a race between us adding it to the map above and calling |
| + // |Attach()|). And even if an entry exists for |local_id|, we need to check |
| + // that it's the one we added (and not some other one that was added since). |
| + { |
| + base::AutoLock locker(lock_); |
| + IdToEndpointInfoMap::iterator it = |
| + local_id_to_endpoint_info_map_.find(local_id); |
| + if (it != local_id_to_endpoint_info_map_.end() && |
| + it->second.message_pipe.get() == message_pipe.get() && |
| + it->second.port == port) { |
| + DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL); |
| + // TODO(vtl): FIXME -- This is wrong. We need to specify (to |
| + // |AttachMessagePipeEndpoint()| who's going to be responsible for calling |
| + // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a |
| + // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to |
| + // run, then we'll get messages to an "invalid" local ID (for running, for |
| + // removal). |
| + local_id_to_endpoint_info_map_.erase(it); |
| + } |
| + } |
| + return MessageInTransit::kInvalidEndpointId; |
| } |
| bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, |
| @@ -115,6 +153,14 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, |
| endpoint_info = it->second; |
| } |
| + // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint| |
| + // and ignore it. |
| + if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { |
| + DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint " |
| + "(local ID " << local_id << ", remote ID " << remote_id << ")"; |
| + return true; |
| + } |
| + |
| // TODO(vtl): FIXME -- We need to handle the case that message pipe is already |
| // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). |
| endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); |
| @@ -125,20 +171,15 @@ void Channel::RunRemoteMessagePipeEndpoint( |
| MessageInTransit::EndpointId local_id, |
| MessageInTransit::EndpointId remote_id) { |
| base::AutoLock locker(lock_); |
| - |
| DCHECK(local_id_to_endpoint_info_map_.find(local_id) != |
| local_id_to_endpoint_info_map_.end()); |
| - |
| - scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| - MessageInTransit::kTypeChannel, |
| - MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, |
| - 0, 0, NULL)); |
| - message->set_source_id(local_id); |
| - message->set_destination_id(remote_id); |
| - if (!raw_channel_->WriteMessage(message.Pass())) { |
| - // TODO(vtl): FIXME -- I guess we should report the error back somehow so |
| - // that the dispatcher can be closed? |
| - CHECK(false) << "Not yet handled"; |
| + if (!SendControlMessage( |
| + MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, |
| + local_id, remote_id)) { |
| + HandleLocalError(base::StringPrintf( |
| + "Failed to send message to run remote message pipe endpoint (local ID " |
| + "%u, remote ID %u)", |
| + static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); |
| } |
| } |
| @@ -160,20 +201,52 @@ bool Channel::IsWriteBufferEmpty() { |
| return raw_channel_->IsWriteBufferEmpty(); |
| } |
| -void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) { |
| +void Channel::DetachMessagePipeEndpoint( |
| + MessageInTransit::EndpointId local_id, |
| + MessageInTransit::EndpointId remote_id) { |
| DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
| - base::AutoLock locker_(lock_); |
| - local_id_to_endpoint_info_map_.erase(local_id); |
| + bool should_send_remove_message = false; |
| + { |
| + base::AutoLock locker_(lock_); |
| + IdToEndpointInfoMap::iterator it = |
| + local_id_to_endpoint_info_map_.find(local_id); |
| + DCHECK(it != local_id_to_endpoint_info_map_.end()); |
| + |
| + switch (it->second.state) { |
| + case EndpointInfo::STATE_NORMAL: |
| + it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK; |
| + it->second.message_pipe = NULL; |
| + should_send_remove_message = |
| + (remote_id != MessageInTransit::kInvalidEndpointId); |
| + break; |
| + case EndpointInfo::STATE_WAIT_LOCAL_DETACH: |
| + local_id_to_endpoint_info_map_.erase(it); |
| + break; |
| + case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK: |
| + NOTREACHED(); |
| + break; |
| + case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK: |
| + it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK; |
| + break; |
| + } |
| + } |
| + if (!should_send_remove_message) |
| + return; |
| + |
| + if (!SendControlMessage( |
| + MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint, |
| + local_id, remote_id)) { |
| + HandleLocalError(base::StringPrintf( |
| + "Failed to send message to remove remote message pipe endpoint (local " |
| + "ID %u, remote ID %u)", |
| + static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); |
| + } |
| } |
| Channel::~Channel() { |
| // The channel should have been shut down first. |
| DCHECK(!raw_channel_.get()); |
| - |
| - DLOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty()) |
| - << "Destroying Channel with " << local_id_to_endpoint_info_map_.size() |
| - << " endpoints still present"; |
| } |
| void Channel::OnReadMessage(const MessageInTransit::View& message_view) { |
| @@ -249,6 +322,13 @@ void Channel::OnReadMessageForDownstream( |
| endpoint_info = it->second; |
| } |
| + // Ignore messages for zombie endpoints (not an error). |
| + if (endpoint_info.state != EndpointInfo::STATE_NORMAL) { |
| + DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = " |
| + << local_id << ", remote ID = " << message_view.source_id() << ")"; |
| + return; |
| + } |
| + |
| // We need to duplicate the message, because |EnqueueMessage()| will take |
| // ownership of it. |
| scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
| @@ -261,7 +341,7 @@ void Channel::OnReadMessageForDownstream( |
| // error, e.g., if the remote side is sending invalid control messages (to |
| // the message pipe). |
| HandleLocalError(base::StringPrintf( |
| - "Failed to enqueue message to local destination ID %u (result %d)", |
| + "Failed to enqueue message to local ID %u (result %d)", |
| static_cast<unsigned>(local_id), static_cast<int>(result))); |
| return; |
| } |
| @@ -273,14 +353,35 @@ void Channel::OnReadMessageForChannel( |
| switch (message_view.subtype()) { |
| case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint: |
| - // TODO(vtl): FIXME -- Error handling (also validation of |
| - // source/destination IDs). |
| - DVLOG(2) << "Handling channel message to run message pipe (local ID = " |
| - << message_view.destination_id() << ", remote ID = " |
| + DVLOG(2) << "Handling channel message to run message pipe (local ID " |
| + << message_view.destination_id() << ", remote ID " |
| << message_view.source_id() << ")"; |
| if (!RunMessagePipeEndpoint(message_view.destination_id(), |
| - message_view.source_id())) |
| - HandleRemoteError("Received invalid channel run message pipe message"); |
| + message_view.source_id())) { |
| + HandleRemoteError( |
| + "Received invalid channel message to run message pipe"); |
| + } |
| + break; |
| + case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint: |
| + DVLOG(2) << "Handling channel message to remove message pipe (local ID " |
| + << message_view.destination_id() << ", remote ID " |
| + << message_view.source_id() << ")"; |
| + if (!RemoveMessagePipeEndpoint(message_view.destination_id(), |
| + message_view.source_id())) { |
| + HandleRemoteError( |
| + "Received invalid channel message to remove message pipe"); |
| + } |
| + break; |
| + case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck: |
| + DVLOG(2) << "Handling channel message to ack remove message pipe (local " |
| + "ID " |
| + << message_view.destination_id() << ", remote ID " |
| + << message_view.source_id() << ")"; |
| + if (!RemoveMessagePipeEndpoint(message_view.destination_id(), |
| + message_view.source_id())) { |
| + HandleRemoteError( |
| + "Received invalid channel message to ack remove message pipe"); |
| + } |
| break; |
| default: |
| HandleRemoteError("Received invalid channel message"); |
| @@ -289,6 +390,62 @@ void Channel::OnReadMessageForChannel( |
| } |
| } |
| +bool Channel::RemoveMessagePipeEndpoint( |
| + MessageInTransit::EndpointId local_id, |
| + MessageInTransit::EndpointId remote_id) { |
| + EndpointInfo endpoint_info; |
| + { |
| + base::AutoLock locker(lock_); |
| + |
| + IdToEndpointInfoMap::iterator it = |
| + local_id_to_endpoint_info_map_.find(local_id); |
| + if (it == local_id_to_endpoint_info_map_.end()) { |
| + DVLOG(2) << "Remove message pipe error: not found"; |
| + return false; |
| + } |
| + |
| + // If it's waiting for the remove ack, just do it and return. |
| + if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) { |
| + local_id_to_endpoint_info_map_.erase(it); |
| + return true; |
| + } |
| + |
| + if (it->second.state != EndpointInfo::STATE_NORMAL) { |
| + DVLOG(2) << "Remove message pipe error: wrong state"; |
| + return false; |
| + } |
| + |
| + it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH; |
| + endpoint_info = it->second; |
| + it->second.message_pipe = NULL; |
| + } |
| + |
| + if (!SendControlMessage( |
| + MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck, |
| + local_id, remote_id)) { |
| + HandleLocalError(base::StringPrintf( |
| + "Failed to send message to remove remote message pipe endpoint ack " |
| + "(local ID %u, remote ID %u)", |
| + static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id))); |
| + } |
| + |
| + endpoint_info.message_pipe->OnRemove(endpoint_info.port); |
| + |
| + return true; |
| +} |
| + |
| +bool Channel::SendControlMessage(MessageInTransit::Subtype subtype, |
| + MessageInTransit::EndpointId local_id, |
| + MessageInTransit::EndpointId remote_id) { |
| + DVLOG(2) << "Sending channel control message: subtype " << subtype |
| + << ", local ID " << local_id << ", remote ID " << remote_id; |
| + scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| + MessageInTransit::kTypeChannel, subtype, 0, 0, NULL)); |
| + message->set_source_id(local_id); |
| + message->set_destination_id(remote_id); |
| + return raw_channel_->WriteMessage(message.Pass()); |
| +} |
| + |
| void Channel::HandleRemoteError(const base::StringPiece& error_message) { |
| // TODO(vtl): Is this how we really want to handle this? Probably we want to |
| // terminate the connection, since it's spewing invalid stuff. |
| @@ -297,6 +454,10 @@ void Channel::HandleRemoteError(const base::StringPiece& error_message) { |
| void Channel::HandleLocalError(const base::StringPiece& error_message) { |
| // TODO(vtl): Is this how we really want to handle this? |
| + // Sometimes we'll want to propagate the error back to the message pipe |
| + // (endpoint), and notify it that the remote is (effectively) closed. |
| + // Sometimes we'll want to kill the channel (and notify all the endpoints that |
| + // their remotes are dead. |
| LOG(WARNING) << error_message; |
| } |