Index: mojo/public/cpp/bindings/lib/multiplex_router.cc |
diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
index b9e086c53e11b4371a2858ba0a808a2613cb69ad..523029b096d30f26d4aea5033a8a710653a9e56a 100644 |
--- a/mojo/public/cpp/bindings/lib/multiplex_router.cc |
+++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
@@ -15,107 +15,268 @@ |
#include "base/stl_util.h" |
#include "mojo/public/cpp/bindings/associated_group.h" |
#include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
+#include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h" |
+#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" |
namespace mojo { |
namespace internal { |
// InterfaceEndpoint stores the information of an interface endpoint registered |
-// with the router. Always accessed under the router's lock. |
+// with the router. |
// No one other than the router's |endpoints_| and |tasks_| should hold refs to |
// this object. |
class MultiplexRouter::InterfaceEndpoint |
- : public base::RefCounted<InterfaceEndpoint> { |
+ : public base::RefCounted<InterfaceEndpoint>, |
+ public InterfaceEndpointController { |
public: |
InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) |
- : router_lock_(&router->lock_), |
+ : router_(router), |
id_(id), |
closed_(false), |
peer_closed_(false), |
- client_(nullptr) { |
- router_lock_->AssertAcquired(); |
- } |
+ client_(nullptr), |
+ event_signalled_(false) {} |
+ |
+ // --------------------------------------------------------------------------- |
+ // The following public methods are safe to call from any threads without |
+ // locking. |
InterfaceId id() const { return id_; } |
+ // --------------------------------------------------------------------------- |
+ // The following public methods are called under the router's lock. |
+ |
bool closed() const { return closed_; } |
void set_closed() { |
- router_lock_->AssertAcquired(); |
+ router_->lock_.AssertAcquired(); |
closed_ = true; |
} |
bool peer_closed() const { return peer_closed_; } |
void set_peer_closed() { |
- router_lock_->AssertAcquired(); |
+ router_->lock_.AssertAcquired(); |
peer_closed_ = true; |
} |
base::SingleThreadTaskRunner* task_runner() const { |
return task_runner_.get(); |
} |
- void set_task_runner( |
- scoped_refptr<base::SingleThreadTaskRunner> task_runner) { |
- router_lock_->AssertAcquired(); |
- task_runner_ = std::move(task_runner); |
- } |
InterfaceEndpointClient* client() const { return client_; } |
- void set_client(InterfaceEndpointClient* client) { |
- router_lock_->AssertAcquired(); |
+ |
+ void AttachClient(InterfaceEndpointClient* client) { |
+ router_->lock_.AssertAcquired(); |
+ DCHECK(!client_); |
+ DCHECK(!closed_); |
+ |
+ task_runner_ = base::MessageLoop::current()->task_runner(); |
client_ = client; |
} |
+ // It should be called on the same thread as the corresponding AttachClient() |
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: should = must? Also perhaps "This" or "This m
yzshen1
2016/03/29 16:19:01
Done. English is hard. :)
|
+ // call. |
+ void DetachClient() { |
+ router_->lock_.AssertAcquired(); |
+ DCHECK(client_); |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ DCHECK(!closed_); |
+ |
+ task_runner_ = nullptr; |
+ client_ = nullptr; |
+ sync_watcher_.reset(); |
+ } |
+ |
+ void SignalSyncMessageEvent() { |
+ router_->lock_.AssertAcquired(); |
+ if (event_signalled_) |
+ return; |
+ |
+ EnsureEventMessagePipeExists(); |
+ event_signalled_ = true; |
+ char dummy_message = '\0'; |
+ MojoResult result = |
+ WriteMessageRaw(sync_message_event_sender_.get(), &dummy_message, 1, |
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: It is perfectly legal to write a zero-length
yzshen1
2016/03/29 16:19:01
Done.
|
+ nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE); |
+ DCHECK_EQ(MOJO_RESULT_OK, result); |
+ } |
+ |
+ // --------------------------------------------------------------------------- |
+ // The following public methods (i.e., InterfaceEndpointController |
+ // implementation) are called by the client on the same thread as the |
+ // AttachClient() call. They are called outside of the router's lock. |
+ |
+ bool SendMessage(Message* message) override { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ message->set_interface_id(id_); |
+ return router_->connector_.Accept(message); |
+ } |
+ |
+ void AllowWokenUpBySyncWatchOnSameThread() override { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ |
+ EnsureSyncWatcherExists(); |
+ sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
+ } |
+ |
+ bool SyncWatch(const bool* should_stop) override { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ |
+ EnsureSyncWatcherExists(); |
+ return sync_watcher_->SyncWatch(should_stop); |
+ } |
+ |
private: |
friend class base::RefCounted<InterfaceEndpoint>; |
- ~InterfaceEndpoint() { |
- router_lock_->AssertAcquired(); |
+ ~InterfaceEndpoint() override { |
+ router_->lock_.AssertAcquired(); |
DCHECK(!client_); |
DCHECK(closed_); |
DCHECK(peer_closed_); |
+ DCHECK(!sync_watcher_); |
+ } |
+ |
+ void OnHandleReady(MojoResult result) { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ scoped_refptr<InterfaceEndpoint> self_protector(this); |
+ scoped_refptr<MultiplexRouter> router_protector(router_); |
+ |
+ // Because we never close |sync_message_event_{sender,receiver}_| before |
+ // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. |
+ DCHECK_EQ(MOJO_RESULT_OK, result); |
+ bool reset_sync_watcher = false; |
+ { |
+ base::AutoLock locker(router_->lock_); |
+ |
+ bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); |
+ |
+ if (!more_to_process) |
+ ResetSyncMessageSignal(); |
+ |
+ // Currently there are no queued sync messages and the peer has closed so |
+ // there won't be incoming sync messages in the future. |
+ reset_sync_watcher = !more_to_process && peer_closed_; |
+ } |
+ if (reset_sync_watcher) { |
+ // If a SyncWatch() call (or multiple ones) of this interface endpoint is |
+ // on the call stack, resetting the sync watcher will allow it to exit |
+ // when the call stack unwinds to that frame. |
+ sync_watcher_.reset(); |
+ } |
+ } |
+ |
+ void EnsureSyncWatcherExists() { |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+ if (sync_watcher_) |
+ return; |
+ |
+ { |
+ base::AutoLock locker(router_->lock_); |
+ EnsureEventMessagePipeExists(); |
+ |
+ auto iter = router_->sync_message_tasks_.find(id_); |
+ if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) |
+ SignalSyncMessageEvent(); |
+ } |
+ |
+ sync_watcher_.reset(new SyncHandleWatcher( |
+ sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
+ base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); |
+ } |
+ |
+ void EnsureEventMessagePipeExists() { |
+ router_->lock_.AssertAcquired(); |
+ |
+ if (sync_message_event_receiver_.is_valid()) |
+ return; |
+ |
+ MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, |
+ &sync_message_event_receiver_); |
+ DCHECK_EQ(MOJO_RESULT_OK, result); |
+ } |
+ |
+ void ResetSyncMessageSignal() { |
+ router_->lock_.AssertAcquired(); |
+ |
+ if (!event_signalled_) |
+ return; |
+ |
+ DCHECK(sync_message_event_receiver_.is_valid()); |
+ char dummy_message = 0; |
+ uint32_t size = 1; |
+ MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(), |
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: It's fine to pass nullptr for the read buffer
yzshen1
2016/03/29 16:19:01
Done.
|
+ &dummy_message, &size, nullptr, nullptr, |
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
+ DCHECK_EQ(MOJO_RESULT_OK, result); |
+ event_signalled_ = false; |
} |
- base::Lock* const router_lock_; |
+ // --------------------------------------------------------------------------- |
+ // The following members are safe to access from any threads. |
+ |
+ MultiplexRouter* const router_; |
const InterfaceId id_; |
+ // --------------------------------------------------------------------------- |
+ // The following members are accessed under the router's lock. |
+ |
// Whether the endpoint has been closed. |
bool closed_; |
// Whether the peer endpoint has been closed. |
bool peer_closed_; |
- // The task runner on which |client_| can be accessed. |
+ // The task runner on which |client_|'s methods can be called. |
scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
// Not owned. It is null if no client is attached to this endpoint. |
InterfaceEndpointClient* client_; |
+ // A message pipe used as an event to signal that sync messages are available. |
+ // The message pipe handles are initialized under the router's lock and remain |
+ // unchanged afterwards. They may be accessed outside of the router's lock |
+ // later. |
+ ScopedMessagePipeHandle sync_message_event_sender_; |
+ ScopedMessagePipeHandle sync_message_event_receiver_; |
+ bool event_signalled_; |
+ |
+ // --------------------------------------------------------------------------- |
+ // The following members are only valid while a client is attached. They are |
+ // used exclusively on the client's thread. They may be accessed outside of |
+ // the router's lock. |
+ |
+ scoped_ptr<SyncHandleWatcher> sync_watcher_; |
+ |
DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
}; |
struct MultiplexRouter::Task { |
public: |
// Doesn't take ownership of |message| but takes its contents. |
- static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) { |
- Task* task = new Task(); |
+ static scoped_ptr<Task> CreateMessageTask(Message* message) { |
+ Task* task = new Task(MESSAGE); |
task->message.reset(new Message); |
message->MoveTo(task->message.get()); |
return make_scoped_ptr(task); |
} |
static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { |
- Task* task = new Task(); |
+ Task* task = new Task(NOTIFY_ERROR); |
task->endpoint_to_notify = endpoint; |
return make_scoped_ptr(task); |
} |
~Task() {} |
- bool IsIncomingMessageTask() const { return !!message; } |
- bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } |
+ bool IsMessageTask() const { return type == MESSAGE; } |
+ bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } |
scoped_ptr<Message> message; |
scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
+ enum Type { MESSAGE, NOTIFY_ERROR }; |
+ Type type; |
+ |
private: |
- Task() {} |
+ explicit Task(Type in_type) : type(in_type) {} |
}; |
MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
@@ -125,12 +286,16 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
header_validator_(this), |
connector_(std::move(message_pipe), Connector::MULTI_THREADED_SEND), |
- encountered_error_(false), |
control_message_handler_(this), |
control_message_proxy_(&connector_), |
next_interface_id_value_(1), |
posted_to_process_tasks_(false), |
+ encountered_error_(false), |
testing_mode_(false) { |
+ // Always participate in sync handle watching, because even if it doesn't |
+ // expect sync requests during sync handle watching, it may still need to |
+ // dispatch messages to associated endpoints on a different thread. |
+ connector_.AllowWokenUpBySyncWatchOnSameThread(); |
connector_.set_incoming_receiver(&header_validator_); |
connector_.set_connection_error_handler( |
[this]() { OnPipeConnectionError(); }); |
@@ -139,6 +304,7 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
MultiplexRouter::~MultiplexRouter() { |
base::AutoLock locker(lock_); |
+ sync_message_tasks_.clear(); |
tasks_.clear(); |
for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
@@ -221,10 +387,10 @@ void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { |
if (!IsMasterInterfaceId(id)) |
control_message_proxy_.NotifyPeerEndpointClosed(id); |
- ProcessTasks(true); |
+ ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
} |
-void MultiplexRouter::AttachEndpointClient( |
+InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( |
const ScopedInterfaceEndpointHandle& handle, |
InterfaceEndpointClient* client) { |
const InterfaceId id = handle.id(); |
@@ -236,15 +402,13 @@ void MultiplexRouter::AttachEndpointClient( |
DCHECK(ContainsKey(endpoints_, id)); |
InterfaceEndpoint* endpoint = endpoints_[id].get(); |
- DCHECK(!endpoint->client()); |
- DCHECK(!endpoint->closed()); |
- |
- endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); |
- endpoint->set_client(client); |
+ endpoint->AttachClient(client); |
if (endpoint->peer_closed()) |
tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
- ProcessTasks(true); |
+ ProcessTasks(NO_DIRECT_CLIENT_CALLS); |
+ |
+ return endpoint; |
} |
void MultiplexRouter::DetachEndpointClient( |
@@ -257,18 +421,7 @@ void MultiplexRouter::DetachEndpointClient( |
DCHECK(ContainsKey(endpoints_, id)); |
InterfaceEndpoint* endpoint = endpoints_[id].get(); |
- DCHECK(endpoint->client()); |
- DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
- DCHECK(!endpoint->closed()); |
- |
- endpoint->set_task_runner(nullptr); |
- endpoint->set_client(nullptr); |
-} |
- |
-bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, |
- Message* message) { |
- message->set_interface_id(handle.id()); |
- return connector_.Accept(message); |
+ endpoint->DetachClient(); |
} |
void MultiplexRouter::RaiseError() { |
@@ -291,6 +444,15 @@ MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { |
return associated_group->router_.get(); |
} |
+void MultiplexRouter::CloseMessagePipe() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ connector_.CloseMessagePipe(); |
+ // CloseMessagePipe() above won't trigger connection error handler. |
+ // Explicitly call OnPipeConnectionError() so that associated endpoints will |
+ // get notified. |
+ OnPipeConnectionError(); |
+} |
+ |
bool MultiplexRouter::HasAssociatedEndpoints() const { |
DCHECK(thread_checker_.CalledOnValidThread()); |
base::AutoLock locker(lock_); |
@@ -317,17 +479,32 @@ bool MultiplexRouter::Accept(Message* message) { |
scoped_refptr<MultiplexRouter> protector(this); |
base::AutoLock locker(lock_); |
- bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); |
+ ClientCallBehavior client_call_behavior = |
+ connector_.during_sync_handle_watcher_callback() |
+ ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
+ : ALLOW_DIRECT_CLIENT_CALLS; |
+ |
+ bool processed = |
+ tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior); |
if (!processed) { |
// Either the task queue is not empty or we cannot process the message |
// directly. In both cases, there is no need to call ProcessTasks(). |
- tasks_.push_back(Task::CreateIncomingMessageTask(message)); |
+ tasks_.push_back(Task::CreateMessageTask(message)); |
+ Task* task = tasks_.back().get(); |
+ |
+ if (task->message->has_flag(kMessageIsSync)) { |
+ InterfaceId id = task->message->interface_id(); |
+ sync_message_tasks_[id].push_back(task); |
+ auto iter = endpoints_.find(id); |
+ if (iter != endpoints_.end()) |
+ iter->second->SignalSyncMessageEvent(); |
+ } |
} else if (!tasks_.empty()) { |
// Processing the message may result in new tasks (for error notification) |
// being added to the queue. In this case, we have to attempt to process the |
// tasks. |
- ProcessTasks(false); |
+ ProcessTasks(client_call_behavior); |
} |
// Always return true. If we see errors during message processing, we will |
@@ -388,10 +565,12 @@ void MultiplexRouter::OnPipeConnectionError() { |
UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
} |
- ProcessTasks(false); |
+ ProcessTasks(connector_.during_sync_handle_watcher_callback() |
+ ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
+ : ALLOW_DIRECT_CLIENT_CALLS); |
} |
-void MultiplexRouter::ProcessTasks(bool force_async) { |
+void MultiplexRouter::ProcessTasks(ClientCallBehavior client_call_behavior) { |
lock_.AssertAcquired(); |
if (posted_to_process_tasks_) |
@@ -401,25 +580,70 @@ void MultiplexRouter::ProcessTasks(bool force_async) { |
scoped_ptr<Task> task(std::move(tasks_.front())); |
tasks_.pop_front(); |
+ InterfaceId id = kInvalidInterfaceId; |
+ bool sync_message = task->IsMessageTask() && task->message && |
+ task->message->has_flag(kMessageIsSync); |
+ if (sync_message) { |
+ InterfaceId id = task->message->interface_id(); |
+ auto& sync_message_queue = sync_message_tasks_[id]; |
+ DCHECK_EQ(task.get(), sync_message_queue.front()); |
+ sync_message_queue.pop_front(); |
+ } |
+ |
bool processed = |
task->IsNotifyErrorTask() |
- ? ProcessNotifyErrorTask(task.get(), force_async) |
- : ProcessIncomingMessage(task->message.get(), force_async); |
+ ? ProcessNotifyErrorTask(task.get(), client_call_behavior) |
+ : ProcessIncomingMessage(task->message.get(), client_call_behavior); |
if (!processed) { |
tasks_.push_front(std::move(task)); |
+ if (sync_message) { |
+ auto& sync_message_queue = sync_message_tasks_[id]; |
+ sync_message_queue.push_front(task.get()); |
+ } |
break; |
+ } else { |
+ if (sync_message) { |
+ auto iter = sync_message_tasks_.find(id); |
+ if (iter != sync_message_tasks_.end() && iter->second.empty()) |
+ sync_message_tasks_.erase(iter); |
+ } |
} |
} |
} |
-bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { |
+bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
+ lock_.AssertAcquired(); |
+ |
+ auto iter = sync_message_tasks_.find(id); |
+ if (iter == sync_message_tasks_.end()) |
+ return false; |
+ |
+ MultiplexRouter::Task* task = iter->second.front(); |
+ iter->second.pop_front(); |
+ |
+ DCHECK(task->IsMessageTask()); |
+ scoped_ptr<Message> message(std::move(task->message)); |
+ |
+ // Note: after this call, |task| and |iter| may be invalidated. |
+ bool processed = ProcessIncomingMessage( |
+ message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES); |
+ DCHECK(processed); |
+ |
+ iter = sync_message_tasks_.find(id); |
+ return iter != sync_message_tasks_.end() && !iter->second.empty(); |
+} |
+ |
+bool MultiplexRouter::ProcessNotifyErrorTask( |
+ Task* task, |
+ ClientCallBehavior client_call_behavior) { |
lock_.AssertAcquired(); |
InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
if (!endpoint->client()) |
return true; |
- if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { |
+ if (!endpoint->task_runner()->BelongsToCurrentThread() || |
+ client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS) { |
MaybePostToProcessTasks(endpoint->task_runner()); |
return false; |
} |
@@ -437,9 +661,17 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { |
return true; |
} |
-bool MultiplexRouter::ProcessIncomingMessage(Message* message, |
- bool force_async) { |
+bool MultiplexRouter::ProcessIncomingMessage( |
+ Message* message, |
+ ClientCallBehavior client_call_behavior) { |
lock_.AssertAcquired(); |
+ |
+ if (!message) { |
+ // This is a sync message and has been processed during sync handle |
+ // watching. |
+ return true; |
+ } |
+ |
if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
if (!control_message_handler_.Accept(message)) |
RaiseErrorInNonTestingMode(); |
@@ -474,7 +706,12 @@ bool MultiplexRouter::ProcessIncomingMessage(Message* message, |
return false; |
} |
- if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { |
+ bool can_direct_call = |
+ (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS) || |
+ (client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES && |
+ message->has_flag(kMessageIsSync)); |
+ |
+ if (!endpoint->task_runner()->BelongsToCurrentThread() || !can_direct_call) { |
MaybePostToProcessTasks(endpoint->task_runner()); |
return false; |
} |
@@ -513,7 +750,7 @@ void MultiplexRouter::LockAndCallProcessTasks() { |
// always called using base::Bind(), which holds a ref. |
base::AutoLock locker(lock_); |
posted_to_process_tasks_ = false; |
- ProcessTasks(false); |
+ ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS); |
} |
void MultiplexRouter::UpdateEndpointStateMayRemove( |
@@ -525,6 +762,9 @@ void MultiplexRouter::UpdateEndpointStateMayRemove( |
break; |
case PEER_ENDPOINT_CLOSED: |
endpoint->set_peer_closed(); |
+ // If the interface endpoint is performing a sync watch, this makes sure |
+ // it is notified and eventually exits the sync watch. |
+ endpoint->SignalSyncMessageEvent(); |
break; |
} |
if (endpoint->closed() && endpoint->peer_closed()) |