Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(17)

Unified Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 2674483002: Mojo C++ bindings: fix MultiplexRouter and ChannelAssociatedGroupController. (Closed)
Patch Set: . Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/public/cpp/bindings/lib/multiplex_router.cc
diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc
index d8e67f7c6f950415489a4777bae298794657ea9d..2d8c53e33994243be49eebb5b417f1b1764157ea 100644
--- a/mojo/public/cpp/bindings/lib/multiplex_router.cc
+++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc
@@ -274,69 +274,51 @@ class MultiplexRouter::InterfaceEndpoint
DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
};
-// Message objects cannot be destroyed under the router's lock, if they contain
-// ScopedInterfaceEndpointHandle objects.
-// IncomingMessageWrapper is used to wrap messages which haven't got the payload
-// interface IDs deserialized into ScopedInterfaceEndpointHandles. Wrapper
-// objects are always destroyed under the router's lock. When a wrapper is
-// destroyed and the message hasn't been consumed, the wrapper is responsible
-// to send endpoint closed notifications.
-class MultiplexRouter::IncomingMessageWrapper {
+// MessageWrapper objects are always destroyed under the router's lock. On
+// destruction, if the message it wrappers contains
+// ScopedInterfaceEndpointHandles (which cannot be destructed under the
+// router's lock), the wrapper unlocks to clean them up.
+class MultiplexRouter::MessageWrapper {
public:
- IncomingMessageWrapper() = default;
+ MessageWrapper() = default;
- IncomingMessageWrapper(MultiplexRouter* router, Message* message)
- : router_(router), value_(std::move(*message)) {
- DCHECK(value_.associated_endpoint_handles()->empty());
- }
+ MessageWrapper(MultiplexRouter* router, Message message)
+ : router_(router), value_(std::move(message)) {}
- IncomingMessageWrapper(IncomingMessageWrapper&& other)
+ MessageWrapper(MessageWrapper&& other)
: router_(other.router_), value_(std::move(other.value_)) {}
- ~IncomingMessageWrapper() {
- if (value_.IsNull())
+ ~MessageWrapper() {
+ if (value_.associated_endpoint_handles()->empty())
return;
router_->AssertLockAcquired();
-
- uint32_t num_ids = value_.payload_num_interface_ids();
- const uint32_t* ids = value_.payload_interface_ids();
- for (uint32_t i = 0; i < num_ids; ++i) {
+ {
MayAutoUnlock unlocker(router_->lock_.get());
- router_->control_message_proxy_.NotifyPeerEndpointClosed(ids[i],
- base::nullopt);
+ value_.mutable_associated_endpoint_handles()->clear();
}
}
- IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) {
+ MessageWrapper& operator=(MessageWrapper&& other) {
router_ = other.router_;
value_ = std::move(other.value_);
return *this;
}
- // Must be called outside of the router's lock.
- bool TakeMessage(Message* output) {
- DCHECK(!value_.IsNull());
-
- *output = std::move(value_);
- return output->DeserializeAssociatedEndpointHandles(router_);
- }
-
- const Message& value() const { return value_; }
+ Message& value() { return value_; }
private:
MultiplexRouter* router_ = nullptr;
- // It must not hold any ScopedInterfaceEndpointHandle objects.
Message value_;
- DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper);
+ DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
};
struct MultiplexRouter::Task {
public:
// Doesn't take ownership of |message| but takes its contents.
static std::unique_ptr<Task> CreateMessageTask(
- IncomingMessageWrapper message_wrapper) {
+ MessageWrapper message_wrapper) {
Task* task = new Task(MESSAGE);
task->message_wrapper = std::move(message_wrapper);
return base::WrapUnique(task);
@@ -353,7 +335,7 @@ struct MultiplexRouter::Task {
bool IsMessageTask() const { return type == MESSAGE; }
bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
- IncomingMessageWrapper message_wrapper;
+ MessageWrapper message_wrapper;
scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
enum Type { MESSAGE, NOTIFY_ERROR };
@@ -461,6 +443,7 @@ void MultiplexRouter::CreateEndpointHandlePair(
if (encountered_error_)
UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
+ endpoint->set_handle_created();
*local_endpoint = CreateScopedInterfaceEndpointHandle(id, true);
*remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false);
}
@@ -629,34 +612,36 @@ void MultiplexRouter::EnableTestingMode() {
bool MultiplexRouter::Accept(Message* message) {
DCHECK(thread_checker_.CalledOnValidThread());
+ if (!message->DeserializeAssociatedEndpointHandles(this))
+ return false;
+
scoped_refptr<MultiplexRouter> protector(this);
MayAutoLock locker(lock_.get());
DCHECK(!paused_);
- IncomingMessageWrapper message_wrapper(this, message);
-
ClientCallBehavior client_call_behavior =
connector_.during_sync_handle_watcher_callback()
? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
: ALLOW_DIRECT_CLIENT_CALLS;
- bool processed = tasks_.empty() && ProcessIncomingMessage(
- &message_wrapper, client_call_behavior,
- connector_.task_runner());
+ bool processed =
+ tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
+ connector_.task_runner());
if (!processed) {
// Either the task queue is not empty or we cannot process the message
// directly. In both cases, there is no need to call ProcessTasks().
- tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper)));
+ tasks_.push_back(
+ Task::CreateMessageTask(MessageWrapper(this, std::move(*message))));
Task* task = tasks_.back().get();
if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
InterfaceId id = task->message_wrapper.value().interface_id();
sync_message_tasks_[id].push_back(task);
- auto iter = endpoints_.find(id);
- if (iter != endpoints_.end())
- iter->second->SignalSyncMessageEvent();
+ InterfaceEndpoint* endpoint = FindEndpoint(id);
+ if (endpoint)
+ endpoint->SignalSyncMessageEvent();
}
} else if (!tasks_.empty()) {
// Processing the message may result in new tasks (for error notification)
@@ -767,7 +752,7 @@ void MultiplexRouter::ProcessTasks(
task->IsNotifyErrorTask()
? ProcessNotifyErrorTask(task.get(), client_call_behavior,
current_task_runner)
- : ProcessIncomingMessage(&task->message_wrapper,
+ : ProcessIncomingMessage(&task->message_wrapper.value(),
client_call_behavior, current_task_runner);
if (!processed) {
@@ -801,11 +786,12 @@ bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
iter->second.pop_front();
DCHECK(task->IsMessageTask());
- IncomingMessageWrapper message_wrapper = std::move(task->message_wrapper);
+ MessageWrapper message_wrapper = std::move(task->message_wrapper);
// Note: after this call, |task| and |iter| may be invalidated.
bool processed = ProcessIncomingMessage(
- &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
+ &message_wrapper.value(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES,
+ nullptr);
DCHECK(processed);
iter = sync_message_tasks_.find(id);
@@ -857,29 +843,26 @@ bool MultiplexRouter::ProcessNotifyErrorTask(
}
bool MultiplexRouter::ProcessIncomingMessage(
- IncomingMessageWrapper* message_wrapper,
+ Message* message,
ClientCallBehavior client_call_behavior,
base::SingleThreadTaskRunner* current_task_runner) {
DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
DCHECK(!paused_);
- DCHECK(message_wrapper);
+ DCHECK(message);
AssertLockAcquired();
- if (message_wrapper->value().IsNull()) {
+ if (message->IsNull()) {
// This is a sync message and has been processed during sync handle
// watching.
return true;
}
- if (PipeControlMessageHandler::IsPipeControlMessage(
- &message_wrapper->value())) {
+ if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
bool result = false;
{
MayAutoUnlock unlocker(lock_.get());
- Message message;
- result = message_wrapper->TakeMessage(&message) &&
- control_message_handler_.Accept(&message);
+ result = control_message_handler_.Accept(message);
}
if (!result)
@@ -888,34 +871,11 @@ bool MultiplexRouter::ProcessIncomingMessage(
return true;
}
- InterfaceId id = message_wrapper->value().interface_id();
+ InterfaceId id = message->interface_id();
DCHECK(IsValidInterfaceId(id));
- bool inserted = false;
- InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
- if (inserted) {
- // Currently, it is legitimate to receive messages for an endpoint
- // that is not registered. For example, the endpoint is transferred in
- // a message that is discarded. Once we add support to specify all
- // enclosing endpoints in message header, we should be able to remove
- // this.
- UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
-
- // It is also possible that this newly-inserted endpoint is the master
- // endpoint. When the master InterfacePtr/Binding goes away, the message
- // pipe is closed and we explicitly trigger a pipe connection error. The
- // error updates all the endpoints, including the master endpoint, with
- // PEER_ENDPOINT_CLOSED and removes the master endpoint from the
- // registration. We continue to process remaining tasks in the queue, as
- // long as there are refs keeping the router alive. If there are remaining
- // messages for the master endpoint, we will get here.
- MayAutoUnlock unlocker(lock_.get());
- if (!IsMasterInterfaceId(id))
- control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt);
- return true;
- }
-
- if (endpoint->closed())
+ InterfaceEndpoint* endpoint = FindEndpoint(id);
+ if (!endpoint || endpoint->closed())
return true;
if (!endpoint->client()) {
@@ -925,7 +885,7 @@ bool MultiplexRouter::ProcessIncomingMessage(
}
bool can_direct_call;
- if (message_wrapper->value().has_flag(Message::kFlagIsSync)) {
+ if (message->has_flag(Message::kFlagIsSync)) {
can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
endpoint->task_runner()->BelongsToCurrentThread();
} else {
@@ -950,9 +910,7 @@ bool MultiplexRouter::ProcessIncomingMessage(
// It is safe to call into |client| without the lock. Because |client| is
// always accessed on the same thread, including DetachEndpointClient().
MayAutoUnlock unlocker(lock_.get());
- Message message;
- result = message_wrapper->TakeMessage(&message) &&
- client->HandleIncomingMessage(&message);
+ result = client->HandleIncomingMessage(message);
}
if (!result)
RaiseErrorInNonTestingMode();
@@ -1014,20 +972,24 @@ MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
// false.
DCHECK(!inserted || !*inserted);
- auto iter = endpoints_.find(id);
- InterfaceEndpoint* endpoint;
- if (iter == endpoints_.end()) {
+ InterfaceEndpoint* endpoint = FindEndpoint(id);
+ if (!endpoint) {
endpoint = new InterfaceEndpoint(this, id);
endpoints_[id] = endpoint;
if (inserted)
*inserted = true;
- } else {
- endpoint = iter->second.get();
}
return endpoint;
}
+MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint(
+ InterfaceId id) {
+ AssertLockAcquired();
+ auto iter = endpoints_.find(id);
+ return iter != endpoints_.end() ? iter->second.get() : nullptr;
+}
+
void MultiplexRouter::AssertLockAcquired() {
#if DCHECK_IS_ON()
if (lock_)
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698