Index: mojo/system/message_pipe.cc |
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc |
index 6f2e5cbb664a2bc070d466c2e0f929f0048942a4..975293442a25b24248551c70b8fd4d7499a38a69 100644 |
--- a/mojo/system/message_pipe.cc |
+++ b/mojo/system/message_pipe.cc |
@@ -57,8 +57,10 @@ void MessagePipe::Close(unsigned port) { |
DCHECK(endpoints_[port].get()); |
endpoints_[port]->Close(); |
- if (endpoints_[destination_port].get()) |
- endpoints_[destination_port]->OnPeerClose(); |
+ if (endpoints_[destination_port].get()) { |
+ if (!endpoints_[destination_port]->OnPeerClose()) |
+ endpoints_[destination_port].reset(); |
+ } |
endpoints_[port].reset(); |
} |
@@ -223,7 +225,7 @@ MojoResult MessagePipe::EnqueueMessage( |
return MOJO_RESULT_OK; |
} |
-void MessagePipe::Attach(unsigned port, |
+bool MessagePipe::Attach(unsigned port, |
scoped_refptr<Channel> channel, |
MessageInTransit::EndpointId local_id) { |
DCHECK(port == 0 || port == 1); |
@@ -231,9 +233,12 @@ void MessagePipe::Attach(unsigned port, |
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
base::AutoLock locker(lock_); |
- DCHECK(endpoints_[port].get()); |
+ if (!endpoints_[port].get()) |
+ return false; |
+ DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy); |
endpoints_[port]->Attach(channel, local_id); |
+ return true; |
} |
void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) { |
@@ -242,7 +247,24 @@ void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) { |
base::AutoLock locker(lock_); |
DCHECK(endpoints_[port].get()); |
- endpoints_[port]->Run(remote_id); |
+ if (!endpoints_[port]->Run(remote_id)) |
+ endpoints_[port].reset(); |
+} |
+ |
+void MessagePipe::OnRemove(unsigned port) { |
+ unsigned destination_port = GetPeerPort(port); |
+ |
+ base::AutoLock locker(lock_); |
+ // A |OnPeerClose()| can come in first, before |OnRemove()| gets called. |
+ if (!endpoints_[port].get()) |
+ return; |
+ |
+ endpoints_[port]->OnRemove(); |
+ if (endpoints_[destination_port].get()) { |
+ if (!endpoints_[destination_port]->OnPeerClose()) |
+ endpoints_[destination_port].reset(); |
+ } |
+ endpoints_[port].reset(); |
} |
MessagePipe::~MessagePipe() { |
@@ -254,28 +276,8 @@ MessagePipe::~MessagePipe() { |
} |
MojoResult MessagePipe::HandleControlMessage( |
- unsigned port, |
+ unsigned /*port*/, |
scoped_ptr<MessageInTransit> message) { |
- DCHECK(port == 0 || port == 1); |
- DCHECK(message.get()); |
- DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipe); |
- |
- switch (message->subtype()) { |
- case MessageInTransit::kSubtypeMessagePipePeerClosed: { |
- unsigned source_port = GetPeerPort(port); |
- |
- base::AutoLock locker(lock_); |
- DCHECK(endpoints_[source_port].get()); |
- |
- endpoints_[source_port]->Close(); |
- if (endpoints_[port].get()) |
- endpoints_[port]->OnPeerClose(); |
- |
- endpoints_[source_port].reset(); |
- return MOJO_RESULT_OK; |
- } |
- } |
- |
LOG(WARNING) << "Unrecognized MessagePipe control message subtype " |
<< message->subtype(); |
return MOJO_RESULT_UNKNOWN; |