| Index: ipc/ipc_mojo_bootstrap.cc
|
| diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc
|
| index 5c0b1338a6336c43f512b2078e110e5bf3f00102..5fa88ea5cc25bc003bf0e4b53ec8cd4f5a547aba 100644
|
| --- a/ipc/ipc_mojo_bootstrap.cc
|
| +++ b/ipc/ipc_mojo_bootstrap.cc
|
| @@ -156,9 +156,16 @@ class ChannelAssociatedGroupController
|
| base::AutoLock locker(lock_);
|
| bool inserted = false;
|
| Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
|
| - if (inserted && encountered_error_)
|
| - endpoint->set_peer_closed();
|
| + if (inserted) {
|
| + DCHECK(!endpoint->handle_created());
|
| + if (encountered_error_)
|
| + endpoint->set_peer_closed();
|
| + } else {
|
| + if (endpoint->handle_created())
|
| + return mojo::ScopedInterfaceEndpointHandle();
|
| + }
|
|
|
| + endpoint->set_handle_created();
|
| return CreateScopedInterfaceEndpointHandle(id, true);
|
| }
|
|
|
| @@ -240,6 +247,65 @@ class ChannelAssociatedGroupController
|
| friend class Endpoint;
|
| friend class ControlMessageProxyThunk;
|
|
|
| + // Message objects cannot be destroyed under the controller's lock, if they
|
| + // contain ScopedInterfaceEndpointHandle objects.
|
| + // IncomingMessageWrapper is used to wrap messages which haven't got the
|
| + // payload interface IDs deserialized into ScopedInterfaceEndpointHandles.
|
| + // Wrapper objects are always destroyed under the controller's lock. When a
|
| + // wrapper is destroyed and the message hasn't been consumed, the wrapper is
|
| + // responsible to send endpoint closed notifications.
|
| + class IncomingMessageWrapper {
|
| + public:
|
| + IncomingMessageWrapper() = default;
|
| +
|
| + IncomingMessageWrapper(ChannelAssociatedGroupController* controller,
|
| + mojo::Message* message)
|
| + : controller_(controller), value_(std::move(*message)) {
|
| + DCHECK(value_.associated_endpoint_handles()->empty());
|
| + }
|
| +
|
| + IncomingMessageWrapper(IncomingMessageWrapper&& other)
|
| + : controller_(other.controller_), value_(std::move(other.value_)) {}
|
| +
|
| + ~IncomingMessageWrapper() {
|
| + if (value_.IsNull())
|
| + return;
|
| +
|
| + controller_->lock_.AssertAcquired();
|
| +
|
| + uint32_t num_ids = value_.payload_num_interface_ids();
|
| + const uint32_t* ids = value_.payload_interface_ids();
|
| + for (uint32_t i = 0; i < num_ids; ++i) {
|
| + base::AutoUnlock unlocker(controller_->lock_);
|
| + controller_->control_message_proxy_.NotifyPeerEndpointClosed(
|
| + ids[i], base::nullopt);
|
| + }
|
| + }
|
| +
|
| + IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) {
|
| + controller_ = other.controller_;
|
| + value_ = std::move(other.value_);
|
| + return *this;
|
| + }
|
| +
|
| + // Must be called outside of the controller's lock.
|
| + bool TakeMessage(mojo::Message* output) {
|
| + DCHECK(!value_.IsNull());
|
| +
|
| + *output = std::move(value_);
|
| + return output->DeserializeAssociatedEndpointHandles(controller_);
|
| + }
|
| +
|
| + const mojo::Message& value() const { return value_; }
|
| +
|
| + private:
|
| + ChannelAssociatedGroupController* controller_ = nullptr;
|
| + // It must not hold any ScopedInterfaceEndpointHandle objects.
|
| + mojo::Message value_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper);
|
| + };
|
| +
|
| class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
|
| public mojo::InterfaceEndpointController {
|
| public:
|
| @@ -268,6 +334,16 @@ class ChannelAssociatedGroupController
|
| peer_closed_ = true;
|
| }
|
|
|
| + bool handle_created() const {
|
| + controller_->lock_.AssertAcquired();
|
| + return handle_created_;
|
| + }
|
| +
|
| + void set_handle_created() {
|
| + controller_->lock_.AssertAcquired();
|
| + handle_created_ = true;
|
| + }
|
| +
|
| const base::Optional<mojo::DisconnectReason>& disconnect_reason() const {
|
| return disconnect_reason_;
|
| }
|
| @@ -308,7 +384,7 @@ class ChannelAssociatedGroupController
|
| sync_watcher_.reset();
|
| }
|
|
|
| - uint32_t EnqueueSyncMessage(mojo::Message message) {
|
| + uint32_t EnqueueSyncMessage(IncomingMessageWrapper message) {
|
| controller_->lock_.AssertAcquired();
|
| uint32_t id = GenerateSyncMessageId();
|
| sync_messages_.emplace(id, std::move(message));
|
| @@ -322,11 +398,11 @@ class ChannelAssociatedGroupController
|
| sync_message_event_->Signal();
|
| }
|
|
|
| - mojo::Message PopSyncMessage(uint32_t id) {
|
| + IncomingMessageWrapper PopSyncMessage(uint32_t id) {
|
| controller_->lock_.AssertAcquired();
|
| if (sync_messages_.empty() || sync_messages_.front().first != id)
|
| - return mojo::Message();
|
| - mojo::Message message = std::move(sync_messages_.front().second);
|
| + return IncomingMessageWrapper();
|
| + IncomingMessageWrapper message = std::move(sync_messages_.front().second);
|
| sync_messages_.pop();
|
| return message;
|
| }
|
| @@ -335,6 +411,7 @@ class ChannelAssociatedGroupController
|
| bool SendMessage(mojo::Message* message) override {
|
| DCHECK(task_runner_->BelongsToCurrentThread());
|
| message->set_interface_id(id_);
|
| + message->SerializeAssociatedEndpointHandles(controller_);
|
| return controller_->SendMessage(message);
|
| }
|
|
|
| @@ -380,14 +457,17 @@ class ChannelAssociatedGroupController
|
| base::AutoLock locker(controller_->lock_);
|
| bool more_to_process = false;
|
| if (!sync_messages_.empty()) {
|
| - mojo::Message message = std::move(sync_messages_.front().second);
|
| + IncomingMessageWrapper message_wrapper =
|
| + std::move(sync_messages_.front().second);
|
| sync_messages_.pop();
|
|
|
| bool dispatch_succeeded;
|
| mojo::InterfaceEndpointClient* client = client_;
|
| {
|
| base::AutoUnlock unlocker(controller_->lock_);
|
| - dispatch_succeeded = client->HandleIncomingMessage(&message);
|
| + mojo::Message message;
|
| + dispatch_succeeded = message_wrapper.TakeMessage(&message) &&
|
| + client->HandleIncomingMessage(&message);
|
| }
|
|
|
| if (!sync_messages_.empty())
|
| @@ -449,12 +529,13 @@ class ChannelAssociatedGroupController
|
|
|
| bool closed_ = false;
|
| bool peer_closed_ = false;
|
| + bool handle_created_ = false;
|
| base::Optional<mojo::DisconnectReason> disconnect_reason_;
|
| mojo::InterfaceEndpointClient* client_ = nullptr;
|
| scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
|
| std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_;
|
| std::unique_ptr<MojoEvent> sync_message_event_;
|
| - std::queue<std::pair<uint32_t, mojo::Message>> sync_messages_;
|
| + std::queue<std::pair<uint32_t, IncomingMessageWrapper>> sync_messages_;
|
| uint32_t next_sync_message_id_ = 0;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(Endpoint);
|
| @@ -469,6 +550,7 @@ class ChannelAssociatedGroupController
|
| private:
|
| // MessageReceiver:
|
| bool Accept(mojo::Message* message) override {
|
| + message->SerializeAssociatedEndpointHandles(controller_);
|
| return controller_->SendMessage(message);
|
| }
|
|
|
| @@ -622,8 +704,10 @@ class ChannelAssociatedGroupController
|
| bool Accept(mojo::Message* message) override {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
|
|
| - if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
|
| - return control_message_handler_.Accept(message);
|
| + if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) {
|
| + return message->DeserializeAssociatedEndpointHandles(this) &&
|
| + control_message_handler_.Accept(message);
|
| + }
|
|
|
| mojo::InterfaceId id = message->interface_id();
|
| DCHECK(mojo::IsValidInterfaceId(id));
|
| @@ -642,12 +726,14 @@ class ChannelAssociatedGroupController
|
| DCHECK(proxy_task_runner_);
|
|
|
| if (message->has_flag(mojo::Message::kFlagIsSync)) {
|
| + IncomingMessageWrapper message_wrapper(this, message);
|
| // Sync messages may need to be handled by the endpoint if it's blocking
|
| // on a sync reply. We pass ownership of the message to the endpoint's
|
| // sync message queue. If the endpoint was blocking, it will dequeue the
|
| // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
|
| // call will dequeue the message and dispatch it.
|
| - uint32_t message_id = endpoint->EnqueueSyncMessage(std::move(*message));
|
| + uint32_t message_id =
|
| + endpoint->EnqueueSyncMessage(std::move(message_wrapper));
|
| proxy_task_runner_->PostTask(
|
| FROM_HERE,
|
| base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage,
|
| @@ -668,7 +754,8 @@ class ChannelAssociatedGroupController
|
| !message->has_flag(mojo::Message::kFlagIsResponse));
|
|
|
| base::AutoUnlock unlocker(lock_);
|
| - return client->HandleIncomingMessage(message);
|
| + return message->DeserializeAssociatedEndpointHandles(this) &&
|
| + client->HandleIncomingMessage(message);
|
| }
|
|
|
| void AcceptOnProxyThread(mojo::Message message) {
|
| @@ -678,6 +765,8 @@ class ChannelAssociatedGroupController
|
| DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
|
|
|
| base::AutoLock locker(lock_);
|
| + IncomingMessageWrapper message_wrapper(this, &message);
|
| +
|
| Endpoint* endpoint = GetEndpointForDispatch(id, false /* create */);
|
| if (!endpoint)
|
| return;
|
| @@ -689,12 +778,14 @@ class ChannelAssociatedGroupController
|
| DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
|
|
|
| // Sync messages should never make their way to this method.
|
| - DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
|
| + DCHECK(!message_wrapper.value().has_flag(mojo::Message::kFlagIsSync));
|
|
|
| bool result = false;
|
| {
|
| base::AutoUnlock unlocker(lock_);
|
| - result = client->HandleIncomingMessage(&message);
|
| + mojo::Message message;
|
| + result = message_wrapper.TakeMessage(&message) &&
|
| + client->HandleIncomingMessage(&message);
|
| }
|
|
|
| if (!result)
|
| @@ -711,11 +802,12 @@ class ChannelAssociatedGroupController
|
| return;
|
|
|
| DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
|
| - mojo::Message message = endpoint->PopSyncMessage(message_id);
|
| + IncomingMessageWrapper message_wrapper =
|
| + endpoint->PopSyncMessage(message_id);
|
|
|
| // The message must have already been dequeued by the endpoint waking up
|
| // from a sync wait. Nothing to do.
|
| - if (message.IsNull())
|
| + if (message_wrapper.value().IsNull())
|
| return;
|
|
|
| mojo::InterfaceEndpointClient* client = endpoint->client();
|
| @@ -725,7 +817,9 @@ class ChannelAssociatedGroupController
|
| bool result = false;
|
| {
|
| base::AutoUnlock unlocker(lock_);
|
| - result = client->HandleIncomingMessage(&message);
|
| + mojo::Message message;
|
| + result = message_wrapper.TakeMessage(&message) &&
|
| + client->HandleIncomingMessage(&message);
|
| }
|
|
|
| if (!result)
|
|
|