| Index: mojo/system/channel.cc
|
| diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc
|
| index 770275a1a8ff24fa2ecc7307d7855e4ec275ce83..2408a469a101931fc28c7669024aab241c44f8ab 100644
|
| --- a/mojo/system/channel.cc
|
| +++ b/mojo/system/channel.cc
|
| @@ -21,12 +21,15 @@ COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
|
| STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
|
| Channel::kBootstrapEndpointId;
|
|
|
| -Channel::EndpointInfo::EndpointInfo() {
|
| +Channel::EndpointInfo::EndpointInfo()
|
| + : state(STATE_NORMAL),
|
| + port() {
|
| }
|
|
|
| Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
|
| unsigned port)
|
| - : message_pipe(message_pipe),
|
| + : state(STATE_NORMAL),
|
| + message_pipe(message_pipe),
|
| port(port) {
|
| }
|
|
|
| @@ -62,23 +65,34 @@ void Channel::Shutdown() {
|
| raw_channel_->Shutdown();
|
| raw_channel_.reset();
|
|
|
| - // This should not occur, but it probably mostly results in leaking;
|
| - // (Explicitly clearing the |local_id_to_endpoint_info_map_| would likely put
|
| - // things in an inconsistent state, which is worse. Note that if the map is
|
| - // nonempty, we probably won't be destroyed, since the endpoints have a
|
| - // reference to us.)
|
| - LOG_IF(ERROR, !local_id_to_endpoint_info_map_.empty())
|
| - << "Channel shutting down with endpoints still attached";
|
| - // TODO(vtl): This currently blows up, but the fix will be nontrivial.
|
| - // crbug.com/360081
|
| - //DCHECK(local_id_to_endpoint_info_map_.empty());
|
| + // This shouldn't usually occur, but it should be okay if all the endpoints
|
| + // are zombies (i.e., waiting to be removed, and not actually having any
|
| + // references to |MessagePipe|s).
|
| + // TODO(vtl): To make this actually okay, we need to make sure the other side
|
| + // channels being killed off properly.
|
| + LOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty())
|
| + << "Channel shutting down with endpoints still attached "
|
| + "(hopefully all zombies)";
|
| +
|
| +#ifndef NDEBUG
|
| + // Check that everything left is a zombie. (Note: We don't explicitly clear
|
| + // |local_id_to_endpoint_info_map_|, since that would likely put us in an
|
| + // inconsistent state if we have non-zombies.)
|
| + for (IdToEndpointInfoMap::const_iterator it =
|
| + local_id_to_endpoint_info_map_.begin();
|
| + it != local_id_to_endpoint_info_map_.end();
|
| + ++it) {
|
| + DCHECK_NE(it->second.state, EndpointInfo::STATE_NORMAL);
|
| + DCHECK(!it->second.message_pipe.get());
|
| + }
|
| +#endif
|
| }
|
|
|
| MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
|
| - scoped_refptr<MessagePipe> message_pipe, unsigned port) {
|
| + scoped_refptr<MessagePipe> message_pipe,
|
| + unsigned port) {
|
| + DCHECK(message_pipe);
|
| DCHECK(port == 0 || port == 1);
|
| - // Note: This assertion must *not* be done under |lock_|.
|
| - DCHECK_EQ(message_pipe->GetType(port), MessagePipeEndpoint::kTypeProxy);
|
|
|
| MessageInTransit::EndpointId local_id;
|
| {
|
| @@ -98,8 +112,32 @@ MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
|
| local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
|
| }
|
|
|
| - message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
|
| - return local_id;
|
| + // This might fail if that port got an |OnPeerClose()| before attaching.
|
| + if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id))
|
| + return local_id;
|
| +
|
| + // Note: If it failed, quite possibly the endpoint info was removed from that
|
| + // map (there's a race between us adding it to the map above and calling
|
| + // |Attach()|). And even if an entry exists for |local_id|, we need to check
|
| + // that it's the one we added (and not some other one that was added since).
|
| + {
|
| + base::AutoLock locker(lock_);
|
| + IdToEndpointInfoMap::iterator it =
|
| + local_id_to_endpoint_info_map_.find(local_id);
|
| + if (it != local_id_to_endpoint_info_map_.end() &&
|
| + it->second.message_pipe.get() == message_pipe.get() &&
|
| + it->second.port == port) {
|
| + DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL);
|
| + // TODO(vtl): FIXME -- This is wrong. We need to specify (to
|
| + // |AttachMessagePipeEndpoint()| who's going to be responsible for calling
|
| + // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a
|
| + // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to
|
| + // run, then we'll get messages to an "invalid" local ID (for running, for
|
| + // removal).
|
| + local_id_to_endpoint_info_map_.erase(it);
|
| + }
|
| + }
|
| + return MessageInTransit::kInvalidEndpointId;
|
| }
|
|
|
| bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
|
| @@ -115,6 +153,14 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
|
| endpoint_info = it->second;
|
| }
|
|
|
| + // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
|
| + // and ignore it.
|
| + if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
|
| + DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
|
| + "(local ID " << local_id << ", remote ID " << remote_id << ")";
|
| + return true;
|
| + }
|
| +
|
| // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
|
| // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
|
| endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
|
| @@ -124,21 +170,21 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
|
| void Channel::RunRemoteMessagePipeEndpoint(
|
| MessageInTransit::EndpointId local_id,
|
| MessageInTransit::EndpointId remote_id) {
|
| - base::AutoLock locker(lock_);
|
| -
|
| - DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
|
| - local_id_to_endpoint_info_map_.end());
|
| +#if DCHECK_IS_ON
|
| + {
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
|
| + local_id_to_endpoint_info_map_.end());
|
| + }
|
| +#endif
|
|
|
| - scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| - MessageInTransit::kTypeChannel,
|
| - MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
|
| - 0, 0, NULL));
|
| - message->set_source_id(local_id);
|
| - message->set_destination_id(remote_id);
|
| - if (!raw_channel_->WriteMessage(message.Pass())) {
|
| - // TODO(vtl): FIXME -- I guess we should report the error back somehow so
|
| - // that the dispatcher can be closed?
|
| - CHECK(false) << "Not yet handled";
|
| + if (!SendControlMessage(
|
| + MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
|
| + local_id, remote_id)) {
|
| + HandleLocalError(base::StringPrintf(
|
| + "Failed to send message to run remote message pipe endpoint (local ID "
|
| + "%u, remote ID %u)",
|
| + static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
|
| }
|
| }
|
|
|
| @@ -160,20 +206,52 @@ bool Channel::IsWriteBufferEmpty() {
|
| return raw_channel_->IsWriteBufferEmpty();
|
| }
|
|
|
| -void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
|
| +void Channel::DetachMessagePipeEndpoint(
|
| + MessageInTransit::EndpointId local_id,
|
| + MessageInTransit::EndpointId remote_id) {
|
| DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
|
|
|
| - base::AutoLock locker_(lock_);
|
| - local_id_to_endpoint_info_map_.erase(local_id);
|
| + bool should_send_remove_message = false;
|
| + {
|
| + base::AutoLock locker_(lock_);
|
| + IdToEndpointInfoMap::iterator it =
|
| + local_id_to_endpoint_info_map_.find(local_id);
|
| + DCHECK(it != local_id_to_endpoint_info_map_.end());
|
| +
|
| + switch (it->second.state) {
|
| + case EndpointInfo::STATE_NORMAL:
|
| + it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
|
| + it->second.message_pipe = NULL;
|
| + should_send_remove_message =
|
| + (remote_id != MessageInTransit::kInvalidEndpointId);
|
| + break;
|
| + case EndpointInfo::STATE_WAIT_LOCAL_DETACH:
|
| + local_id_to_endpoint_info_map_.erase(it);
|
| + break;
|
| + case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK:
|
| + NOTREACHED();
|
| + break;
|
| + case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK:
|
| + it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
|
| + break;
|
| + }
|
| + }
|
| + if (!should_send_remove_message)
|
| + return;
|
| +
|
| + if (!SendControlMessage(
|
| + MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
|
| + local_id, remote_id)) {
|
| + HandleLocalError(base::StringPrintf(
|
| + "Failed to send message to remove remote message pipe endpoint (local "
|
| + "ID %u, remote ID %u)",
|
| + static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
|
| + }
|
| }
|
|
|
| Channel::~Channel() {
|
| // The channel should have been shut down first.
|
| DCHECK(!raw_channel_.get());
|
| -
|
| - DLOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty())
|
| - << "Destroying Channel with " << local_id_to_endpoint_info_map_.size()
|
| - << " endpoints still present";
|
| }
|
|
|
| void Channel::OnReadMessage(const MessageInTransit::View& message_view) {
|
| @@ -249,6 +327,13 @@ void Channel::OnReadMessageForDownstream(
|
| endpoint_info = it->second;
|
| }
|
|
|
| + // Ignore messages for zombie endpoints (not an error).
|
| + if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
|
| + DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
|
| + << local_id << ", remote ID = " << message_view.source_id() << ")";
|
| + return;
|
| + }
|
| +
|
| // We need to duplicate the message, because |EnqueueMessage()| will take
|
| // ownership of it.
|
| scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
|
| @@ -261,7 +346,7 @@ void Channel::OnReadMessageForDownstream(
|
| // error, e.g., if the remote side is sending invalid control messages (to
|
| // the message pipe).
|
| HandleLocalError(base::StringPrintf(
|
| - "Failed to enqueue message to local destination ID %u (result %d)",
|
| + "Failed to enqueue message to local ID %u (result %d)",
|
| static_cast<unsigned>(local_id), static_cast<int>(result)));
|
| return;
|
| }
|
| @@ -273,14 +358,35 @@ void Channel::OnReadMessageForChannel(
|
|
|
| switch (message_view.subtype()) {
|
| case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
|
| - // TODO(vtl): FIXME -- Error handling (also validation of
|
| - // source/destination IDs).
|
| - DVLOG(2) << "Handling channel message to run message pipe (local ID = "
|
| - << message_view.destination_id() << ", remote ID = "
|
| + DVLOG(2) << "Handling channel message to run message pipe (local ID "
|
| + << message_view.destination_id() << ", remote ID "
|
| << message_view.source_id() << ")";
|
| if (!RunMessagePipeEndpoint(message_view.destination_id(),
|
| - message_view.source_id()))
|
| - HandleRemoteError("Received invalid channel run message pipe message");
|
| + message_view.source_id())) {
|
| + HandleRemoteError(
|
| + "Received invalid channel message to run message pipe");
|
| + }
|
| + break;
|
| + case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
|
| + DVLOG(2) << "Handling channel message to remove message pipe (local ID "
|
| + << message_view.destination_id() << ", remote ID "
|
| + << message_view.source_id() << ")";
|
| + if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
|
| + message_view.source_id())) {
|
| + HandleRemoteError(
|
| + "Received invalid channel message to remove message pipe");
|
| + }
|
| + break;
|
| + case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
|
| + DVLOG(2) << "Handling channel message to ack remove message pipe (local "
|
| + "ID "
|
| + << message_view.destination_id() << ", remote ID "
|
| + << message_view.source_id() << ")";
|
| + if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
|
| + message_view.source_id())) {
|
| + HandleRemoteError(
|
| + "Received invalid channel message to ack remove message pipe");
|
| + }
|
| break;
|
| default:
|
| HandleRemoteError("Received invalid channel message");
|
| @@ -289,6 +395,62 @@ void Channel::OnReadMessageForChannel(
|
| }
|
| }
|
|
|
| +bool Channel::RemoveMessagePipeEndpoint(
|
| + MessageInTransit::EndpointId local_id,
|
| + MessageInTransit::EndpointId remote_id) {
|
| + EndpointInfo endpoint_info;
|
| + {
|
| + base::AutoLock locker(lock_);
|
| +
|
| + IdToEndpointInfoMap::iterator it =
|
| + local_id_to_endpoint_info_map_.find(local_id);
|
| + if (it == local_id_to_endpoint_info_map_.end()) {
|
| + DVLOG(2) << "Remove message pipe error: not found";
|
| + return false;
|
| + }
|
| +
|
| + // If it's waiting for the remove ack, just do it and return.
|
| + if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) {
|
| + local_id_to_endpoint_info_map_.erase(it);
|
| + return true;
|
| + }
|
| +
|
| + if (it->second.state != EndpointInfo::STATE_NORMAL) {
|
| + DVLOG(2) << "Remove message pipe error: wrong state";
|
| + return false;
|
| + }
|
| +
|
| + it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH;
|
| + endpoint_info = it->second;
|
| + it->second.message_pipe = NULL;
|
| + }
|
| +
|
| + if (!SendControlMessage(
|
| + MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
|
| + local_id, remote_id)) {
|
| + HandleLocalError(base::StringPrintf(
|
| + "Failed to send message to remove remote message pipe endpoint ack "
|
| + "(local ID %u, remote ID %u)",
|
| + static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
|
| + }
|
| +
|
| + endpoint_info.message_pipe->OnRemove(endpoint_info.port);
|
| +
|
| + return true;
|
| +}
|
| +
|
| +bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
|
| + MessageInTransit::EndpointId local_id,
|
| + MessageInTransit::EndpointId remote_id) {
|
| + DVLOG(2) << "Sending channel control message: subtype " << subtype
|
| + << ", local ID " << local_id << ", remote ID " << remote_id;
|
| + scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| + MessageInTransit::kTypeChannel, subtype, 0, 0, NULL));
|
| + message->set_source_id(local_id);
|
| + message->set_destination_id(remote_id);
|
| + return WriteMessage(message.Pass());
|
| +}
|
| +
|
| void Channel::HandleRemoteError(const base::StringPiece& error_message) {
|
| // TODO(vtl): Is this how we really want to handle this? Probably we want to
|
| // terminate the connection, since it's spewing invalid stuff.
|
| @@ -297,6 +459,10 @@ void Channel::HandleRemoteError(const base::StringPiece& error_message) {
|
|
|
| void Channel::HandleLocalError(const base::StringPiece& error_message) {
|
| // TODO(vtl): Is this how we really want to handle this?
|
| + // Sometimes we'll want to propagate the error back to the message pipe
|
| + // (endpoint), and notify it that the remote is (effectively) closed.
|
| + // Sometimes we'll want to kill the channel (and notify all the endpoints that
|
| + // their remotes are dead.
|
| LOG(WARNING) << error_message;
|
| }
|
|
|
|
|