| Index: ipc/ipc_mojo_bootstrap.cc
|
| diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc
|
| index 02354e3fb0d425c2b521ce106b4a003de16bdd0d..d92fd3ef10a01fb975ef35e3bc1e72a750987cfb 100644
|
| --- a/ipc/ipc_mojo_bootstrap.cc
|
| +++ b/ipc/ipc_mojo_bootstrap.cc
|
| @@ -8,6 +8,7 @@
|
|
|
| #include <map>
|
| #include <memory>
|
| +#include <queue>
|
| #include <utility>
|
| #include <vector>
|
|
|
| @@ -18,6 +19,7 @@
|
| #include "base/single_thread_task_runner.h"
|
| #include "base/synchronization/lock.h"
|
| #include "base/threading/thread_task_runner_handle.h"
|
| +#include "ipc/mojo_event.h"
|
| #include "mojo/public/cpp/bindings/associated_group.h"
|
| #include "mojo/public/cpp/bindings/associated_group_controller.h"
|
| #include "mojo/public/cpp/bindings/connector.h"
|
| @@ -29,6 +31,7 @@
|
| #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
|
| #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
|
| #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
|
| +#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
|
|
|
| namespace IPC {
|
|
|
| @@ -273,6 +276,31 @@ class ChannelAssociatedGroupController
|
|
|
| task_runner_ = nullptr;
|
| client_ = nullptr;
|
| + sync_watcher_.reset();
|
| + }
|
| +
|
| + uint32_t EnqueueSyncMessage(std::unique_ptr<mojo::Message> message) {
|
| + controller_->lock_.AssertAcquired();
|
| + uint32_t id = GenerateSyncMessageId();
|
| + sync_messages_.emplace(id, std::move(message));
|
| + SignalSyncMessageEvent();
|
| + return id;
|
| + }
|
| +
|
| + void SignalSyncMessageEvent() {
|
| + controller_->lock_.AssertAcquired();
|
| + EnsureSyncMessageEventExists();
|
| + sync_message_event_->Signal();
|
| + }
|
| +
|
| + std::unique_ptr<mojo::Message> PopSyncMessage(uint32_t id) {
|
| + controller_->lock_.AssertAcquired();
|
| + if (sync_messages_.empty() || sync_messages_.front().first != id)
|
| + return nullptr;
|
| + std::unique_ptr<mojo::Message> message =
|
| + std::move(sync_messages_.front().second);
|
| + sync_messages_.pop();
|
| + return message;
|
| }
|
|
|
| // mojo::InterfaceEndpointController:
|
| @@ -285,8 +313,8 @@ class ChannelAssociatedGroupController
|
| void AllowWokenUpBySyncWatchOnSameThread() override {
|
| DCHECK(task_runner_->BelongsToCurrentThread());
|
|
|
| - // TODO(rockot): Implement sync waiting.
|
| - NOTREACHED();
|
| + EnsureSyncWatcherExists();
|
| + sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
|
| }
|
|
|
| bool SyncWatch(const bool* should_stop) override {
|
| @@ -297,15 +325,97 @@ class ChannelAssociatedGroupController
|
| DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
|
| DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
|
|
|
| - // TODO(rockot): Implement sync waiting.
|
| - NOTREACHED();
|
| - return false;
|
| + EnsureSyncWatcherExists();
|
| + return sync_watcher_->SyncWatch(should_stop);
|
| }
|
|
|
| private:
|
| friend class base::RefCountedThreadSafe<Endpoint>;
|
|
|
| - ~Endpoint() override {}
|
| + ~Endpoint() override {
|
| + controller_->lock_.AssertAcquired();
|
| + DCHECK(!client_);
|
| + DCHECK(closed_);
|
| + DCHECK(peer_closed_);
|
| + DCHECK(!sync_watcher_);
|
| + }
|
| +
|
| + void OnSyncMessageEventHandleReady(MojoResult result) {
|
| + DCHECK(task_runner_->BelongsToCurrentThread());
|
| +
|
| + scoped_refptr<Endpoint> keepalive(this);
|
| + scoped_refptr<AssociatedGroupController> controller_keepalive(
|
| + controller_);
|
| +
|
| + bool reset_sync_watcher = false;
|
| + {
|
| + base::AutoLock locker(controller_->lock_);
|
| + bool more_to_process = false;
|
| + if (!sync_messages_.empty()) {
|
| + std::unique_ptr<mojo::Message> message(
|
| + std::move(sync_messages_.front().second));
|
| + sync_messages_.pop();
|
| +
|
| + bool dispatch_succeeded;
|
| + mojo::InterfaceEndpointClient* client = client_;
|
| + {
|
| + base::AutoUnlock unlocker(controller_->lock_);
|
| + dispatch_succeeded = client->HandleIncomingMessage(message.get());
|
| + }
|
| +
|
| + if (!sync_messages_.empty())
|
| + more_to_process = true;
|
| +
|
| + if (!dispatch_succeeded)
|
| + controller_->RaiseError();
|
| + }
|
| +
|
| + if (!more_to_process)
|
| + sync_message_event_->Reset();
|
| +
|
| + // If there are no queued sync messages and the peer has closed, there
|
| + // there won't be incoming sync messages in the future.
|
| + reset_sync_watcher = !more_to_process && peer_closed_;
|
| + }
|
| +
|
| + if (reset_sync_watcher) {
|
| + // If a SyncWatch() call (or multiple ones) of this interface endpoint
|
| + // is on the call stack, resetting the sync watcher will allow it to
|
| + // exit when the call stack unwinds to that frame.
|
| + sync_watcher_.reset();
|
| + }
|
| + }
|
| +
|
| + void EnsureSyncWatcherExists() {
|
| + DCHECK(task_runner_->BelongsToCurrentThread());
|
| + if (sync_watcher_)
|
| + return;
|
| +
|
| + {
|
| + base::AutoLock locker(controller_->lock_);
|
| + EnsureSyncMessageEventExists();
|
| + if (!sync_messages_.empty())
|
| + SignalSyncMessageEvent();
|
| + }
|
| +
|
| + sync_watcher_.reset(new mojo::SyncHandleWatcher(
|
| + sync_message_event_->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
|
| + base::Bind(&Endpoint::OnSyncMessageEventHandleReady,
|
| + base::Unretained(this))));
|
| + }
|
| +
|
| + void EnsureSyncMessageEventExists() {
|
| + controller_->lock_.AssertAcquired();
|
| + if (!sync_message_event_)
|
| + sync_message_event_.reset(new MojoEvent);
|
| + }
|
| +
|
| + uint32_t GenerateSyncMessageId() {
|
| + // Overflow is fine.
|
| + uint32_t id = next_sync_message_id_++;
|
| + DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
|
| + return id;
|
| + }
|
|
|
| ChannelAssociatedGroupController* const controller_;
|
| const mojo::InterfaceId id_;
|
| @@ -314,6 +424,11 @@ class ChannelAssociatedGroupController
|
| bool peer_closed_ = false;
|
| mojo::InterfaceEndpointClient* client_ = nullptr;
|
| scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
|
| + std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_;
|
| + std::unique_ptr<MojoEvent> sync_message_event_;
|
| + std::queue<std::pair<uint32_t, std::unique_ptr<mojo::Message>>>
|
| + sync_messages_;
|
| + uint32_t next_sync_message_id_ = 0;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(Endpoint);
|
| };
|
| @@ -423,17 +538,22 @@ class ChannelAssociatedGroupController
|
| endpoint->task_runner()->PostTask(
|
| FROM_HERE,
|
| base::Bind(&ChannelAssociatedGroupController
|
| - ::NotifyEndpointOfErrorOnEndpointThread, this,
|
| - make_scoped_refptr(endpoint)));
|
| + ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(),
|
| + endpoint));
|
| }
|
| }
|
|
|
| - void NotifyEndpointOfErrorOnEndpointThread(scoped_refptr<Endpoint> endpoint) {
|
| + void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
|
| + Endpoint* endpoint) {
|
| base::AutoLock locker(lock_);
|
| + auto iter = endpoints_.find(id);
|
| + if (iter == endpoints_.end() || iter->second.get() != endpoint)
|
| + return;
|
| if (!endpoint->client())
|
| return;
|
| +
|
| DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
|
| - NotifyEndpointOfError(endpoint.get(), false /* force_async */);
|
| + NotifyEndpointOfError(endpoint, false /* force_async */);
|
| }
|
|
|
| void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
|
| @@ -446,6 +566,7 @@ class ChannelAssociatedGroupController
|
| void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
|
| lock_.AssertAcquired();
|
| endpoint->set_peer_closed();
|
| + endpoint->SignalSyncMessageEvent();
|
| if (endpoint->closed() && endpoint->peer_closed())
|
| endpoints_.erase(endpoint->id());
|
| }
|
| @@ -489,8 +610,25 @@ class ChannelAssociatedGroupController
|
| // If the client is not yet bound, it must be bound by the time this task
|
| // runs or else it's programmer error.
|
| DCHECK(proxy_task_runner_);
|
| +
|
| std::unique_ptr<mojo::Message> passed_message(new mojo::Message);
|
| message->MoveTo(passed_message.get());
|
| +
|
| + if (passed_message->has_flag(mojo::Message::kFlagIsSync)) {
|
| + // Sync messages may need to be handled by the endpoint if it's blocking
|
| + // on a sync reply. We pass ownership of the message to the endpoint's
|
| + // sync message queue. If the endpoint was blocking, it will dequeue the
|
| + // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
|
| + // call will dequeue the message and dispatch it.
|
| + uint32_t message_id =
|
| + endpoint->EnqueueSyncMessage(std::move(passed_message));
|
| + proxy_task_runner_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage,
|
| + this, id, message_id));
|
| + return true;
|
| + }
|
| +
|
| proxy_task_runner_->PostTask(
|
| FROM_HERE,
|
| base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
|
| @@ -500,7 +638,8 @@ class ChannelAssociatedGroupController
|
|
|
| // We do not expect to receive sync responses on the master endpoint thread.
|
| // If it's happening, it's a bug.
|
| - DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
|
| + DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) ||
|
| + !message->has_flag(mojo::Message::kFlagIsResponse));
|
|
|
| base::AutoUnlock unlocker(lock_);
|
| return client->HandleIncomingMessage(message);
|
| @@ -523,8 +662,7 @@ class ChannelAssociatedGroupController
|
|
|
| DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
|
|
|
| - // TODO(rockot): Implement sync dispatch. For now, sync messages are
|
| - // unsupported here.
|
| + // Sync messages should never make their way to this method.
|
| DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
|
|
|
| bool result = false;
|
| @@ -537,6 +675,37 @@ class ChannelAssociatedGroupController
|
| RaiseError();
|
| }
|
|
|
| + void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) {
|
| + DCHECK(proxy_task_runner_->BelongsToCurrentThread());
|
| +
|
| + base::AutoLock locker(lock_);
|
| + Endpoint* endpoint = GetEndpointForDispatch(interface_id);
|
| + if (!endpoint)
|
| + return;
|
| +
|
| + DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
|
| + std::unique_ptr<mojo::Message> message =
|
| + endpoint->PopSyncMessage(message_id);
|
| +
|
| + // The message must have already been dequeued by the endpoint waking up
|
| + // from a sync wait. Nothing to do.
|
| + if (!message)
|
| + return;
|
| +
|
| + mojo::InterfaceEndpointClient* client = endpoint->client();
|
| + if (!client)
|
| + return;
|
| +
|
| + bool result = false;
|
| + {
|
| + base::AutoUnlock unlocker(lock_);
|
| + result = client->HandleIncomingMessage(message.get());
|
| + }
|
| +
|
| + if (!result)
|
| + RaiseError();
|
| + }
|
| +
|
| Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) {
|
| lock_.AssertAcquired();
|
| bool inserted = false;
|
|
|