| Index: mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| index b9e086c53e11b4371a2858ba0a808a2613cb69ad..f8361bfe7fa75dd49127d2312b39cefb0544902b 100644
|
| --- a/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| +++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| @@ -15,6 +15,7 @@
|
| #include "base/stl_util.h"
|
| #include "mojo/public/cpp/bindings/associated_group.h"
|
| #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
|
| +#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
|
|
|
| namespace mojo {
|
| namespace internal {
|
| @@ -23,29 +24,36 @@ namespace internal {
|
| // with the router. Always accessed under the router's lock.
|
| // No one other than the router's |endpoints_| and |tasks_| should hold refs to
|
| // this object.
|
| +// TODO(yzshen): update the comment about lock.
|
| class MultiplexRouter::InterfaceEndpoint
|
| - : public base::RefCounted<InterfaceEndpoint> {
|
| + : public base::RefCounted<InterfaceEndpoint>,
|
| + public InterfaceEndpointController {
|
| public:
|
| InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
|
| - : router_lock_(&router->lock_),
|
| + : router_(router),
|
| id_(id),
|
| closed_(false),
|
| peer_closed_(false),
|
| - client_(nullptr) {
|
| - router_lock_->AssertAcquired();
|
| + client_(nullptr),
|
| + event_signalled_(false) {
|
| + router_->lock_.AssertAcquired();
|
| +
|
| + MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
|
| + &sync_message_event_receiver_);
|
| + DCHECK_EQ(MOJO_RESULT_OK, result);
|
| }
|
|
|
| InterfaceId id() const { return id_; }
|
|
|
| bool closed() const { return closed_; }
|
| void set_closed() {
|
| - router_lock_->AssertAcquired();
|
| + router_->lock_.AssertAcquired();
|
| closed_ = true;
|
| }
|
|
|
| bool peer_closed() const { return peer_closed_; }
|
| void set_peer_closed() {
|
| - router_lock_->AssertAcquired();
|
| + router_->lock_.AssertAcquired();
|
| peer_closed_ = true;
|
| }
|
|
|
| @@ -54,28 +62,102 @@ class MultiplexRouter::InterfaceEndpoint
|
| }
|
| void set_task_runner(
|
| scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
|
| - router_lock_->AssertAcquired();
|
| + router_->lock_.AssertAcquired();
|
| task_runner_ = std::move(task_runner);
|
| }
|
|
|
| InterfaceEndpointClient* client() const { return client_; }
|
| void set_client(InterfaceEndpointClient* client) {
|
| - router_lock_->AssertAcquired();
|
| + router_->lock_.AssertAcquired();
|
| client_ = client;
|
| }
|
|
|
| + void ClearSyncHandleWatcher() {
|
| + router_->lock_.AssertAcquired();
|
| + sync_watcher_.reset();
|
| + }
|
| +
|
| + void SignalSyncMessageReceived() {
|
| + router_->lock_.AssertAcquired();
|
| +
|
| + if (event_signalled_)
|
| + return;
|
| +
|
| + event_signalled_ = true;
|
| + char dummy_message = '\0';
|
| + MojoResult result =
|
| + WriteMessageRaw(sync_message_event_sender_.get(), &dummy_message, 1,
|
| + nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
|
| + DCHECK_EQ(MOJO_RESULT_OK, result);
|
| + }
|
| +
|
| + void ResetSyncMessageSignal() {
|
| + router_->lock_.AssertAcquired();
|
| +
|
| + if (!event_signalled_)
|
| + return;
|
| +
|
| + char dummy_message = 0;
|
| + uint32_t size = 1;
|
| + MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(),
|
| + &dummy_message, &size, nullptr, nullptr,
|
| + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
|
| + DCHECK_EQ(MOJO_RESULT_OK, result);
|
| + event_signalled_ = false;
|
| + }
|
| +
|
| + bool SendMessage(Message* message) override {
|
| + DCHECK(task_runner_->BelongsToCurrentThread());
|
| + message->set_interface_id(id_);
|
| + return router_->connector_.Accept(message);
|
| + }
|
| +
|
| + void AllowWokenUpBySyncWatchOnSameThread() override {
|
| + DCHECK(task_runner_->BelongsToCurrentThread());
|
| +
|
| + {
|
| + base::AutoLock locker(router_->lock_);
|
| +
|
| + auto iter = router_->sync_message_tasks_.find(id_);
|
| + if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
|
| + SignalSyncMessageReceived();
|
| + }
|
| +
|
| + EnsureSyncWatcherExists();
|
| + sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
|
| + }
|
| +
|
| + bool SyncWatch(const bool* should_stop) override {
|
| + DCHECK(task_runner_->BelongsToCurrentThread());
|
| +
|
| + EnsureSyncWatcherExists();
|
| + return sync_watcher_->SyncWatch(should_stop);
|
| + }
|
| +
|
| private:
|
| friend class base::RefCounted<InterfaceEndpoint>;
|
|
|
| - ~InterfaceEndpoint() {
|
| - router_lock_->AssertAcquired();
|
| + ~InterfaceEndpoint() override {
|
| + router_->lock_.AssertAcquired();
|
|
|
| DCHECK(!client_);
|
| DCHECK(closed_);
|
| DCHECK(peer_closed_);
|
| + DCHECK(!sync_watcher_);
|
| }
|
|
|
| - base::Lock* const router_lock_;
|
| + void OnHandleReady(MojoResult result);
|
| +
|
| + void EnsureSyncWatcherExists() {
|
| + if (sync_watcher_)
|
| + return;
|
| +
|
| + sync_watcher_.reset(new SyncHandleWatcher(
|
| + sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
|
| + base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
|
| + }
|
| +
|
| + MultiplexRouter* const router_;
|
| const InterfaceId id_;
|
|
|
| // Whether the endpoint has been closed.
|
| @@ -88,36 +170,74 @@ class MultiplexRouter::InterfaceEndpoint
|
| // Not owned. It is null if no client is attached to this endpoint.
|
| InterfaceEndpointClient* client_;
|
|
|
| + // TODO(yzshen): is it too expensive? Do I need to lazy init?
|
| + ScopedMessagePipeHandle sync_message_event_sender_;
|
| + ScopedMessagePipeHandle sync_message_event_receiver_;
|
| + bool event_signalled_;
|
| + scoped_ptr<SyncHandleWatcher> sync_watcher_;
|
| +
|
| + // TODO(yzshen): The handling of sync watching is quite similar to what
|
| + // Connector does. Consider unifying them.
|
| + //
|
| + // If non-zero, |sync_message_event_receiver_| should be registered with
|
| + // SyncHandleRegistry.
|
| + size_t register_sync_handle_watch_count_;
|
| + scoped_refptr<base::RefCountedData<bool>> should_stop_sync_handle_watch_;
|
| +
|
| DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
|
| };
|
|
|
| struct MultiplexRouter::Task {
|
| public:
|
| // Doesn't take ownership of |message| but takes its contents.
|
| - static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) {
|
| - Task* task = new Task();
|
| + static scoped_ptr<Task> CreateMessageTask(Message* message) {
|
| + Task* task = new Task(MESSAGE);
|
| task->message.reset(new Message);
|
| message->MoveTo(task->message.get());
|
| return make_scoped_ptr(task);
|
| }
|
| static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) {
|
| - Task* task = new Task();
|
| + Task* task = new Task(NOTIFY_ERROR);
|
| task->endpoint_to_notify = endpoint;
|
| return make_scoped_ptr(task);
|
| }
|
|
|
| ~Task() {}
|
|
|
| - bool IsIncomingMessageTask() const { return !!message; }
|
| - bool IsNotifyErrorTask() const { return !!endpoint_to_notify; }
|
| + bool IsMessageTask() const { return type == MESSAGE; }
|
| + bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
|
|
|
| scoped_ptr<Message> message;
|
| scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
|
|
|
| + enum Type { MESSAGE, NOTIFY_ERROR };
|
| +
|
| + Type type;
|
| +
|
| private:
|
| - Task() {}
|
| + explicit Task(Type in_type) : type(in_type) {}
|
| };
|
|
|
| +void MultiplexRouter::InterfaceEndpoint::OnHandleReady(MojoResult result) {
|
| + DCHECK(task_runner_->BelongsToCurrentThread());
|
| + scoped_refptr<InterfaceEndpoint> self_protector(this);
|
| + scoped_refptr<MultiplexRouter> router_protector(router_);
|
| +
|
| + {
|
| + base::AutoLock locker(router_->lock_);
|
| + bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
|
| +
|
| + if (!more_to_process)
|
| + ResetSyncMessageSignal();
|
| +
|
| + bool no_more_sync_messages = !more_to_process && peer_closed_;
|
| + bool sync_handle_watch_failed = result != MOJO_RESULT_OK;
|
| +
|
| + if (no_more_sync_messages || sync_handle_watch_failed)
|
| + sync_watcher_.reset();
|
| + }
|
| +}
|
| +
|
| MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
|
| ScopedMessagePipeHandle message_pipe)
|
| : RefCountedDeleteOnMessageLoop(
|
| @@ -131,6 +251,11 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
|
| next_interface_id_value_(1),
|
| posted_to_process_tasks_(false),
|
| testing_mode_(false) {
|
| + // Always participate in sync handle watch, because it may want to dispatch
|
| + // messages to associated endpoints on a different thread; or it want to
|
| + // dispatch sync requests to the master binding or associated bindings on the
|
| + // same thread.
|
| + connector_.AllowWokenUpBySyncWatchOnSameThread();
|
| connector_.set_incoming_receiver(&header_validator_);
|
| connector_.set_connection_error_handler(
|
| [this]() { OnPipeConnectionError(); });
|
| @@ -139,6 +264,7 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
|
| MultiplexRouter::~MultiplexRouter() {
|
| base::AutoLock locker(lock_);
|
|
|
| + sync_message_tasks_.clear();
|
| tasks_.clear();
|
|
|
| for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
|
| @@ -221,10 +347,10 @@ void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) {
|
| if (!IsMasterInterfaceId(id))
|
| control_message_proxy_.NotifyPeerEndpointClosed(id);
|
|
|
| - ProcessTasks(true);
|
| + ProcessTasks(NO_DIRECT_CLIENT_CALLS);
|
| }
|
|
|
| -void MultiplexRouter::AttachEndpointClient(
|
| +InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
|
| const ScopedInterfaceEndpointHandle& handle,
|
| InterfaceEndpointClient* client) {
|
| const InterfaceId id = handle.id();
|
| @@ -244,7 +370,9 @@ void MultiplexRouter::AttachEndpointClient(
|
|
|
| if (endpoint->peer_closed())
|
| tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
|
| - ProcessTasks(true);
|
| + ProcessTasks(NO_DIRECT_CLIENT_CALLS);
|
| +
|
| + return endpoint;
|
| }
|
|
|
| void MultiplexRouter::DetachEndpointClient(
|
| @@ -263,12 +391,7 @@ void MultiplexRouter::DetachEndpointClient(
|
|
|
| endpoint->set_task_runner(nullptr);
|
| endpoint->set_client(nullptr);
|
| -}
|
| -
|
| -bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle,
|
| - Message* message) {
|
| - message->set_interface_id(handle.id());
|
| - return connector_.Accept(message);
|
| + endpoint->ClearSyncHandleWatcher();
|
| }
|
|
|
| void MultiplexRouter::RaiseError() {
|
| @@ -317,17 +440,32 @@ bool MultiplexRouter::Accept(Message* message) {
|
| scoped_refptr<MultiplexRouter> protector(this);
|
| base::AutoLock locker(lock_);
|
|
|
| - bool processed = tasks_.empty() && ProcessIncomingMessage(message, false);
|
| + ClientCallBehavior client_call_behavior =
|
| + connector_.during_sync_handle_watcher_callback()
|
| + ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
|
| + : ALLOW_DIRECT_CLIENT_CALLS;
|
| +
|
| + bool processed =
|
| + tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior);
|
|
|
| if (!processed) {
|
| // Either the task queue is not empty or we cannot process the message
|
| // directly. In both cases, there is no need to call ProcessTasks().
|
| - tasks_.push_back(Task::CreateIncomingMessageTask(message));
|
| + tasks_.push_back(Task::CreateMessageTask(message));
|
| + Task* task = tasks_.back().get();
|
| +
|
| + if (task->message->has_flag(kMessageIsSync)) {
|
| + InterfaceId id = task->message->interface_id();
|
| + sync_message_tasks_[id].push_back(task);
|
| + auto iter = endpoints_.find(id);
|
| + if (iter != endpoints_.end())
|
| + iter->second->SignalSyncMessageReceived();
|
| + }
|
| } else if (!tasks_.empty()) {
|
| // Processing the message may result in new tasks (for error notification)
|
| // being added to the queue. In this case, we have to attempt to process the
|
| // tasks.
|
| - ProcessTasks(false);
|
| + ProcessTasks(client_call_behavior);
|
| }
|
|
|
| // Always return true. If we see errors during message processing, we will
|
| @@ -388,10 +526,12 @@ void MultiplexRouter::OnPipeConnectionError() {
|
| UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
|
| }
|
|
|
| - ProcessTasks(false);
|
| + ProcessTasks(connector_.during_sync_handle_watcher_callback()
|
| + ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
|
| + : ALLOW_DIRECT_CLIENT_CALLS);
|
| }
|
|
|
| -void MultiplexRouter::ProcessTasks(bool force_async) {
|
| +void MultiplexRouter::ProcessTasks(ClientCallBehavior client_call_behavior) {
|
| lock_.AssertAcquired();
|
|
|
| if (posted_to_process_tasks_)
|
| @@ -401,25 +541,69 @@ void MultiplexRouter::ProcessTasks(bool force_async) {
|
| scoped_ptr<Task> task(std::move(tasks_.front()));
|
| tasks_.pop_front();
|
|
|
| + InterfaceId id = kInvalidInterfaceId;
|
| + if (task->IsMessageTask() && task->message &&
|
| + task->message->has_flag(kMessageIsSync)) {
|
| + InterfaceId id = task->message->interface_id();
|
| + auto& sync_message_queue = sync_message_tasks_[id];
|
| + DCHECK_EQ(task.get(), sync_message_queue.front());
|
| + sync_message_queue.pop_front();
|
| + }
|
| +
|
| bool processed =
|
| task->IsNotifyErrorTask()
|
| - ? ProcessNotifyErrorTask(task.get(), force_async)
|
| - : ProcessIncomingMessage(task->message.get(), force_async);
|
| + ? ProcessNotifyErrorTask(task.get(), client_call_behavior)
|
| + : ProcessIncomingMessage(task->message.get(), client_call_behavior);
|
|
|
| if (!processed) {
|
| tasks_.push_front(std::move(task));
|
| + if (IsValidInterfaceId(id)) {
|
| + auto& sync_message_queue = sync_message_tasks_[id];
|
| + sync_message_queue.push_front(task.get());
|
| + }
|
| break;
|
| + } else {
|
| + if (IsValidInterfaceId(id)) {
|
| + auto iter = sync_message_tasks_.find(id);
|
| + if (iter != sync_message_tasks_.end() && iter->second.empty())
|
| + sync_message_tasks_.erase(iter);
|
| + }
|
| }
|
| }
|
| }
|
|
|
| -bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) {
|
| +bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
|
| + lock_.AssertAcquired();
|
| +
|
| + auto iter = sync_message_tasks_.find(id);
|
| + if (iter == sync_message_tasks_.end())
|
| + return false;
|
| +
|
| + MultiplexRouter::Task* task = iter->second.front();
|
| + iter->second.pop_front();
|
| +
|
| + DCHECK(task->IsMessageTask());
|
| + scoped_ptr<Message> message(std::move(task->message));
|
| +
|
| + // Note: after this call, |task|, |tasks| and |iter| may be invalidated.
|
| + bool processed = ProcessIncomingMessage(
|
| + message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES);
|
| + DCHECK(processed);
|
| +
|
| + iter = sync_message_tasks_.find(id);
|
| + return iter != sync_message_tasks_.end() && !iter->second.empty();
|
| +}
|
| +
|
| +bool MultiplexRouter::ProcessNotifyErrorTask(
|
| + Task* task,
|
| + ClientCallBehavior client_call_behavior) {
|
| lock_.AssertAcquired();
|
| InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
|
| if (!endpoint->client())
|
| return true;
|
|
|
| - if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) {
|
| + if (!endpoint->task_runner()->BelongsToCurrentThread() ||
|
| + client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS) {
|
| MaybePostToProcessTasks(endpoint->task_runner());
|
| return false;
|
| }
|
| @@ -437,9 +621,17 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) {
|
| return true;
|
| }
|
|
|
| -bool MultiplexRouter::ProcessIncomingMessage(Message* message,
|
| - bool force_async) {
|
| +bool MultiplexRouter::ProcessIncomingMessage(
|
| + Message* message,
|
| + ClientCallBehavior client_call_behavior) {
|
| lock_.AssertAcquired();
|
| +
|
| + if (!message) {
|
| + // This is a sync message and has been processed during sync handle
|
| + // watching.
|
| + return true;
|
| + }
|
| +
|
| if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
|
| if (!control_message_handler_.Accept(message))
|
| RaiseErrorInNonTestingMode();
|
| @@ -468,13 +660,15 @@ bool MultiplexRouter::ProcessIncomingMessage(Message* message,
|
| if (endpoint->closed())
|
| return true;
|
|
|
| - if (!endpoint->client()) {
|
| - // We need to wait until a client is attached in order to dispatch further
|
| - // messages.
|
| + if (!endpoint->client())
|
| return false;
|
| - }
|
|
|
| - if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) {
|
| + bool can_direct_call =
|
| + (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS) ||
|
| + (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES &&
|
| + message->has_flag(kMessageIsSync));
|
| +
|
| + if (!endpoint->task_runner()->BelongsToCurrentThread() || !can_direct_call) {
|
| MaybePostToProcessTasks(endpoint->task_runner());
|
| return false;
|
| }
|
| @@ -513,7 +707,7 @@ void MultiplexRouter::LockAndCallProcessTasks() {
|
| // always called using base::Bind(), which holds a ref.
|
| base::AutoLock locker(lock_);
|
| posted_to_process_tasks_ = false;
|
| - ProcessTasks(false);
|
| + ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS);
|
| }
|
|
|
| void MultiplexRouter::UpdateEndpointStateMayRemove(
|
| @@ -525,6 +719,7 @@ void MultiplexRouter::UpdateEndpointStateMayRemove(
|
| break;
|
| case PEER_ENDPOINT_CLOSED:
|
| endpoint->set_peer_closed();
|
| + endpoint->SignalSyncMessageReceived();
|
| break;
|
| }
|
| if (endpoint->closed() && endpoint->peer_closed())
|
|
|