| Index: mojo/edk/system/channel_endpoint.cc
|
| diff --git a/mojo/edk/system/channel_endpoint.cc b/mojo/edk/system/channel_endpoint.cc
|
| index a8bbbb2b1531df24c4e4cc6b10744f2be451570d..8ef1f40f52e97df1091f991ad152c25bf8460337 100644
|
| --- a/mojo/edk/system/channel_endpoint.cc
|
| +++ b/mojo/edk/system/channel_endpoint.cc
|
| @@ -5,6 +5,7 @@
|
| #include "mojo/edk/system/channel_endpoint.h"
|
|
|
| #include "base/logging.h"
|
| +#include "base/threading/platform_thread.h"
|
| #include "mojo/edk/system/channel.h"
|
| #include "mojo/edk/system/channel_endpoint_client.h"
|
|
|
| @@ -14,8 +15,11 @@ namespace system {
|
| ChannelEndpoint::ChannelEndpoint(ChannelEndpointClient* client,
|
| unsigned client_port,
|
| MessageInTransitQueue* message_queue)
|
| - : client_(client), client_port_(client_port), channel_(nullptr) {
|
| - DCHECK(client_.get() || message_queue);
|
| + : client_(client),
|
| + client_port_(client_port),
|
| + channel_(nullptr),
|
| + is_detached_from_channel_(false) {
|
| + DCHECK(client_ || message_queue);
|
|
|
| if (message_queue)
|
| channel_message_queue_.Swap(message_queue);
|
| @@ -39,21 +43,27 @@ bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) {
|
| return WriteMessageNoLock(message.Pass());
|
| }
|
|
|
| +bool ChannelEndpoint::ReplaceClient(ChannelEndpointClient* client,
|
| + unsigned client_port) {
|
| + DCHECK(client);
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(client_);
|
| + DCHECK(client != client_.get() || client_port != client_port_);
|
| + client_ = client;
|
| + client_port_ = client_port;
|
| + return !is_detached_from_channel_;
|
| +}
|
| +
|
| void ChannelEndpoint::DetachFromClient() {
|
| - {
|
| - base::AutoLock locker(lock_);
|
| - DCHECK(client_.get());
|
| - client_ = nullptr;
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(client_);
|
| + client_ = nullptr;
|
|
|
| - if (!channel_)
|
| - return;
|
| - DCHECK(local_id_.is_valid());
|
| - DCHECK(remote_id_.is_valid());
|
| - channel_->DetachEndpoint(this, local_id_, remote_id_);
|
| - channel_ = nullptr;
|
| - local_id_ = ChannelEndpointId();
|
| - remote_id_ = ChannelEndpointId();
|
| - }
|
| + if (!channel_)
|
| + return;
|
| + channel_->DetachEndpoint(this, local_id_, remote_id_);
|
| + ResetChannelNoLock();
|
| }
|
|
|
| void ChannelEndpoint::AttachAndRun(Channel* channel,
|
| @@ -76,32 +86,52 @@ void ChannelEndpoint::AttachAndRun(Channel* channel,
|
| << "Failed to write enqueue message to channel";
|
| }
|
|
|
| - if (!client_.get()) {
|
| + if (!client_) {
|
| channel_->DetachEndpoint(this, local_id_, remote_id_);
|
| - channel_ = nullptr;
|
| - local_id_ = ChannelEndpointId();
|
| - remote_id_ = ChannelEndpointId();
|
| + ResetChannelNoLock();
|
| }
|
| }
|
|
|
| void ChannelEndpoint::OnReadMessage(scoped_ptr<MessageInTransit> message) {
|
| scoped_refptr<ChannelEndpointClient> client;
|
| - unsigned client_port;
|
| - {
|
| - base::AutoLock locker(lock_);
|
| - DCHECK(channel_);
|
| - if (!client_.get()) {
|
| - // This isn't a failure per se. (It just means that, e.g., the other end
|
| - // of the message point closed first.)
|
| - return;
|
| + unsigned client_port = 0;
|
| +
|
| + // This loop is to make |ReplaceClient()| work. We can't call the client's
|
| + // |OnReadMessage()| under our lock, so by the time we do that, |client| may
|
| + // no longer be our client.
|
| + //
|
| + // In that case, |client| must return false. We'll then yield, and retry with
|
| + // the new client. (Theoretically, the client could be replaced again.)
|
| + //
|
| + // This solution isn't terribly elegant, but it's the least costly way of
|
| + // handling/avoiding this (very unlikely) race. (Other solutions -- e.g.,
|
| + // adding a client message queue, which the client only fetches messages from
|
| + // -- impose significant cost in the common case.)
|
| + for (;;) {
|
| + {
|
| + base::AutoLock locker(lock_);
|
| + if (!channel_ || !client_) {
|
| + // This isn't a failure per se. (It just means that, e.g., the other end
|
| + // of the message point closed first.)
|
| + return;
|
| + }
|
| +
|
| + // If we get here in a second (third, etc.) iteration of the loop, it's
|
| + // because |ReplaceClient()| was called.
|
| + DCHECK(client_ != client || client_port_ != client_port);
|
| +
|
| + // Take a ref, and call |OnReadMessage()| outside the lock.
|
| + client = client_;
|
| + client_port = client_port_;
|
| }
|
|
|
| - // Take a ref, and call |OnReadMessage()| outside the lock.
|
| - client = client_;
|
| - client_port = client_port_;
|
| - }
|
| + if (client->OnReadMessage(client_port, message.get())) {
|
| + ignore_result(message.release());
|
| + break;
|
| + }
|
|
|
| - client->OnReadMessage(client_port, message.Pass());
|
| + base::PlatformThread::YieldCurrentThread();
|
| + }
|
| }
|
|
|
| void ChannelEndpoint::DetachFromChannel() {
|
| @@ -110,7 +140,7 @@ void ChannelEndpoint::DetachFromChannel() {
|
| {
|
| base::AutoLock locker(lock_);
|
|
|
| - if (client_.get()) {
|
| + if (client_) {
|
| // Take a ref, and call |OnDetachFromChannel()| outside the lock.
|
| client = client_;
|
| client_port = client_port_;
|
| @@ -119,21 +149,23 @@ void ChannelEndpoint::DetachFromChannel() {
|
| // |channel_| may already be null if we already detached from the channel in
|
| // |DetachFromClient()| by calling |Channel::DetachEndpoint()| (and there
|
| // are racing detaches).
|
| - if (channel_) {
|
| - DCHECK(local_id_.is_valid());
|
| - DCHECK(remote_id_.is_valid());
|
| - channel_ = nullptr;
|
| - local_id_ = ChannelEndpointId();
|
| - remote_id_ = ChannelEndpointId();
|
| - }
|
| + if (channel_)
|
| + ResetChannelNoLock();
|
| + else
|
| + DCHECK(is_detached_from_channel_);
|
| }
|
|
|
| - if (client.get())
|
| + // If |ReplaceClient()| is called (from another thread) after the above locked
|
| + // section but before we call |OnDetachFromChannel()|, |ReplaceClient()|
|
| + // return false to notify the caller that the channel was already detached.
|
| + // (The old client has to accept the arguably-spurious call to
|
| + // |OnDetachFromChannel()|.)
|
| + if (client)
|
| client->OnDetachFromChannel(client_port);
|
| }
|
|
|
| ChannelEndpoint::~ChannelEndpoint() {
|
| - DCHECK(!client_.get());
|
| + DCHECK(!client_);
|
| DCHECK(!channel_);
|
| DCHECK(!local_id_.is_valid());
|
| DCHECK(!remote_id_.is_valid());
|
| @@ -154,5 +186,17 @@ bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) {
|
| return channel_->WriteMessage(message.Pass());
|
| }
|
|
|
| +void ChannelEndpoint::ResetChannelNoLock() {
|
| + DCHECK(channel_);
|
| + DCHECK(local_id_.is_valid());
|
| + DCHECK(remote_id_.is_valid());
|
| + DCHECK(!is_detached_from_channel_);
|
| +
|
| + channel_ = nullptr;
|
| + local_id_ = ChannelEndpointId();
|
| + remote_id_ = ChannelEndpointId();
|
| + is_detached_from_channel_ = true;
|
| +}
|
| +
|
| } // namespace system
|
| } // namespace mojo
|
|
|