| Index: ipc/ipc_mojo_bootstrap.cc
|
| diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc
|
| index 5fa88ea5cc25bc003bf0e4b53ec8cd4f5a547aba..68454aea73348bf2a8e6676616da5d73cc9fc8e5 100644
|
| --- a/ipc/ipc_mojo_bootstrap.cc
|
| +++ b/ipc/ipc_mojo_bootstrap.cc
|
| @@ -107,6 +107,8 @@ class ChannelAssociatedGroupController
|
| Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
|
| endpoints_.insert({ sender_id, sender_endpoint });
|
| endpoints_.insert({ receiver_id, receiver_endpoint });
|
| + sender_endpoint->set_handle_created();
|
| + receiver_endpoint->set_handle_created();
|
| }
|
|
|
| mojo::ScopedInterfaceEndpointHandle sender_handle =
|
| @@ -144,6 +146,7 @@ class ChannelAssociatedGroupController
|
| endpoint->set_peer_closed();
|
| endpoints_.insert({ id, endpoint });
|
|
|
| + endpoint->set_handle_created();
|
| *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true);
|
| *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false);
|
| }
|
| @@ -247,63 +250,45 @@ class ChannelAssociatedGroupController
|
| friend class Endpoint;
|
| friend class ControlMessageProxyThunk;
|
|
|
| - // Message objects cannot be destroyed under the controller's lock, if they
|
| - // contain ScopedInterfaceEndpointHandle objects.
|
| - // IncomingMessageWrapper is used to wrap messages which haven't got the
|
| - // payload interface IDs deserialized into ScopedInterfaceEndpointHandles.
|
| - // Wrapper objects are always destroyed under the controller's lock. When a
|
| - // wrapper is destroyed and the message hasn't been consumed, the wrapper is
|
| - // responsible to send endpoint closed notifications.
|
| - class IncomingMessageWrapper {
|
| + // MessageWrapper objects are always destroyed under the controller's lock. On
|
| + // destruction, if the message it wrappers contains
|
| + // ScopedInterfaceEndpointHandles (which cannot be destructed under the
|
| + // controller's lock), the wrapper unlocks to clean them up.
|
| + class MessageWrapper {
|
| public:
|
| - IncomingMessageWrapper() = default;
|
| + MessageWrapper() = default;
|
|
|
| - IncomingMessageWrapper(ChannelAssociatedGroupController* controller,
|
| - mojo::Message* message)
|
| - : controller_(controller), value_(std::move(*message)) {
|
| - DCHECK(value_.associated_endpoint_handles()->empty());
|
| - }
|
| + MessageWrapper(ChannelAssociatedGroupController* controller,
|
| + mojo::Message message)
|
| + : controller_(controller), value_(std::move(message)) {}
|
|
|
| - IncomingMessageWrapper(IncomingMessageWrapper&& other)
|
| + MessageWrapper(MessageWrapper&& other)
|
| : controller_(other.controller_), value_(std::move(other.value_)) {}
|
|
|
| - ~IncomingMessageWrapper() {
|
| - if (value_.IsNull())
|
| + ~MessageWrapper() {
|
| + if (value_.associated_endpoint_handles()->empty())
|
| return;
|
|
|
| controller_->lock_.AssertAcquired();
|
| -
|
| - uint32_t num_ids = value_.payload_num_interface_ids();
|
| - const uint32_t* ids = value_.payload_interface_ids();
|
| - for (uint32_t i = 0; i < num_ids; ++i) {
|
| + {
|
| base::AutoUnlock unlocker(controller_->lock_);
|
| - controller_->control_message_proxy_.NotifyPeerEndpointClosed(
|
| - ids[i], base::nullopt);
|
| + value_.mutable_associated_endpoint_handles()->clear();
|
| }
|
| }
|
|
|
| - IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) {
|
| + MessageWrapper& operator=(MessageWrapper&& other) {
|
| controller_ = other.controller_;
|
| value_ = std::move(other.value_);
|
| return *this;
|
| }
|
|
|
| - // Must be called outside of the controller's lock.
|
| - bool TakeMessage(mojo::Message* output) {
|
| - DCHECK(!value_.IsNull());
|
| -
|
| - *output = std::move(value_);
|
| - return output->DeserializeAssociatedEndpointHandles(controller_);
|
| - }
|
| -
|
| - const mojo::Message& value() const { return value_; }
|
| + mojo::Message& value() { return value_; }
|
|
|
| private:
|
| ChannelAssociatedGroupController* controller_ = nullptr;
|
| - // It must not hold any ScopedInterfaceEndpointHandle objects.
|
| mojo::Message value_;
|
|
|
| - DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper);
|
| + DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
|
| };
|
|
|
| class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
|
| @@ -384,7 +369,7 @@ class ChannelAssociatedGroupController
|
| sync_watcher_.reset();
|
| }
|
|
|
| - uint32_t EnqueueSyncMessage(IncomingMessageWrapper message) {
|
| + uint32_t EnqueueSyncMessage(MessageWrapper message) {
|
| controller_->lock_.AssertAcquired();
|
| uint32_t id = GenerateSyncMessageId();
|
| sync_messages_.emplace(id, std::move(message));
|
| @@ -398,11 +383,11 @@ class ChannelAssociatedGroupController
|
| sync_message_event_->Signal();
|
| }
|
|
|
| - IncomingMessageWrapper PopSyncMessage(uint32_t id) {
|
| + MessageWrapper PopSyncMessage(uint32_t id) {
|
| controller_->lock_.AssertAcquired();
|
| if (sync_messages_.empty() || sync_messages_.front().first != id)
|
| - return IncomingMessageWrapper();
|
| - IncomingMessageWrapper message = std::move(sync_messages_.front().second);
|
| + return MessageWrapper();
|
| + MessageWrapper message = std::move(sync_messages_.front().second);
|
| sync_messages_.pop();
|
| return message;
|
| }
|
| @@ -457,7 +442,7 @@ class ChannelAssociatedGroupController
|
| base::AutoLock locker(controller_->lock_);
|
| bool more_to_process = false;
|
| if (!sync_messages_.empty()) {
|
| - IncomingMessageWrapper message_wrapper =
|
| + MessageWrapper message_wrapper =
|
| std::move(sync_messages_.front().second);
|
| sync_messages_.pop();
|
|
|
| @@ -465,9 +450,8 @@ class ChannelAssociatedGroupController
|
| mojo::InterfaceEndpointClient* client = client_;
|
| {
|
| base::AutoUnlock unlocker(controller_->lock_);
|
| - mojo::Message message;
|
| - dispatch_succeeded = message_wrapper.TakeMessage(&message) &&
|
| - client->HandleIncomingMessage(&message);
|
| + dispatch_succeeded =
|
| + client->HandleIncomingMessage(&message_wrapper.value());
|
| }
|
|
|
| if (!sync_messages_.empty())
|
| @@ -535,7 +519,7 @@ class ChannelAssociatedGroupController
|
| scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
|
| std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_;
|
| std::unique_ptr<MojoEvent> sync_message_event_;
|
| - std::queue<std::pair<uint32_t, IncomingMessageWrapper>> sync_messages_;
|
| + std::queue<std::pair<uint32_t, MessageWrapper>> sync_messages_;
|
| uint32_t next_sync_message_id_ = 0;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(Endpoint);
|
| @@ -689,33 +673,41 @@ class ChannelAssociatedGroupController
|
| lock_.AssertAcquired();
|
| DCHECK(!inserted || !*inserted);
|
|
|
| - auto iter = endpoints_.find(id);
|
| - if (iter != endpoints_.end())
|
| - return iter->second.get();
|
| -
|
| - Endpoint* endpoint = new Endpoint(this, id);
|
| - endpoints_.insert({ id, endpoint });
|
| - if (inserted)
|
| - *inserted = true;
|
| + Endpoint* endpoint = FindEndpoint(id);
|
| + if (!endpoint) {
|
| + endpoint = new Endpoint(this, id);
|
| + endpoints_.insert({id, endpoint});
|
| + if (inserted)
|
| + *inserted = true;
|
| + }
|
| return endpoint;
|
| }
|
|
|
| + Endpoint* FindEndpoint(mojo::InterfaceId id) {
|
| + lock_.AssertAcquired();
|
| + auto iter = endpoints_.find(id);
|
| + return iter != endpoints_.end() ? iter->second.get() : nullptr;
|
| + }
|
| +
|
| // mojo::MessageReceiver:
|
| bool Accept(mojo::Message* message) override {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
|
|
| - if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) {
|
| - return message->DeserializeAssociatedEndpointHandles(this) &&
|
| - control_message_handler_.Accept(message);
|
| - }
|
| + if (!message->DeserializeAssociatedEndpointHandles(this))
|
| + return false;
|
| +
|
| + if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
|
| + return control_message_handler_.Accept(message);
|
|
|
| mojo::InterfaceId id = message->interface_id();
|
| DCHECK(mojo::IsValidInterfaceId(id));
|
|
|
| base::AutoLock locker(lock_);
|
| - Endpoint* endpoint = GetEndpointForDispatch(id, true /* create */);
|
| - mojo::InterfaceEndpointClient* client =
|
| - endpoint ? endpoint->client() : nullptr;
|
| + Endpoint* endpoint = FindEndpoint(id);
|
| + if (!endpoint)
|
| + return true;
|
| +
|
| + mojo::InterfaceEndpointClient* client = endpoint->client();
|
| if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) {
|
| // No client has been bound yet or the client runs tasks on another
|
| // thread. We assume the other thread must always be the one on which
|
| @@ -726,7 +718,7 @@ class ChannelAssociatedGroupController
|
| DCHECK(proxy_task_runner_);
|
|
|
| if (message->has_flag(mojo::Message::kFlagIsSync)) {
|
| - IncomingMessageWrapper message_wrapper(this, message);
|
| + MessageWrapper message_wrapper(this, std::move(*message));
|
| // Sync messages may need to be handled by the endpoint if it's blocking
|
| // on a sync reply. We pass ownership of the message to the endpoint's
|
| // sync message queue. If the endpoint was blocking, it will dequeue the
|
| @@ -754,8 +746,7 @@ class ChannelAssociatedGroupController
|
| !message->has_flag(mojo::Message::kFlagIsResponse));
|
|
|
| base::AutoUnlock unlocker(lock_);
|
| - return message->DeserializeAssociatedEndpointHandles(this) &&
|
| - client->HandleIncomingMessage(message);
|
| + return client->HandleIncomingMessage(message);
|
| }
|
|
|
| void AcceptOnProxyThread(mojo::Message message) {
|
| @@ -765,9 +756,7 @@ class ChannelAssociatedGroupController
|
| DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
|
|
|
| base::AutoLock locker(lock_);
|
| - IncomingMessageWrapper message_wrapper(this, &message);
|
| -
|
| - Endpoint* endpoint = GetEndpointForDispatch(id, false /* create */);
|
| + Endpoint* endpoint = FindEndpoint(id);
|
| if (!endpoint)
|
| return;
|
|
|
| @@ -778,14 +767,12 @@ class ChannelAssociatedGroupController
|
| DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
|
|
|
| // Sync messages should never make their way to this method.
|
| - DCHECK(!message_wrapper.value().has_flag(mojo::Message::kFlagIsSync));
|
| + DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
|
|
|
| bool result = false;
|
| {
|
| base::AutoUnlock unlocker(lock_);
|
| - mojo::Message message;
|
| - result = message_wrapper.TakeMessage(&message) &&
|
| - client->HandleIncomingMessage(&message);
|
| + result = client->HandleIncomingMessage(&message);
|
| }
|
|
|
| if (!result)
|
| @@ -796,14 +783,12 @@ class ChannelAssociatedGroupController
|
| DCHECK(proxy_task_runner_->BelongsToCurrentThread());
|
|
|
| base::AutoLock locker(lock_);
|
| - Endpoint* endpoint =
|
| - GetEndpointForDispatch(interface_id, false /* create */);
|
| + Endpoint* endpoint = FindEndpoint(interface_id);
|
| if (!endpoint)
|
| return;
|
|
|
| DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
|
| - IncomingMessageWrapper message_wrapper =
|
| - endpoint->PopSyncMessage(message_id);
|
| + MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
|
|
|
| // The message must have already been dequeued by the endpoint waking up
|
| // from a sync wait. Nothing to do.
|
| @@ -817,28 +802,13 @@ class ChannelAssociatedGroupController
|
| bool result = false;
|
| {
|
| base::AutoUnlock unlocker(lock_);
|
| - mojo::Message message;
|
| - result = message_wrapper.TakeMessage(&message) &&
|
| - client->HandleIncomingMessage(&message);
|
| + result = client->HandleIncomingMessage(&message_wrapper.value());
|
| }
|
|
|
| if (!result)
|
| RaiseError();
|
| }
|
|
|
| - Endpoint* GetEndpointForDispatch(mojo::InterfaceId id, bool create) {
|
| - lock_.AssertAcquired();
|
| - auto iter = endpoints_.find(id);
|
| - if (iter != endpoints_.end())
|
| - return iter->second.get();
|
| - if (!create)
|
| - return nullptr;
|
| - bool inserted = false;
|
| - Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
|
| - DCHECK(inserted);
|
| - return endpoint;
|
| - }
|
| -
|
| // mojo::PipeControlMessageHandlerDelegate:
|
| bool OnPeerAssociatedEndpointClosed(
|
| mojo::InterfaceId id,
|
|
|