Chromium Code Reviews| Index: ipc/ipc_mojo_bootstrap.cc |
| diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc |
| index 5c0b1338a6336c43f512b2078e110e5bf3f00102..b7c76b1795c67d1f623bacbe8e197409ea021a1f 100644 |
| --- a/ipc/ipc_mojo_bootstrap.cc |
| +++ b/ipc/ipc_mojo_bootstrap.cc |
| @@ -156,9 +156,16 @@ class ChannelAssociatedGroupController |
| base::AutoLock locker(lock_); |
| bool inserted = false; |
| Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| - if (inserted && encountered_error_) |
| - endpoint->set_peer_closed(); |
| + if (inserted) { |
| + DCHECK(!endpoint->handle_created()); |
| + if (encountered_error_) |
| + endpoint->set_peer_closed(); |
| + } else { |
| + if (endpoint->handle_created()) |
| + return mojo::ScopedInterfaceEndpointHandle(); |
| + } |
| + endpoint->set_handle_created(); |
| return CreateScopedInterfaceEndpointHandle(id, true); |
| } |
| @@ -240,6 +247,65 @@ class ChannelAssociatedGroupController |
| friend class Endpoint; |
| friend class ControlMessageProxyThunk; |
| + // Message objects cannot be destroyed under the controller's lock, if they |
| + // contain ScopedInterfaceEndpointHandle objects. |
| + // IncomingMessageWrapper is used to wrap messages which haven't got the |
| + // payload interface IDs desreialized into ScopedInterfaceEndpointHandles. |
|
Ken Rockot(use gerrit already)
2017/01/31 18:33:42
nit: deserialized
yzshen1
2017/01/31 19:26:05
Done.
|
| + // Wrapper objects are always destroyed under the controller's lock. When a |
| + // wrapper is destroyed and the message hasn't been consumed, the wrapper is |
| + // responsible to send endpoint closed notifications. |
| + class IncomingMessageWrapper { |
| + public: |
| + IncomingMessageWrapper() = default; |
| + |
| + IncomingMessageWrapper(ChannelAssociatedGroupController* controller, |
| + mojo::Message* message) |
| + : controller_(controller), value_(std::move(*message)) { |
| + DCHECK(value_.associated_endpoint_handles()->empty()); |
| + } |
| + |
| + IncomingMessageWrapper(IncomingMessageWrapper&& other) |
| + : controller_(other.controller_), value_(std::move(other.value_)) {} |
| + |
| + ~IncomingMessageWrapper() { |
| + if (value_.IsNull()) |
| + return; |
| + |
| + controller_->lock_.AssertAcquired(); |
| + |
| + uint32_t num_ids = value_.payload_num_interface_ids(); |
| + const uint32_t* ids = value_.payload_interface_ids(); |
| + for (uint32_t i = 0; i < num_ids; ++i) { |
| + base::AutoUnlock unlocker(controller_->lock_); |
| + controller_->control_message_proxy_.NotifyPeerEndpointClosed( |
| + ids[i], base::nullopt); |
| + } |
| + } |
| + |
| + IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) { |
| + controller_ = other.controller_; |
| + value_ = std::move(other.value_); |
| + return *this; |
| + } |
| + |
| + // Must be called outside of the controller's lock. |
| + bool TakeMessage(mojo::Message* output) { |
| + DCHECK(!value_.IsNull()); |
| + |
| + *output = std::move(value_); |
| + return output->DeserializeAssociatedEndpointHandles(controller_); |
| + } |
| + |
| + const mojo::Message& value() const { return value_; } |
| + |
| + private: |
| + ChannelAssociatedGroupController* controller_ = nullptr; |
| + // It must not hold any ScopedInterfaceEndpointHandle objects. |
| + mojo::Message value_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper); |
| + }; |
| + |
| class Endpoint : public base::RefCountedThreadSafe<Endpoint>, |
| public mojo::InterfaceEndpointController { |
| public: |
| @@ -268,6 +334,16 @@ class ChannelAssociatedGroupController |
| peer_closed_ = true; |
| } |
| + bool handle_created() const { |
| + controller_->lock_.AssertAcquired(); |
| + return handle_created_; |
| + } |
| + |
| + void set_handle_created() { |
| + controller_->lock_.AssertAcquired(); |
| + handle_created_ = true; |
| + } |
| + |
| const base::Optional<mojo::DisconnectReason>& disconnect_reason() const { |
| return disconnect_reason_; |
| } |
| @@ -308,7 +384,7 @@ class ChannelAssociatedGroupController |
| sync_watcher_.reset(); |
| } |
| - uint32_t EnqueueSyncMessage(mojo::Message message) { |
| + uint32_t EnqueueSyncMessage(IncomingMessageWrapper message) { |
| controller_->lock_.AssertAcquired(); |
| uint32_t id = GenerateSyncMessageId(); |
| sync_messages_.emplace(id, std::move(message)); |
| @@ -322,11 +398,11 @@ class ChannelAssociatedGroupController |
| sync_message_event_->Signal(); |
| } |
| - mojo::Message PopSyncMessage(uint32_t id) { |
| + IncomingMessageWrapper PopSyncMessage(uint32_t id) { |
| controller_->lock_.AssertAcquired(); |
| if (sync_messages_.empty() || sync_messages_.front().first != id) |
| - return mojo::Message(); |
| - mojo::Message message = std::move(sync_messages_.front().second); |
| + return IncomingMessageWrapper(); |
| + IncomingMessageWrapper message = std::move(sync_messages_.front().second); |
| sync_messages_.pop(); |
| return message; |
| } |
| @@ -335,6 +411,7 @@ class ChannelAssociatedGroupController |
| bool SendMessage(mojo::Message* message) override { |
| DCHECK(task_runner_->BelongsToCurrentThread()); |
| message->set_interface_id(id_); |
| + message->SerializeAssociatedEndpointHandles(controller_); |
| return controller_->SendMessage(message); |
| } |
| @@ -380,14 +457,17 @@ class ChannelAssociatedGroupController |
| base::AutoLock locker(controller_->lock_); |
| bool more_to_process = false; |
| if (!sync_messages_.empty()) { |
| - mojo::Message message = std::move(sync_messages_.front().second); |
| + IncomingMessageWrapper message_wrapper = |
| + std::move(sync_messages_.front().second); |
| sync_messages_.pop(); |
| bool dispatch_succeeded; |
| mojo::InterfaceEndpointClient* client = client_; |
| { |
| base::AutoUnlock unlocker(controller_->lock_); |
| - dispatch_succeeded = client->HandleIncomingMessage(&message); |
| + mojo::Message message; |
| + dispatch_succeeded = message_wrapper.TakeMessage(&message) && |
| + client->HandleIncomingMessage(&message); |
| } |
| if (!sync_messages_.empty()) |
| @@ -449,12 +529,13 @@ class ChannelAssociatedGroupController |
| bool closed_ = false; |
| bool peer_closed_ = false; |
| + bool handle_created_ = false; |
| base::Optional<mojo::DisconnectReason> disconnect_reason_; |
| mojo::InterfaceEndpointClient* client_ = nullptr; |
| scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; |
| std::unique_ptr<MojoEvent> sync_message_event_; |
| - std::queue<std::pair<uint32_t, mojo::Message>> sync_messages_; |
| + std::queue<std::pair<uint32_t, IncomingMessageWrapper>> sync_messages_; |
| uint32_t next_sync_message_id_ = 0; |
| DISALLOW_COPY_AND_ASSIGN(Endpoint); |
| @@ -469,6 +550,7 @@ class ChannelAssociatedGroupController |
| private: |
| // MessageReceiver: |
| bool Accept(mojo::Message* message) override { |
| + message->SerializeAssociatedEndpointHandles(controller_); |
| return controller_->SendMessage(message); |
| } |
| @@ -622,8 +704,10 @@ class ChannelAssociatedGroupController |
| bool Accept(mojo::Message* message) override { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| - if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) |
| - return control_message_handler_.Accept(message); |
| + if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| + return message->DeserializeAssociatedEndpointHandles(this) && |
| + control_message_handler_.Accept(message); |
| + } |
| mojo::InterfaceId id = message->interface_id(); |
| DCHECK(mojo::IsValidInterfaceId(id)); |
| @@ -642,12 +726,14 @@ class ChannelAssociatedGroupController |
| DCHECK(proxy_task_runner_); |
| if (message->has_flag(mojo::Message::kFlagIsSync)) { |
| + IncomingMessageWrapper message_wrapper(this, message); |
| // Sync messages may need to be handled by the endpoint if it's blocking |
| // on a sync reply. We pass ownership of the message to the endpoint's |
| // sync message queue. If the endpoint was blocking, it will dequeue the |
| // message and dispatch it. Otherwise the posted |AcceptSyncMessage()| |
| // call will dequeue the message and dispatch it. |
| - uint32_t message_id = endpoint->EnqueueSyncMessage(std::move(*message)); |
| + uint32_t message_id = |
| + endpoint->EnqueueSyncMessage(std::move(message_wrapper)); |
| proxy_task_runner_->PostTask( |
| FROM_HERE, |
| base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage, |
| @@ -668,7 +754,8 @@ class ChannelAssociatedGroupController |
| !message->has_flag(mojo::Message::kFlagIsResponse)); |
| base::AutoUnlock unlocker(lock_); |
| - return client->HandleIncomingMessage(message); |
| + return message->DeserializeAssociatedEndpointHandles(this) && |
| + client->HandleIncomingMessage(message); |
| } |
| void AcceptOnProxyThread(mojo::Message message) { |
| @@ -678,6 +765,8 @@ class ChannelAssociatedGroupController |
| DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); |
| base::AutoLock locker(lock_); |
| + IncomingMessageWrapper message_wrapper(this, &message); |
| + |
| Endpoint* endpoint = GetEndpointForDispatch(id, false /* create */); |
| if (!endpoint) |
| return; |
| @@ -689,12 +778,14 @@ class ChannelAssociatedGroupController |
| DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| // Sync messages should never make their way to this method. |
| - DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); |
| + DCHECK(!message_wrapper.value().has_flag(mojo::Message::kFlagIsSync)); |
| bool result = false; |
| { |
| base::AutoUnlock unlocker(lock_); |
| - result = client->HandleIncomingMessage(&message); |
| + mojo::Message message; |
| + result = message_wrapper.TakeMessage(&message) && |
| + client->HandleIncomingMessage(&message); |
| } |
| if (!result) |
| @@ -711,11 +802,12 @@ class ChannelAssociatedGroupController |
| return; |
| DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| - mojo::Message message = endpoint->PopSyncMessage(message_id); |
| + IncomingMessageWrapper message_wrapper = |
| + endpoint->PopSyncMessage(message_id); |
| // The message must have already been dequeued by the endpoint waking up |
| // from a sync wait. Nothing to do. |
| - if (message.IsNull()) |
| + if (message_wrapper.value().IsNull()) |
| return; |
| mojo::InterfaceEndpointClient* client = endpoint->client(); |
| @@ -725,7 +817,9 @@ class ChannelAssociatedGroupController |
| bool result = false; |
| { |
| base::AutoUnlock unlocker(lock_); |
| - result = client->HandleIncomingMessage(&message); |
| + mojo::Message message; |
| + result = message_wrapper.TakeMessage(&message) && |
| + client->HandleIncomingMessage(&message); |
| } |
| if (!result) |