Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1676)

Unified Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 1823683006: Mojo C++ bindings: sync call support for associated interfaces and master interfaces (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/cpp/bindings/lib/multiplex_router.cc
diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc
index b9e086c53e11b4371a2858ba0a808a2613cb69ad..523029b096d30f26d4aea5033a8a710653a9e56a 100644
--- a/mojo/public/cpp/bindings/lib/multiplex_router.cc
+++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc
@@ -15,107 +15,268 @@
#include "base/stl_util.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
+#include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h"
+#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
namespace mojo {
namespace internal {
// InterfaceEndpoint stores the information of an interface endpoint registered
-// with the router. Always accessed under the router's lock.
+// with the router.
// No one other than the router's |endpoints_| and |tasks_| should hold refs to
// this object.
class MultiplexRouter::InterfaceEndpoint
- : public base::RefCounted<InterfaceEndpoint> {
+ : public base::RefCounted<InterfaceEndpoint>,
+ public InterfaceEndpointController {
public:
InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
- : router_lock_(&router->lock_),
+ : router_(router),
id_(id),
closed_(false),
peer_closed_(false),
- client_(nullptr) {
- router_lock_->AssertAcquired();
- }
+ client_(nullptr),
+ event_signalled_(false) {}
+
+ // ---------------------------------------------------------------------------
+ // The following public methods are safe to call from any threads without
+ // locking.
InterfaceId id() const { return id_; }
+ // ---------------------------------------------------------------------------
+ // The following public methods are called under the router's lock.
+
bool closed() const { return closed_; }
void set_closed() {
- router_lock_->AssertAcquired();
+ router_->lock_.AssertAcquired();
closed_ = true;
}
bool peer_closed() const { return peer_closed_; }
void set_peer_closed() {
- router_lock_->AssertAcquired();
+ router_->lock_.AssertAcquired();
peer_closed_ = true;
}
base::SingleThreadTaskRunner* task_runner() const {
return task_runner_.get();
}
- void set_task_runner(
- scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
- router_lock_->AssertAcquired();
- task_runner_ = std::move(task_runner);
- }
InterfaceEndpointClient* client() const { return client_; }
- void set_client(InterfaceEndpointClient* client) {
- router_lock_->AssertAcquired();
+
+ void AttachClient(InterfaceEndpointClient* client) {
+ router_->lock_.AssertAcquired();
+ DCHECK(!client_);
+ DCHECK(!closed_);
+
+ task_runner_ = base::MessageLoop::current()->task_runner();
client_ = client;
}
+ // It should be called on the same thread as the corresponding AttachClient()
Ken Rockot(use gerrit already) 2016/03/29 06:36:49 nit: should = must? Also perhaps "This" or "This m
yzshen1 2016/03/29 16:19:01 Done. English is hard. :)
+ // call.
+ void DetachClient() {
+ router_->lock_.AssertAcquired();
+ DCHECK(client_);
+ DCHECK(task_runner_->BelongsToCurrentThread());
+ DCHECK(!closed_);
+
+ task_runner_ = nullptr;
+ client_ = nullptr;
+ sync_watcher_.reset();
+ }
+
+ void SignalSyncMessageEvent() {
+ router_->lock_.AssertAcquired();
+ if (event_signalled_)
+ return;
+
+ EnsureEventMessagePipeExists();
+ event_signalled_ = true;
+ char dummy_message = '\0';
+ MojoResult result =
+ WriteMessageRaw(sync_message_event_sender_.get(), &dummy_message, 1,
Ken Rockot(use gerrit already) 2016/03/29 06:36:49 nit: It is perfectly legal to write a zero-length
yzshen1 2016/03/29 16:19:01 Done.
+ nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ }
+
+ // ---------------------------------------------------------------------------
+ // The following public methods (i.e., InterfaceEndpointController
+ // implementation) are called by the client on the same thread as the
+ // AttachClient() call. They are called outside of the router's lock.
+
+ bool SendMessage(Message* message) override {
+ DCHECK(task_runner_->BelongsToCurrentThread());
+ message->set_interface_id(id_);
+ return router_->connector_.Accept(message);
+ }
+
+ void AllowWokenUpBySyncWatchOnSameThread() override {
+ DCHECK(task_runner_->BelongsToCurrentThread());
+
+ EnsureSyncWatcherExists();
+ sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
+ }
+
+ bool SyncWatch(const bool* should_stop) override {
+ DCHECK(task_runner_->BelongsToCurrentThread());
+
+ EnsureSyncWatcherExists();
+ return sync_watcher_->SyncWatch(should_stop);
+ }
+
private:
friend class base::RefCounted<InterfaceEndpoint>;
- ~InterfaceEndpoint() {
- router_lock_->AssertAcquired();
+ ~InterfaceEndpoint() override {
+ router_->lock_.AssertAcquired();
DCHECK(!client_);
DCHECK(closed_);
DCHECK(peer_closed_);
+ DCHECK(!sync_watcher_);
+ }
+
+ void OnHandleReady(MojoResult result) {
+ DCHECK(task_runner_->BelongsToCurrentThread());
+ scoped_refptr<InterfaceEndpoint> self_protector(this);
+ scoped_refptr<MultiplexRouter> router_protector(router_);
+
+ // Because we never close |sync_message_event_{sender,receiver}_| before
+ // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ bool reset_sync_watcher = false;
+ {
+ base::AutoLock locker(router_->lock_);
+
+ bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
+
+ if (!more_to_process)
+ ResetSyncMessageSignal();
+
+ // Currently there are no queued sync messages and the peer has closed so
+ // there won't be incoming sync messages in the future.
+ reset_sync_watcher = !more_to_process && peer_closed_;
+ }
+ if (reset_sync_watcher) {
+ // If a SyncWatch() call (or multiple ones) of this interface endpoint is
+ // on the call stack, resetting the sync watcher will allow it to exit
+ // when the call stack unwinds to that frame.
+ sync_watcher_.reset();
+ }
+ }
+
+ void EnsureSyncWatcherExists() {
+ DCHECK(task_runner_->BelongsToCurrentThread());
+ if (sync_watcher_)
+ return;
+
+ {
+ base::AutoLock locker(router_->lock_);
+ EnsureEventMessagePipeExists();
+
+ auto iter = router_->sync_message_tasks_.find(id_);
+ if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
+ SignalSyncMessageEvent();
+ }
+
+ sync_watcher_.reset(new SyncHandleWatcher(
+ sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
+ }
+
+ void EnsureEventMessagePipeExists() {
+ router_->lock_.AssertAcquired();
+
+ if (sync_message_event_receiver_.is_valid())
+ return;
+
+ MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
+ &sync_message_event_receiver_);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ }
+
+ void ResetSyncMessageSignal() {
+ router_->lock_.AssertAcquired();
+
+ if (!event_signalled_)
+ return;
+
+ DCHECK(sync_message_event_receiver_.is_valid());
+ char dummy_message = 0;
+ uint32_t size = 1;
+ MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(),
Ken Rockot(use gerrit already) 2016/03/29 06:36:49 nit: It's fine to pass nullptr for the read buffer
yzshen1 2016/03/29 16:19:01 Done.
+ &dummy_message, &size, nullptr, nullptr,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ event_signalled_ = false;
}
- base::Lock* const router_lock_;
+ // ---------------------------------------------------------------------------
+ // The following members are safe to access from any threads.
+
+ MultiplexRouter* const router_;
const InterfaceId id_;
+ // ---------------------------------------------------------------------------
+ // The following members are accessed under the router's lock.
+
// Whether the endpoint has been closed.
bool closed_;
// Whether the peer endpoint has been closed.
bool peer_closed_;
- // The task runner on which |client_| can be accessed.
+ // The task runner on which |client_|'s methods can be called.
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
// Not owned. It is null if no client is attached to this endpoint.
InterfaceEndpointClient* client_;
+ // A message pipe used as an event to signal that sync messages are available.
+ // The message pipe handles are initialized under the router's lock and remain
+ // unchanged afterwards. They may be accessed outside of the router's lock
+ // later.
+ ScopedMessagePipeHandle sync_message_event_sender_;
+ ScopedMessagePipeHandle sync_message_event_receiver_;
+ bool event_signalled_;
+
+ // ---------------------------------------------------------------------------
+ // The following members are only valid while a client is attached. They are
+ // used exclusively on the client's thread. They may be accessed outside of
+ // the router's lock.
+
+ scoped_ptr<SyncHandleWatcher> sync_watcher_;
+
DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
};
struct MultiplexRouter::Task {
public:
// Doesn't take ownership of |message| but takes its contents.
- static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) {
- Task* task = new Task();
+ static scoped_ptr<Task> CreateMessageTask(Message* message) {
+ Task* task = new Task(MESSAGE);
task->message.reset(new Message);
message->MoveTo(task->message.get());
return make_scoped_ptr(task);
}
static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) {
- Task* task = new Task();
+ Task* task = new Task(NOTIFY_ERROR);
task->endpoint_to_notify = endpoint;
return make_scoped_ptr(task);
}
~Task() {}
- bool IsIncomingMessageTask() const { return !!message; }
- bool IsNotifyErrorTask() const { return !!endpoint_to_notify; }
+ bool IsMessageTask() const { return type == MESSAGE; }
+ bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
scoped_ptr<Message> message;
scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
+ enum Type { MESSAGE, NOTIFY_ERROR };
+ Type type;
+
private:
- Task() {}
+ explicit Task(Type in_type) : type(in_type) {}
};
MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
@@ -125,12 +286,16 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
header_validator_(this),
connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND),
- encountered_error_(false),
control_message_handler_(this),
control_message_proxy_(&connector_),
next_interface_id_value_(1),
posted_to_process_tasks_(false),
+ encountered_error_(false),
testing_mode_(false) {
+ // Always participate in sync handle watching, because even if it doesn't
+ // expect sync requests during sync handle watching, it may still need to
+ // dispatch messages to associated endpoints on a different thread.
+ connector_.AllowWokenUpBySyncWatchOnSameThread();
connector_.set_incoming_receiver(&header_validator_);
connector_.set_connection_error_handler(
[this]() { OnPipeConnectionError(); });
@@ -139,6 +304,7 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
MultiplexRouter::~MultiplexRouter() {
base::AutoLock locker(lock_);
+ sync_message_tasks_.clear();
tasks_.clear();
for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
@@ -221,10 +387,10 @@ void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) {
if (!IsMasterInterfaceId(id))
control_message_proxy_.NotifyPeerEndpointClosed(id);
- ProcessTasks(true);
+ ProcessTasks(NO_DIRECT_CLIENT_CALLS);
}
-void MultiplexRouter::AttachEndpointClient(
+InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
const ScopedInterfaceEndpointHandle& handle,
InterfaceEndpointClient* client) {
const InterfaceId id = handle.id();
@@ -236,15 +402,13 @@ void MultiplexRouter::AttachEndpointClient(
DCHECK(ContainsKey(endpoints_, id));
InterfaceEndpoint* endpoint = endpoints_[id].get();
- DCHECK(!endpoint->client());
- DCHECK(!endpoint->closed());
-
- endpoint->set_task_runner(base::MessageLoop::current()->task_runner());
- endpoint->set_client(client);
+ endpoint->AttachClient(client);
if (endpoint->peer_closed())
tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
- ProcessTasks(true);
+ ProcessTasks(NO_DIRECT_CLIENT_CALLS);
+
+ return endpoint;
}
void MultiplexRouter::DetachEndpointClient(
@@ -257,18 +421,7 @@ void MultiplexRouter::DetachEndpointClient(
DCHECK(ContainsKey(endpoints_, id));
InterfaceEndpoint* endpoint = endpoints_[id].get();
- DCHECK(endpoint->client());
- DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
- DCHECK(!endpoint->closed());
-
- endpoint->set_task_runner(nullptr);
- endpoint->set_client(nullptr);
-}
-
-bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle,
- Message* message) {
- message->set_interface_id(handle.id());
- return connector_.Accept(message);
+ endpoint->DetachClient();
}
void MultiplexRouter::RaiseError() {
@@ -291,6 +444,15 @@ MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) {
return associated_group->router_.get();
}
+void MultiplexRouter::CloseMessagePipe() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ connector_.CloseMessagePipe();
+ // CloseMessagePipe() above won't trigger connection error handler.
+ // Explicitly call OnPipeConnectionError() so that associated endpoints will
+ // get notified.
+ OnPipeConnectionError();
+}
+
bool MultiplexRouter::HasAssociatedEndpoints() const {
DCHECK(thread_checker_.CalledOnValidThread());
base::AutoLock locker(lock_);
@@ -317,17 +479,32 @@ bool MultiplexRouter::Accept(Message* message) {
scoped_refptr<MultiplexRouter> protector(this);
base::AutoLock locker(lock_);
- bool processed = tasks_.empty() && ProcessIncomingMessage(message, false);
+ ClientCallBehavior client_call_behavior =
+ connector_.during_sync_handle_watcher_callback()
+ ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
+ : ALLOW_DIRECT_CLIENT_CALLS;
+
+ bool processed =
+ tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior);
if (!processed) {
// Either the task queue is not empty or we cannot process the message
// directly. In both cases, there is no need to call ProcessTasks().
- tasks_.push_back(Task::CreateIncomingMessageTask(message));
+ tasks_.push_back(Task::CreateMessageTask(message));
+ Task* task = tasks_.back().get();
+
+ if (task->message->has_flag(kMessageIsSync)) {
+ InterfaceId id = task->message->interface_id();
+ sync_message_tasks_[id].push_back(task);
+ auto iter = endpoints_.find(id);
+ if (iter != endpoints_.end())
+ iter->second->SignalSyncMessageEvent();
+ }
} else if (!tasks_.empty()) {
// Processing the message may result in new tasks (for error notification)
// being added to the queue. In this case, we have to attempt to process the
// tasks.
- ProcessTasks(false);
+ ProcessTasks(client_call_behavior);
}
// Always return true. If we see errors during message processing, we will
@@ -388,10 +565,12 @@ void MultiplexRouter::OnPipeConnectionError() {
UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
}
- ProcessTasks(false);
+ ProcessTasks(connector_.during_sync_handle_watcher_callback()
+ ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
+ : ALLOW_DIRECT_CLIENT_CALLS);
}
-void MultiplexRouter::ProcessTasks(bool force_async) {
+void MultiplexRouter::ProcessTasks(ClientCallBehavior client_call_behavior) {
lock_.AssertAcquired();
if (posted_to_process_tasks_)
@@ -401,25 +580,70 @@ void MultiplexRouter::ProcessTasks(bool force_async) {
scoped_ptr<Task> task(std::move(tasks_.front()));
tasks_.pop_front();
+ InterfaceId id = kInvalidInterfaceId;
+ bool sync_message = task->IsMessageTask() && task->message &&
+ task->message->has_flag(kMessageIsSync);
+ if (sync_message) {
+ InterfaceId id = task->message->interface_id();
+ auto& sync_message_queue = sync_message_tasks_[id];
+ DCHECK_EQ(task.get(), sync_message_queue.front());
+ sync_message_queue.pop_front();
+ }
+
bool processed =
task->IsNotifyErrorTask()
- ? ProcessNotifyErrorTask(task.get(), force_async)
- : ProcessIncomingMessage(task->message.get(), force_async);
+ ? ProcessNotifyErrorTask(task.get(), client_call_behavior)
+ : ProcessIncomingMessage(task->message.get(), client_call_behavior);
if (!processed) {
tasks_.push_front(std::move(task));
+ if (sync_message) {
+ auto& sync_message_queue = sync_message_tasks_[id];
+ sync_message_queue.push_front(task.get());
+ }
break;
+ } else {
+ if (sync_message) {
+ auto iter = sync_message_tasks_.find(id);
+ if (iter != sync_message_tasks_.end() && iter->second.empty())
+ sync_message_tasks_.erase(iter);
+ }
}
}
}
-bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) {
+bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
+ lock_.AssertAcquired();
+
+ auto iter = sync_message_tasks_.find(id);
+ if (iter == sync_message_tasks_.end())
+ return false;
+
+ MultiplexRouter::Task* task = iter->second.front();
+ iter->second.pop_front();
+
+ DCHECK(task->IsMessageTask());
+ scoped_ptr<Message> message(std::move(task->message));
+
+ // Note: after this call, |task| and |iter| may be invalidated.
+ bool processed = ProcessIncomingMessage(
+ message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES);
+ DCHECK(processed);
+
+ iter = sync_message_tasks_.find(id);
+ return iter != sync_message_tasks_.end() && !iter->second.empty();
+}
+
+bool MultiplexRouter::ProcessNotifyErrorTask(
+ Task* task,
+ ClientCallBehavior client_call_behavior) {
lock_.AssertAcquired();
InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
if (!endpoint->client())
return true;
- if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) {
+ if (!endpoint->task_runner()->BelongsToCurrentThread() ||
+ client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS) {
MaybePostToProcessTasks(endpoint->task_runner());
return false;
}
@@ -437,9 +661,17 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) {
return true;
}
-bool MultiplexRouter::ProcessIncomingMessage(Message* message,
- bool force_async) {
+bool MultiplexRouter::ProcessIncomingMessage(
+ Message* message,
+ ClientCallBehavior client_call_behavior) {
lock_.AssertAcquired();
+
+ if (!message) {
+ // This is a sync message and has been processed during sync handle
+ // watching.
+ return true;
+ }
+
if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
if (!control_message_handler_.Accept(message))
RaiseErrorInNonTestingMode();
@@ -474,7 +706,12 @@ bool MultiplexRouter::ProcessIncomingMessage(Message* message,
return false;
}
- if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) {
+ bool can_direct_call =
+ (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS) ||
+ (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES &&
+ message->has_flag(kMessageIsSync));
+
+ if (!endpoint->task_runner()->BelongsToCurrentThread() || !can_direct_call) {
MaybePostToProcessTasks(endpoint->task_runner());
return false;
}
@@ -513,7 +750,7 @@ void MultiplexRouter::LockAndCallProcessTasks() {
// always called using base::Bind(), which holds a ref.
base::AutoLock locker(lock_);
posted_to_process_tasks_ = false;
- ProcessTasks(false);
+ ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS);
}
void MultiplexRouter::UpdateEndpointStateMayRemove(
@@ -525,6 +762,9 @@ void MultiplexRouter::UpdateEndpointStateMayRemove(
break;
case PEER_ENDPOINT_CLOSED:
endpoint->set_peer_closed();
+ // If the interface endpoint is performing a sync watch, this makes sure
+ // it is notified and eventually exits the sync watch.
+ endpoint->SignalSyncMessageEvent();
break;
}
if (endpoint->closed() && endpoint->peer_closed())

Powered by Google App Engine
This is Rietveld 408576698