Chromium Code Reviews| Index: ipc/ipc_mojo_bootstrap.cc |
| diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc |
| index 02354e3fb0d425c2b521ce106b4a003de16bdd0d..3177d4d55b37ddc88c3438460ab215ce8cb77796 100644 |
| --- a/ipc/ipc_mojo_bootstrap.cc |
| +++ b/ipc/ipc_mojo_bootstrap.cc |
| @@ -8,6 +8,7 @@ |
| #include <map> |
| #include <memory> |
| +#include <queue> |
| #include <utility> |
| #include <vector> |
| @@ -18,6 +19,7 @@ |
| #include "base/single_thread_task_runner.h" |
| #include "base/synchronization/lock.h" |
| #include "base/threading/thread_task_runner_handle.h" |
| +#include "ipc/mojo_event.h" |
| #include "mojo/public/cpp/bindings/associated_group.h" |
| #include "mojo/public/cpp/bindings/associated_group_controller.h" |
| #include "mojo/public/cpp/bindings/connector.h" |
| @@ -29,6 +31,7 @@ |
| #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" |
| #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" |
| #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" |
| +#include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
| namespace IPC { |
| @@ -273,6 +276,29 @@ class ChannelAssociatedGroupController |
| task_runner_ = nullptr; |
| client_ = nullptr; |
| + sync_watcher_.reset(); |
| + } |
| + |
| + void EnqueueSyncMessage(std::unique_ptr<mojo::Message> message) { |
| + controller_->lock_.AssertAcquired(); |
| + sync_messages_.emplace(std::move(message)); |
| + SignalSyncMessageEvent(); |
| + } |
| + |
| + void SignalSyncMessageEvent() { |
| + controller_->lock_.AssertAcquired(); |
| + EnsureSyncMessageEventExists(); |
| + sync_message_event_->Signal(); |
| + } |
| + |
| + std::unique_ptr<mojo::Message> PopSyncMessage(mojo::Message* raw_message) { |
| + controller_->lock_.AssertAcquired(); |
| + if (sync_messages_.empty() || sync_messages_.front().get() != raw_message) |
| + return nullptr; |
| + std::unique_ptr<mojo::Message> message = |
| + std::move(sync_messages_.front()); |
| + sync_messages_.pop(); |
| + return message; |
| } |
| // mojo::InterfaceEndpointController: |
| @@ -285,8 +311,8 @@ class ChannelAssociatedGroupController |
| void AllowWokenUpBySyncWatchOnSameThread() override { |
| DCHECK(task_runner_->BelongsToCurrentThread()); |
| - // TODO(rockot): Implement sync waiting. |
| - NOTREACHED(); |
| + EnsureSyncWatcherExists(); |
| + sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| } |
| bool SyncWatch(const bool* should_stop) override { |
| @@ -297,15 +323,90 @@ class ChannelAssociatedGroupController |
| DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); |
| DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); |
| - // TODO(rockot): Implement sync waiting. |
| - NOTREACHED(); |
| - return false; |
| + EnsureSyncWatcherExists(); |
| + return sync_watcher_->SyncWatch(should_stop); |
| } |
| private: |
| friend class base::RefCountedThreadSafe<Endpoint>; |
| - ~Endpoint() override {} |
| + ~Endpoint() override { |
| + controller_->lock_.AssertAcquired(); |
| + DCHECK(!client_); |
| + DCHECK(closed_); |
| + DCHECK(peer_closed_); |
| + DCHECK(!sync_watcher_); |
| + } |
| + |
| + void OnSyncMessageEventHandleReady(MojoResult result) { |
| + DCHECK(task_runner_->BelongsToCurrentThread()); |
| + |
| + scoped_refptr<Endpoint> keepalive(this); |
| + scoped_refptr<AssociatedGroupController> controller_keepalive( |
| + controller_); |
| + |
| + bool reset_sync_watcher = false; |
| + { |
| + base::AutoLock locker(controller_->lock_); |
| + bool more_to_process = false; |
| + if (!sync_messages_.empty()) { |
| + std::unique_ptr<mojo::Message> message( |
| + std::move(sync_messages_.front())); |
| + sync_messages_.pop(); |
| + |
| + bool dispatch_succeeded; |
| + mojo::InterfaceEndpointClient* client = client_; |
| + { |
| + base::AutoUnlock unlocker(controller_->lock_); |
| + dispatch_succeeded = client->HandleIncomingMessage(message.get()); |
| + } |
| + |
| + if (!sync_messages_.empty()) |
| + more_to_process = true; |
| + |
| + if (!dispatch_succeeded) |
| + controller_->RaiseError(); |
| + } |
| + |
| + if (!more_to_process) |
| + sync_message_event_->Reset(); |
| + |
| + // If there are no queued sync messages and the peer has closed, there |
| + // there won't be incoming sync messages in the future. |
| + reset_sync_watcher = !more_to_process && peer_closed_; |
| + } |
| + |
| + if (reset_sync_watcher) { |
| + // If a SyncWatch() call (or multiple ones) of this interface endpoint |
| + // is on the call stack, resetting the sync watcher will allow it to |
| + // exit when the call stack unwinds to that frame. |
| + sync_watcher_.reset(); |
| + } |
| + } |
| + |
| + void EnsureSyncWatcherExists() { |
| + DCHECK(task_runner_->BelongsToCurrentThread()); |
| + if (sync_watcher_) |
| + return; |
| + |
| + { |
| + base::AutoLock locker(controller_->lock_); |
| + EnsureSyncMessageEventExists(); |
| + if (!sync_messages_.empty()) |
| + SignalSyncMessageEvent(); |
| + } |
| + |
| + sync_watcher_.reset(new mojo::SyncHandleWatcher( |
| + sync_message_event_->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| + base::Bind(&Endpoint::OnSyncMessageEventHandleReady, |
| + base::Unretained(this)))); |
| + } |
| + |
| + void EnsureSyncMessageEventExists() { |
| + controller_->lock_.AssertAcquired(); |
| + if (!sync_message_event_) |
| + sync_message_event_.reset(new MojoEvent); |
| + } |
| ChannelAssociatedGroupController* const controller_; |
| const mojo::InterfaceId id_; |
| @@ -314,6 +415,9 @@ class ChannelAssociatedGroupController |
| bool peer_closed_ = false; |
| mojo::InterfaceEndpointClient* client_ = nullptr; |
| scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| + std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; |
| + std::unique_ptr<MojoEvent> sync_message_event_; |
| + std::queue<std::unique_ptr<mojo::Message>> sync_messages_; |
| DISALLOW_COPY_AND_ASSIGN(Endpoint); |
| }; |
| @@ -423,17 +527,22 @@ class ChannelAssociatedGroupController |
| endpoint->task_runner()->PostTask( |
| FROM_HERE, |
| base::Bind(&ChannelAssociatedGroupController |
| - ::NotifyEndpointOfErrorOnEndpointThread, this, |
| - make_scoped_refptr(endpoint))); |
| + ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), |
| + endpoint)); |
| } |
| } |
| - void NotifyEndpointOfErrorOnEndpointThread(scoped_refptr<Endpoint> endpoint) { |
| + void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, |
| + Endpoint* endpoint) { |
| base::AutoLock locker(lock_); |
| + auto iter = endpoints_.find(id); |
| + if (iter == endpoints_.end() || iter->second.get() != endpoint) |
| + return; |
| if (!endpoint->client()) |
| return; |
| + |
| DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| - NotifyEndpointOfError(endpoint.get(), false /* force_async */); |
| + NotifyEndpointOfError(endpoint, false /* force_async */); |
| } |
| void MarkClosedAndMaybeRemove(Endpoint* endpoint) { |
| @@ -446,6 +555,7 @@ class ChannelAssociatedGroupController |
| void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) { |
| lock_.AssertAcquired(); |
| endpoint->set_peer_closed(); |
| + endpoint->SignalSyncMessageEvent(); |
| if (endpoint->closed() && endpoint->peer_closed()) |
| endpoints_.erase(endpoint->id()); |
| } |
| @@ -489,8 +599,25 @@ class ChannelAssociatedGroupController |
| // If the client is not yet bound, it must be bound by the time this task |
| // runs or else it's programmer error. |
| DCHECK(proxy_task_runner_); |
| + |
| std::unique_ptr<mojo::Message> passed_message(new mojo::Message); |
| message->MoveTo(passed_message.get()); |
| + |
| + if (passed_message->has_flag(mojo::Message::kFlagIsSync)) { |
| + // Sync messages may need to be handled by the endpoint if it's blocking |
| + // on a sync reply. We pass ownership of the message to the endpoint's |
| + // sync message queue. If the endpoint was blocking, it will dequeue the |
| + // message and dispatch it. Otherwise the posted |AcceptSyncMessage()| |
| + // call will dequeue the message and dispatch it. |
| + mojo::Message* raw_message = passed_message.get(); |
| + endpoint->EnqueueSyncMessage(std::move(passed_message)); |
| + proxy_task_runner_->PostTask( |
| + FROM_HERE, |
| + base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage, |
| + this, id, raw_message)); |
|
yzshen1
2016/08/02 17:38:37
Raw message pointer may not be an appropriate ID:
Ken Rockot(use gerrit already)
2016/08/02 18:21:24
I've switched to using sequential integers. Overfl
yzshen1
2016/08/02 18:34:11
SGTM
|
| + return true; |
| + } |
| + |
| proxy_task_runner_->PostTask( |
| FROM_HERE, |
| base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, |
| @@ -500,7 +627,8 @@ class ChannelAssociatedGroupController |
| // We do not expect to receive sync responses on the master endpoint thread. |
| // If it's happening, it's a bug. |
| - DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); |
| + DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) || |
| + !message->has_flag(mojo::Message::kFlagIsResponse)); |
| base::AutoUnlock unlocker(lock_); |
| return client->HandleIncomingMessage(message); |
| @@ -512,6 +640,7 @@ class ChannelAssociatedGroupController |
| mojo::InterfaceId id = message->interface_id(); |
| DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); |
| + scoped_refptr<ChannelAssociatedGroupController> keepalive(this); |
|
yzshen1
2016/08/02 17:38:37
I think this line is not needed because the base::
Ken Rockot(use gerrit already)
2016/08/02 18:21:24
Done
|
| base::AutoLock locker(lock_); |
| Endpoint* endpoint = GetEndpointForDispatch(id); |
| if (!endpoint) |
| @@ -523,8 +652,7 @@ class ChannelAssociatedGroupController |
| DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| - // TODO(rockot): Implement sync dispatch. For now, sync messages are |
| - // unsupported here. |
| + // Sync messages should never make their way to this method. |
| DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); |
| bool result = false; |
| @@ -537,6 +665,39 @@ class ChannelAssociatedGroupController |
| RaiseError(); |
| } |
| + void AcceptSyncMessage(mojo::InterfaceId interface_id, |
| + mojo::Message* raw_message) { |
| + DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
| + |
| + scoped_refptr<ChannelAssociatedGroupController> keepalive(this); |
| + base::AutoLock locker(lock_); |
| + Endpoint* endpoint = GetEndpointForDispatch(interface_id); |
| + if (!endpoint) |
| + return; |
| + |
| + mojo::InterfaceEndpointClient* client = endpoint->client(); |
| + if (!client) |
|
yzshen1
2016/08/02 17:38:37
If there is not client, what should we do here? Sh
Ken Rockot(use gerrit already)
2016/08/02 18:21:24
Fixed to drop the message in this case
yzshen1
2016/08/02 18:34:11
Btw, I think we don't have code to disallow re-att
|
| + return; |
| + |
| + DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| + std::unique_ptr<mojo::Message> message = |
| + endpoint->PopSyncMessage(raw_message); |
| + |
| + // The message must have already been dequeued by the endpoint waking up |
| + // from a sync wait. Nothing to do. |
| + if (!message) |
| + return; |
| + |
| + bool result = false; |
| + { |
| + base::AutoUnlock unlocker(lock_); |
| + result = client->HandleIncomingMessage(message.get()); |
| + } |
| + |
| + if (!result) |
| + RaiseError(); |
| + } |
| + |
| Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) { |
| lock_.AssertAcquired(); |
| bool inserted = false; |