Index: mojo/edk/system/channel_endpoint.cc |
diff --git a/mojo/edk/system/channel_endpoint.cc b/mojo/edk/system/channel_endpoint.cc |
index a8bbbb2b1531df24c4e4cc6b10744f2be451570d..8ef1f40f52e97df1091f991ad152c25bf8460337 100644 |
--- a/mojo/edk/system/channel_endpoint.cc |
+++ b/mojo/edk/system/channel_endpoint.cc |
@@ -5,6 +5,7 @@ |
#include "mojo/edk/system/channel_endpoint.h" |
#include "base/logging.h" |
+#include "base/threading/platform_thread.h" |
#include "mojo/edk/system/channel.h" |
#include "mojo/edk/system/channel_endpoint_client.h" |
@@ -14,8 +15,11 @@ namespace system { |
ChannelEndpoint::ChannelEndpoint(ChannelEndpointClient* client, |
unsigned client_port, |
MessageInTransitQueue* message_queue) |
- : client_(client), client_port_(client_port), channel_(nullptr) { |
- DCHECK(client_.get() || message_queue); |
+ : client_(client), |
+ client_port_(client_port), |
+ channel_(nullptr), |
+ is_detached_from_channel_(false) { |
+ DCHECK(client_ || message_queue); |
if (message_queue) |
channel_message_queue_.Swap(message_queue); |
@@ -39,21 +43,27 @@ bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) { |
return WriteMessageNoLock(message.Pass()); |
} |
+bool ChannelEndpoint::ReplaceClient(ChannelEndpointClient* client, |
+ unsigned client_port) { |
+ DCHECK(client); |
+ |
+ base::AutoLock locker(lock_); |
+ DCHECK(client_); |
+ DCHECK(client != client_.get() || client_port != client_port_); |
+ client_ = client; |
+ client_port_ = client_port; |
+ return !is_detached_from_channel_; |
+} |
+ |
void ChannelEndpoint::DetachFromClient() { |
- { |
- base::AutoLock locker(lock_); |
- DCHECK(client_.get()); |
- client_ = nullptr; |
+ base::AutoLock locker(lock_); |
+ DCHECK(client_); |
+ client_ = nullptr; |
- if (!channel_) |
- return; |
- DCHECK(local_id_.is_valid()); |
- DCHECK(remote_id_.is_valid()); |
- channel_->DetachEndpoint(this, local_id_, remote_id_); |
- channel_ = nullptr; |
- local_id_ = ChannelEndpointId(); |
- remote_id_ = ChannelEndpointId(); |
- } |
+ if (!channel_) |
+ return; |
+ channel_->DetachEndpoint(this, local_id_, remote_id_); |
+ ResetChannelNoLock(); |
} |
void ChannelEndpoint::AttachAndRun(Channel* channel, |
@@ -76,32 +86,52 @@ void ChannelEndpoint::AttachAndRun(Channel* channel, |
<< "Failed to write enqueue message to channel"; |
} |
- if (!client_.get()) { |
+ if (!client_) { |
channel_->DetachEndpoint(this, local_id_, remote_id_); |
- channel_ = nullptr; |
- local_id_ = ChannelEndpointId(); |
- remote_id_ = ChannelEndpointId(); |
+ ResetChannelNoLock(); |
} |
} |
void ChannelEndpoint::OnReadMessage(scoped_ptr<MessageInTransit> message) { |
scoped_refptr<ChannelEndpointClient> client; |
- unsigned client_port; |
- { |
- base::AutoLock locker(lock_); |
- DCHECK(channel_); |
- if (!client_.get()) { |
- // This isn't a failure per se. (It just means that, e.g., the other end |
- // of the message point closed first.) |
- return; |
+ unsigned client_port = 0; |
+ |
+ // This loop is to make |ReplaceClient()| work. We can't call the client's |
+ // |OnReadMessage()| under our lock, so by the time we do that, |client| may |
+ // no longer be our client. |
+ // |
+ // In that case, |client| must return false. We'll then yield, and retry with |
+ // the new client. (Theoretically, the client could be replaced again.) |
+ // |
+ // This solution isn't terribly elegant, but it's the least costly way of |
+ // handling/avoiding this (very unlikely) race. (Other solutions -- e.g., |
+ // adding a client message queue, which the client only fetches messages from |
+ // -- impose significant cost in the common case.) |
+ for (;;) { |
+ { |
+ base::AutoLock locker(lock_); |
+ if (!channel_ || !client_) { |
+ // This isn't a failure per se. (It just means that, e.g., the other end |
+ // of the message point closed first.) |
+ return; |
+ } |
+ |
+ // If we get here in a second (third, etc.) iteration of the loop, it's |
+ // because |ReplaceClient()| was called. |
+ DCHECK(client_ != client || client_port_ != client_port); |
+ |
+ // Take a ref, and call |OnReadMessage()| outside the lock. |
+ client = client_; |
+ client_port = client_port_; |
} |
- // Take a ref, and call |OnReadMessage()| outside the lock. |
- client = client_; |
- client_port = client_port_; |
- } |
+ if (client->OnReadMessage(client_port, message.get())) { |
+ ignore_result(message.release()); |
+ break; |
+ } |
- client->OnReadMessage(client_port, message.Pass()); |
+ base::PlatformThread::YieldCurrentThread(); |
+ } |
} |
void ChannelEndpoint::DetachFromChannel() { |
@@ -110,7 +140,7 @@ void ChannelEndpoint::DetachFromChannel() { |
{ |
base::AutoLock locker(lock_); |
- if (client_.get()) { |
+ if (client_) { |
// Take a ref, and call |OnDetachFromChannel()| outside the lock. |
client = client_; |
client_port = client_port_; |
@@ -119,21 +149,23 @@ void ChannelEndpoint::DetachFromChannel() { |
// |channel_| may already be null if we already detached from the channel in |
// |DetachFromClient()| by calling |Channel::DetachEndpoint()| (and there |
// are racing detaches). |
- if (channel_) { |
- DCHECK(local_id_.is_valid()); |
- DCHECK(remote_id_.is_valid()); |
- channel_ = nullptr; |
- local_id_ = ChannelEndpointId(); |
- remote_id_ = ChannelEndpointId(); |
- } |
+ if (channel_) |
+ ResetChannelNoLock(); |
+ else |
+ DCHECK(is_detached_from_channel_); |
} |
- if (client.get()) |
+ // If |ReplaceClient()| is called (from another thread) after the above locked |
+ // section but before we call |OnDetachFromChannel()|, |ReplaceClient()| |
+ // return false to notify the caller that the channel was already detached. |
+ // (The old client has to accept the arguably-spurious call to |
+ // |OnDetachFromChannel()|.) |
+ if (client) |
client->OnDetachFromChannel(client_port); |
} |
ChannelEndpoint::~ChannelEndpoint() { |
- DCHECK(!client_.get()); |
+ DCHECK(!client_); |
DCHECK(!channel_); |
DCHECK(!local_id_.is_valid()); |
DCHECK(!remote_id_.is_valid()); |
@@ -154,5 +186,17 @@ bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) { |
return channel_->WriteMessage(message.Pass()); |
} |
+void ChannelEndpoint::ResetChannelNoLock() { |
+ DCHECK(channel_); |
+ DCHECK(local_id_.is_valid()); |
+ DCHECK(remote_id_.is_valid()); |
+ DCHECK(!is_detached_from_channel_); |
+ |
+ channel_ = nullptr; |
+ local_id_ = ChannelEndpointId(); |
+ remote_id_ = ChannelEndpointId(); |
+ is_detached_from_channel_ = true; |
+} |
+ |
} // namespace system |
} // namespace mojo |