Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(502)

Unified Diff: ipc/ipc_mojo_bootstrap.cc

Issue 2674483002: Mojo C++ bindings: fix MultiplexRouter and ChannelAssociatedGroupController. (Closed)
Patch Set: . Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | mojo/public/cpp/bindings/lib/multiplex_router.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: ipc/ipc_mojo_bootstrap.cc
diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc
index 5fa88ea5cc25bc003bf0e4b53ec8cd4f5a547aba..68454aea73348bf2a8e6676616da5d73cc9fc8e5 100644
--- a/ipc/ipc_mojo_bootstrap.cc
+++ b/ipc/ipc_mojo_bootstrap.cc
@@ -107,6 +107,8 @@ class ChannelAssociatedGroupController
Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
endpoints_.insert({ sender_id, sender_endpoint });
endpoints_.insert({ receiver_id, receiver_endpoint });
+ sender_endpoint->set_handle_created();
+ receiver_endpoint->set_handle_created();
}
mojo::ScopedInterfaceEndpointHandle sender_handle =
@@ -144,6 +146,7 @@ class ChannelAssociatedGroupController
endpoint->set_peer_closed();
endpoints_.insert({ id, endpoint });
+ endpoint->set_handle_created();
*local_endpoint = CreateScopedInterfaceEndpointHandle(id, true);
*remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false);
}
@@ -247,63 +250,45 @@ class ChannelAssociatedGroupController
friend class Endpoint;
friend class ControlMessageProxyThunk;
- // Message objects cannot be destroyed under the controller's lock, if they
- // contain ScopedInterfaceEndpointHandle objects.
- // IncomingMessageWrapper is used to wrap messages which haven't got the
- // payload interface IDs deserialized into ScopedInterfaceEndpointHandles.
- // Wrapper objects are always destroyed under the controller's lock. When a
- // wrapper is destroyed and the message hasn't been consumed, the wrapper is
- // responsible to send endpoint closed notifications.
- class IncomingMessageWrapper {
+ // MessageWrapper objects are always destroyed under the controller's lock. On
+ // destruction, if the message it wrappers contains
+ // ScopedInterfaceEndpointHandles (which cannot be destructed under the
+ // controller's lock), the wrapper unlocks to clean them up.
+ class MessageWrapper {
public:
- IncomingMessageWrapper() = default;
+ MessageWrapper() = default;
- IncomingMessageWrapper(ChannelAssociatedGroupController* controller,
- mojo::Message* message)
- : controller_(controller), value_(std::move(*message)) {
- DCHECK(value_.associated_endpoint_handles()->empty());
- }
+ MessageWrapper(ChannelAssociatedGroupController* controller,
+ mojo::Message message)
+ : controller_(controller), value_(std::move(message)) {}
- IncomingMessageWrapper(IncomingMessageWrapper&& other)
+ MessageWrapper(MessageWrapper&& other)
: controller_(other.controller_), value_(std::move(other.value_)) {}
- ~IncomingMessageWrapper() {
- if (value_.IsNull())
+ ~MessageWrapper() {
+ if (value_.associated_endpoint_handles()->empty())
return;
controller_->lock_.AssertAcquired();
-
- uint32_t num_ids = value_.payload_num_interface_ids();
- const uint32_t* ids = value_.payload_interface_ids();
- for (uint32_t i = 0; i < num_ids; ++i) {
+ {
base::AutoUnlock unlocker(controller_->lock_);
- controller_->control_message_proxy_.NotifyPeerEndpointClosed(
- ids[i], base::nullopt);
+ value_.mutable_associated_endpoint_handles()->clear();
}
}
- IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) {
+ MessageWrapper& operator=(MessageWrapper&& other) {
controller_ = other.controller_;
value_ = std::move(other.value_);
return *this;
}
- // Must be called outside of the controller's lock.
- bool TakeMessage(mojo::Message* output) {
- DCHECK(!value_.IsNull());
-
- *output = std::move(value_);
- return output->DeserializeAssociatedEndpointHandles(controller_);
- }
-
- const mojo::Message& value() const { return value_; }
+ mojo::Message& value() { return value_; }
private:
ChannelAssociatedGroupController* controller_ = nullptr;
- // It must not hold any ScopedInterfaceEndpointHandle objects.
mojo::Message value_;
- DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper);
+ DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
};
class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
@@ -384,7 +369,7 @@ class ChannelAssociatedGroupController
sync_watcher_.reset();
}
- uint32_t EnqueueSyncMessage(IncomingMessageWrapper message) {
+ uint32_t EnqueueSyncMessage(MessageWrapper message) {
controller_->lock_.AssertAcquired();
uint32_t id = GenerateSyncMessageId();
sync_messages_.emplace(id, std::move(message));
@@ -398,11 +383,11 @@ class ChannelAssociatedGroupController
sync_message_event_->Signal();
}
- IncomingMessageWrapper PopSyncMessage(uint32_t id) {
+ MessageWrapper PopSyncMessage(uint32_t id) {
controller_->lock_.AssertAcquired();
if (sync_messages_.empty() || sync_messages_.front().first != id)
- return IncomingMessageWrapper();
- IncomingMessageWrapper message = std::move(sync_messages_.front().second);
+ return MessageWrapper();
+ MessageWrapper message = std::move(sync_messages_.front().second);
sync_messages_.pop();
return message;
}
@@ -457,7 +442,7 @@ class ChannelAssociatedGroupController
base::AutoLock locker(controller_->lock_);
bool more_to_process = false;
if (!sync_messages_.empty()) {
- IncomingMessageWrapper message_wrapper =
+ MessageWrapper message_wrapper =
std::move(sync_messages_.front().second);
sync_messages_.pop();
@@ -465,9 +450,8 @@ class ChannelAssociatedGroupController
mojo::InterfaceEndpointClient* client = client_;
{
base::AutoUnlock unlocker(controller_->lock_);
- mojo::Message message;
- dispatch_succeeded = message_wrapper.TakeMessage(&message) &&
- client->HandleIncomingMessage(&message);
+ dispatch_succeeded =
+ client->HandleIncomingMessage(&message_wrapper.value());
}
if (!sync_messages_.empty())
@@ -535,7 +519,7 @@ class ChannelAssociatedGroupController
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_;
std::unique_ptr<MojoEvent> sync_message_event_;
- std::queue<std::pair<uint32_t, IncomingMessageWrapper>> sync_messages_;
+ std::queue<std::pair<uint32_t, MessageWrapper>> sync_messages_;
uint32_t next_sync_message_id_ = 0;
DISALLOW_COPY_AND_ASSIGN(Endpoint);
@@ -689,33 +673,41 @@ class ChannelAssociatedGroupController
lock_.AssertAcquired();
DCHECK(!inserted || !*inserted);
- auto iter = endpoints_.find(id);
- if (iter != endpoints_.end())
- return iter->second.get();
-
- Endpoint* endpoint = new Endpoint(this, id);
- endpoints_.insert({ id, endpoint });
- if (inserted)
- *inserted = true;
+ Endpoint* endpoint = FindEndpoint(id);
+ if (!endpoint) {
+ endpoint = new Endpoint(this, id);
+ endpoints_.insert({id, endpoint});
+ if (inserted)
+ *inserted = true;
+ }
return endpoint;
}
+ Endpoint* FindEndpoint(mojo::InterfaceId id) {
+ lock_.AssertAcquired();
+ auto iter = endpoints_.find(id);
+ return iter != endpoints_.end() ? iter->second.get() : nullptr;
+ }
+
// mojo::MessageReceiver:
bool Accept(mojo::Message* message) override {
DCHECK(thread_checker_.CalledOnValidThread());
- if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) {
- return message->DeserializeAssociatedEndpointHandles(this) &&
- control_message_handler_.Accept(message);
- }
+ if (!message->DeserializeAssociatedEndpointHandles(this))
+ return false;
+
+ if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
+ return control_message_handler_.Accept(message);
mojo::InterfaceId id = message->interface_id();
DCHECK(mojo::IsValidInterfaceId(id));
base::AutoLock locker(lock_);
- Endpoint* endpoint = GetEndpointForDispatch(id, true /* create */);
- mojo::InterfaceEndpointClient* client =
- endpoint ? endpoint->client() : nullptr;
+ Endpoint* endpoint = FindEndpoint(id);
+ if (!endpoint)
+ return true;
+
+ mojo::InterfaceEndpointClient* client = endpoint->client();
if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) {
// No client has been bound yet or the client runs tasks on another
// thread. We assume the other thread must always be the one on which
@@ -726,7 +718,7 @@ class ChannelAssociatedGroupController
DCHECK(proxy_task_runner_);
if (message->has_flag(mojo::Message::kFlagIsSync)) {
- IncomingMessageWrapper message_wrapper(this, message);
+ MessageWrapper message_wrapper(this, std::move(*message));
// Sync messages may need to be handled by the endpoint if it's blocking
// on a sync reply. We pass ownership of the message to the endpoint's
// sync message queue. If the endpoint was blocking, it will dequeue the
@@ -754,8 +746,7 @@ class ChannelAssociatedGroupController
!message->has_flag(mojo::Message::kFlagIsResponse));
base::AutoUnlock unlocker(lock_);
- return message->DeserializeAssociatedEndpointHandles(this) &&
- client->HandleIncomingMessage(message);
+ return client->HandleIncomingMessage(message);
}
void AcceptOnProxyThread(mojo::Message message) {
@@ -765,9 +756,7 @@ class ChannelAssociatedGroupController
DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
base::AutoLock locker(lock_);
- IncomingMessageWrapper message_wrapper(this, &message);
-
- Endpoint* endpoint = GetEndpointForDispatch(id, false /* create */);
+ Endpoint* endpoint = FindEndpoint(id);
if (!endpoint)
return;
@@ -778,14 +767,12 @@ class ChannelAssociatedGroupController
DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
// Sync messages should never make their way to this method.
- DCHECK(!message_wrapper.value().has_flag(mojo::Message::kFlagIsSync));
+ DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
bool result = false;
{
base::AutoUnlock unlocker(lock_);
- mojo::Message message;
- result = message_wrapper.TakeMessage(&message) &&
- client->HandleIncomingMessage(&message);
+ result = client->HandleIncomingMessage(&message);
}
if (!result)
@@ -796,14 +783,12 @@ class ChannelAssociatedGroupController
DCHECK(proxy_task_runner_->BelongsToCurrentThread());
base::AutoLock locker(lock_);
- Endpoint* endpoint =
- GetEndpointForDispatch(interface_id, false /* create */);
+ Endpoint* endpoint = FindEndpoint(interface_id);
if (!endpoint)
return;
DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
- IncomingMessageWrapper message_wrapper =
- endpoint->PopSyncMessage(message_id);
+ MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
// The message must have already been dequeued by the endpoint waking up
// from a sync wait. Nothing to do.
@@ -817,28 +802,13 @@ class ChannelAssociatedGroupController
bool result = false;
{
base::AutoUnlock unlocker(lock_);
- mojo::Message message;
- result = message_wrapper.TakeMessage(&message) &&
- client->HandleIncomingMessage(&message);
+ result = client->HandleIncomingMessage(&message_wrapper.value());
}
if (!result)
RaiseError();
}
- Endpoint* GetEndpointForDispatch(mojo::InterfaceId id, bool create) {
- lock_.AssertAcquired();
- auto iter = endpoints_.find(id);
- if (iter != endpoints_.end())
- return iter->second.get();
- if (!create)
- return nullptr;
- bool inserted = false;
- Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
- DCHECK(inserted);
- return endpoint;
- }
-
// mojo::PipeControlMessageHandlerDelegate:
bool OnPeerAssociatedEndpointClosed(
mojo::InterfaceId id,
« no previous file with comments | « no previous file | mojo/public/cpp/bindings/lib/multiplex_router.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698