Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(125)

Unified Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 2660733002: Mojo C++ bindings: introduce an optional array to store transferred interface IDs in messages. (Closed)
Patch Set: . Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/cpp/bindings/lib/multiplex_router.cc
diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc
index 17f225839a28ecba222b1a1e121e219a37d533fa..d8e67f7c6f950415489a4777bae298794657ea9d 100644
--- a/mojo/public/cpp/bindings/lib/multiplex_router.cc
+++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc
@@ -37,6 +37,7 @@ class MultiplexRouter::InterfaceEndpoint
id_(id),
closed_(false),
peer_closed_(false),
+ handle_created_(false),
client_(nullptr),
event_signalled_(false) {}
@@ -61,6 +62,12 @@ class MultiplexRouter::InterfaceEndpoint
peer_closed_ = true;
}
+ bool handle_created() const { return handle_created_; }
+ void set_handle_created() {
+ router_->AssertLockAcquired();
+ handle_created_ = true;
+ }
+
const base::Optional<DisconnectReason>& disconnect_reason() const {
return disconnect_reason_;
}
@@ -134,6 +141,7 @@ class MultiplexRouter::InterfaceEndpoint
bool SendMessage(Message* message) override {
DCHECK(task_runner_->BelongsToCurrentThread());
+ message->SerializeAssociatedEndpointHandles(router_);
message->set_interface_id(id_);
return router_->connector_.Accept(message);
}
@@ -237,6 +245,10 @@ class MultiplexRouter::InterfaceEndpoint
// Whether the peer endpoint has been closed.
bool peer_closed_;
+ // Whether there is already a ScopedInterfaceEndpointHandle created for this
+ // endpoint.
+ bool handle_created_;
+
base::Optional<DisconnectReason> disconnect_reason_;
// The task runner on which |client_|'s methods can be called.
@@ -262,12 +274,71 @@ class MultiplexRouter::InterfaceEndpoint
DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
};
+// Message objects cannot be destroyed under the router's lock, if they contain
+// ScopedInterfaceEndpointHandle objects.
+// IncomingMessageWrapper is used to wrap messages which haven't got the payload
+// interface IDs deserialized into ScopedInterfaceEndpointHandles. Wrapper
+// objects are always destroyed under the router's lock. When a wrapper is
+// destroyed and the message hasn't been consumed, the wrapper is responsible
+// to send endpoint closed notifications.
+class MultiplexRouter::IncomingMessageWrapper {
+ public:
+ IncomingMessageWrapper() = default;
+
+ IncomingMessageWrapper(MultiplexRouter* router, Message* message)
+ : router_(router), value_(std::move(*message)) {
+ DCHECK(value_.associated_endpoint_handles()->empty());
+ }
+
+ IncomingMessageWrapper(IncomingMessageWrapper&& other)
+ : router_(other.router_), value_(std::move(other.value_)) {}
+
+ ~IncomingMessageWrapper() {
+ if (value_.IsNull())
+ return;
+
+ router_->AssertLockAcquired();
+
+ uint32_t num_ids = value_.payload_num_interface_ids();
+ const uint32_t* ids = value_.payload_interface_ids();
+ for (uint32_t i = 0; i < num_ids; ++i) {
+ MayAutoUnlock unlocker(router_->lock_.get());
+ router_->control_message_proxy_.NotifyPeerEndpointClosed(ids[i],
+ base::nullopt);
+ }
+ }
+
+ IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) {
+ router_ = other.router_;
+ value_ = std::move(other.value_);
+ return *this;
+ }
+
+ // Must be called outside of the router's lock.
+ bool TakeMessage(Message* output) {
+ DCHECK(!value_.IsNull());
+
+ *output = std::move(value_);
+ return output->DeserializeAssociatedEndpointHandles(router_);
+ }
+
+ const Message& value() const { return value_; }
+
+ private:
+ MultiplexRouter* router_ = nullptr;
+ // It must not hold any ScopedInterfaceEndpointHandle objects.
+ Message value_;
+
+ DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper);
+};
+
struct MultiplexRouter::Task {
public:
// Doesn't take ownership of |message| but takes its contents.
- static std::unique_ptr<Task> CreateMessageTask(Message* message) {
+ static std::unique_ptr<Task> CreateMessageTask(
+ IncomingMessageWrapper message_wrapper) {
Task* task = new Task(MESSAGE);
- task->message = std::move(*message);
+ task->message_wrapper = std::move(message_wrapper);
return base::WrapUnique(task);
}
static std::unique_ptr<Task> CreateNotifyErrorTask(
@@ -282,7 +353,7 @@ struct MultiplexRouter::Task {
bool IsMessageTask() const { return type == MESSAGE; }
bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
- Message message;
+ IncomingMessageWrapper message_wrapper;
scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
enum Type { MESSAGE, NOTIFY_ERROR };
@@ -290,6 +361,8 @@ struct MultiplexRouter::Task {
private:
explicit Task(Type in_type) : type(in_type) {}
+
+ DISALLOW_COPY_AND_ASSIGN(Task);
};
MultiplexRouter::MultiplexRouter(
@@ -401,6 +474,8 @@ ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
bool inserted = false;
InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
if (inserted) {
+ DCHECK(!endpoint->handle_created());
+
if (encountered_error_)
UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
} else {
@@ -408,7 +483,12 @@ ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
// notification that the peer endpoint has closed.
CHECK(!endpoint->closed());
CHECK(endpoint->peer_closed());
+
+ if (endpoint->handle_created())
+ return ScopedInterfaceEndpointHandle();
}
+
+ endpoint->set_handle_created();
return CreateScopedInterfaceEndpointHandle(id, true);
}
@@ -554,23 +634,25 @@ bool MultiplexRouter::Accept(Message* message) {
DCHECK(!paused_);
+ IncomingMessageWrapper message_wrapper(this, message);
+
ClientCallBehavior client_call_behavior =
connector_.during_sync_handle_watcher_callback()
? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
: ALLOW_DIRECT_CLIENT_CALLS;
- bool processed =
- tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
- connector_.task_runner());
+ bool processed = tasks_.empty() && ProcessIncomingMessage(
+ &message_wrapper, client_call_behavior,
+ connector_.task_runner());
if (!processed) {
// Either the task queue is not empty or we cannot process the message
// directly. In both cases, there is no need to call ProcessTasks().
- tasks_.push_back(Task::CreateMessageTask(message));
+ tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper)));
Task* task = tasks_.back().get();
- if (task->message.has_flag(Message::kFlagIsSync)) {
- InterfaceId id = task->message.interface_id();
+ if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
+ InterfaceId id = task->message_wrapper.value().interface_id();
sync_message_tasks_[id].push_back(task);
auto iter = endpoints_.find(id);
if (iter != endpoints_.end())
@@ -591,10 +673,9 @@ bool MultiplexRouter::Accept(Message* message) {
bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
InterfaceId id,
const base::Optional<DisconnectReason>& reason) {
- AssertLockAcquired();
-
DCHECK(!IsMasterInterfaceId(id) || reason);
+ MayAutoLock locker(lock_.get());
InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
if (reason)
@@ -618,18 +699,18 @@ bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
}
bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
- AssertLockAcquired();
-
if (IsMasterInterfaceId(id))
return false;
- InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
- DCHECK(!endpoint->closed());
- UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
+ {
+ MayAutoLock locker(lock_.get());
- MayAutoUnlock unlocker(lock_.get());
- control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt);
+ InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
+ DCHECK(!endpoint->closed());
+ UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
+ }
+ control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt);
return true;
}
@@ -672,10 +753,11 @@ void MultiplexRouter::ProcessTasks(
tasks_.pop_front();
InterfaceId id = kInvalidInterfaceId;
- bool sync_message = task->IsMessageTask() && !task->message.IsNull() &&
- task->message.has_flag(Message::kFlagIsSync);
+ bool sync_message =
+ task->IsMessageTask() && !task->message_wrapper.value().IsNull() &&
+ task->message_wrapper.value().has_flag(Message::kFlagIsSync);
if (sync_message) {
- id = task->message.interface_id();
+ id = task->message_wrapper.value().interface_id();
auto& sync_message_queue = sync_message_tasks_[id];
DCHECK_EQ(task.get(), sync_message_queue.front());
sync_message_queue.pop_front();
@@ -685,8 +767,8 @@ void MultiplexRouter::ProcessTasks(
task->IsNotifyErrorTask()
? ProcessNotifyErrorTask(task.get(), client_call_behavior,
current_task_runner)
- : ProcessIncomingMessage(&task->message, client_call_behavior,
- current_task_runner);
+ : ProcessIncomingMessage(&task->message_wrapper,
+ client_call_behavior, current_task_runner);
if (!processed) {
if (sync_message) {
@@ -719,11 +801,11 @@ bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
iter->second.pop_front();
DCHECK(task->IsMessageTask());
- Message message = std::move(task->message);
+ IncomingMessageWrapper message_wrapper = std::move(task->message_wrapper);
// Note: after this call, |task| and |iter| may be invalidated.
bool processed = ProcessIncomingMessage(
- &message, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
+ &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
DCHECK(processed);
iter = sync_message_tasks_.find(id);
@@ -775,27 +857,38 @@ bool MultiplexRouter::ProcessNotifyErrorTask(
}
bool MultiplexRouter::ProcessIncomingMessage(
- Message* message,
+ IncomingMessageWrapper* message_wrapper,
ClientCallBehavior client_call_behavior,
base::SingleThreadTaskRunner* current_task_runner) {
DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
DCHECK(!paused_);
- DCHECK(message);
+ DCHECK(message_wrapper);
AssertLockAcquired();
- if (message->IsNull()) {
+ if (message_wrapper->value().IsNull()) {
// This is a sync message and has been processed during sync handle
// watching.
return true;
}
- if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
- if (!control_message_handler_.Accept(message))
+ if (PipeControlMessageHandler::IsPipeControlMessage(
+ &message_wrapper->value())) {
+ bool result = false;
+
+ {
+ MayAutoUnlock unlocker(lock_.get());
+ Message message;
+ result = message_wrapper->TakeMessage(&message) &&
+ control_message_handler_.Accept(&message);
+ }
+
+ if (!result)
RaiseErrorInNonTestingMode();
+
return true;
}
- InterfaceId id = message->interface_id();
+ InterfaceId id = message_wrapper->value().interface_id();
DCHECK(IsValidInterfaceId(id));
bool inserted = false;
@@ -832,7 +925,7 @@ bool MultiplexRouter::ProcessIncomingMessage(
}
bool can_direct_call;
- if (message->has_flag(Message::kFlagIsSync)) {
+ if (message_wrapper->value().has_flag(Message::kFlagIsSync)) {
can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
endpoint->task_runner()->BelongsToCurrentThread();
} else {
@@ -857,7 +950,9 @@ bool MultiplexRouter::ProcessIncomingMessage(
// It is safe to call into |client| without the lock. Because |client| is
// always accessed on the same thread, including DetachEndpointClient().
MayAutoUnlock unlocker(lock_.get());
- result = client->HandleIncomingMessage(message);
+ Message message;
+ result = message_wrapper->TakeMessage(&message) &&
+ client->HandleIncomingMessage(&message);
}
if (!result)
RaiseErrorInNonTestingMode();
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/lib/pipe_control_message_handler.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698